Compare commits

...

2 Commits

4 changed files with 328 additions and 2 deletions

View File

View File

@@ -0,0 +1,273 @@
import argparse
import enum
import signal, os
import logging
import signal
import time
from io import TextIOWrapper
from typing import List
from PySide6 import QtCore, QtGui
from PySide6.QtGui import QPaintEvent, QPainter, QFont, QFontMetrics, QColor
from PySide6.QtWidgets import QApplication, QMainWindow, QWidget, QStatusBar
from PySide6.QtCore import QTimer, QPoint, Qt
import sys
from src.pluginregistry import PluginRegistry
import gettext
__version__ = '0.2.1'
from src.i18n import _
gettext.install('krowlog', 'locale')
logging.basicConfig(level=logging.INFO)
log = logging.getLogger("main")
def register_signal_handler():
signal.signal(signal.SIGINT, stop_signal)
signal.signal(signal.SIGTERM, stop_signal)
def stop_signal(signum, _stackframe):
""" Handle terminate signal """
try:
log.info("Terminate signal received. %s", signum)
QtCore.QCoreApplication.quit()
except Exception:
log.exception("Exception occurred while terminating")
sys.exit(1)
sys.exit(0)
MAX_LINE_LENGTH = 4096
logging.basicConfig(level=logging.INFO)
log = logging.getLogger("main")
class FileWithTimeout:
def __init__(self):
pass
def handler(signum, frame):
signame = signal.Signals(signum).name
print(f'Signal handler called with signal {signame} ({signum})')
raise OSError("Couldn't open device!")
def open(path: str, timeout_in_seconds: int = 10, mode="rb", encoding=None) -> TextIOWrapper:
# Set the signal handler and a 5-second alarm
signal.signal(signal.SIGALRM, FileWithTimeout.handler)
signal.alarm(timeout_in_seconds)
fd = open(path, mode=mode, encoding=encoding)
signal.alarm(0) # Disable the alarm
return fd
class MainWindow(QMainWindow):
def __init__(self, *args, **kwargs):
super(MainWindow, self).__init__(*args, **kwargs)
self.setWindowTitle(_("KrowLog"))
self.setMinimumWidth(600)
self.setMinimumHeight(480)
bigger_text = BiggerText()
self.setCentralWidget(bigger_text)
self.status_bar = QStatusBar(self)
self.setStatusBar(self.status_bar)
self.status_bar.setAutoFillBackground(True)
self.status_bar.showMessage("bla blub")
def apply_time_workaround():
# workaround to make signals work in QT apps.
# They do not work out of the box, because the main thread
# is running in C++ code once app.exec() is executed
# Forcing an empty lambda to be executed periodically gives
# control back to python and allows python to react to signals
timer = QTimer()
timer.timeout.connect(lambda: None)
timer.start(100)
class LineType(enum.IntEnum):
Full = 1
Begin = 2
Middle = 3
End = 4
class Line:
def __init__(self, byte_offset: int, byte_end: int, text: str, type: LineType = LineType.Full):
"""
:type byte_offset: int the offset of the first byte of this line
:type byte_end: int the offset of the last byte of this line
:type text: str the decoded text
:type continued: bool True if the previous line was too long and has been split
"""
self._byte_offset = byte_offset
self._byte_end = byte_end
self._text = text
self._type = type
def byte_offset(self) -> int:
return self._byte_offset
def byte_end(self) -> int:
return self._byte_end
def text(self) -> str:
return self._text
def type(self) -> LineType:
return self._type
def __str__(self):
return "%s (%d->%d %s)" % (self._text, self._byte_offset, self._byte_end, self._type.name)
def __repr__(self):
return "%s (%d->%d %s)" % (repr(self._text), self._byte_offset, self._byte_end, self._type.name)
def __eq__(self, other):
if not isinstance(other, Line):
# don't attempt to compare against unrelated types
return NotImplemented
return (self._byte_offset == other._byte_offset
and self._byte_end == other._byte_end
and self._text == other._text
and self._type == other._type)
def __hash__(self):
# necessary for instances to behave sanely in dicts and sets.
return hash((self._byte_offset, self._byte_end, self._text, self._type))
class FileModel:
BUFFER_SIZE = 2 ** 16 # for production use this should be at least 4096, better 64kb
def __init__(self, file: str):
self.file = file
def _read_line(self, file: TextIOWrapper, read_offset: int, max_line_length: int, encoding: str,
previous_line_type: LineType) -> Line | None:
file.seek(read_offset)
buffer: str = file.read(self.BUFFER_SIZE)
if not buffer:
# end of file reached
return None
while True:
pos_of_newline = buffer.find(b"\n")
if pos_of_newline > max_line_length:
start_of_line = read_offset
end_of_line = read_offset + max_line_length - 1
line = buffer[0:max_line_length]
decoded_line = line.decode(encoding, errors="replace")
line_type = LineType.Begin if previous_line_type == LineType.End or previous_line_type == LineType.Full else LineType.Middle
return Line(start_of_line, end_of_line, decoded_line, line_type)
elif pos_of_newline >= 0:
start_of_line = read_offset
end_of_line = read_offset + pos_of_newline
line = buffer[0:pos_of_newline + 1]
decoded_line = line.decode(encoding, errors="replace")
line_type = LineType.Full if previous_line_type == LineType.End or previous_line_type == LineType.Full else LineType.End
return Line(start_of_line, end_of_line, decoded_line, line_type)
else:
# line does not end in this buffer
# read the next chunk and stitch the line together
raise "not yet implemented"
def read(self, file_offset: int, lines_to_read: int, max_line_length=512, encoding="utf8") -> [Line]:
lines: list[Line] = []
with FileWithTimeout.open(self.file, 5, 'rb') as f:
read_offset = max(0, file_offset - max_line_length * 6) # factor 6 is due to multibyte characters in utf8
f.seek(read_offset)
previous_line_type = LineType.Full
while len(lines) < lines_to_read:
l: Line | None = self._read_line(f, read_offset, max_line_length, encoding, previous_line_type)
if l is None:
break
if file_offset <= l.byte_end():
lines.append(l)
read_offset = l.byte_end() + 1
previous_line_type = l.type()
return lines
class BiggerText(QWidget):
def __init__(self, ):
super(BiggerText, self).__init__()
# font ="Andale Mono"
# font = "JetBrains Mono"
# font = "Monospace" # not found
# font = "ZedMono" # is not found
# font = "Noto Sans Mono"
font = "Noto Color Emoji"
font_size = 20
qfont = QFont(font, font_size)
# qfont.setStyleHint(QFont.StyleHint.Monospace)
self.qfont = qfont
def mousePressEvent(self, e: QtGui.QMouseEvent) -> None:
if e.buttons() == Qt.MouseButton.LeftButton and e.modifiers() == Qt.KeyboardModifier.NoModifier:
offset = self.to_byte_offset(e.pos())
return
def to_byte_offset(self, pos: QPoint):
line = self.y_pos_to_line(pos.y())
def y_pos_to_line(self, y: int) -> int:
return int(y / self.char_height)
def paintEvent(self, event: QPaintEvent) -> None:
start_ns = time.process_time_ns()
painter = QPainter(self)
font = painter.font()
font.setPointSize(20)
painter.setFont(font)
font_metric: QFontMetrics = painter.fontMetrics()
self.char_height = font_metric.height()
lines_to_render = [
"im0 ひらがな 王 フーバー 🔴🟢 7²",
"iiiiiiiiii",
"",
"12345678",
"nonspacing marks:",
"next line consists of a%CC%88",
"äääääääääääääääääääääääääääääää|",
"アンドレアス",
"アンドレアス"
]
painter.setPen(QColor(0, 0, 0))
line_on_screen = 1
for line in lines_to_render:
painter.drawText(QPoint(0, line_on_screen * char_height), line)
line_on_screen = line_on_screen + 1
painter.end()
if __name__ == "__main__":
app = QApplication(sys.argv)
apply_time_workaround()
window = MainWindow()
window.show()
register_signal_handler()
app.exec()

View File

@@ -0,0 +1,53 @@
import unittest
from src.new_big_text.bigger_text import FileModel, Line, LineType
class FileModelTestCase(unittest.TestCase):
def test_read_lines_with_offset_0(self):
fm = FileModel("testdata/readlines.txt")
actual_lines = fm.read(file_offset=0, lines_to_read=5, max_line_length=16, encoding="utf8")
self.assertEqual([
Line(0, 1, '1\n'),
Line(2, 4, '12\n'),
Line(5, 8, '123\n'),
Line(9, 13, '1234\n'),
Line(14, 19, '12345\n')
],
actual_lines)
def test_read_lines_with_offset_in_middle_of_line(self):
fm = FileModel("testdata/readlines.txt")
file_offset = "1\n12\n123\n".find("123") + 1
actual_lines = fm.read(
file_offset=file_offset, # at char 2 in line 3
lines_to_read=5,
max_line_length=16,
encoding="utf8")
self.assertEqual([
Line(5, 8, '123\n'),
Line(9, 13, '1234\n'),
Line(14, 19, '12345\n'),
Line(20, 26, '123456\n'),
Line(27, 34, '1234567\n')
],
actual_lines)
def test_read_long_line__buffer_larger_than_line(self):
fm = FileModel("testdata/longLines.txt")
fm.BUFFER_SIZE = 512
actual_lines = fm.read(
file_offset=0,
lines_to_read=3,
max_line_length=10,
encoding="utf8"
)
self.assertEqual([
Line(0, 9, "1aaaaaaaa-", LineType.Begin),
Line(10, 19, "bbbbbbbbb-", LineType.Middle),
Line(20, 27, "ccccccc\n", LineType.End),
], actual_lines)
if __name__ == '__main__':
unittest.main()

View File

@@ -22,8 +22,6 @@ class LogFileModel:
range_start = 0 range_start = 0
range_end = -1 range_end = -1
_line_cache = {}
def __init__(self, file: str, settings: Settings, original_file: str = False): def __init__(self, file: str, settings: Settings, original_file: str = False):
""" """
:param file: :param file:
@@ -34,6 +32,7 @@ class LogFileModel:
self._file = os.path.realpath(file) self._file = os.path.realpath(file)
self._original_file = os.path.realpath(original_file) if original_file else self._file self._original_file = os.path.realpath(original_file) if original_file else self._file
self._file_name = os.path.basename(self._original_file) self._file_name = os.path.basename(self._original_file)
self._line_cache = {}
def highlighters(self): def highlighters(self):
all_highlighters = Highlighting.read_config(self.settings) all_highlighters = Highlighting.read_config(self.settings)
@@ -137,6 +136,7 @@ class LogFileModel:
return re.match(r"\w", char) is not None return re.match(r"\w", char) is not None
def prune_cache(self, range_start: int, range_end: int): def prune_cache(self, range_start: int, range_end: int):
print(f"cache size: {len(self._line_cache.keys())}")
for key in list(self._line_cache.keys()): for key in list(self._line_cache.keys()):
line = self._line_cache[key] line = self._line_cache[key]
if range_start > line.byte_end() or line.byte_offset() > range_end: if range_start > line.byte_end() or line.byte_offset() > range_end: