Files
krowlog/logFileModel.py

164 lines
6.2 KiB
Python

import math
import re
import threading
import time
from typing import List, Optional
from PyQt6.QtCore import pyqtSignal
from highlight_regex import HighlightRegex
from highlighting import Highlighting
from line import Line
import os
from settings import Settings
class LogFileModel:
_query_highlight: Optional[HighlightRegex] = None
file_size_changed = pyqtSignal(str)
"""Fires when the file size changed. **Note:** uses strings,
because int in Qt signal are limited to 32bit."""
_file_size = -1
def __init__(self, file: str, settings: Settings):
self.settings = settings
self._file = os.path.realpath(file)
# self._lock = threading.RLock()
def highlighters(self):
return Highlighting.read_config(self.settings)
def get_file(self):
return self._file
def __str__(self):
return self._file
def get_query_highlight(self):
return self._query_highlight
def clear_query_highlight(self):
self._query_highlight = None
def set_query_highlight(self, query: str, ignore_case: bool, is_regex: bool):
self._query_highlight = HighlightRegex(
query=query,
ignore_case=ignore_case,
is_regex=is_regex,
hit_background_color="ffff00")
def get_tab_name(self):
file_name = os.path.basename(self._file)
if len(file_name) > 35:
file_name = file_name[:15] + "..." + file_name[-15:]
return file_name
def read_range(self, start_byte: int, end_byte: int):
# with self._lock:
if True:
with open(self._file, 'rb') as f:
f.seek(start_byte)
bytes = f.read(end_byte - start_byte)
return bytes.decode("utf8", errors="ignore")
def write_range(self, start_byte: int, end_byte: int, file: str):
# print("write range: %d - %d -> %s" % (start_byte, end_byte, file))
# with self._lock, open(self._file, 'rb') as source, open(file, "w+b") as target:
with open(self._file, 'rb') as source, open(file, "w+b") as target:
offset = start_byte
source.seek(offset)
while offset < end_byte:
new_offset = min(offset + 1024 * 1024, end_byte)
buffer_size = new_offset - offset
buffer = source.read(buffer_size)
target.write(buffer)
offset = new_offset
def read_word_at(self, byte_offset: int) -> (str, int, int):
lines = self.data(byte_offset, 0, 1)
if len(lines) == 0:
return ("", -1, -1)
line: Line = lines[0]
if not lines[0].includes_byte(byte_offset):
return ("", -1, -1)
offset_in_line = byte_offset - line.byte_offset()
current_char = line.line()[line.byte_index_to_char_index(offset_in_line)]
if not self._is_word_char(current_char):
return (current_char, byte_offset, byte_offset + 1)
start_in_line = byte_offset - line.byte_offset()
while start_in_line - 1 >= 0 and self._is_word_char(line.line()[start_in_line - 1]):
start_in_line = start_in_line - 1
end_in_line = byte_offset - line.byte_offset()
while end_in_line < len(line.line()) and self._is_word_char(line.line()[end_in_line]):
end_in_line = end_in_line + 1
start_byte = start_in_line + line.byte_offset()
end_byte = end_in_line + line.byte_offset()
return (line.line()[start_in_line:end_in_line], start_byte, end_byte)
def _is_word_char(self, char: str) -> bool:
return re.match("\w", char)
def data(self, byte_offset: int, scroll_lines: int, lines: int) -> List[Line]:
# print("data(%s, %s, %s)" % (byte_offset, scroll_lines, lines))
lines_before_offset: List[Line] = []
lines_after_offset: List[Line] = []
lines_to_find = lines + abs(scroll_lines)
lines_to_return = math.ceil(lines)
# with self._lock:
if True:
# TODO handle lines longer than 4096 bytes
# TODO abort file open after a few secons: https://docs.python.org/3/library/signal.html#example
with open(self._file, 'rb') as f:
offset = min(byte_offset, self.byte_count())
# print("offset: %s byte_count: %d" % (offset, self.byte_count()))
offset = max(0, offset - self.settings.max_line_length())
eof_reached = True
f.seek(offset)
while l := f.readline():
new_offset = f.tell()
line = Line(offset, new_offset,
l.decode("utf8", errors="ignore").replace("\r", "").replace("\n", ""))
# print("%s %s %s" %(line.byte_offset(), line.line(), line.byte_end()))
if line.byte_end() <= byte_offset: # line.byte_end() returns the end byte +1
lines_before_offset.append(line)
else:
lines_after_offset.append(line)
offset = f.tell()
if len(lines_after_offset) >= lines_to_find:
eof_reached = False
break
all_lines = lines_before_offset + lines_after_offset
start = max(0, len(lines_before_offset) + scroll_lines)
if start + lines_to_return - 1 < len(all_lines):
result = all_lines[start:start + lines_to_return]
else:
result = all_lines[-lines_to_return + 1:]
# print("returning %s lines" % (len(result)))
# if len(result) > 0:
# print("returning %s %d -> %d" % (result[0].line(), result[0].byte_offset(), result[0].byte_end()))
return result
def byte_count(self) -> int:
size = os.stat(self._file).st_size
if self._file_size != size:
# self.file_size_changed.emit(str(size))
self._file_size = size
return size
def write_line(self, line: str):
with open(self._file, 'a+b') as f:
f.write(line.encode("utf8"))
if not line.endswith("\n"):
f.write("\n".encode("utf8"))
def truncate(self):
with open(self._file, 'a') as f:
f.truncate(0)