179 lines
6.9 KiB
Python
179 lines
6.9 KiB
Python
import math
|
|
import re
|
|
from typing import List, Optional
|
|
from PySide6.QtCore import Signal
|
|
from src.ui.bigtext.highlight_regex import HighlightRegex
|
|
from src.ui.bigtext.highlighting import Highlighting
|
|
from src.ui.bigtext.line import Line
|
|
import os
|
|
from src.settings.settings import Settings
|
|
|
|
|
|
class LogFileModel:
|
|
_query_highlight: Optional[HighlightRegex] = None
|
|
|
|
file_size_changed = Signal()
|
|
"""Fires when the file size changed. **Note:** uses strings,
|
|
because int in Qt signal are limited to 32bit."""
|
|
|
|
_file_size = -1
|
|
|
|
range_start = 0
|
|
range_end = -1
|
|
|
|
def __init__(self, file: str, settings: Settings):
|
|
self.settings = settings
|
|
self._file = os.path.realpath(file)
|
|
|
|
def highlighters(self):
|
|
all_highlighters = Highlighting.read_config(self.settings)
|
|
active_highlighters = []
|
|
for h in all_highlighters:
|
|
if h.is_active():
|
|
active_highlighters.append(h)
|
|
return active_highlighters
|
|
|
|
def get_file(self):
|
|
return self._file
|
|
|
|
def __str__(self):
|
|
return self._file
|
|
|
|
def setRange(self, range_start: int, range_end: int):
|
|
self.range_start = range_start
|
|
self.range_end = range_end
|
|
|
|
def get_query_highlight(self) -> Optional[HighlightRegex]:
|
|
if not self.settings.session.getboolean("general", "highlight_search_term"):
|
|
return None
|
|
return self._query_highlight
|
|
|
|
def clear_query_highlight(self):
|
|
self._query_highlight = None
|
|
|
|
def set_query_highlight(self, query: str, ignore_case: bool, is_regex: bool):
|
|
self._query_highlight = HighlightRegex(
|
|
query=query,
|
|
ignore_case=ignore_case,
|
|
is_regex=is_regex,
|
|
hit_background_color="ffff00")
|
|
|
|
def get_tab_name(self) -> str:
|
|
file_name = os.path.basename(self._file)
|
|
if len(file_name) > 35:
|
|
file_name = file_name[:15] + "..." + file_name[-15:]
|
|
return file_name
|
|
|
|
def read_range(self, start_byte: int, end_byte: int) -> str:
|
|
# with self._lock:
|
|
if True:
|
|
with open(self._file, 'rb') as f:
|
|
f.seek(start_byte)
|
|
bytes = f.read(end_byte - start_byte)
|
|
return bytes.decode("utf8", errors="ignore")
|
|
|
|
def write_range(self, start_byte: int, end_byte: int, file: str):
|
|
# print("write range: %d - %d -> %s" % (start_byte, end_byte, file))
|
|
with open(self._file, 'rb') as source, open(file, "w+b") as target:
|
|
offset = start_byte
|
|
source.seek(offset)
|
|
while offset < end_byte:
|
|
new_offset = min(offset + 1024 * 1024, end_byte)
|
|
buffer_size = new_offset - offset
|
|
buffer = source.read(buffer_size)
|
|
target.write(buffer)
|
|
offset = new_offset
|
|
|
|
def get_line_start_at(self, byte_offset: int) -> int:
|
|
lines = self.data(byte_offset, 0, 1, 0, -1);
|
|
if len(lines) == 0:
|
|
return 0
|
|
return lines[0].byte_offset()
|
|
|
|
def get_line_end_at(self, byte_offset: int) -> int:
|
|
lines = self.data(byte_offset, 0, 1, 0, -1);
|
|
if len(lines) == 0:
|
|
return 0
|
|
return lines[0].byte_end()
|
|
|
|
def read_word_at(self, byte_offset: int) -> (str, int, int):
|
|
lines = self.data(byte_offset, 0, 1, 0, -1)
|
|
if len(lines) == 0:
|
|
return "", -1, -1
|
|
line: Line = lines[0]
|
|
if not line.includes_byte(byte_offset):
|
|
return "", -1, -1
|
|
|
|
offset_in_line = byte_offset - line.byte_offset()
|
|
char_index = line.byte_index_to_char_index(offset_in_line)
|
|
current_char = line.line()[char_index]
|
|
# print("read_word: char_index=%s, current_char=%s, line=%s" %(char_index, current_char, line.line()))
|
|
if not self._is_word_char(current_char):
|
|
return current_char, byte_offset, byte_offset + 1
|
|
start_in_line = line.byte_index_to_char_index(byte_offset - line.byte_offset())
|
|
while start_in_line - 1 >= 0 and self._is_word_char(line.line()[start_in_line - 1]):
|
|
start_in_line = start_in_line - 1
|
|
end_in_line = line.byte_index_to_char_index(byte_offset - line.byte_offset())
|
|
while end_in_line < len(line.line()) and self._is_word_char(line.line()[end_in_line]):
|
|
end_in_line = end_in_line + 1
|
|
start_byte = line.char_index_to_byte(start_in_line) + line.byte_offset()
|
|
end_byte = line.char_index_to_byte(end_in_line) + line.byte_offset()
|
|
return line.line()[start_in_line:end_in_line], start_byte, end_byte
|
|
|
|
def _is_word_char(self, char: str) -> bool:
|
|
return re.match(r"\w", char) is not None
|
|
|
|
def data(self, byte_offset: int, scroll_lines: int, lines: int, range_start: int, range_end: int) -> List[Line]:
|
|
# print("data(%s, %s, %s)" % (byte_offset, scroll_lines, lines))
|
|
lines_before_offset: List[Line] = []
|
|
lines_after_offset: List[Line] = []
|
|
lines_to_find = lines + abs(scroll_lines)
|
|
lines_to_return = math.ceil(lines)
|
|
|
|
# TODO handle lines longer than 4096 bytes
|
|
# TODO abort file open after a few seconds: https://docs.python.org/3/library/signal.html#example
|
|
with open(self._file, 'rb') as f:
|
|
offset = min(byte_offset, self.byte_count())
|
|
# print("offset: %s byte_count: %d" % (offset, self.byte_count()))
|
|
offset = max(0,
|
|
max(range_start - self.settings.max_line_length(), offset - self.settings.max_line_length()))
|
|
|
|
f.seek(offset)
|
|
while l := f.readline():
|
|
new_offset = f.tell()
|
|
if 0 <= range_end < new_offset:
|
|
break
|
|
line = Line(offset, new_offset, l.decode("utf8", errors="ignore"))
|
|
|
|
if line.byte_end() <= byte_offset: # line.byte_end() returns the end byte +1
|
|
if line.byte_offset() >= range_start: # only add if in range
|
|
lines_before_offset.append(line)
|
|
else:
|
|
lines_after_offset.append(line)
|
|
offset = f.tell()
|
|
if len(lines_after_offset) >= lines_to_find:
|
|
break
|
|
|
|
all_lines = lines_before_offset + lines_after_offset
|
|
start = max(0, len(lines_before_offset) + scroll_lines)
|
|
if start + lines_to_return - 1 < len(all_lines):
|
|
result = all_lines[start:start + lines_to_return]
|
|
else:
|
|
result = all_lines[-lines_to_return + 1:]
|
|
|
|
# print("returning %s lines" % (len(result)))
|
|
# if len(result) > 0:
|
|
# print("returning %s %d -> %d" % (result[0].line(), result[0].byte_offset(), result[0].byte_end()))
|
|
return result
|
|
|
|
def byte_count(self) -> int:
|
|
size = os.stat(self._file).st_size
|
|
if self._file_size != size:
|
|
# self.file_size_changed.emit(str(size))
|
|
self._file_size = size
|
|
return size
|
|
|
|
def truncate(self):
|
|
with open(self._file, 'a') as f:
|
|
f.truncate(0)
|