import multiprocessing import os import re from re import Pattern import tempfile import time from typing import Optional from PyQt6.QtCore import QRunnable, QThread, QThreadPool from PyQt6.QtWidgets import QWidget, QVBoxLayout, QHBoxLayout, QLineEdit, QCheckBox from bigtext import BigText from logFileModel import LogFileModel import asyncio class FilterTask(QRunnable): aborted = False future : asyncio.Future def __init__( self, source_model: LogFileModel, filter_model: LogFileModel, regex: re.Pattern ): super(FilterTask, self).__init__() self.source_model = source_model self.filter_model = filter_model self.regex = regex #self.query = query.lower() if ignore_case and not is_regex else query #self.ignore_case = ignore_case #self.is_regex = is_regex self.future = asyncio.Future() #flags = re.IGNORECASE if ignore_case else 0 #if is_regex: # self.regex = re.compile(query, flags=flags) #else: # self.regex = re.compile(re.escape(query),flags=flags) def run(self): #print("writing to tmp file", self.filter_model.get_file()) start = time.time() with open(self.source_model.get_file(), "rb") as source: with open(self.filter_model.get_file(), "w+b") as target: while l := source.readline(): line = l.decode("utf8", errors="ignore") if self.regex.findall(line): target.write(line.encode("utf8")) if self.aborted or self.future.cancelled(): print("aborted") break print("filtering for %s took %s" % (self.regex, time.time() - start) ) self.future.done() class FilterWidget(QWidget): future = None filter_model : LogFileModel filter_task :Optional[FilterTask] = None def __init__(self, source_model: LogFileModel): super(FilterWidget, self).__init__() self.source_model = source_model self.layout = QVBoxLayout(self) self.layout.setContentsMargins(0, 0, 0, 0) self.query_field = QLineEdit() self.query_field.textChanged.connect(self.filter_changed) self.ignore_case = QCheckBox(self.tr("ignore case")) self.ignore_case.setChecked(True) self.ignore_case.stateChanged.connect(self.filter_changed) self.is_regex = QCheckBox(self.tr("regex")) self.is_regex.setChecked(True) self.is_regex.stateChanged.connect(self.filter_changed) filter_bar = QWidget() filter_bar.layout = QHBoxLayout(filter_bar) filter_bar.layout.setContentsMargins(0, 0, 0, 0) filter_bar.layout.addWidget(self.query_field) filter_bar.layout.addWidget(self.ignore_case) filter_bar.layout.addWidget(self.is_regex) (_handle, self.tmpfilename) = tempfile.mkstemp() self.filter_model = LogFileModel(self.tmpfilename, self.source_model.settings) self.hits_view = BigText(self.filter_model) self.layout.addWidget(filter_bar) self.layout.addWidget(self.hits_view) def destruct(self): print("cleanup: ", self.tmpfilename) os.remove(self.tmpfilename) def filter_changed(self): query = self.query_field.text() ignore_case = self.ignore_case.isChecked() is_regex = self.is_regex.isChecked() if len(query) == 0: return if self.filter_task: self.filter_task.aborted = True try: flags = re.IGNORECASE if ignore_case else 0 if is_regex: regex = re.compile(query, flags=flags) else: regex = re.compile(re.escape(query), flags=flags) except: # query was not a valid regex -> abort return self.filter_task = FilterTask(self.source_model, self.filter_model, regex) QThreadPool.globalInstance().start(self.filter_task)