125 lines
3.9 KiB
Python
125 lines
3.9 KiB
Python
import multiprocessing
|
|
import os
|
|
import re
|
|
from re import Pattern
|
|
import tempfile
|
|
import time
|
|
from typing import Optional
|
|
|
|
from PyQt6.QtCore import QRunnable, QThread, QThreadPool
|
|
from PyQt6.QtWidgets import QWidget, QVBoxLayout, QHBoxLayout, QLineEdit, QCheckBox
|
|
|
|
from bigtext import BigText
|
|
from logFileModel import LogFileModel
|
|
import asyncio
|
|
|
|
class FilterTask(QRunnable):
|
|
|
|
aborted = False
|
|
future : asyncio.Future
|
|
|
|
def __init__(
|
|
self,
|
|
source_model: LogFileModel,
|
|
filter_model: LogFileModel,
|
|
regex: re.Pattern
|
|
):
|
|
super(FilterTask, self).__init__()
|
|
self.source_model = source_model
|
|
self.filter_model = filter_model
|
|
self.regex = regex
|
|
#self.query = query.lower() if ignore_case and not is_regex else query
|
|
#self.ignore_case = ignore_case
|
|
#self.is_regex = is_regex
|
|
self.future = asyncio.Future()
|
|
|
|
#flags = re.IGNORECASE if ignore_case else 0
|
|
|
|
#if is_regex:
|
|
# self.regex = re.compile(query, flags=flags)
|
|
#else:
|
|
# self.regex = re.compile(re.escape(query),flags=flags)
|
|
|
|
def run(self):
|
|
#print("writing to tmp file", self.filter_model.get_file())
|
|
start = time.time()
|
|
with open(self.source_model.get_file(), "rb") as source:
|
|
with open(self.filter_model.get_file(), "w+b") as target:
|
|
while l := source.readline():
|
|
line = l.decode("utf8", errors="ignore")
|
|
|
|
if self.regex.findall(line):
|
|
target.write(line.encode("utf8"))
|
|
|
|
if self.aborted or self.future.cancelled():
|
|
print("aborted")
|
|
break
|
|
print("filtering for %s took %s" % (self.regex, time.time() - start) )
|
|
self.future.done()
|
|
|
|
class FilterWidget(QWidget):
|
|
future = None
|
|
filter_model : LogFileModel
|
|
filter_task :Optional[FilterTask] = None
|
|
|
|
def __init__(self, source_model: LogFileModel):
|
|
super(FilterWidget, self).__init__()
|
|
self.source_model = source_model
|
|
|
|
self.layout = QVBoxLayout(self)
|
|
self.layout.setContentsMargins(0, 0, 0, 0)
|
|
|
|
self.query_field = QLineEdit()
|
|
self.query_field.textChanged.connect(self.filter_changed)
|
|
|
|
self.ignore_case = QCheckBox(self.tr("ignore case"))
|
|
self.ignore_case.setChecked(True)
|
|
self.ignore_case.stateChanged.connect(self.filter_changed)
|
|
|
|
self.is_regex = QCheckBox(self.tr("regex"))
|
|
self.is_regex.setChecked(True)
|
|
self.is_regex.stateChanged.connect(self.filter_changed)
|
|
|
|
filter_bar = QWidget()
|
|
filter_bar.layout = QHBoxLayout(filter_bar)
|
|
filter_bar.layout.setContentsMargins(0, 0, 0, 0)
|
|
filter_bar.layout.addWidget(self.query_field)
|
|
filter_bar.layout.addWidget(self.ignore_case)
|
|
filter_bar.layout.addWidget(self.is_regex)
|
|
|
|
(_handle, self.tmpfilename) = tempfile.mkstemp()
|
|
self.filter_model = LogFileModel(self.tmpfilename, self.source_model.settings)
|
|
self.hits_view = BigText(self.filter_model)
|
|
|
|
self.layout.addWidget(filter_bar)
|
|
self.layout.addWidget(self.hits_view)
|
|
|
|
def filter_changed(self):
|
|
query = self.query_field.text()
|
|
ignore_case = self.ignore_case.isChecked()
|
|
is_regex = self.is_regex.isChecked()
|
|
if len(query) == 0:
|
|
return
|
|
|
|
if self.filter_task:
|
|
self.filter_task.aborted = True
|
|
|
|
try:
|
|
flags = re.IGNORECASE if ignore_case else 0
|
|
if is_regex:
|
|
regex = re.compile(query, flags=flags)
|
|
else:
|
|
regex = re.compile(re.escape(query), flags=flags)
|
|
except:
|
|
# query was not a valid regex -> abort
|
|
return
|
|
|
|
self.filter_task = FilterTask(self.source_model, self.filter_model, regex)
|
|
QThreadPool.globalInstance().start(self.filter_task)
|
|
|
|
|
|
|
|
|
|
|
|
|