From d561facb7e1013142239e179e1530d94144225ed Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Wed, 29 May 2024 19:17:30 +0200 Subject: [PATCH] starting to write a better version of bigtext --- src/new_big_text/__init__.py | 0 src/new_big_text/bigger_text.py | 273 ++++++++++++++++++++++++++++ src/new_big_text/test_file_model.py | 53 ++++++ 3 files changed, 326 insertions(+) create mode 100644 src/new_big_text/__init__.py create mode 100644 src/new_big_text/bigger_text.py create mode 100644 src/new_big_text/test_file_model.py diff --git a/src/new_big_text/__init__.py b/src/new_big_text/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/new_big_text/bigger_text.py b/src/new_big_text/bigger_text.py new file mode 100644 index 0000000..580b0ec --- /dev/null +++ b/src/new_big_text/bigger_text.py @@ -0,0 +1,273 @@ +import argparse +import enum + +import signal, os +import logging +import signal +import time +from io import TextIOWrapper +from typing import List + +from PySide6 import QtCore, QtGui +from PySide6.QtGui import QPaintEvent, QPainter, QFont, QFontMetrics, QColor +from PySide6.QtWidgets import QApplication, QMainWindow, QWidget, QStatusBar +from PySide6.QtCore import QTimer, QPoint, Qt +import sys +from src.pluginregistry import PluginRegistry +import gettext + +__version__ = '0.2.1' + +from src.i18n import _ + +gettext.install('krowlog', 'locale') + +logging.basicConfig(level=logging.INFO) +log = logging.getLogger("main") + + +def register_signal_handler(): + signal.signal(signal.SIGINT, stop_signal) + signal.signal(signal.SIGTERM, stop_signal) + + +def stop_signal(signum, _stackframe): + """ Handle terminate signal """ + try: + log.info("Terminate signal received. %s", signum) + QtCore.QCoreApplication.quit() + except Exception: + log.exception("Exception occurred while terminating") + sys.exit(1) + sys.exit(0) + + +MAX_LINE_LENGTH = 4096 + +logging.basicConfig(level=logging.INFO) +log = logging.getLogger("main") + + +class FileWithTimeout: + def __init__(self): + pass + + def handler(signum, frame): + signame = signal.Signals(signum).name + print(f'Signal handler called with signal {signame} ({signum})') + raise OSError("Couldn't open device!") + + def open(path: str, timeout_in_seconds: int = 10, mode="rb", encoding=None) -> TextIOWrapper: + # Set the signal handler and a 5-second alarm + signal.signal(signal.SIGALRM, FileWithTimeout.handler) + signal.alarm(timeout_in_seconds) + fd = open(path, mode=mode, encoding=encoding) + signal.alarm(0) # Disable the alarm + return fd + + +class MainWindow(QMainWindow): + def __init__(self, *args, **kwargs): + super(MainWindow, self).__init__(*args, **kwargs) + + self.setWindowTitle(_("KrowLog")) + self.setMinimumWidth(600) + self.setMinimumHeight(480) + bigger_text = BiggerText() + self.setCentralWidget(bigger_text) + self.status_bar = QStatusBar(self) + self.setStatusBar(self.status_bar) + self.status_bar.setAutoFillBackground(True) + self.status_bar.showMessage("bla blub") + + +def apply_time_workaround(): + # workaround to make signals work in QT apps. + # They do not work out of the box, because the main thread + # is running in C++ code once app.exec() is executed + # Forcing an empty lambda to be executed periodically gives + # control back to python and allows python to react to signals + timer = QTimer() + timer.timeout.connect(lambda: None) + timer.start(100) + + +class LineType(enum.IntEnum): + Full = 1 + Begin = 2 + Middle = 3 + End = 4 + + +class Line: + def __init__(self, byte_offset: int, byte_end: int, text: str, type: LineType = LineType.Full): + """ + :type byte_offset: int the offset of the first byte of this line + :type byte_end: int the offset of the last byte of this line + :type text: str the decoded text + :type continued: bool True if the previous line was too long and has been split + """ + self._byte_offset = byte_offset + self._byte_end = byte_end + self._text = text + self._type = type + + def byte_offset(self) -> int: + return self._byte_offset + + def byte_end(self) -> int: + return self._byte_end + + def text(self) -> str: + return self._text + + def type(self) -> LineType: + return self._type + + def __str__(self): + return "%s (%d->%d %s)" % (self._text, self._byte_offset, self._byte_end, self._type.name) + + def __repr__(self): + return "%s (%d->%d %s)" % (repr(self._text), self._byte_offset, self._byte_end, self._type.name) + + def __eq__(self, other): + if not isinstance(other, Line): + # don't attempt to compare against unrelated types + return NotImplemented + + return (self._byte_offset == other._byte_offset + and self._byte_end == other._byte_end + and self._text == other._text + and self._type == other._type) + + def __hash__(self): + # necessary for instances to behave sanely in dicts and sets. + return hash((self._byte_offset, self._byte_end, self._text, self._type)) + + +class FileModel: + BUFFER_SIZE = 2 ** 16 # for production use this should be at least 4096, better 64kb + + def __init__(self, file: str): + self.file = file + + def _read_line(self, file: TextIOWrapper, read_offset: int, max_line_length: int, encoding: str, + previous_line_type: LineType) -> Line | None: + file.seek(read_offset) + buffer: str = file.read(self.BUFFER_SIZE) + if not buffer: + # end of file reached + return None + + while True: + pos_of_newline = buffer.find(b"\n") + if pos_of_newline > max_line_length: + start_of_line = read_offset + end_of_line = read_offset + max_line_length - 1 + line = buffer[0:max_line_length] + + decoded_line = line.decode(encoding, errors="replace") + line_type = LineType.Begin if previous_line_type == LineType.End or previous_line_type == LineType.Full else LineType.Middle + return Line(start_of_line, end_of_line, decoded_line, line_type) + elif pos_of_newline >= 0: + start_of_line = read_offset + end_of_line = read_offset + pos_of_newline + line = buffer[0:pos_of_newline + 1] + + decoded_line = line.decode(encoding, errors="replace") + line_type = LineType.Full if previous_line_type == LineType.End or previous_line_type == LineType.Full else LineType.End + return Line(start_of_line, end_of_line, decoded_line, line_type) + + else: + # line does not end in this buffer + # read the next chunk and stitch the line together + raise "not yet implemented" + + def read(self, file_offset: int, lines_to_read: int, max_line_length=512, encoding="utf8") -> [Line]: + lines: list[Line] = [] + with FileWithTimeout.open(self.file, 5, 'rb') as f: + read_offset = max(0, file_offset - max_line_length * 6) # factor 6 is due to multibyte characters in utf8 + f.seek(read_offset) + + previous_line_type = LineType.Full + while len(lines) < lines_to_read: + l: Line | None = self._read_line(f, read_offset, max_line_length, encoding, previous_line_type) + if l is None: + break + if file_offset <= l.byte_end(): + lines.append(l) + read_offset = l.byte_end() + 1 + previous_line_type = l.type() + + return lines + + +class BiggerText(QWidget): + + def __init__(self, ): + super(BiggerText, self).__init__() + # font ="Andale Mono" + # font = "JetBrains Mono" + # font = "Monospace" # not found + # font = "ZedMono" # is not found + # font = "Noto Sans Mono" + font = "Noto Color Emoji" + font_size = 20 + + qfont = QFont(font, font_size) + # qfont.setStyleHint(QFont.StyleHint.Monospace) + self.qfont = qfont + + def mousePressEvent(self, e: QtGui.QMouseEvent) -> None: + if e.buttons() == Qt.MouseButton.LeftButton and e.modifiers() == Qt.KeyboardModifier.NoModifier: + offset = self.to_byte_offset(e.pos()) + + return + + def to_byte_offset(self, pos: QPoint): + line = self.y_pos_to_line(pos.y()) + + def y_pos_to_line(self, y: int) -> int: + return int(y / self.char_height) + + def paintEvent(self, event: QPaintEvent) -> None: + start_ns = time.process_time_ns() + painter = QPainter(self) + font = painter.font() + font.setPointSize(20) + painter.setFont(font) + + font_metric: QFontMetrics = painter.fontMetrics() + self.char_height = font_metric.height() + + lines_to_render = [ + "im0 ひらがな 王 フーバー 🔴🟢 7²", + "iiiiiiiiii", + "12345678", + "12345678", + "nonspacing marks:", + "next line consists of a%CC%88", + "äääääääääääääääääääääääääääääää|", + "アンドレアス", + "アンドレアス" + ] + + painter.setPen(QColor(0, 0, 0)) + line_on_screen = 1 + for line in lines_to_render: + painter.drawText(QPoint(0, line_on_screen * char_height), line) + line_on_screen = line_on_screen + 1 + + painter.end() + + +if __name__ == "__main__": + app = QApplication(sys.argv) + + apply_time_workaround() + + window = MainWindow() + window.show() + register_signal_handler() + + app.exec() diff --git a/src/new_big_text/test_file_model.py b/src/new_big_text/test_file_model.py new file mode 100644 index 0000000..396d65c --- /dev/null +++ b/src/new_big_text/test_file_model.py @@ -0,0 +1,53 @@ +import unittest + +from src.new_big_text.bigger_text import FileModel, Line, LineType + + +class FileModelTestCase(unittest.TestCase): + def test_read_lines_with_offset_0(self): + fm = FileModel("testdata/readlines.txt") + actual_lines = fm.read(file_offset=0, lines_to_read=5, max_line_length=16, encoding="utf8") + self.assertEqual([ + Line(0, 1, '1\n'), + Line(2, 4, '12\n'), + Line(5, 8, '123\n'), + Line(9, 13, '1234\n'), + Line(14, 19, '12345\n') + ], + actual_lines) + + def test_read_lines_with_offset_in_middle_of_line(self): + fm = FileModel("testdata/readlines.txt") + file_offset = "1\n12\n123\n".find("123") + 1 + actual_lines = fm.read( + file_offset=file_offset, # at char 2 in line 3 + lines_to_read=5, + max_line_length=16, + encoding="utf8") + self.assertEqual([ + Line(5, 8, '123\n'), + Line(9, 13, '1234\n'), + Line(14, 19, '12345\n'), + Line(20, 26, '123456\n'), + Line(27, 34, '1234567\n') + ], + actual_lines) + + def test_read_long_line__buffer_larger_than_line(self): + fm = FileModel("testdata/longLines.txt") + fm.BUFFER_SIZE = 512 + actual_lines = fm.read( + file_offset=0, + lines_to_read=3, + max_line_length=10, + encoding="utf8" + ) + self.assertEqual([ + Line(0, 9, "1aaaaaaaa-", LineType.Begin), + Line(10, 19, "bbbbbbbbb-", LineType.Middle), + Line(20, 27, "ccccccc\n", LineType.End), + ], actual_lines) + + +if __name__ == '__main__': + unittest.main()