import os import re import tempfile import threading import time from typing import Optional, Callable from PyQt6.QtCore import QRunnable, QThreadPool from PyQt6.QtWidgets import QWidget, QVBoxLayout, QHBoxLayout, QLineEdit, QCheckBox, QPushButton from bigtext import BigText from logFileModel import LogFileModel class FilterTask(QRunnable): aborted = False def __init__( self, source_model: LogFileModel, filter_model: LogFileModel, regex: re.Pattern, lock: threading.RLock, on_before: Callable[[], None], on_finish: Callable[[], None] ): super(FilterTask, self).__init__() self.source_model = source_model self.filter_model = filter_model self.regex = regex self.on_before = on_before self.on_finish = on_finish self.lock = lock def run(self): # print("writing to tmp file", self.filter_model.get_file()) # the lock ensures that we only start a new search when the previous search already ended with self.lock: #print("starting thread ", threading.currentThread()) self.on_before() try: with open(self.source_model.get_file(), "rb") as source: with open(self.filter_model.get_file(), "w+b") as target: line_count = 0 lines_written = 0 while l := source.readline(): line_count = line_count + 1 line = l.decode("utf8", errors="ignore") if self.regex.findall(line): #time.sleep(0.5) lines_written = lines_written +1 target.write(line.encode("utf8")) # sometime buffering can hide results for a while # We force a flush periodically. if line_count % 10000 == 0 and lines_written > 0: target.flush() lines_written = 0 if self.aborted: # print("aborted ", time.time()) break finally: self.on_finish() #print("dome thread ", threading.currentThread()) class FilterWidget(QWidget): filter_model: LogFileModel filter_task: Optional[FilterTask] = None def __init__(self, source_model: LogFileModel): super(FilterWidget, self).__init__() self.source_model = source_model self._lock = threading.RLock() self.layout = QVBoxLayout(self) self.layout.setContentsMargins(0, 0, 0, 0) self.query_field = QLineEdit() self.query_field.textChanged.connect(self.filter_changed) self.btn_cancel_search = QPushButton(self.tr("Cancel")) self.btn_cancel_search.setVisible(False) self.btn_cancel_search.pressed.connect(self._cancel_search) self.ignore_case = QCheckBox(self.tr("ignore case")) self.ignore_case.setChecked(True) self.ignore_case.stateChanged.connect(self.filter_changed) self.is_regex = QCheckBox(self.tr("regex")) self.is_regex.setChecked(True) self.is_regex.stateChanged.connect(self.filter_changed) filter_bar = QWidget() filter_bar.layout = QHBoxLayout(filter_bar) filter_bar.layout.setContentsMargins(0, 0, 0, 0) filter_bar.layout.addWidget(self.query_field) filter_bar.layout.addWidget(self.btn_cancel_search) filter_bar.layout.addWidget(self.ignore_case) filter_bar.layout.addWidget(self.is_regex) (_handle, self.tmpfilename) = tempfile.mkstemp() self.filter_model = LogFileModel(self.tmpfilename, self.source_model.settings) self.hits_view = BigText(self.filter_model) self.layout.addWidget(filter_bar) self.layout.addWidget(self.hits_view) def destruct(self): # print("cleanup: ", self.tmpfilename) os.remove(self.tmpfilename) def _cancel_search(self): if self.filter_task: # print("cancel started ", time.time()) self.filter_task.aborted = True def filter_changed(self): query = self.query_field.text() ignore_case = self.ignore_case.isChecked() if len(query) == 0: return # cancel previous search self._cancel_search() try: flags = re.IGNORECASE if ignore_case else 0 if self.is_regex.isChecked(): regex = re.compile(query, flags=flags) else: regex = re.compile(re.escape(query), flags=flags) except: # query was not a valid regex -> abort return self.filter_task = FilterTask( self.source_model, self.filter_model, regex, self._lock, lambda: self.btn_cancel_search.setVisible(True), lambda: self.btn_cancel_search.setVisible(False) ) QThreadPool.globalInstance().start(self.filter_task)