Files
krowlog/logFileModel.py
2021-10-26 18:51:32 +02:00

77 lines
2.7 KiB
Python

import math
import threading
from typing import List
from line import Line
import os
from settings import Settings
class LogFileModel:
_lock = threading.RLock()
def __init__(self, file:str, settings: Settings):
self.settings = settings
self._file = os.path.realpath(file)
def get_file(self):
return self._file
def __str__(self):
return self._file
def get_tab_name(self):
file_name = os.path.basename(self._file)
if len(file_name) > 35:
file_name = file_name[:15] + "..." + file_name[-15:]
return file_name
def read_range(self, start_byte: int, end_byte: int):
with self._lock:
with open(self._file, 'rb') as f:
f.seek(start_byte)
bytes = f.read(end_byte - start_byte)
return bytes.decode("utf8", errors="ignore")
def data(self, byte_offset, scroll_lines, lines) -> List[Line]:
#print("data(%s, %s, %s)" % (byte_offset, scroll_lines, lines))
lines_before_offset: List[Line] = []
lines_after_offset: List[Line] = []
result: List[Line] = []
lines_to_find = lines + abs(scroll_lines)
lines_to_return = math.ceil(lines)
with self._lock:
# TODO handle lines longer than 4096 bytes
with open(self._file, 'rb') as f:
offset = min(byte_offset, self.byte_count())
offset = max(0, offset - self.settings.max_line_length())
#print("offset: %s" % (offset))
eof_reached = True
f.seek(offset)
while l := f.readline():
new_offset = f.tell()
line = Line(offset, new_offset, l.decode("utf8", errors="ignore").replace("\r", "").replace("\n", ""))
# print("%s %s" %(line.byte_offset(), line.line()))
if offset < byte_offset:
lines_before_offset.append(line)
else:
lines_after_offset.append(line)
offset = f.tell()
if len(lines_after_offset) >= lines_to_find:
eof_reached = False
break
all_lines = lines_before_offset + lines_after_offset
start = max(0, len(lines_before_offset) + scroll_lines)
if start + lines_to_return-1 < len(all_lines):
result = all_lines[start:start+lines_to_return]
else:
result = all_lines[-lines_to_return+1:]
#print("returning %s lines" % (len(result)))
return result
def byte_count(self) -> int:
return os.stat(self._file).st_size