import os import re import tempfile import threading import time from typing import Optional, Callable from PyQt6.QtCore import QRunnable, QThreadPool from PyQt6.QtWidgets import QWidget, QVBoxLayout, QHBoxLayout, QLineEdit, QCheckBox, QPushButton from bigtext import BigText from logFileModel import LogFileModel from ravenui import RavenUI class FilterTask(QRunnable): aborted = False def __init__( self, source_model: LogFileModel, filter_model: LogFileModel, regex: re.Pattern, lock: threading.RLock, filter_match_found_listeners: Callable[[int], None], on_before: Callable[[], None], on_finish: Callable[[], None] ): super(FilterTask, self).__init__() self.source_model = source_model self.filter_model = filter_model self.regex = regex self.on_before = on_before self.on_finish = on_finish self.lock = lock self.filter_match_found_listeners = filter_match_found_listeners def run(self): # print("writing to tmp file", self.filter_model.get_file()) # the lock ensures that we only start a new search when the previous search already ended with self.lock: # print("starting thread ", threading.currentThread()) self.on_before() # delay the filtering. This work arounds a race condition. For some reasons, even though we have locks # to ensure that only one thread writes to the target file, it happened that the file was not correctly # truncated and started with null bytes. Sometimes several MB of null bytes. My guess is that this is # caused by async write operations. time.sleep(0.5) if self.aborted: self.on_finish() for listener in self.filter_match_found_listeners: listener(-1, -1) # notify listeners that a new search started try: with open(self.source_model.get_file(), "rb") as source: with open(self.filter_model.get_file(), "w+b") as target: line_count = 0 lines_written = 0 while l := source.readline(): line_count = line_count + 1 line = l.decode("utf8", errors="ignore") if self.regex.findall(line): # time.sleep(0.5) lines_written = lines_written + 1 source_line_offset = source.tell() - len(l) target_line_offset = target.tell() for listener in self.filter_match_found_listeners: listener(target_line_offset, source_line_offset) target.write(l) # sometime buffering can hide results for a while # We force a flush periodically. if line_count % 10000 == 0 and lines_written > 0: target.flush() lines_written = 0 if self.aborted: # print("aborted ", time.time()) break finally: self.on_finish() #print("dome thread ", threading.currentThread()) class FilterWidget(QWidget): filter_model: LogFileModel filter_task: Optional[FilterTask] = None def __init__(self, source_model: LogFileModel): super(FilterWidget, self).__init__() self.source_model = source_model self._lock = threading.RLock() self.layout = QVBoxLayout(self) self.layout.setContentsMargins(0, 0, 0, 0) self.query_field = QLineEdit() self.query_field.textChanged.connect(self.filter_changed) self.btn_cancel_search = QPushButton(self.tr("Cancel")) self.btn_cancel_search.setVisible(False) self.btn_cancel_search.pressed.connect(self._cancel_search) self.ignore_case = QCheckBox(self.tr("ignore case")) self.ignore_case.setChecked(True) self.ignore_case.stateChanged.connect(self.filter_changed) self.is_regex = QCheckBox(self.tr("regex")) self.is_regex.setChecked(True) self.is_regex.stateChanged.connect(self.filter_changed) filter_bar = QWidget() filter_bar.layout = QHBoxLayout(filter_bar) filter_bar.layout.setContentsMargins(0, 0, 0, 0) filter_bar.layout.addWidget(self.query_field) filter_bar.layout.addWidget(self.btn_cancel_search) filter_bar.layout.addWidget(self.ignore_case) filter_bar.layout.addWidget(self.is_regex) (handle, self.tmpfilename) = tempfile.mkstemp() os.close(handle) self.filter_model = LogFileModel(self.tmpfilename, self.source_model.settings) self.hits_view = BigText(self.filter_model) self.layout.addWidget(filter_bar) self.layout.addWidget(self.hits_view) self.filter_match_found_listeners: [Callable[[int], None]] = [] def add_line_click_listener(self, listener: Callable[[int], None]): self.hits_view.add_line_click_listener(listener) def add_filter_match_found_listener(self, listener: Callable[[int], None]): self.filter_match_found_listeners.append(listener) def destruct(self): # print("cleanup: ", self.tmpfilename) os.remove(self.tmpfilename) def _cancel_search(self): if self.filter_task: # print("cancel started ", time.time()) self.filter_task.aborted = True # wait until the previous search is aborted with self._lock: pass def reset_filter(self): self.filter_model.truncate() self.source_model.clear_query_highlight() self.filter_model.clear_query_highlight() RavenUI.update_ui() def filter_changed(self): query = self.query_field.text() ignore_case = self.ignore_case.isChecked() is_regex = self.is_regex.isChecked() if len(query) == 0: self.reset_filter() return # cancel previous search self._cancel_search() try: flags = re.IGNORECASE if ignore_case else 0 if is_regex: regex = re.compile(query, flags=flags) else: regex = re.compile(re.escape(query), flags=flags) except: # query was not a valid regex -> clear search hits, then abort self.filter_model.truncate() return self.source_model.set_query_highlight(query, ignore_case, is_regex) self.filter_model.set_query_highlight(query, ignore_case, is_regex) self.filter_task = FilterTask( self.source_model, self.filter_model, regex, self._lock, self.filter_match_found_listeners, lambda: self.btn_cancel_search.setVisible(True), lambda: self.btn_cancel_search.setVisible(False) ) QThreadPool.globalInstance().start(self.filter_task)