import math import re import threading import time from typing import List, Optional from PyQt6.QtCore import pyqtSignal from highlight_regex import HighlightRegex from highlighting import Highlighting from line import Line import os from settings import Settings class LogFileModel: _query_highlight: Optional[HighlightRegex] = None file_size_changed = pyqtSignal(str) """Fires when the file size changed. **Note:** uses strings, because int in Qt signal are limited to 32bit.""" _file_size = -1 def __init__(self, file: str, settings: Settings): self.settings = settings self._file = os.path.realpath(file) # self._lock = threading.RLock() def highlighters(self): return Highlighting.read_config(self.settings) def get_file(self): return self._file def __str__(self): return self._file def get_query_highlight(self): return self._query_highlight def clear_query_highlight(self): self._query_highlight = None def set_query_highlight(self, query: str, ignore_case: bool, is_regex: bool): self._query_highlight = HighlightRegex( query=query, ignore_case=ignore_case, is_regex=is_regex, hit_background_color="ffff00") def get_tab_name(self): file_name = os.path.basename(self._file) if len(file_name) > 35: file_name = file_name[:15] + "..." + file_name[-15:] return file_name def read_range(self, start_byte: int, end_byte: int): # with self._lock: if True: with open(self._file, 'rb') as f: f.seek(start_byte) bytes = f.read(end_byte - start_byte) return bytes.decode("utf8", errors="ignore") def write_range(self, start_byte: int, end_byte: int, file: str): # print("write range: %d - %d -> %s" % (start_byte, end_byte, file)) # with self._lock, open(self._file, 'rb') as source, open(file, "w+b") as target: with open(self._file, 'rb') as source, open(file, "w+b") as target: offset = start_byte source.seek(offset) while offset < end_byte: new_offset = min(offset + 1024 * 1024, end_byte) buffer_size = new_offset - offset buffer = source.read(buffer_size) target.write(buffer) offset = new_offset def data(self, byte_offset, scroll_lines, lines) -> List[Line]: # print("data(%s, %s, %s)" % (byte_offset, scroll_lines, lines)) lines_before_offset: List[Line] = [] lines_after_offset: List[Line] = [] lines_to_find = lines + abs(scroll_lines) lines_to_return = math.ceil(lines) start = time.time() # with self._lock: if True: duration = time.time() - start if duration > 10: print("data lock acquision %.4f" % ()) # TODO handle lines longer than 4096 bytes # TODO abort file open after a few secons: https://docs.python.org/3/library/signal.html#example with open(self._file, 'rb') as f: offset = min(byte_offset, self.byte_count()) # print("offset: %s byte_count: %d" % (offset, self.byte_count())) offset = max(0, offset - self.settings.max_line_length()) eof_reached = True f.seek(offset) while l := f.readline(): new_offset = f.tell() line = Line(offset, new_offset, l.decode("utf8", errors="ignore").replace("\r", "").replace("\n", "")) # print("%s %s" %(line.byte_offset(), line.line())) if offset < byte_offset: lines_before_offset.append(line) else: lines_after_offset.append(line) offset = f.tell() if len(lines_after_offset) >= lines_to_find: eof_reached = False break all_lines = lines_before_offset + lines_after_offset start = max(0, len(lines_before_offset) + scroll_lines) if start + lines_to_return - 1 < len(all_lines): result = all_lines[start:start + lines_to_return] else: result = all_lines[-lines_to_return + 1:] # print("returning %s lines" % (len(result))) return result def byte_count(self) -> int: size = os.stat(self._file).st_size if self._file_size != size: # self.file_size_changed.emit(str(size)) self._file_size = size return size def write_line(self, line: str): with open(self._file, 'a+b') as f: f.write(line.encode("utf8")) if not line.endswith("\n"): f.write("\n".encode("utf8")) def truncate(self): with open(self._file, 'a') as f: f.truncate(0)