import math import re from typing import List, Optional from PyQt6.QtCore import pyqtSignal from highlight_regex import HighlightRegex from highlighting import Highlighting from line import Line import os from settings import Settings class LogFileModel: _query_highlight: Optional[HighlightRegex] = None file_size_changed = pyqtSignal(str) """Fires when the file size changed. **Note:** uses strings, because int in Qt signal are limited to 32bit.""" _file_size = -1 def __init__(self, file: str, settings: Settings): self.settings = settings self._file = os.path.realpath(file) # self._lock = threading.RLock() def highlighters(self): return Highlighting.read_config(self.settings) def get_file(self): return self._file def __str__(self): return self._file def get_query_highlight(self) -> Optional[HighlightRegex]: if not self.settings.session.getboolean("general", "highlight_search_term"): return None return self._query_highlight def clear_query_highlight(self): self._query_highlight = None def set_query_highlight(self, query: str, ignore_case: bool, is_regex: bool): self._query_highlight = HighlightRegex( query=query, ignore_case=ignore_case, is_regex=is_regex, hit_background_color="ffff00") def get_tab_name(self): file_name = os.path.basename(self._file) if len(file_name) > 35: file_name = file_name[:15] + "..." + file_name[-15:] return file_name def read_range(self, start_byte: int, end_byte: int): # with self._lock: if True: with open(self._file, 'rb') as f: f.seek(start_byte) bytes = f.read(end_byte - start_byte) return bytes.decode("utf8", errors="ignore") def write_range(self, start_byte: int, end_byte: int, file: str): # print("write range: %d - %d -> %s" % (start_byte, end_byte, file)) # with self._lock, open(self._file, 'rb') as source, open(file, "w+b") as target: with open(self._file, 'rb') as source, open(file, "w+b") as target: offset = start_byte source.seek(offset) while offset < end_byte: new_offset = min(offset + 1024 * 1024, end_byte) buffer_size = new_offset - offset buffer = source.read(buffer_size) target.write(buffer) offset = new_offset def read_word_at(self, byte_offset: int) -> (str, int, int): lines = self.data(byte_offset, 0, 1) if len(lines) == 0: return "", -1, -1 line: Line = lines[0] if not lines[0].includes_byte(byte_offset): return "", -1, -1 offset_in_line = byte_offset - line.byte_offset() char_index = line.byte_index_to_char_index(offset_in_line) # todo char_index may be out of range current_char = line.line()[char_index] if not self._is_word_char(current_char): return current_char, byte_offset, byte_offset + 1 start_in_line = line.byte_index_to_char_index(byte_offset - line.byte_offset()) while start_in_line - 1 >= 0 and self._is_word_char(line.line()[start_in_line - 1]): start_in_line = start_in_line - 1 end_in_line = line.byte_index_to_char_index(byte_offset - line.byte_offset()) while end_in_line < len(line.line()) and self._is_word_char(line.line()[end_in_line]): end_in_line = end_in_line + 1 start_byte = line.char_index_to_byte(start_in_line) + line.byte_offset() end_byte = line.char_index_to_byte(end_in_line) + line.byte_offset() return line.line()[start_in_line:end_in_line], start_byte, end_byte def _is_word_char(self, char: str) -> bool: return re.match(r"\w", char) is not None def data(self, byte_offset: int, scroll_lines: int, lines: int) -> List[Line]: # print("data(%s, %s, %s)" % (byte_offset, scroll_lines, lines)) lines_before_offset: List[Line] = [] lines_after_offset: List[Line] = [] lines_to_find = lines + abs(scroll_lines) lines_to_return = math.ceil(lines) # with self._lock: if True: # TODO handle lines longer than 4096 bytes # TODO abort file open after a few secons: https://docs.python.org/3/library/signal.html#example with open(self._file, 'rb') as f: offset = min(byte_offset, self.byte_count()) # print("offset: %s byte_count: %d" % (offset, self.byte_count())) offset = max(0, offset - self.settings.max_line_length()) eof_reached = True f.seek(offset) while l := f.readline(): new_offset = f.tell() line = Line(offset, new_offset, l.decode("utf8", errors="ignore").replace("\r", "").replace("\n", "")) # print("%s %s %s" %(line.byte_offset(), line.line(), line.byte_end())) if line.byte_end() <= byte_offset: # line.byte_end() returns the end byte +1 lines_before_offset.append(line) else: lines_after_offset.append(line) offset = f.tell() if len(lines_after_offset) >= lines_to_find: eof_reached = False break all_lines = lines_before_offset + lines_after_offset start = max(0, len(lines_before_offset) + scroll_lines) if start + lines_to_return - 1 < len(all_lines): result = all_lines[start:start + lines_to_return] else: result = all_lines[-lines_to_return + 1:] # print("returning %s lines" % (len(result))) # if len(result) > 0: # print("returning %s %d -> %d" % (result[0].line(), result[0].byte_offset(), result[0].byte_end())) return result def byte_count(self) -> int: size = os.stat(self._file).st_size if self._file_size != size: # self.file_size_changed.emit(str(size)) self._file_size = size return size def write_line(self, line: str): with open(self._file, 'a+b') as f: f.write(line.encode("utf8")) if not line.endswith("\n"): f.write("\n".encode("utf8")) def truncate(self): with open(self._file, 'a') as f: f.truncate(0)