import math import re import threading import time from typing import List, Optional from PyQt6.QtGui import QBrush, QColor, QPen from highlight_regex import HighlightRegex from highlighting import Highlighting from line import Line import os from settings import Settings class LogFileModel: _query_highlight: Optional[HighlightRegex] = None def __init__(self, file: str, settings: Settings): self.settings = settings self._file = os.path.realpath(file) self._lock = threading.RLock() def highlighters(self): return Highlighting.read_config(self.settings) def get_file(self): return self._file def __str__(self): return self._file def get_query_highlight(self): return self._query_highlight def clear_query_highlight(self): self._query_highlight = None def set_query_highlight(self, query: str, ignore_case: bool, is_regex: bool): self._query_highlight = HighlightRegex( query=query, ignore_case=ignore_case, is_regex=is_regex, hit_background_color="ffff00") def get_tab_name(self): file_name = os.path.basename(self._file) if len(file_name) > 35: file_name = file_name[:15] + "..." + file_name[-15:] return file_name def read_range(self, start_byte: int, end_byte: int): with self._lock: with open(self._file, 'rb') as f: f.seek(start_byte) bytes = f.read(end_byte - start_byte) return bytes.decode("utf8", errors="ignore") def write_range(self, start_byte: int, end_byte: int, file: str): print("write range: %d - %d -> %s" % (start_byte, end_byte, file)) with self._lock, open(self._file, 'rb') as source, open(file, "w+b") as target: offset = start_byte source.seek(offset) while offset < end_byte: new_offset = min(offset + 1024 * 1024, end_byte) buffer_size = new_offset - offset buffer = source.read(buffer_size) target.write(buffer) offset = new_offset def data(self, byte_offset, scroll_lines, lines) -> List[Line]: # print("data(%s, %s, %s)" % (byte_offset, scroll_lines, lines)) lines_before_offset: List[Line] = [] lines_after_offset: List[Line] = [] lines_to_find = lines + abs(scroll_lines) lines_to_return = math.ceil(lines) # start = time.time() with self._lock: # print("data lock acquision %.4f" % (time.time() -start)) # TODO handle lines longer than 4096 bytes # TODO abort file open after a few secons: https://docs.python.org/3/library/signal.html#example with open(self._file, 'rb') as f: offset = min(byte_offset, self.byte_count()) offset = max(0, offset - self.settings.max_line_length()) # print("offset: %s" % (offset)) eof_reached = True f.seek(offset) while l := f.readline(): new_offset = f.tell() line = Line(offset, new_offset, l.decode("utf8", errors="ignore").replace("\r", "").replace("\n", "")) # print("%s %s" %(line.byte_offset(), line.line())) if offset < byte_offset: lines_before_offset.append(line) else: lines_after_offset.append(line) offset = f.tell() if len(lines_after_offset) >= lines_to_find: eof_reached = False break all_lines = lines_before_offset + lines_after_offset start = max(0, len(lines_before_offset) + scroll_lines) if start + lines_to_return - 1 < len(all_lines): result = all_lines[start:start + lines_to_return] else: result = all_lines[-lines_to_return + 1:] # print("returning %s lines" % (len(result))) return result def byte_count(self) -> int: return os.stat(self._file).st_size def write_line(self, line: str): with open(self._file, 'a+b') as f: f.write(line.encode("utf8")) if not line.endswith("\n"): f.write("\n".encode("utf8")) def truncate(self): with open(self._file, 'a') as f: f.truncate(0)