Compare commits
2 Commits
329775fb26
...
9c64acf77e
| Author | SHA1 | Date | |
|---|---|---|---|
| 9c64acf77e | |||
| d561facb7e |
0
src/new_big_text/__init__.py
Normal file
0
src/new_big_text/__init__.py
Normal file
273
src/new_big_text/bigger_text.py
Normal file
273
src/new_big_text/bigger_text.py
Normal file
@@ -0,0 +1,273 @@
|
|||||||
|
import argparse
|
||||||
|
import enum
|
||||||
|
|
||||||
|
import signal, os
|
||||||
|
import logging
|
||||||
|
import signal
|
||||||
|
import time
|
||||||
|
from io import TextIOWrapper
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
from PySide6 import QtCore, QtGui
|
||||||
|
from PySide6.QtGui import QPaintEvent, QPainter, QFont, QFontMetrics, QColor
|
||||||
|
from PySide6.QtWidgets import QApplication, QMainWindow, QWidget, QStatusBar
|
||||||
|
from PySide6.QtCore import QTimer, QPoint, Qt
|
||||||
|
import sys
|
||||||
|
from src.pluginregistry import PluginRegistry
|
||||||
|
import gettext
|
||||||
|
|
||||||
|
__version__ = '0.2.1'
|
||||||
|
|
||||||
|
from src.i18n import _
|
||||||
|
|
||||||
|
gettext.install('krowlog', 'locale')
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.INFO)
|
||||||
|
log = logging.getLogger("main")
|
||||||
|
|
||||||
|
|
||||||
|
def register_signal_handler():
|
||||||
|
signal.signal(signal.SIGINT, stop_signal)
|
||||||
|
signal.signal(signal.SIGTERM, stop_signal)
|
||||||
|
|
||||||
|
|
||||||
|
def stop_signal(signum, _stackframe):
|
||||||
|
""" Handle terminate signal """
|
||||||
|
try:
|
||||||
|
log.info("Terminate signal received. %s", signum)
|
||||||
|
QtCore.QCoreApplication.quit()
|
||||||
|
except Exception:
|
||||||
|
log.exception("Exception occurred while terminating")
|
||||||
|
sys.exit(1)
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
|
||||||
|
MAX_LINE_LENGTH = 4096
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.INFO)
|
||||||
|
log = logging.getLogger("main")
|
||||||
|
|
||||||
|
|
||||||
|
class FileWithTimeout:
|
||||||
|
def __init__(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def handler(signum, frame):
|
||||||
|
signame = signal.Signals(signum).name
|
||||||
|
print(f'Signal handler called with signal {signame} ({signum})')
|
||||||
|
raise OSError("Couldn't open device!")
|
||||||
|
|
||||||
|
def open(path: str, timeout_in_seconds: int = 10, mode="rb", encoding=None) -> TextIOWrapper:
|
||||||
|
# Set the signal handler and a 5-second alarm
|
||||||
|
signal.signal(signal.SIGALRM, FileWithTimeout.handler)
|
||||||
|
signal.alarm(timeout_in_seconds)
|
||||||
|
fd = open(path, mode=mode, encoding=encoding)
|
||||||
|
signal.alarm(0) # Disable the alarm
|
||||||
|
return fd
|
||||||
|
|
||||||
|
|
||||||
|
class MainWindow(QMainWindow):
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super(MainWindow, self).__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
self.setWindowTitle(_("KrowLog"))
|
||||||
|
self.setMinimumWidth(600)
|
||||||
|
self.setMinimumHeight(480)
|
||||||
|
bigger_text = BiggerText()
|
||||||
|
self.setCentralWidget(bigger_text)
|
||||||
|
self.status_bar = QStatusBar(self)
|
||||||
|
self.setStatusBar(self.status_bar)
|
||||||
|
self.status_bar.setAutoFillBackground(True)
|
||||||
|
self.status_bar.showMessage("bla blub")
|
||||||
|
|
||||||
|
|
||||||
|
def apply_time_workaround():
|
||||||
|
# workaround to make signals work in QT apps.
|
||||||
|
# They do not work out of the box, because the main thread
|
||||||
|
# is running in C++ code once app.exec() is executed
|
||||||
|
# Forcing an empty lambda to be executed periodically gives
|
||||||
|
# control back to python and allows python to react to signals
|
||||||
|
timer = QTimer()
|
||||||
|
timer.timeout.connect(lambda: None)
|
||||||
|
timer.start(100)
|
||||||
|
|
||||||
|
|
||||||
|
class LineType(enum.IntEnum):
|
||||||
|
Full = 1
|
||||||
|
Begin = 2
|
||||||
|
Middle = 3
|
||||||
|
End = 4
|
||||||
|
|
||||||
|
|
||||||
|
class Line:
|
||||||
|
def __init__(self, byte_offset: int, byte_end: int, text: str, type: LineType = LineType.Full):
|
||||||
|
"""
|
||||||
|
:type byte_offset: int the offset of the first byte of this line
|
||||||
|
:type byte_end: int the offset of the last byte of this line
|
||||||
|
:type text: str the decoded text
|
||||||
|
:type continued: bool True if the previous line was too long and has been split
|
||||||
|
"""
|
||||||
|
self._byte_offset = byte_offset
|
||||||
|
self._byte_end = byte_end
|
||||||
|
self._text = text
|
||||||
|
self._type = type
|
||||||
|
|
||||||
|
def byte_offset(self) -> int:
|
||||||
|
return self._byte_offset
|
||||||
|
|
||||||
|
def byte_end(self) -> int:
|
||||||
|
return self._byte_end
|
||||||
|
|
||||||
|
def text(self) -> str:
|
||||||
|
return self._text
|
||||||
|
|
||||||
|
def type(self) -> LineType:
|
||||||
|
return self._type
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return "%s (%d->%d %s)" % (self._text, self._byte_offset, self._byte_end, self._type.name)
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return "%s (%d->%d %s)" % (repr(self._text), self._byte_offset, self._byte_end, self._type.name)
|
||||||
|
|
||||||
|
def __eq__(self, other):
|
||||||
|
if not isinstance(other, Line):
|
||||||
|
# don't attempt to compare against unrelated types
|
||||||
|
return NotImplemented
|
||||||
|
|
||||||
|
return (self._byte_offset == other._byte_offset
|
||||||
|
and self._byte_end == other._byte_end
|
||||||
|
and self._text == other._text
|
||||||
|
and self._type == other._type)
|
||||||
|
|
||||||
|
def __hash__(self):
|
||||||
|
# necessary for instances to behave sanely in dicts and sets.
|
||||||
|
return hash((self._byte_offset, self._byte_end, self._text, self._type))
|
||||||
|
|
||||||
|
|
||||||
|
class FileModel:
|
||||||
|
BUFFER_SIZE = 2 ** 16 # for production use this should be at least 4096, better 64kb
|
||||||
|
|
||||||
|
def __init__(self, file: str):
|
||||||
|
self.file = file
|
||||||
|
|
||||||
|
def _read_line(self, file: TextIOWrapper, read_offset: int, max_line_length: int, encoding: str,
|
||||||
|
previous_line_type: LineType) -> Line | None:
|
||||||
|
file.seek(read_offset)
|
||||||
|
buffer: str = file.read(self.BUFFER_SIZE)
|
||||||
|
if not buffer:
|
||||||
|
# end of file reached
|
||||||
|
return None
|
||||||
|
|
||||||
|
while True:
|
||||||
|
pos_of_newline = buffer.find(b"\n")
|
||||||
|
if pos_of_newline > max_line_length:
|
||||||
|
start_of_line = read_offset
|
||||||
|
end_of_line = read_offset + max_line_length - 1
|
||||||
|
line = buffer[0:max_line_length]
|
||||||
|
|
||||||
|
decoded_line = line.decode(encoding, errors="replace")
|
||||||
|
line_type = LineType.Begin if previous_line_type == LineType.End or previous_line_type == LineType.Full else LineType.Middle
|
||||||
|
return Line(start_of_line, end_of_line, decoded_line, line_type)
|
||||||
|
elif pos_of_newline >= 0:
|
||||||
|
start_of_line = read_offset
|
||||||
|
end_of_line = read_offset + pos_of_newline
|
||||||
|
line = buffer[0:pos_of_newline + 1]
|
||||||
|
|
||||||
|
decoded_line = line.decode(encoding, errors="replace")
|
||||||
|
line_type = LineType.Full if previous_line_type == LineType.End or previous_line_type == LineType.Full else LineType.End
|
||||||
|
return Line(start_of_line, end_of_line, decoded_line, line_type)
|
||||||
|
|
||||||
|
else:
|
||||||
|
# line does not end in this buffer
|
||||||
|
# read the next chunk and stitch the line together
|
||||||
|
raise "not yet implemented"
|
||||||
|
|
||||||
|
def read(self, file_offset: int, lines_to_read: int, max_line_length=512, encoding="utf8") -> [Line]:
|
||||||
|
lines: list[Line] = []
|
||||||
|
with FileWithTimeout.open(self.file, 5, 'rb') as f:
|
||||||
|
read_offset = max(0, file_offset - max_line_length * 6) # factor 6 is due to multibyte characters in utf8
|
||||||
|
f.seek(read_offset)
|
||||||
|
|
||||||
|
previous_line_type = LineType.Full
|
||||||
|
while len(lines) < lines_to_read:
|
||||||
|
l: Line | None = self._read_line(f, read_offset, max_line_length, encoding, previous_line_type)
|
||||||
|
if l is None:
|
||||||
|
break
|
||||||
|
if file_offset <= l.byte_end():
|
||||||
|
lines.append(l)
|
||||||
|
read_offset = l.byte_end() + 1
|
||||||
|
previous_line_type = l.type()
|
||||||
|
|
||||||
|
return lines
|
||||||
|
|
||||||
|
|
||||||
|
class BiggerText(QWidget):
|
||||||
|
|
||||||
|
def __init__(self, ):
|
||||||
|
super(BiggerText, self).__init__()
|
||||||
|
# font ="Andale Mono"
|
||||||
|
# font = "JetBrains Mono"
|
||||||
|
# font = "Monospace" # not found
|
||||||
|
# font = "ZedMono" # is not found
|
||||||
|
# font = "Noto Sans Mono"
|
||||||
|
font = "Noto Color Emoji"
|
||||||
|
font_size = 20
|
||||||
|
|
||||||
|
qfont = QFont(font, font_size)
|
||||||
|
# qfont.setStyleHint(QFont.StyleHint.Monospace)
|
||||||
|
self.qfont = qfont
|
||||||
|
|
||||||
|
def mousePressEvent(self, e: QtGui.QMouseEvent) -> None:
|
||||||
|
if e.buttons() == Qt.MouseButton.LeftButton and e.modifiers() == Qt.KeyboardModifier.NoModifier:
|
||||||
|
offset = self.to_byte_offset(e.pos())
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
def to_byte_offset(self, pos: QPoint):
|
||||||
|
line = self.y_pos_to_line(pos.y())
|
||||||
|
|
||||||
|
def y_pos_to_line(self, y: int) -> int:
|
||||||
|
return int(y / self.char_height)
|
||||||
|
|
||||||
|
def paintEvent(self, event: QPaintEvent) -> None:
|
||||||
|
start_ns = time.process_time_ns()
|
||||||
|
painter = QPainter(self)
|
||||||
|
font = painter.font()
|
||||||
|
font.setPointSize(20)
|
||||||
|
painter.setFont(font)
|
||||||
|
|
||||||
|
font_metric: QFontMetrics = painter.fontMetrics()
|
||||||
|
self.char_height = font_metric.height()
|
||||||
|
|
||||||
|
lines_to_render = [
|
||||||
|
"im0 ひらがな 王 フーバー 🔴🟢 7²",
|
||||||
|
"iiiiiiiiii",
|
||||||
|
"12345678",
|
||||||
|
"12345678",
|
||||||
|
"nonspacing marks:",
|
||||||
|
"next line consists of a%CC%88",
|
||||||
|
"äääääääääääääääääääääääääääääää|",
|
||||||
|
"アンドレアス",
|
||||||
|
"アンドレアス"
|
||||||
|
]
|
||||||
|
|
||||||
|
painter.setPen(QColor(0, 0, 0))
|
||||||
|
line_on_screen = 1
|
||||||
|
for line in lines_to_render:
|
||||||
|
painter.drawText(QPoint(0, line_on_screen * char_height), line)
|
||||||
|
line_on_screen = line_on_screen + 1
|
||||||
|
|
||||||
|
painter.end()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
app = QApplication(sys.argv)
|
||||||
|
|
||||||
|
apply_time_workaround()
|
||||||
|
|
||||||
|
window = MainWindow()
|
||||||
|
window.show()
|
||||||
|
register_signal_handler()
|
||||||
|
|
||||||
|
app.exec()
|
||||||
53
src/new_big_text/test_file_model.py
Normal file
53
src/new_big_text/test_file_model.py
Normal file
@@ -0,0 +1,53 @@
|
|||||||
|
import unittest
|
||||||
|
|
||||||
|
from src.new_big_text.bigger_text import FileModel, Line, LineType
|
||||||
|
|
||||||
|
|
||||||
|
class FileModelTestCase(unittest.TestCase):
|
||||||
|
def test_read_lines_with_offset_0(self):
|
||||||
|
fm = FileModel("testdata/readlines.txt")
|
||||||
|
actual_lines = fm.read(file_offset=0, lines_to_read=5, max_line_length=16, encoding="utf8")
|
||||||
|
self.assertEqual([
|
||||||
|
Line(0, 1, '1\n'),
|
||||||
|
Line(2, 4, '12\n'),
|
||||||
|
Line(5, 8, '123\n'),
|
||||||
|
Line(9, 13, '1234\n'),
|
||||||
|
Line(14, 19, '12345\n')
|
||||||
|
],
|
||||||
|
actual_lines)
|
||||||
|
|
||||||
|
def test_read_lines_with_offset_in_middle_of_line(self):
|
||||||
|
fm = FileModel("testdata/readlines.txt")
|
||||||
|
file_offset = "1\n12\n123\n".find("123") + 1
|
||||||
|
actual_lines = fm.read(
|
||||||
|
file_offset=file_offset, # at char 2 in line 3
|
||||||
|
lines_to_read=5,
|
||||||
|
max_line_length=16,
|
||||||
|
encoding="utf8")
|
||||||
|
self.assertEqual([
|
||||||
|
Line(5, 8, '123\n'),
|
||||||
|
Line(9, 13, '1234\n'),
|
||||||
|
Line(14, 19, '12345\n'),
|
||||||
|
Line(20, 26, '123456\n'),
|
||||||
|
Line(27, 34, '1234567\n')
|
||||||
|
],
|
||||||
|
actual_lines)
|
||||||
|
|
||||||
|
def test_read_long_line__buffer_larger_than_line(self):
|
||||||
|
fm = FileModel("testdata/longLines.txt")
|
||||||
|
fm.BUFFER_SIZE = 512
|
||||||
|
actual_lines = fm.read(
|
||||||
|
file_offset=0,
|
||||||
|
lines_to_read=3,
|
||||||
|
max_line_length=10,
|
||||||
|
encoding="utf8"
|
||||||
|
)
|
||||||
|
self.assertEqual([
|
||||||
|
Line(0, 9, "1aaaaaaaa-", LineType.Begin),
|
||||||
|
Line(10, 19, "bbbbbbbbb-", LineType.Middle),
|
||||||
|
Line(20, 27, "ccccccc\n", LineType.End),
|
||||||
|
], actual_lines)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
||||||
@@ -22,8 +22,6 @@ class LogFileModel:
|
|||||||
range_start = 0
|
range_start = 0
|
||||||
range_end = -1
|
range_end = -1
|
||||||
|
|
||||||
_line_cache = {}
|
|
||||||
|
|
||||||
def __init__(self, file: str, settings: Settings, original_file: str = False):
|
def __init__(self, file: str, settings: Settings, original_file: str = False):
|
||||||
"""
|
"""
|
||||||
:param file:
|
:param file:
|
||||||
@@ -34,6 +32,7 @@ class LogFileModel:
|
|||||||
self._file = os.path.realpath(file)
|
self._file = os.path.realpath(file)
|
||||||
self._original_file = os.path.realpath(original_file) if original_file else self._file
|
self._original_file = os.path.realpath(original_file) if original_file else self._file
|
||||||
self._file_name = os.path.basename(self._original_file)
|
self._file_name = os.path.basename(self._original_file)
|
||||||
|
self._line_cache = {}
|
||||||
|
|
||||||
def highlighters(self):
|
def highlighters(self):
|
||||||
all_highlighters = Highlighting.read_config(self.settings)
|
all_highlighters = Highlighting.read_config(self.settings)
|
||||||
@@ -137,6 +136,7 @@ class LogFileModel:
|
|||||||
return re.match(r"\w", char) is not None
|
return re.match(r"\w", char) is not None
|
||||||
|
|
||||||
def prune_cache(self, range_start: int, range_end: int):
|
def prune_cache(self, range_start: int, range_end: int):
|
||||||
|
print(f"cache size: {len(self._line_cache.keys())}")
|
||||||
for key in list(self._line_cache.keys()):
|
for key in list(self._line_cache.keys()):
|
||||||
line = self._line_cache[key]
|
line = self._line_cache[key]
|
||||||
if range_start > line.byte_end() or line.byte_offset() > range_end:
|
if range_start > line.byte_end() or line.byte_offset() > range_end:
|
||||||
|
|||||||
Reference in New Issue
Block a user