import math import re import threading import time from typing import List, Optional from PyQt6.QtGui import QBrush, QColor, QPen from highlight_regex import HighlightRegex from line import Line import os from settings import Settings class LogFileModel: _query_highlight: Optional[HighlightRegex] = None def __init__(self, file: str, settings: Settings): self.settings = settings self._file = os.path.realpath(file) self._lock = threading.RLock() self.highlights = [ HighlightRegex( re.compile("ERROR"), brush=QBrush(QColor(220, 112, 122)), pen=QPen(QColor(0, 0, 0)), brush_full_line=QBrush(QColor(255, 112, 122)) ) ] def get_file(self): return self._file def __str__(self): return self._file def get_query_highlight(self): return self._query_highlight def set_query_highlight(self, regex: Optional[re.Pattern] = None): if regex: self._query_highlight = HighlightRegex( regex, brush=QBrush(QColor(255, 255, 0)) ) else: self._query_highlight = None def get_tab_name(self): file_name = os.path.basename(self._file) if len(file_name) > 35: file_name = file_name[:15] + "..." + file_name[-15:] return file_name def read_range(self, start_byte: int, end_byte: int): with self._lock: with open(self._file, 'rb') as f: f.seek(start_byte) bytes = f.read(end_byte - start_byte) return bytes.decode("utf8", errors="ignore") def data(self, byte_offset, scroll_lines, lines) -> List[Line]: # print("data(%s, %s, %s)" % (byte_offset, scroll_lines, lines)) lines_before_offset: List[Line] = [] lines_after_offset: List[Line] = [] lines_to_find = lines + abs(scroll_lines) lines_to_return = math.ceil(lines) # start = time.time() with self._lock: # print("data lock acquision %.4f" % (time.time() -start)) # TODO handle lines longer than 4096 bytes # TODO abort file open after a few secons: https://docs.python.org/3/library/signal.html#example with open(self._file, 'rb') as f: offset = min(byte_offset, self.byte_count()) offset = max(0, offset - self.settings.max_line_length()) # print("offset: %s" % (offset)) eof_reached = True f.seek(offset) while l := f.readline(): new_offset = f.tell() line = Line(offset, new_offset, l.decode("utf8", errors="ignore").replace("\r", "").replace("\n", "")) # print("%s %s" %(line.byte_offset(), line.line())) if offset < byte_offset: lines_before_offset.append(line) else: lines_after_offset.append(line) offset = f.tell() if len(lines_after_offset) >= lines_to_find: eof_reached = False break all_lines = lines_before_offset + lines_after_offset start = max(0, len(lines_before_offset) + scroll_lines) if start + lines_to_return - 1 < len(all_lines): result = all_lines[start:start + lines_to_return] else: result = all_lines[-lines_to_return + 1:] # print("returning %s lines" % (len(result))) return result def byte_count(self) -> int: return os.stat(self._file).st_size def write_line(self, line: str): with open(self._file, 'a+b') as f: f.write(line.encode("utf8")) if not line.endswith("\n"): f.write("\n".encode("utf8")) def truncate(self): with open(self._file, 'a') as f: f.truncate(0)