import os import re import tempfile import threading import time from typing import Optional, Callable from PyQt6.QtCore import QRunnable, QThreadPool from PyQt6.QtWidgets import QWidget, QVBoxLayout, QHBoxLayout, QLineEdit, QCheckBox, QPushButton from bigtext import BigText from logFileModel import LogFileModel import asyncio class FilterTask(QRunnable): aborted = False future: asyncio.Future def __init__( self, source_model: LogFileModel, filter_model: LogFileModel, regex: re.Pattern, lock: threading.RLock, on_before: Callable[[], None], on_finish: Callable[[], None] ): super(FilterTask, self).__init__() self.source_model = source_model self.filter_model = filter_model self.regex = regex self.on_before = on_before self.on_finish = on_finish self.lock = lock def run(self): # print("writing to tmp file", self.filter_model.get_file()) # the lock ensures that we only start a new search when the previous search already ended with self.lock: self.on_before() try: with open(self.source_model.get_file(), "rb") as source: with open(self.filter_model.get_file(), "w+b") as target: while l := source.readline(): line = l.decode("utf8", errors="ignore") if self.regex.findall(line): target.write(line.encode("utf8")) if self.aborted: #print("aborted ", time.time()) break finally: self.on_finish() class FilterWidget(QWidget): future = None filter_model: LogFileModel filter_task: Optional[FilterTask] = None _lock = threading.RLock() def __init__(self, source_model: LogFileModel): super(FilterWidget, self).__init__() self.source_model = source_model self.layout = QVBoxLayout(self) self.layout.setContentsMargins(0, 0, 0, 0) self.query_field = QLineEdit() self.query_field.textChanged.connect(self.filter_changed) self.btn_cancel_search = QPushButton(self.tr("Cancel")) self.btn_cancel_search.setVisible(False) self.btn_cancel_search.pressed.connect(self._cancel_search) self.ignore_case = QCheckBox(self.tr("ignore case")) self.ignore_case.setChecked(True) self.ignore_case.stateChanged.connect(self.filter_changed) self.is_regex = QCheckBox(self.tr("regex")) self.is_regex.setChecked(True) self.is_regex.stateChanged.connect(self.filter_changed) filter_bar = QWidget() filter_bar.layout = QHBoxLayout(filter_bar) filter_bar.layout.setContentsMargins(0, 0, 0, 0) filter_bar.layout.addWidget(self.query_field) filter_bar.layout.addWidget(self.btn_cancel_search) filter_bar.layout.addWidget(self.ignore_case) filter_bar.layout.addWidget(self.is_regex) (_handle, self.tmpfilename) = tempfile.mkstemp() self.filter_model = LogFileModel(self.tmpfilename, self.source_model.settings) self.hits_view = BigText(self.filter_model) self.layout.addWidget(filter_bar) self.layout.addWidget(self.hits_view) def destruct(self): #print("cleanup: ", self.tmpfilename) os.remove(self.tmpfilename) def _cancel_search(self): if self.filter_task: #print("cancel started ", time.time()) self.filter_task.aborted = True def filter_changed(self): query = self.query_field.text() ignore_case = self.ignore_case.isChecked() is_regex = self.is_regex.isChecked() if len(query) == 0: return # cancel previous search self._cancel_search() try: flags = re.IGNORECASE if ignore_case else 0 if is_regex: regex = re.compile(query, flags=flags) else: regex = re.compile(re.escape(query), flags=flags) except: # query was not a valid regex -> abort return self.filter_task = FilterTask( self.source_model, self.filter_model, regex, self._lock, lambda : self.btn_cancel_search.setVisible(True), lambda : self.btn_cancel_search.setVisible(False) ) QThreadPool.globalInstance().start(self.filter_task)