import threading from typing import List import os from line import Line from settings import Settings class LogFileModel: _lock = threading.RLock() def __init__(self, file): self._file = file def data(self, byte_offset, scroll_lines, lines) -> List[Line]: #print("data(%s, %s, %s)" % (byte_offset, scroll_lines, lines)) lines_before_offset: List[Line] = [] lines_after_offset: List[Line] = [] result: List[Line] = [] lines_to_find = lines + abs(scroll_lines) with self._lock: # TODO handle lines longer than 4096 bytes with open(self._file, 'rb') as f: offset = min(byte_offset, self.byte_count()) offset = max(0, offset - Settings.max_line_length()) #print("offset: %s" % (offset)) eof_reached = True f.seek(offset) while l := f.readline(): new_offset = f.tell() line = Line(offset, new_offset, l.decode("utf8", errors="ignore").replace("\r", "").replace("\n", "")) # print("%s %s" %(line.byte_offset(), line.line())) if offset < byte_offset: lines_before_offset.append(line) else: lines_after_offset.append(line) offset = f.tell() if len(lines_after_offset) >= lines_to_find: eof_reached = False break if scroll_lines > 0: result = result + lines_before_offset[-scroll_lines:] result = result + lines_after_offset elif eof_reached: # we always return the number of requested lines even if we reach the end of the file lines_to_return_from_before_offset = int(lines_to_find - len(lines_after_offset)); #print(lines_to_return_from_before_offset) if lines_to_return_from_before_offset > 0: result = result + lines_before_offset[-lines_to_return_from_before_offset:] result = result + lines_after_offset else: result = result + lines_after_offset[-scroll_lines:] #print("returning %s lines" % (len(result))) return result def byte_count(self) -> int: return os.stat(self._file).st_size