import argparse import enum import signal, os import logging import signal import time from io import TextIOWrapper from typing import List from PySide6 import QtCore, QtGui from PySide6.QtGui import QPaintEvent, QPainter, QFont, QFontMetrics, QColor, QBrush, QWheelEvent from PySide6.QtWidgets import QApplication, QMainWindow, QWidget, QStatusBar, QGridLayout, QSizePolicy, QScrollBar from PySide6.QtCore import QTimer, QPoint, Qt, QRect, QLine, Slot import sys from src.pluginregistry import PluginRegistry import gettext __version__ = '0.2.1' from src.i18n import _ from src.ui.bigtext.BigScrollBar import BigScrollBar from src.ui.bigtext.bigtext import InnerBigText gettext.install('krowlog', 'locale') logging.basicConfig(level=logging.INFO) log = logging.getLogger("main") def register_signal_handler(): signal.signal(signal.SIGINT, stop_signal) signal.signal(signal.SIGTERM, stop_signal) def stop_signal(signum, _stackframe): """ Handle terminate signal """ try: log.info("Terminate signal received. %s", signum) QtCore.QCoreApplication.quit() except Exception: log.exception("Exception occurred while terminating") sys.exit(1) sys.exit(0) MAX_LINE_LENGTH = 4096 logging.basicConfig(level=logging.INFO) log = logging.getLogger("main") class FileWithTimeout: def __init__(self): pass def handler(signum, frame): signame = signal.Signals(signum).name print(f'Signal handler called with signal {signame} ({signum})') raise OSError("Couldn't open device!") def open(path: str, timeout_in_seconds: int = 10, mode="rb", encoding=None) -> TextIOWrapper: # Set the signal handler and a 5-second alarm signal.signal(signal.SIGALRM, FileWithTimeout.handler) signal.alarm(timeout_in_seconds) fd = open(path, mode=mode, encoding=encoding) signal.alarm(0) # Disable the alarm return fd class MainWindow(QMainWindow): def __init__(self, *args, **kwargs): super(MainWindow, self).__init__(*args, **kwargs) self.setWindowTitle(_("KrowLog")) self.setMinimumWidth(800) self.setMinimumHeight(880) bigger_text = BiggerText(FileModel("testdata/testset.txt")) self.setCentralWidget(bigger_text) self.status_bar = QStatusBar(self) self.setStatusBar(self.status_bar) self.status_bar.setAutoFillBackground(True) self.status_bar.showMessage("bla blub") def apply_time_workaround(): # workaround to make signals work in QT apps. # They do not work out of the box, because the main thread # is running in C++ code once app.exec() is executed # Forcing an empty lambda to be executed periodically gives # control back to python and allows python to react to signals timer = QTimer() timer.timeout.connect(lambda: None) timer.start(100) class LineType(enum.IntEnum): Full = 1 Begin = 2 Middle = 3 End = 4 class Line: def __init__(self, byte_offset: int, byte_end: int, text: str, bytes: str, type: LineType = LineType.Full): """ :type byte_offset: int the offset of the first byte of this line :type byte_end: int the offset of the last byte of this line :type text: str the decoded text :type continued: bool True if the previous line was too long and has been split """ self._byte_offset = byte_offset self._byte_end = byte_end self._text = text self._bytes = bytes self._type = type def byte_offset(self) -> int: return self._byte_offset def byte_end(self) -> int: return self._byte_end def text(self) -> str: return self._text def bytes(self) -> str: return self._bytes def type(self) -> LineType: return self._type def __str__(self): return "%s (%d->%d %s)" % (self._text, self._byte_offset, self._byte_end, self._type.name) def __repr__(self): return "%s (%d->%d %s)" % (repr(self._text), self._byte_offset, self._byte_end, self._type.name) def __eq__(self, other): if not isinstance(other, Line): # don't attempt to compare against unrelated types return NotImplemented return (self._byte_offset == other._byte_offset and self._byte_end == other._byte_end and self._text == other._text and self._type == other._type) def __hash__(self): # necessary for instances to behave sanely in dicts and sets. return hash((self._byte_offset, self._byte_end, self._text, self._type)) class FileModel: BUFFER_SIZE = 2 ** 16 # for production use this should be at least 4096, better 64kb def __init__(self, file: str): self.file = file def _read_line(self, file: TextIOWrapper, read_offset: int, max_line_length: int, encoding: str, previous_line_type: LineType) -> Line | None: file.seek(read_offset) buffer: str = file.read(self.BUFFER_SIZE) if not buffer: # end of file reached return None while True: pos_of_newline = buffer.find(b"\n") if pos_of_newline > max_line_length: start_of_line = read_offset end_of_line = read_offset + max_line_length - 1 line = buffer[0:max_line_length] decoded_line = line.decode(encoding, errors="replace") line_type = LineType.Begin if previous_line_type == LineType.End or previous_line_type == LineType.Full else LineType.Middle return Line(start_of_line, end_of_line, decoded_line, line, line_type) elif pos_of_newline >= 0: start_of_line = read_offset end_of_line = read_offset + pos_of_newline line = buffer[0:pos_of_newline + 1] decoded_line = line.decode(encoding, errors="replace") line_type = LineType.Full if previous_line_type == LineType.End or previous_line_type == LineType.Full else LineType.End return Line(start_of_line, end_of_line, decoded_line, line, line_type) else: # line does not end in this buffer # read the next chunk and stitch the line together raise "not yet implemented" def read(self, file_offset: int, lines_to_read: int, max_line_length=512, encoding="utf8") -> [Line]: lines: list[Line] = [] with FileWithTimeout.open(self.file, 5, 'rb') as f: read_offset = max(0, file_offset - max_line_length * 6) # factor 6 is due to multibyte characters in utf8 f.seek(read_offset) previous_line_type = LineType.Full while len(lines) < lines_to_read: l: Line | None = self._read_line(f, read_offset, max_line_length, encoding, previous_line_type) if l is None: break if file_offset <= l.byte_end(): lines.append(l) read_offset = l.byte_end() + 1 previous_line_type = l.type() return lines def get_selection(self, byte_start: int, byte_end: int): with FileWithTimeout.open(self.file, 5, 'rb') as f: start = min(byte_start, byte_end) end = max(byte_start, byte_end) f.seek(start) b = f.read(end - start) # print(f"read {end - start } bytes -> {b}") return b.decode("utf-8", errors="replace") class SelectionPos: def __init__(self, index: int, is_in_left_half: bool, num_bytes_of_char: int): self.index = index self.is_in_left_half = is_in_left_half self.num_bytes_of_char = num_bytes_of_char def __repr__(self): return f"{self.index}{'🞀' if self.is_in_left_half else '🞂'}({self.num_bytes_of_char})" def pos(self): return self.index + (0 if self.is_in_left_half else self.num_bytes_of_char) class Selection: def __init__(self, start: SelectionPos = SelectionPos(0, False, 0), end: SelectionPos = SelectionPos(0, False, 0)): self.start = start self.end = end def __repr__(self): return f"{self.start}:{self.end}" def min_byte(self) -> int: return min(self.start.pos(), self.end.pos()) def max_byte(self) -> int: return max(self.start.pos(), self.end.pos()) class BiggerText(QWidget): def __init__(self, model: FileModel): super(BiggerText, self).__init__() self._model = model self.grid = QGridLayout() self.grid.setContentsMargins(0, 0, 0, 0) self.grid.setHorizontalSpacing(0) self.grid.setVerticalSpacing(0) self.setLayout(self.grid) self.v_scroll_bar = BigScrollBar() self.big_text_area = BiggerTextArea(self, model, self.v_scroll_bar) self.big_text_area.setSizePolicy(QSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding)) self.h_scroll_bar = QScrollBar(Qt.Orientation.Horizontal) self.h_scroll_bar.setMinimum(0) self.h_scroll_bar.setMaximum(1) self.h_scroll_bar.valueChanged.connect(self.big_text_area.h_scroll_event) # self.v_scroll_bar.value_changed.connect(self.big_text_area.v_scroll_value_changed) # self.v_scroll_bar.scroll_event.connect(self.big_text_area.v_scroll_event) self.grid.addWidget(self.big_text_area, 0, 1) self.grid.addWidget(self.h_scroll_bar, 1, 1) self.grid.addWidget(self.v_scroll_bar, 0, 2) class BiggerTextArea(QWidget): def __init__(self, parent: BiggerText, model: FileModel, v_scroll_bar: BigScrollBar): super(BiggerTextArea, self).__init__() self.parent = parent self._v_scroll_bar = v_scroll_bar self._left_offset = 0 self.longest_line = 1 self._font_size = 20 self.selection = Selection() self.mouse_pressed = False self._encoding = "utf8" self.file_model: FileModel = model # font ="Andale Mono" # font = "JetBrains Mono" # font = "Monospace" # not found # font = "ZedMono" # is not found #font = "Noto Sans Mono" font = "Noto Color Emoji" qfont = QFont(font, self._font_size) # qfont.setStyleHint(QFont.StyleHint.Monospace) self.qfont = qfont def mousePressEvent(self, e: QtGui.QMouseEvent) -> None: if e.buttons() == Qt.MouseButton.LeftButton and e.modifiers() == Qt.KeyboardModifier.NoModifier: self.selection.start = self.to_byte_offset(e.position()) self.selection.end = self.selection.start self.mouse_pressed = True self.update() def mouseReleaseEvent(self, event): self.mouse_pressed = False def mouseMoveEvent(self, event): if self.mouse_pressed: self.selection.end = self.to_byte_offset(event.position()) #print(f"selection: {self.selection} -> {self.file_model.get_selection(self.selection.min_byte(), self.selection.max_byte())}") self.update() def wheelEvent(self, event: QWheelEvent): direction = 1 if event.angleDelta().y() < 0 else -1 if event.modifiers() == Qt.KeyboardModifier.ControlModifier: self._font_size = max(4, min(50, self._font_size - direction)) self.update() else: # print("wheel event fired :) %s" % (direction)) self.scroll_by_lines(direction * 3) @Slot() def h_scroll_event(self, left_offset: int): self._left_offset = left_offset # print("left_offset: %d" % left_offset) self.update() def to_byte_offset(self, pos: QPoint) -> SelectionPos: line_number = self.y_pos_to_line_number_on_screen(pos.y()) line = self.lines_to_render[line_number] text: str = line.text() text = text.replace("\n", "").replace("\r", "") elided_text = self.elided_text(text, pos.x()) byte_offset = line.byte_offset() + len(elided_text.encode("utf8")) left_x_offset = self.font_metric.horizontalAdvance(elided_text) next_char = "" pos_is_in_left_half = False bytes_of_char = 0 if len(text) > len(elided_text): # has another character next_char = text[len(elided_text)] char_with = self.font_metric.horizontalAdvance(next_char) pos_is_in_left_half = pos.x() < (left_x_offset + char_with / 2) bytes_of_char = len(next_char.encode("utf8")) else: # the position is after the last character / behind the end of the line pass print( f"to_byte_offset({pos.x()}, {pos.y()}) -> {left_x_offset} -- elided_text '{elided_text}' next_char '{next_char}' -> byte_offset {byte_offset} pos_is_in_left_half: {pos_is_in_left_half}") return SelectionPos(byte_offset, pos_is_in_left_half, bytes_of_char) def elided_text(self, text: str, width: int): w = width + self.font_metric.horizontalAdvance("…") elided_text = self.font_metric.elidedText(text + "…", Qt.TextElideMode.ElideRight, w, Qt.TextFlag.TextWrapAnywhere) elided_text = elided_text[0:-1] if elided_text.endswith('…') else elided_text # remove the trailing '…' return elided_text def y_pos_to_line_number_on_screen(self, y: int) -> int: return int(y / self.char_height) def update_longest_line(self, lines: [Line]): for line in lines: width_for_full_line = self.font_metric.horizontalAdvance(line.text()) # print("width_in_chars: %d" % width_in_chars) if self.longest_line < width_for_full_line: self.longest_line = width_for_full_line maximum = max(0, self.longest_line - self.width() + 1) self.parent.h_scroll_bar.setMaximum(round(maximum)) def paintEvent(self, event: QPaintEvent) -> None: start_ns = time.process_time_ns() painter = QPainter(self) font = painter.font() font.setPointSize(self._font_size) painter.setFont(font) # --- self.font_metric = painter.fontMetrics() self.char_height = self.font_metric.height() lines_to_read = self.height() / self.char_height + 1 self.lines_to_render: [Line] = self.file_model.read(0, lines_to_read, 200, self._encoding) self.update_longest_line(self.lines_to_render) painter.setPen(QColor(0, 0, 0)) line_on_screen = 1 for line in self.lines_to_render: x_start = -1 x_end = -1 # selection starts before line if self.selection.min_byte() < line.byte_offset(): x_start = 0 # selection starts in line if line.byte_offset() <= self.selection.min_byte() <= line.byte_end(): left_offset_in_bytes = self.selection.min_byte() - line.byte_offset() bytes = line.bytes()[0:left_offset_in_bytes] chars = bytes.decode(self._encoding, errors="replace") x_start = self.font_metric.horizontalAdvance(chars) #print(f"width({chars}) -> bounding_rect={self.font_metric.boundingRect(chars).width()}px or horizontalAdvance={self.font_metric.horizontalAdvance(chars)}") # selection ends after line if self.selection.max_byte() > line.byte_end(): x_end = self.width() # selection ends in line if line.byte_offset() <= self.selection.max_byte() <= line.byte_end(): left_offset_in_bytes = self.selection.max_byte() - line.byte_offset() bytes = line.bytes()[0:left_offset_in_bytes] x_end = self.font_metric.horizontalAdvance(bytes.decode(self._encoding, errors="replace")) - x_start if x_start >= 0 and x_end >= 0: #print(f"highlighting in line {line_on_screen} -- x_start: {x_start} -> x_end: {x_end}") prev_brush = painter.brush() prev_pen = painter.pen() painter.setBrush(QBrush(QColor(0, 255, 255))) painter.setPen(QColor(0, 0, 0, 0)) painter.drawRect( QRect(x_start - self._left_offset, int(line_on_screen * self.char_height + int(self.char_height * 0.1)), x_end, -self.char_height)) painter.setBrush(prev_brush) painter.setPen(prev_pen) painter.drawText(QPoint(-self._left_offset, line_on_screen * self.char_height), line.text()) line_on_screen = line_on_screen + 1 painter.end() if __name__ == "__main__": app = QApplication(sys.argv) apply_time_workaround() window = MainWindow() window.show() register_signal_handler() app.exec()