import os import re import tempfile import time from typing import Optional from PyQt6.QtCore import QRunnable, QThreadPool from PyQt6.QtWidgets import QWidget, QVBoxLayout, QHBoxLayout, QLineEdit, QCheckBox from bigtext import BigText from logFileModel import LogFileModel import asyncio class FilterTask(QRunnable): aborted = False future: asyncio.Future def __init__( self, source_model: LogFileModel, filter_model: LogFileModel, regex: re.Pattern ): super(FilterTask, self).__init__() self.source_model = source_model self.filter_model = filter_model self.regex = regex self.future = asyncio.Future() def run(self): # print("writing to tmp file", self.filter_model.get_file()) start = time.time() with open(self.source_model.get_file(), "rb") as source: with open(self.filter_model.get_file(), "w+b") as target: while l := source.readline(): line = l.decode("utf8", errors="ignore") if self.regex.findall(line): target.write(line.encode("utf8")) if self.aborted or self.future.cancelled(): print("aborted") break #print("filtering for %s took %s" % (self.regex, time.time() - start)) self.future.done() class FilterWidget(QWidget): future = None filter_model: LogFileModel filter_task: Optional[FilterTask] = None def __init__(self, source_model: LogFileModel): super(FilterWidget, self).__init__() self.source_model = source_model self.layout = QVBoxLayout(self) self.layout.setContentsMargins(0, 0, 0, 0) self.query_field = QLineEdit() self.query_field.textChanged.connect(self.filter_changed) self.ignore_case = QCheckBox(self.tr("ignore case")) self.ignore_case.setChecked(True) self.ignore_case.stateChanged.connect(self.filter_changed) self.is_regex = QCheckBox(self.tr("regex")) self.is_regex.setChecked(True) self.is_regex.stateChanged.connect(self.filter_changed) filter_bar = QWidget() filter_bar.layout = QHBoxLayout(filter_bar) filter_bar.layout.setContentsMargins(0, 0, 0, 0) filter_bar.layout.addWidget(self.query_field) filter_bar.layout.addWidget(self.ignore_case) filter_bar.layout.addWidget(self.is_regex) (_handle, self.tmpfilename) = tempfile.mkstemp() self.filter_model = LogFileModel(self.tmpfilename, self.source_model.settings) self.hits_view = BigText(self.filter_model) self.layout.addWidget(filter_bar) self.layout.addWidget(self.hits_view) def destruct(self): print("cleanup: ", self.tmpfilename) os.remove(self.tmpfilename) def filter_changed(self): query = self.query_field.text() ignore_case = self.ignore_case.isChecked() is_regex = self.is_regex.isChecked() if len(query) == 0: return if self.filter_task: self.filter_task.aborted = True try: flags = re.IGNORECASE if ignore_case else 0 if is_regex: regex = re.compile(query, flags=flags) else: regex = re.compile(re.escape(query), flags=flags) except: # query was not a valid regex -> abort return self.filter_task = FilterTask(self.source_model, self.filter_model, regex) QThreadPool.globalInstance().start(self.filter_task)