Files
krowlog/filterwidget.py
Andreas Huber f0555df0a9 switch from PyQt6 to PySide6
PySide6 uses LGPL instead of GPL, which is much nicer to work with.
2022-01-30 10:50:31 +01:00

189 lines
6.8 KiB
Python

import os
import re
import tempfile
import threading
from typing import Optional, Callable
from PySide6.QtCore import QRunnable, QThreadPool
from PySide6.QtWidgets import QWidget, QVBoxLayout, QHBoxLayout, QLineEdit, QCheckBox, QPushButton
from bigtext import BigText
from logFileModel import LogFileModel
from ravenui import RavenUI
class FilterTask(QRunnable):
aborted = False
def __init__(
self,
source_model: LogFileModel,
filter_model: LogFileModel,
regex: re.Pattern,
lock: threading.RLock,
filter_match_found_listeners: Callable[[int], None],
on_before: Callable[[], None],
on_finish: Callable[[], None]
):
super(FilterTask, self).__init__()
self.source_model = source_model
self.filter_model = filter_model
self.regex = regex
self.on_before = on_before
self.on_finish = on_finish
self.lock = lock
self.filter_match_found_listeners = filter_match_found_listeners
def run(self):
# print("writing to tmp file", self.filter_model.get_file())
# the lock ensures that we only start a new search when the previous search already ended
with self.lock:
# print("starting thread ", threading.currentThread())
self.on_before()
if self.aborted:
self.on_finish()
for listener in self.filter_match_found_listeners:
listener(-1, -1) # notify listeners that a new search started
try:
with open(self.source_model.get_file(), "rb") as source:
with open(self.filter_model.get_file(), "w+b") as target:
line_count = 0
lines_written = 0
while l := source.readline():
line_count = line_count + 1
line = l.decode("utf8", errors="ignore")
if self.regex.findall(line):
# time.sleep(0.5)
lines_written = lines_written + 1
source_line_offset = source.tell() - len(l)
target_line_offset = target.tell()
for listener in self.filter_match_found_listeners:
listener(target_line_offset, source_line_offset)
target.write(l)
# sometime buffering can hide results for a while
# We force a flush periodically.
if line_count % 10000 == 0 and lines_written > 0:
target.flush()
lines_written = 0
if self.aborted:
# print("aborted ", time.time())
break
finally:
self.on_finish()
# print("dome thread ", threading.currentThread())
class FilterWidget(QWidget):
filter_model: LogFileModel
filter_task: Optional[FilterTask] = None
def __init__(self, source_model: LogFileModel):
super(FilterWidget, self).__init__()
self.source_model = source_model
self._lock = threading.RLock()
self.layout = QVBoxLayout(self)
self.layout.setContentsMargins(0, 0, 0, 0)
self.query_field = QLineEdit()
self.query_field.textChanged.connect(self.filter_changed)
self.btn_cancel_search = QPushButton(self.tr("Cancel"))
self.btn_cancel_search.setVisible(False)
self.btn_cancel_search.pressed.connect(self._cancel_search)
self.ignore_case = QCheckBox(self.tr("ignore case"))
self.ignore_case.setChecked(True)
self.ignore_case.stateChanged.connect(self.filter_changed)
self.is_regex = QCheckBox(self.tr("regex"))
self.is_regex.setChecked(True)
self.is_regex.stateChanged.connect(self.filter_changed)
filter_bar = QWidget()
filter_bar.layout = QHBoxLayout(filter_bar)
filter_bar.layout.setContentsMargins(0, 0, 0, 0)
filter_bar.layout.addWidget(self.query_field)
filter_bar.layout.addWidget(self.btn_cancel_search)
filter_bar.layout.addWidget(self.ignore_case)
filter_bar.layout.addWidget(self.is_regex)
(handle, self.tmpfilename) = tempfile.mkstemp()
os.close(handle)
self.filter_model = LogFileModel(self.tmpfilename, self.source_model.settings)
self.hits_view = BigText(self.filter_model)
self.layout.addWidget(filter_bar)
self.layout.addWidget(self.hits_view)
self.filter_match_found_listeners: [Callable[[int], None]] = []
def add_line_click_listener(self, listener: Callable[[int], None]):
self.hits_view.add_line_click_listener(listener)
def add_filter_match_found_listener(self, listener: Callable[[int], None]):
self.filter_match_found_listeners.append(listener)
def destruct(self):
# print("cleanup: ", self.tmpfilename)
os.remove(self.tmpfilename)
def _cancel_search(self):
if self.filter_task:
# print("cancel started ", time.time())
self.filter_task.aborted = True
# wait until the previous search is aborted
with self._lock:
pass
def reset_filter(self):
self.filter_model.truncate()
self.source_model.clear_query_highlight()
self.filter_model.clear_query_highlight()
RavenUI.update_ui()
def filter_changed(self):
query = self.query_field.text()
ignore_case = self.ignore_case.isChecked()
is_regex = self.is_regex.isChecked()
# cancel previous search
self._cancel_search()
if len(query) == 0:
self.reset_filter()
return
try:
flags = re.IGNORECASE if ignore_case else 0
if is_regex:
regex = re.compile(query, flags=flags)
else:
regex = re.compile(re.escape(query), flags=flags)
except:
# query was not a valid regex -> clear search hits, then abort
self.filter_model.truncate()
return
self.source_model.set_query_highlight(query, ignore_case, is_regex)
self.filter_model.set_query_highlight(query, ignore_case, is_regex)
self.filter_task = FilterTask(
self.source_model,
self.filter_model,
regex,
self._lock,
self.filter_match_found_listeners,
lambda: self.btn_cancel_search.setVisible(True),
lambda: self.btn_cancel_search.setVisible(False)
)
QThreadPool.globalInstance().start(self.filter_task)