import os import re import tempfile import threading import time from typing import Optional, Callable from PyQt6.QtCore import QRunnable, QThreadPool from PyQt6.QtWidgets import QWidget, QVBoxLayout, QHBoxLayout, QLineEdit, QCheckBox, QPushButton from bigtext import BigText from logFileModel import LogFileModel from ravenui import RavenUI class FilterTask(QRunnable): aborted = False def __init__( self, source_model: LogFileModel, filter_model: LogFileModel, regex: re.Pattern, lock: threading.RLock, filter_match_found_listeners: Callable[[int], None], on_before: Callable[[], None], on_finish: Callable[[], None] ): super(FilterTask, self).__init__() self.source_model = source_model self.filter_model = filter_model self.regex = regex self.on_before = on_before self.on_finish = on_finish self.lock = lock self.filter_match_found_listeners = filter_match_found_listeners def run(self): # print("writing to tmp file", self.filter_model.get_file()) # the lock ensures that we only start a new search when the previous search already ended with self.lock: # print("starting thread ", threading.currentThread()) self.on_before() for listener in self.filter_match_found_listeners: listener(-1, -1) # notify listeners that a new search started try: with open(self.source_model.get_file(), "rb") as source: with open(self.filter_model.get_file(), "w+b") as target: line_count = 0 lines_written = 0 while l := source.readline(): line_count = line_count + 1 line = l.decode("utf8", errors="ignore") if self.regex.findall(line): # time.sleep(0.5) lines_written = lines_written + 1 source_line_offset = source.tell() - len(l) target_line_offset = target.tell() for listener in self.filter_match_found_listeners: listener(target_line_offset, source_line_offset) target.write(l) # sometime buffering can hide results for a while # We force a flush periodically. if line_count % 10000 == 0 and lines_written > 0: target.flush() lines_written = 0 if self.aborted: # print("aborted ", time.time()) break finally: self.on_finish() #print("dome thread ", threading.currentThread()) class FilterWidget(QWidget): filter_model: LogFileModel filter_task: Optional[FilterTask] = None def __init__(self, source_model: LogFileModel): super(FilterWidget, self).__init__() self.source_model = source_model self._lock = threading.RLock() self.layout = QVBoxLayout(self) self.layout.setContentsMargins(0, 0, 0, 0) self.query_field = QLineEdit() self.query_field.textChanged.connect(self.filter_changed) self.btn_cancel_search = QPushButton(self.tr("Cancel")) self.btn_cancel_search.setVisible(False) self.btn_cancel_search.pressed.connect(self._cancel_search) self.ignore_case = QCheckBox(self.tr("ignore case")) self.ignore_case.setChecked(True) self.ignore_case.stateChanged.connect(self.filter_changed) self.is_regex = QCheckBox(self.tr("regex")) self.is_regex.setChecked(True) self.is_regex.stateChanged.connect(self.filter_changed) filter_bar = QWidget() filter_bar.layout = QHBoxLayout(filter_bar) filter_bar.layout.setContentsMargins(0, 0, 0, 0) filter_bar.layout.addWidget(self.query_field) filter_bar.layout.addWidget(self.btn_cancel_search) filter_bar.layout.addWidget(self.ignore_case) filter_bar.layout.addWidget(self.is_regex) (handle, self.tmpfilename) = tempfile.mkstemp() os.close(handle) self.filter_model = LogFileModel(self.tmpfilename, self.source_model.settings) self.hits_view = BigText(self.filter_model) self.layout.addWidget(filter_bar) self.layout.addWidget(self.hits_view) self.filter_match_found_listeners: [Callable[[int], None]] = [] def add_line_click_listener(self, listener: Callable[[int], None]): self.hits_view.add_line_click_listener(listener) def add_filter_match_found_listener(self, listener: Callable[[int], None]): self.filter_match_found_listeners.append(listener) def destruct(self): # print("cleanup: ", self.tmpfilename) os.remove(self.tmpfilename) def _cancel_search(self): if self.filter_task: # print("cancel started ", time.time()) self.filter_task.aborted = True def reset_filter(self): self.filter_model.truncate() self.source_model.clear_query_highlight() self.filter_model.clear_query_highlight() RavenUI.update_ui() def filter_changed(self): query = self.query_field.text() ignore_case = self.ignore_case.isChecked() is_regex = self.is_regex.isChecked() if len(query) == 0: self.reset_filter() return # cancel previous search self._cancel_search() try: flags = re.IGNORECASE if ignore_case else 0 if is_regex: regex = re.compile(query, flags=flags) else: regex = re.compile(re.escape(query), flags=flags) except: # query was not a valid regex -> clear search hits, then abort self.filter_model.truncate() return self.source_model.set_query_highlight(query, ignore_case, is_regex) self.filter_model.set_query_highlight(query, ignore_case, is_regex) self.filter_task = FilterTask( self.source_model, self.filter_model, regex, self._lock, self.filter_match_found_listeners, lambda: self.btn_cancel_search.setVisible(True), lambda: self.btn_cancel_search.setVisible(False) ) QThreadPool.globalInstance().start(self.filter_task)