Files
krowlog/src/ui/bigtext/logFileModel.py
Andreas Huber 61132d242f fix: graphemes are not correctly highlighted
Graphemes don't all have the same width, not even when you use a monospace font.
For latin characters it usually works find to assume the same width. But emojis,
japanese or chinese characters have have different width. There are even some
ultra wide characters like 𒐫 or ﷽. There is also a thing
called 'half-width' character. E.g. the japanese 'a' can be ア or ア.

Fixed by actually computing the width of graphemes and using pixel.
2025-03-24 17:49:27 +01:00

216 lines
8.6 KiB
Python

import math
import re
from typing import List, Optional
from PySide6.QtCore import Signal
from src.ui.bigtext.highlight_regex import HighlightRegex
from src.ui.bigtext.highlighting import Highlighting
from src.ui.bigtext.line import Line
import os
from src.settings.settings import Settings
from functools import lru_cache
class LogFileModel:
_query_highlight: Optional[HighlightRegex] = None
file_size_changed = Signal()
"""Fires when the file size changed. **Note:** uses strings,
because int in Qt signal are limited to 32bit."""
_file_size = -1
range_start = 0
range_end = -1
def __init__(self, file: str, settings: Settings, original_file: str = False):
"""
:param file:
:param settings:
:param original_file: used in the filter widget to denote the original file, the one being filtered, because 'file' points to the tmp file
"""
self.settings = settings
self._file = os.path.realpath(file)
self._original_file = os.path.realpath(original_file) if original_file else self._file
self._file_name = os.path.basename(self._original_file)
self._line_cache = {}
def highlighters(self):
all_highlighters = Highlighting.read_config(self.settings)
active_highlighters = []
for h in all_highlighters:
if h.is_active() and h.file_type_matches(self._file_name):
active_highlighters.append(h)
return active_highlighters
def get_file(self):
return self._file
def get_original_file(self):
return self._original_file
def __str__(self):
return self._file
def setRange(self, range_start: int, range_end: int):
self.range_start = range_start
self.range_end = range_end
def get_query_highlight(self) -> Optional[HighlightRegex]:
if not self.settings.session.getboolean("general", "highlight_search_term"):
return None
return self._query_highlight
def clear_query_highlight(self):
self._query_highlight = None
def set_query_highlight(self, query: str, ignore_case: bool, is_regex: bool):
self._query_highlight = HighlightRegex(
query=query,
ignore_case=ignore_case,
is_regex=is_regex,
hit_background_color="ffff00")
def get_tab_name(self) -> str:
file_name = os.path.basename(self._file)
if len(file_name) > 35:
file_name = file_name[:15] + "..." + file_name[-15:]
return file_name
def read_range(self, start_byte: int, end_byte: int) -> str:
# with self._lock:
if True:
with open(self._file, 'rb') as f:
f.seek(start_byte)
bytes = f.read(end_byte - start_byte)
return bytes.decode("utf8", errors="ignore")
def write_range(self, start_byte: int, end_byte: int, file: str):
# print("write range: %d - %d -> %s" % (start_byte, end_byte, file))
with open(self._file, 'rb') as source, open(file, "w+b") as target:
offset = start_byte
source.seek(offset)
while offset < end_byte:
new_offset = min(offset + 1024 * 1024, end_byte)
buffer_size = new_offset - offset
buffer = source.read(buffer_size)
target.write(buffer)
offset = new_offset
def get_line_start_at(self, byte_offset: int) -> int:
lines = self.data(byte_offset, 0, 1, 0, -1);
if len(lines) == 0:
return 0
return lines[0].byte_offset()
def get_line_end_at(self, byte_offset: int) -> int:
lines = self.data(byte_offset, 0, 1, 0, -1);
if len(lines) == 0:
return 0
return lines[0].byte_end()
def read_word_at(self, byte_offset: int) -> (str, int, int):
lines = self.data(byte_offset, 0, 1, 0, -1)
if len(lines) == 0:
return "", -1, -1
line: Line = lines[0]
if not line.includes_byte(byte_offset):
return "", -1, -1
offset_in_line = byte_offset - line.byte_offset()
char_index = line.byte_index_to_char_index(offset_in_line)
current_char = line.line()[char_index]
# print("read_word: char_index=%s, current_char=%s, line=%s" %(char_index, current_char, line.line()))
if not self._is_word_char(current_char):
return current_char, byte_offset, byte_offset + 1
start_in_line = line.byte_index_to_char_index(byte_offset - line.byte_offset())
while start_in_line - 1 >= 0 and self._is_word_char(line.line()[start_in_line - 1]):
start_in_line = start_in_line - 1
end_in_line = line.byte_index_to_char_index(byte_offset - line.byte_offset())
while end_in_line < len(line.line()) and self._is_word_char(line.line()[end_in_line]):
end_in_line = end_in_line + 1
start_byte = line.char_index_to_byte(start_in_line) + line.byte_offset()
end_byte = line.char_index_to_byte(end_in_line) + line.byte_offset()
return line.line()[start_in_line:end_in_line], start_byte, end_byte
def _is_word_char(self, char: str) -> bool:
return re.match(r"\w", char) is not None
def prune_cache(self, range_start: int, range_end: int):
# print(f"cache size: {len(self._line_cache.keys())}")
for key in list(self._line_cache.keys()):
line = self._line_cache[key]
if range_start > line.byte_end() or line.byte_offset() > range_end:
del self._line_cache[key]
def data(self, byte_offset: int, scroll_lines: int, lines: int, range_start: int, range_end: int) -> List[Line]:
# print("data(%s, %s, %s)" % (byte_offset, scroll_lines, lines))
lines_before_offset: List[Line] = []
lines_after_offset: List[Line] = []
lines_to_find = lines + abs(scroll_lines)
lines_to_return = math.ceil(lines)
# TODO handle lines longer than 4096 bytes
# TODO abort file open after a few seconds: https://docs.python.org/3/library/signal.html#example
with open(self._file, 'rb') as f:
offset = min(byte_offset, self.byte_count())
# print("offset: %s byte_count: %d" % (offset, self.byte_count()))
offset = max(0,
max(range_start - self.settings.max_line_length(), offset - self.settings.max_line_length()))
self.prune_cache(range_start, range_end)
previous_line_is_complete = False
f.seek(offset)
while True:
line: Line | None = self._line_cache.get(offset)
if line is None:
line_bytes = f.readline()
if not line_bytes:
break
new_offset = f.tell()
if 0 <= range_end < new_offset:
break
line = Line(offset, new_offset, line_bytes.decode("utf8", errors="ignore"), line_bytes)
if previous_line_is_complete: # only cache lines when we know they are complete
self._line_cache[offset] = line
offset = new_offset
previous_line_is_complete = True
else:
# print(f"loaded cached line at offset {offset}")
offset = line.byte_end() # line.byte_end() returns the end byte +1
f.seek(offset)
previous_line_is_complete = True
if line.byte_end() <= byte_offset: # line.byte_end() returns the end byte +1
if line.byte_offset() >= range_start: # only add if in range
lines_before_offset.append(line)
else:
lines_after_offset.append(line)
if len(lines_after_offset) >= lines_to_find:
break
all_lines = lines_before_offset + lines_after_offset
start = max(0, len(lines_before_offset) + scroll_lines)
if start + lines_to_return - 1 < len(all_lines):
result = all_lines[start:start + lines_to_return]
else:
result = all_lines[-lines_to_return + 1:]
# print("returning %s lines" % (len(result)))
# if len(result) > 0:
# print("returning %s %d -> %d" % (result[0].line(), result[0].byte_offset(), result[0].byte_end()))
return result
def byte_count(self) -> int:
size = os.stat(self._file).st_size
if self._file_size != size:
# self.file_size_changed.emit(str(size))
self._file_size = size
return size
def truncate(self):
with open(self._file, 'a') as f:
f.truncate(0)