import math import threading from typing import List from line import Line import os from settings import Settings class LogFileModel: _lock = threading.RLock() def __init__(self, file): self._file = file def data(self, byte_offset, scroll_lines, lines) -> List[Line]: #print("data(%s, %s, %s)" % (byte_offset, scroll_lines, lines)) lines_before_offset: List[Line] = [] lines_after_offset: List[Line] = [] result: List[Line] = [] lines_to_find = lines + abs(scroll_lines) lines_to_return = math.ceil(lines) with self._lock: # TODO handle lines longer than 4096 bytes with open(self._file, 'rb') as f: offset = min(byte_offset, self.byte_count()) offset = max(0, offset - Settings.max_line_length()) #print("offset: %s" % (offset)) eof_reached = True f.seek(offset) while l := f.readline(): new_offset = f.tell() line = Line(offset, new_offset, l.decode("utf8", errors="ignore").replace("\r", "").replace("\n", "")) # print("%s %s" %(line.byte_offset(), line.line())) if offset < byte_offset: lines_before_offset.append(line) else: lines_after_offset.append(line) offset = f.tell() if len(lines_after_offset) >= lines_to_find: eof_reached = False break all_lines = lines_before_offset + lines_after_offset start = max(0, len(lines_before_offset) + scroll_lines) if start + lines_to_return-1 < len(all_lines): result = all_lines[start:start+lines_to_return] else: result = all_lines[-lines_to_return+1:] #print("returning %s lines" % (len(result))) return result def byte_count(self) -> int: return os.stat(self._file).st_size