rename ravenlog to krowlog

There is a database named RavenDB.
KrowLog starts with a K, which is a) distinctive and b) has an association to KDE.
This commit is contained in:
2022-02-12 10:22:47 +01:00
parent 38e14d6042
commit a640b35c87
62 changed files with 380 additions and 362 deletions

0
src/__init__.py Normal file
View File

27
src/i18n.py Normal file
View File

@@ -0,0 +1,27 @@
import gettext
import os
from pathlib import Path
from src.pluginregistry import PluginRegistry
from src.settings.settingsstore import SettingsStore
settings = SettingsStore.load()
locale = os.environ['LANG'] if 'LANG' in os.environ and os.environ['LANG'] else "en"
locale = settings.session.get('general', 'lang', fallback=locale)
_ = False
src_dir = Path(__file__).resolve().parent.parent
try:
translation = gettext.translation('messages', localedir=src_dir / 'locales', languages=[locale])
if translation:
translation.install()
_ = translation.gettext
ngettext = translation.ngettext
PluginRegistry.execute("set_locale", locale)
except FileNotFoundError:
pass
if not _:
_ = gettext.gettext
ngettext = gettext.ngettext
PluginRegistry.execute("set_locale", '')
print('No translation found')

151
src/mainwindow.py Normal file
View File

@@ -0,0 +1,151 @@
import logging
from typing import List
from PySide6.QtWidgets import *
from PySide6.QtGui import *
from PySide6.QtCore import Qt
from src.util import urlutils
from src.settings.cutesettings import CuteSettings
from src.pluginregistry import PluginRegistry
from src.plugins.domain.menucontribution import MenuContribution, sort_menu_contributions
from src.plugins.domain.raction import RAction
from src.plugins.domain.rmenu import RMenu
from src.settings.settingsstore import SettingsStore
from src.ui.bigtext.highlightingdialog import HighlightingDialog
from src.ui.tabs import Tabs
from src.util.urlutils import url_is_file
from functools import reduce
from src.i18n import _
MAX_LINE_LENGTH = 4096
logging.basicConfig(level=logging.INFO)
log = logging.getLogger("main")
def flat_map(array: List[List]) -> List:
return reduce(list.__add__, array)
class MainWindow(QMainWindow):
def __init__(self, *args, **kwargs):
super(MainWindow, self).__init__(*args, **kwargs)
self.settings = SettingsStore.load()
PluginRegistry.execute("set_settings", self.settings)
self.setWindowTitle(_("KrowLog"))
self._restore_window()
self.setDockNestingEnabled(True)
self.setAcceptDrops(True)
self.tabs = Tabs(self.settings)
self._menu_recent_files = QMenu(_("Open &Recent"), self)
self.setCentralWidget(self.tabs)
self.status_bar = QStatusBar(self)
self.setStatusBar(self.status_bar)
self.setMenuBar(self.create_dynamic_menu_bar())
self.setTabPosition(Qt.DockWidgetArea.AllDockWidgetAreas, QTabWidget.TabPosition.North)
def create_dynamic_menu_bar(self) -> QMenuBar:
menu_bar = QMenuBar()
menu_contributions: [MenuContribution] = flat_map(PluginRegistry.execute("get_menu_contributions"))
menu_contributions.append(MenuContribution("settings", action=self._action_highlighter()))
menu_contributions.append(MenuContribution("settings", action=self._action_highlight_search_terms()))
menu_contributions.append(MenuContribution("settings", action=self._action_new_tab()))
menu_contributions = sort_menu_contributions(menu_contributions)
known_menus = [
("file", _("&File")),
("settings", _("&Settings")),
("window", _("&Window")),
("help", _("&Help"))
]
for (menu_id, menu_label) in known_menus:
menu = QMenu(menu_label, self)
mcs: [MenuContribution] = [mc for mc in menu_contributions if mc.menu_id == menu_id]
if len(mcs) == 0:
continue
for menu_contribution in mcs:
if menu_contribution.action:
action = menu_contribution.action.to_qaction(menu)
menu.addAction(action)
if menu_contribution.menu:
submenu = QMenu(menu_contribution.menu.label, menu_bar)
submenu.setIcon(QIcon.fromTheme(menu_contribution.menu.icon_from_theme))
menu_contribution.menu.add_change_listener(
lambda qmenu=submenu, rmenu=menu_contribution.menu: self._rmenu_update(qmenu, rmenu))
self._rmenu_update(submenu, menu_contribution.menu)
menu.addMenu(submenu)
menu_bar.addMenu(menu)
return menu_bar
def _rmenu_update(self, qmenu: QMenu, rmenu: RMenu):
qmenu.clear()
for raction in rmenu.actions:
action = raction.to_qaction(qmenu)
qmenu.addAction(action)
def _action_highlighter(self):
manage = RAction(
_("&Highlighter"),
action=lambda: HighlightingDialog(self.settings).exec(),
shortcut='Ctrl+H'
)
return manage
def _action_highlight_search_terms(self):
highlight_search_terms = RAction(_("Highlight &Searches"))
highlight_search_terms.set_checkable(True)
highlight_search_terms.set_checked(self.settings.session.getboolean("general", "highlight_search_term"))
highlight_search_terms.set_action(lambda: self.settings.set_session("general", "highlight_search_term",
str(highlight_search_terms.checked)) or self.update())
return highlight_search_terms
def _action_new_tab(self):
new_tab = RAction(_("Open Tab on Save As File"))
new_tab.set_checkable(True)
new_tab.set_checked(self.settings.session.getboolean("general", "open_tab_on_save_as_file"))
new_tab.set_action(
lambda: self.settings.set_session("general", "open_tab_on_save_as_file", str(new_tab.checked)))
return new_tab
def dragEnterEvent(self, e: QDragEnterEvent):
if url_is_file(e.mimeData().text()):
e.accept()
else:
e.ignore()
def dropEvent(self, e):
files = urlutils.urls_to_path(e.mimeData().text())
for file in files:
PluginRegistry.execute_single("open_file", file)
def _restore_window(self):
qsettings = CuteSettings()
geometry_restored = False
geometry = qsettings.value("mainWindow/geometry")
if geometry:
geometry_restored = self.restoreGeometry(geometry)
if not geometry_restored:
self.setGeometry(0, 0, 800, 600)
def closeEvent(self, event):
self.destruct()
def destruct(self):
PluginRegistry.execute("before_shutdown")
self.tabs.destruct()
self.close()
SettingsStore.save(self.settings)
CuteSettings().set_value("mainWindow/geometry", self.saveGeometry())

6
src/pluginbase.py Normal file
View File

@@ -0,0 +1,6 @@
from PySide6.QtCore import QObject
class PluginBase():
def __init__(self):
pass

78
src/pluginregistry.py Normal file
View File

@@ -0,0 +1,78 @@
from types import ModuleType
from typing import Dict, Optional
from inspect import isclass
from importlib import import_module
from inspect import signature
from src.pluginbase import PluginBase
class PluginRegistry():
plugins: Dict[str, PluginBase] = {}
modules: [ModuleType] = []
@staticmethod
def _register_plugin(name: str, plugin: PluginBase):
PluginRegistry.plugins[name] = plugin
@staticmethod
def load_plugin(plugin_name: str) -> PluginBase:
module_name = f"src.plugins.{plugin_name.lower()}"
module = import_module(module_name)
if plugin_name in dir(module):
plugin_class = getattr(module, plugin_name)
if isclass(plugin_class) and issubclass(plugin_class, PluginBase):
PluginRegistry._register_plugin(plugin_name, plugin_class())
return plugin_class
raise RuntimeError("plugin %s not found" % plugin_name)
# @staticmethod
# def get_modules() -> [ModuleType]:
# return PluginRegistry.modules.copy()
@staticmethod
def get_plugins() -> [PluginBase]:
return PluginRegistry.modules.copy()
@staticmethod
def execute_single(function_name: str, *args) -> Optional[any]:
return PluginRegistry._execute(function_name, True, *args)
@staticmethod
def execute(function_name: str, *args) -> [any]:
return PluginRegistry._execute(function_name, False, *args)
@staticmethod
def _execute(function_name: str, return_first: bool, *args) -> []:
result = []
for plugin in PluginRegistry.plugins.values():
fun = getattr(plugin, function_name, None)
if callable(fun):
sig = signature(fun)
if len(sig.parameters) != len(args):
raise RuntimeError("method %s.%s has wrong number of arguments. expected %s but was %s " % (
plugin, function_name, len(args), len(sig.parameters)))
# print("calling %s with args %s" % (fun, args))
if len(args) == 0:
return_value = fun()
elif len(args) == 1:
return_value = fun(args[0])
elif len(args) == 2:
return_value = fun(args[0], args[1])
elif len(args) == 3:
return_value = fun(args[0], args[1], args[2])
elif len(args) == 4:
return_value = fun(args[0], args[1], args[2], args[3])
elif len(args) == 5:
return_value = fun(args[0], args[1], args[2], args[3], args[4])
else:
raise Exception("too many arguments")
if return_first:
return return_value
result.append(return_value)
if return_first:
return None
return result

25
src/plugins/__init__.py Normal file
View File

@@ -0,0 +1,25 @@
from inspect import isclass
from pkgutil import iter_modules
from pathlib import Path
from importlib import import_module
from src.pluginbase import PluginBase
# iterate through the modules in the current package
from src.pluginregistry import PluginRegistry
if False:
package_dir = Path(__file__).resolve().parent
for (_, module_name, _) in iter_modules([str(package_dir)]):
# import the module and iterate through its attributes
module = import_module(f"{__name__}.{module_name}")
print("module: %s" % module)
for attribute_name in dir(module):
if attribute_name == "PluginBase":
continue
attribute = getattr(module, attribute_name)
if isclass(attribute) and issubclass(attribute, PluginBase):
globals()[attribute_name] = attribute
PluginRegistry.register_plugin(attribute_name, attribute)
print("%s -> %s :: %s in %s" % (attribute_name, attribute, module_name, module))

View File

View File

@@ -0,0 +1,51 @@
from src.plugins.domain.raction import RAction
from src.plugins.domain.rmenu import RMenu
id_counter = 0
def next_id() -> str:
global id_counter
id_counter = id_counter + 1
return "action_%d" % id_counter
class MenuContribution():
def __init__(self,
menu_id: str,
action: RAction = None,
menu: RMenu = None,
action_id=None,
after=None):
super(MenuContribution, self).__init__()
self.menu_id = menu_id
self.action = action
self.menu = menu
self.action_id = action_id if action_id else next_id()
self.after = after
def _sort_by_action_id(menu_contributions: [MenuContribution]) -> [MenuContribution]:
return sorted(menu_contributions, key=lambda mc: mc.action_id)
def sort_menu_contributions(menu_contributions: [MenuContribution]) -> [MenuContribution]:
result = []
items = _sort_by_action_id(menu_contributions[:])
_recursive_half_order_adder(result, items, None)
# add remaining items to the end (ordered by their action_id)
# This resolves cycles.
for item in items:
result.append(item)
return result
def _recursive_half_order_adder(result: [MenuContribution], items: [MenuContribution], parent):
for item in items:
mc: MenuContribution = item
if not mc.after:
result.append(mc)
items.remove(mc)
_recursive_half_order_adder(result, items, mc.action_id)

View File

@@ -0,0 +1,88 @@
from typing import Callable
from PySide6.QtGui import QAction, QIcon
from PySide6.QtWidgets import QMenu
class RAction():
def __init__(self,
label: str,
action: Callable[[], None] = None,
shortcut: str = None,
icon_from_theme: str = None,
icon_file: str = None,
checkable: bool = False,
checked: bool = False
):
"""
:param label: the label
:param action: the callback to be executed when clicked. Note: use the setter when creating a checkable menu item
:param shortcut: the shortcut, e.g. 'Ctrl+X'
:param icon_from_theme: environment specific name of an icon. On Linux: /usr/share/icons
:param icon_file: path to an icon
:param checkable: if this menu item behaves like a checkbox
:param checked: if it is checked
"""
super(RAction, self).__init__()
self.label = label
self.action = action
self.shortcut = shortcut
self.icon_from_theme = icon_from_theme
self.icon_file = icon_file
self.checkable = checkable
self.checked = checked
self._action: QAction = None
def set_action(self, action):
self.action = lambda *args: self.decorated_action(action)
def decorated_action(self, action):
if self.checkable:
self.checked = not self.checked
self._update_check_state()
action()
def set_icon_from_theme(self, icon_from_theme: str):
self.icon_from_theme = icon_from_theme
def set_icon_file(self, icon_file: str):
self.icon_file = icon_file
def set_shortcut(self, shortcut: str):
self.shortcut = shortcut
def set_checkable(self, checkable: bool):
self.checkable = checkable
def set_checked(self, checked: bool):
self.checked = checked
self._update_check_state()
def _update_check_state(self):
if self._action:
if self.checked:
self._action.setIcon(QIcon("icons/ionicons/checkbox-outline.svg"))
else:
self._action.setIcon(QIcon("icons/ionicons/square-outline.svg"))
def set_label(self, label: str):
if self._action:
self._action.setText(label)
def to_qaction(self, qmenu: QMenu) -> QAction:
action = QAction(self.label, qmenu)
self._action = action
if self.icon_from_theme:
action.setIcon(QIcon.fromTheme(self.icon_from_theme))
if self.icon_file:
action.setIcon(QIcon(self.icon_file))
if self.shortcut:
action.setShortcut(self.shortcut)
if self.action:
action.triggered.connect(self.action)
if self.checkable:
self._update_check_state()
return action

View File

@@ -0,0 +1,27 @@
from typing import Callable
from src.plugins.domain.raction import RAction
class RMenu():
def __init__(self, label: str, icon_from_theme: str = ""):
super(RMenu, self).__init__()
self.label = label
self.actions = []
self.listeners = []
self.icon_from_theme = icon_from_theme;
def add_action(self, action: RAction):
self.actions.append(action)
self._notify()
def clear(self):
self.actions.clear()
self._notify()
def _notify(self):
for listener in self.listeners:
listener()
def add_change_listener(self, listener: Callable[[], None]):
self.listeners.append(listener)

View File

@@ -0,0 +1,45 @@
import unittest
from random import shuffle
from src.plugins.domain.menucontribution import MenuContribution, sort_menu_contributions
class MyTestCase(unittest.TestCase):
def test_sort(self):
items = [
MenuContribution("menuId", action_id="a", after=None),
MenuContribution("menuId", action_id="b", after="a"),
MenuContribution("menuId", action_id="c", after="a"),
MenuContribution("menuId", action_id="d", after="b"),
MenuContribution("menuId", action_id="e", after="d"),
]
shuffle(items)
actual = sort_menu_contributions(items)
ordered_ids = ""
for a in actual:
ordered_ids = ordered_ids + a.action_id
self.assertEqual("abcde", ordered_ids)
def test_sort_with_cycle(self):
"""
There is a cycle between a and b. This is resolved, because neither is set in the recursive
part of the method. After the recursive part the remaining items are added in order of
their action_id.
:return:
"""
items = [
MenuContribution("menuId", action_id="a", after="b"),
MenuContribution("menuId", action_id="b", after="a"),
]
shuffle(items)
actual = sort_menu_contributions(items)
ordered_ids = ""
for a in actual:
ordered_ids = ordered_ids + a.action_id
self.assertEqual("ab", ordered_ids)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,23 @@
from abc import abstractmethod
from PySide6.QtWidgets import QWidget
class Tab(QWidget):
def __init__(self, unique_id: str, title: str):
super(Tab, self).__init__()
self.unique_id = unique_id
self.title = title
@abstractmethod
def get_status_text(self) -> str:
pass
@abstractmethod
def get_file(self) -> str:
pass
@abstractmethod
def destruct(self):
pass

View File

View File

@@ -0,0 +1,86 @@
import textwrap
import PySide6
from PySide6.QtCore import Qt
from PySide6.QtGui import QFont, QPixmap
from PySide6.QtWidgets import *
import constants
from src.ui.label import Label
from src.ui.vbox import VBox
from src.i18n import _
class AboutDialog(QDialog):
"""Dialog for showing info about KrowLog"""
def __init__(self, parent=None):
super(AboutDialog, self).__init__(parent)
self.setWindowTitle(_("About KrowLog"))
self.setModal(True)
self.layout = QVBoxLayout(self)
heading_app_name = QLabel(_("KrowLog"))
heading_app_name.setAlignment(Qt.AlignmentFlag.AlignLeft)
heading_app_name.setFont(QFont("default", 25))
heading_app_name.setTextInteractionFlags(Qt.TextInteractionFlag.TextSelectableByMouse)
version = QLabel(_("Version: {0}").format(self._version()))
version.setAlignment(Qt.AlignmentFlag.AlignLeft)
app_icon = QLabel()
app_icon.setPixmap(QPixmap(constants.krow_icon))
heading = QWidget(self)
hbox = QHBoxLayout(heading)
hbox.addWidget(app_icon)
hbox.addWidget(VBox(heading_app_name, version))
hbox.addSpacerItem(QSpacerItem(1, 1, hData=QSizePolicy.Policy.Expanding))
heading.layout = hbox
self.layout.addWidget(heading)
tabs = QTabWidget()
tabs.addTab(self._about(), _("About"))
tabs.addTab(self._license(), _("License"))
self.layout.addWidget(tabs)
buttons = QDialogButtonBox(self)
buttons.setStandardButtons(QDialogButtonBox.StandardButton.Close)
buttons.rejected.connect(self.close)
self.layout.addWidget(buttons)
def _about(self) -> QWidget:
result = QWidget()
result.layout = QVBoxLayout(result)
label = Label("{0}<br>{1}<br>{2}".format(
_("Log file viewer"),
_("(c) 2022 Andreas Huber"),
_("License: LGPL v3")
))
result.layout.addWidget(label)
return result
def _license(self) -> QWidget:
dependencies = """
<ul>
<li>Ionicons (MIT) - <a href="https://github.com/ionic-team/ionicons">https://github.com/ionic-team/ionicons</a></li>
<li>PySide6 {pyside} (LGPL v3) - <a href="https://doc.qt.io/qtforpython-6/">https://doc.qt.io/qtforpython-6/</a></li>
<li>Qt6 {qt} (LGPL v3) - <a href="https://code.qt.io/cgit/qt/qtbase.git/">https://code.qt.io/cgit/qt/qtbase.git/</a></li>
<li>urllib3 (MIT) - <a href="https://github.com/urllib3/urllib3">https://github.com/urllib3/urllib3</a></li>
<li>watchdog 2.16 (Apache 2.0) - <a href="https://github.com/gorakhargosh/watchdog">https://github.com/gorakhargosh/watchdog</a></li>
</ul>""".format(pyside=PySide6.__version__, qt=PySide6.QtCore.__version__)
label = textwrap.dedent(dependencies)
result = QWidget()
result.layout = QVBoxLayout(result)
result.layout.addWidget(Label(label))
return result
def _version(self):
with open('VERSION.info', "rt") as f:
line = f.readline()
version = line.strip()
return version

View File

@@ -0,0 +1,115 @@
import sys
from typing import Optional
from PySide6.QtCore import Qt
from PySide6.QtWidgets import QDockWidget, QMessageBox
import constants
from src.plugins.krowlog.aboutdialog import AboutDialog
from src.mainwindow import MainWindow
from src.pluginbase import PluginBase
from src.plugins.domain.menucontribution import MenuContribution
from src.plugins.domain.raction import RAction
from src.plugins.domain.rmenu import RMenu
from src.plugins.krowlog.Tab import Tab
from src.i18n import _, locale
from src.settings.settings import Settings
class KrowLogPlugin(PluginBase):
def __init__(self):
super(KrowLogPlugin, self).__init__()
self.main_window = None
self._locale = locale
self._locale_actions = {}
self.settings = None
def set_settings(self, settings: Settings):
self.settings = settings
def create_main_window(self):
if not self.main_window:
self.main_window = MainWindow()
return self.main_window
def get_menu_contributions(self) -> [MenuContribution]:
return [
MenuContribution("file", action=self._action_close(), action_id="close application", after="<last>"),
MenuContribution("help", action=self._action_about(), action_id="open about dialog", after="<last>"),
MenuContribution("settings", menu=self._sub_menu_languages(), action_id="recent files menu"),
]
def _sub_menu_languages(self) -> RMenu:
menu = RMenu(_("&Languages"))
self._locale_actions[''] = RAction(_("&Default"), lambda: self._change_locale(''), checkable=True)
self._locale_actions['en'] = RAction(_("&English"), lambda: self._change_locale('en'), checkable=True)
self._locale_actions['de'] = RAction(_("&German"), lambda: self._change_locale('de'), checkable=True)
for (key, action) in self._locale_actions.items():
action.checked = self._locale == key
menu.add_action(action)
if not self._locale in self._locale_actions.keys():
self._locale_actions[''].checked = True
return menu
def _change_locale(self, locale: str):
if self._locale != locale:
if self._locale in self._locale_actions:
self._locale_actions[self._locale].set_checked(False)
self._locale_actions[locale].set_checked(True)
self._locale = locale
if locale == '':
self.settings.session.remove_option('general', 'lang')
else:
self.settings.session.set('general', 'lang', locale)
info = QMessageBox(
QMessageBox.Icon.Information,
_("Language Changed"),
_("The language for this application has been changed. The change will take effect the next time the application is started."))
info.setStandardButtons(QMessageBox.Ok)
info.exec()
def current_file(self) -> Optional[str]:
return self.main_window.tabs.current_file()
def get_open_files(self) -> [str]:
return self.main_window.tabs.open_files();
def update_window_title(self, title: str):
if len(title) > 0:
self.main_window.setWindowTitle(_("{0} - KrowLog").format(title))
else:
self.main_window.setWindowTitle(_("KrowLog"))
def update_status_bar(self, text: str):
if not self.main_window:
return
self.main_window.status_bar.showMessage(text)
def update_ui(self):
self.main_window.update()
def add_tab(self, tab: Tab):
self.main_window.tabs.add_tab(tab)
def add_dock(self, area: Qt.DockWidgetArea, widget: Tab):
dock_widget = QDockWidget(widget.title, self.main_window)
dock_widget.setWidget(widget)
self.main_window.addDockWidget(area, dock_widget)
def _action_about(self) -> RAction:
about_action = RAction(
_("&About"),
action=lambda: AboutDialog().exec(),
icon_file=constants.krow_icon
)
return about_action
def _action_close(self) -> RAction:
icon = "close" if sys.platform == 'win32' or sys.platform == 'cygwin' else "exit"
close_action = RAction(_("E&xit"), action=lambda: self.main_window.destruct(), shortcut='Ctrl+X',
icon_from_theme=icon)
return close_action

View File

View File

@@ -0,0 +1,21 @@
from src.ui.bigtext.bigtext import BigText
class FilterViewSyncer:
def __init__(self, sync_view: BigText):
self._matches = {}
self._sync_view = sync_view
def click_listener(self, byte_offset: int):
source_byte_offset = self._matches[byte_offset] if byte_offset in self._matches else None
# print("click %d -> %d (total hits %d)" % (byte_offset, source_byte_offset, len(self._matches)))
if source_byte_offset is not None:
self._sync_view.scroll_to_byte(source_byte_offset)
def match_found(self, match_byte_offset: int, source_byte_offset: int):
# print("match %d" % match_byte_offset)
if match_byte_offset >= 0:
self._matches[match_byte_offset] = source_byte_offset
else:
self._matches = {}

View File

@@ -0,0 +1,246 @@
import logging
import os
import re
import tempfile
import threading
import time
from typing import Optional, Callable
from PySide6.QtCore import QRunnable, QThreadPool, Signal, QThread, QObject
from PySide6.QtWidgets import QWidget, QVBoxLayout, QHBoxLayout, QLineEdit, QCheckBox, QPushButton, QLabel, QProgressBar
from src.ui.bigtext.bigtext import BigText
from src.ui.bigtext.logFileModel import LogFileModel
from src.i18n import _
from src.pluginregistry import PluginRegistry
log = logging.getLogger("filterwidget")
class FilterTask(QThread):
aborted = False
on_before = Signal()
on_finish = Signal()
filter_progress = Signal(float)
def __init__(
self,
source_model: LogFileModel,
filter_model: LogFileModel,
regex: re.Pattern,
lock: threading.RLock,
filter_match_found_listeners: Callable[[int], None]
):
super(FilterTask, self).__init__()
self.source_model = source_model
self.filter_model = filter_model
self.regex = regex
self.lock = lock
self.filter_match_found_listeners = filter_match_found_listeners
def run(self):
# print("writing to tmp file", self.filter_model.get_file())
# the lock ensures that we only start a new search when the previous search already ended
with self.lock:
print("starting thread ", threading.current_thread())
self.on_before.emit()
if self.aborted:
self.on_finish.emit()
for listener in self.filter_match_found_listeners:
listener(-1, -1) # notify listeners that a new search started
self.filter_progress.emit(0.0)
try:
source_file = self.source_model.get_file()
file_size = os.stat(source_file).st_size
start = time.time()
with open(source_file, "rb") as source:
with open(self.filter_model.get_file(), "w+b") as target:
line_count = 0
lines_written = 0
last_bytes_read = 0
bytes_read = 0
while l := source.readline():
bytes_read = bytes_read + len(l)
line_count = line_count + 1
line = l.decode("utf8", errors="ignore")
if self.regex.findall(line):
# time.sleep(0.5)
lines_written = lines_written + 1
source_line_offset = source.tell() - len(l)
target_line_offset = target.tell()
for listener in self.filter_match_found_listeners:
listener(target_line_offset, source_line_offset)
target.write(l)
# sometime buffering can hide results for a while
# We force a flush periodically.
if line_count % 10000 == 0:
now = time.time()
time_diff = (now - start)
if time_diff > 0.5:
read_speed = ((bytes_read - last_bytes_read) / time_diff) / (1024 * 1024)
# todo progress disabled because of its detrimental effect on UI responsibility
# print("emit %f" % (bytes_read / file_size))
self.filter_progress.emit(bytes_read / file_size)
# self._progress_updater(bytes_read / file_size, read_speed)
last_bytes_read = bytes_read
start = time.time()
if lines_written > 0:
target.flush()
lines_written = 0
if self.aborted:
print("aborted ", time.time())
break
finally:
self.on_finish.emit()
print("dome thread ", threading.current_thread())
class FilterWidget(QWidget):
filter_model: LogFileModel
filter_task: Optional[FilterTask] = None
def __init__(self, source_model: LogFileModel):
super(FilterWidget, self).__init__()
self.source_model = source_model
self._lock = threading.RLock()
self.layout = QVBoxLayout(self)
self.layout.setContentsMargins(0, 0, 0, 0)
self.query_field = QLineEdit()
self.query_field.textChanged.connect(self.filter_changed)
self._lbl_search_progress = QLabel("0%")
self._lbl_search_progress.setVisible(False)
self._progress_bar = QProgressBar();
self._progress_bar.setVisible(False)
self._progress_bar.setMinimum(0)
self._progress_bar.setMaximum(100)
self._progress_bar.setMaximumWidth(50)
self.btn_cancel_search = QPushButton(_("Cancel"))
self.btn_cancel_search.setVisible(False)
self.btn_cancel_search.pressed.connect(self._cancel_search)
self.ignore_case = QCheckBox(_("ignore case"))
self.ignore_case.setChecked(True)
self.ignore_case.stateChanged.connect(self.filter_changed)
self.is_regex = QCheckBox(_("regex"))
self.is_regex.setChecked(True)
self.is_regex.stateChanged.connect(self.filter_changed)
filter_bar = QWidget()
filter_bar.layout = QHBoxLayout(filter_bar)
filter_bar.layout.setContentsMargins(0, 0, 0, 0)
filter_bar.layout.addWidget(self.query_field)
filter_bar.layout.addWidget(self._lbl_search_progress)
filter_bar.layout.addWidget(self._progress_bar)
filter_bar.layout.addWidget(self.btn_cancel_search)
filter_bar.layout.addWidget(self.ignore_case)
filter_bar.layout.addWidget(self.is_regex)
(handle, self.tmpfilename) = tempfile.mkstemp()
os.close(handle)
self.filter_model = LogFileModel(self.tmpfilename, self.source_model.settings)
self.hits_view = BigText(self.filter_model)
self.layout.addWidget(filter_bar)
self.layout.addWidget(self.hits_view)
self.filter_match_found_listeners: [Callable[[int], None]] = []
def add_line_click_listener(self, listener: Callable[[int], None]):
self.hits_view.add_line_click_listener(listener)
def add_filter_match_found_listener(self, listener: Callable[[int], None]):
self.filter_match_found_listeners.append(listener)
def destruct(self):
# print("cleanup: ", self.tmpfilename)
self._cancel_search()
os.remove(self.tmpfilename)
def _cancel_search(self):
if self.filter_task:
# print("cancel started ", time.time())
self.filter_task.aborted = True
# wait until the previous search is aborted
with self._lock:
pass
def reset_filter(self):
self.filter_model.truncate()
self.source_model.clear_query_highlight()
self.filter_model.clear_query_highlight()
PluginRegistry.execute("update_ui")
def filter_changed(self):
query = self.query_field.text()
ignore_case = self.ignore_case.isChecked()
is_regex = self.is_regex.isChecked()
# cancel previous search
self._cancel_search()
if len(query) == 0:
self.reset_filter()
return
try:
flags = re.IGNORECASE if ignore_case else 0
if is_regex:
regex = re.compile(query, flags=flags)
else:
regex = re.compile(re.escape(query), flags=flags)
except:
# query was not a valid regex -> clear search hits, then abort
self.filter_model.truncate()
return
self.source_model.set_query_highlight(query, ignore_case, is_regex)
self.filter_model.set_query_highlight(query, ignore_case, is_regex)
self.filter_task = FilterTask(
self.source_model,
self.filter_model,
regex,
self._lock,
self.filter_match_found_listeners,
)
self.filter_task.on_before.connect(self._on_before)
self.filter_task.on_finish.connect(self._on_finish)
self.filter_task.filter_progress.connect(self._update_progress)
# self.filter_task.finished.connect(self.filter_task.deleteLater)
# super().connect(self.filter_task, FilterTask.filter_progress, self, self._update_progress)
# super().connect(self.filter_task, FilterTask.finished, self.filter_task, QObject.deleteLater)
self.filter_task.start()
# QThreadPool.globalInstance().start(self.filter_task)
def _on_before(self):
print("on_before")
self.btn_cancel_search.setVisible(True)
self._progress_bar.setVisible(True)
def _on_finish(self):
print("on_finish")
self.btn_cancel_search.setVisible(False)
self._progress_bar.setVisible(False)
self.filter_task.deleteLater()
def _update_progress(self, progress: float):
print("progress %f" % (progress))
self._progress_bar.setValue(progress * 100)

View File

@@ -0,0 +1,47 @@
from PySide6.QtWidgets import *
from PySide6.QtCore import *
from src.ui.bigtext.bigtext import BigText
from src.plugins.logfile.filterviewsyncer import FilterViewSyncer
from src.plugins.logfile.filterwidget import FilterWidget
from src.ui.bigtext.logFileModel import LogFileModel
from src.plugins.krowlog.Tab import Tab
from src.util.conversion import humanbytes
class FullTabWidget(Tab):
def __init__(self, model: LogFileModel, unique_id: str, title: str):
super(FullTabWidget, self).__init__(unique_id, title)
self._model = model
self.file_view = BigText(model)
self.filter_hit_view = FilterWidget(self._model)
self.filter_view_syncer = FilterViewSyncer(self.file_view)
self.filter_hit_view.add_line_click_listener(self.filter_view_syncer.click_listener)
self.filter_hit_view.add_filter_match_found_listener(self.filter_view_syncer.match_found)
self.layout = QVBoxLayout(self)
self.layout.setContentsMargins(0, 0, 0, 0)
splitter = QSplitter()
splitter.setOrientation(Qt.Orientation.Vertical)
splitter.setHandleWidth(5)
# splitter.setStyleSheet("QSplitter::handle{background: #cccccc;}")
splitter.addWidget(self.file_view)
splitter.addWidget(self.filter_hit_view)
self.layout.addWidget(splitter)
def get_file(self) -> str:
return self.file_view.get_file()
# overriding abstract method
def destruct(self):
self.file_view.destruct()
self.filter_hit_view.destruct()
# overriding abstract method
def get_status_text(self) -> str:
file = self._model.get_file()
file_size = humanbytes(self._model.byte_count())
return "%s - %s" % (file_size, file)

View File

@@ -0,0 +1,36 @@
import os.path
from typing import Optional
from PySide6.QtWidgets import QMessageBox
from src.plugins.logfile.fulltabwidget import FullTabWidget
from src.ui.bigtext.logFileModel import LogFileModel
from src.pluginbase import PluginBase
from src.plugins.krowlog.Tab import Tab
from src.settings.settings import Settings
from src.i18n import _
class LogFilePlugin(PluginBase):
def __init__(self):
super(LogFilePlugin, self).__init__()
self.settings = None
def set_settings(self, settings: Settings):
self.settings = settings
def create_tab(self, file: str) -> Optional[Tab]:
if not os.path.isfile(file):
message = QMessageBox(QMessageBox.Icon.Warning, _("File not found"),
_("'{0}' is not a file or cannot be opened").format(file))
message.exec()
return None
realpath = os.path.realpath(file)
filename = os.path.basename(realpath)
model = LogFileModel(file, self.settings)
tab = FullTabWidget(model, unique_id=realpath, title=filename)
return tab

View File

View File

@@ -0,0 +1,14 @@
from src.plugins.krowlog.Tab import Tab
from PySide6.QtWidgets import QTextEdit, QVBoxLayout
class NotesWidget(Tab):
def __init__(self, unique_id: str, title: str):
super(NotesWidget, self).__init__(unique_id, title)
self.text_area = QTextEdit(self)
self.layout = QVBoxLayout(self)
self.layout.setContentsMargins(0, 0, 0, 0)
self.layout.addWidget(self.text_area)

View File

@@ -0,0 +1,35 @@
from typing import Callable
from PySide6.QtCore import Qt
from src.pluginbase import PluginBase
from src.pluginregistry import PluginRegistry
from src.plugins.domain.menucontribution import MenuContribution
from src.plugins.domain.raction import RAction
from src.plugins.notes.noteswidget import NotesWidget
from src.i18n import _
class NotesPlugin(PluginBase):
def __init__(self):
super(NotesPlugin, self).__init__()
self.settings = None
self.tab_counter = 0
def get_menu_contributions(self) -> [MenuContribution]:
return [
MenuContribution("window", action=self._add_notes_tab_action(), action_id="add notes tab", after="<last>"),
]
def _add_notes_tab_action(self) -> RAction:
open_file = RAction(_("Add &Notes"), self._add_notes_tab, shortcut='Ctrl+Shift+N',
icon_from_theme="filenew")
return open_file
def _add_notes_tab(self):
self.tab_counter = self.tab_counter + 1
notes = NotesWidget(
"notes_tab_%d" % self.tab_counter,
_("Notes {0}").format(self.tab_counter))
PluginRegistry.execute_single("add_dock", Qt.DockWidgetArea.RightDockWidgetArea, notes)

View File

@@ -0,0 +1,99 @@
import os
from pathlib import Path
from PySide6.QtWidgets import QFileDialog
from src.pluginbase import PluginBase
from src.pluginregistry import PluginRegistry
from src.plugins.domain.menucontribution import MenuContribution
from src.plugins.domain.raction import RAction
from src.plugins.domain.rmenu import RMenu
from src.settings.settings import Settings
from src.i18n import _
class OpenFilePlugin(PluginBase):
def __init__(self):
super(OpenFilePlugin, self).__init__()
self.settings = None
def set_settings(self, settings: Settings):
self.settings = settings
def _action_open_file(self) -> RAction:
open_file = RAction(_("&Open..."), self._open_file_dialog, shortcut='Ctrl+O',
icon_from_theme="document-open")
return open_file
def _sub_menu_recent_files(self) -> RMenu:
self._menu_recent_files = RMenu(_("Open &Recent"), icon_from_theme="document-open-recent")
self._update_recent_files_menu()
return self._menu_recent_files
def get_menu_contributions(self) -> [MenuContribution]:
return [
MenuContribution("file", action=self._action_open_file(), action_id="open file"),
MenuContribution("file", menu=self._sub_menu_recent_files(), action_id="recent files menu"),
]
def _open_file_dialog(self) -> None:
current_file = PluginRegistry.execute_single("current_file")
directory = os.path.dirname(current_file) if current_file else os.path.join(Path.home())
dialog = QFileDialog()
(selected_file, _filter) = dialog.getOpenFileName(
caption=_("Open File"),
dir=directory
)
# directory=directory
if selected_file:
self.open_file(selected_file)
def open_file(self, selected_file: str):
tab = PluginRegistry.execute_single("create_tab", selected_file)
if tab:
PluginRegistry.execute_single("add_tab", tab)
PluginRegistry.execute("after_open_file", selected_file)
def _get_recent_files(self) -> [str]:
recent_files = self.settings.session.get('general', 'recent_files', fallback='')
# print(recent_files)
files = recent_files.split(os.pathsep)
if "" in files:
files.remove("")
return files
def _update_recent_files_menu(self):
self._menu_recent_files.clear()
files = self._get_recent_files()
for file in files:
action = RAction(os.path.basename(file))
action.set_action(lambda _="", f=file: self.open_file(f))
self._menu_recent_files.add_action(action)
def _remember_recent_file(self, file: str):
files = self._get_recent_files()
if file in files:
files.remove(file)
files.insert(0, file)
recent_files = os.pathsep.join(files[:10])
self.settings.set_session('general', 'recent_files', recent_files)
self._update_recent_files_menu()
def after_open_file(self, file: str):
self._remember_recent_file(file)
def after_start(self):
open_files_as_string = self.settings.get_session('general', 'open_files', fallback='')
files = open_files_as_string.split(os.pathsep)
if "" in files:
files.remove("")
for file in files:
self.open_file(file)
def before_shutdown(self):
open_files = PluginRegistry.execute_single("get_open_files")
if open_files:
open_files_as_string = os.pathsep.join(open_files)
self.settings.set_session('general', 'open_files', open_files_as_string)

0
src/settings/__init__.py Normal file
View File

View File

@@ -0,0 +1,13 @@
from PySide6.QtCore import QSettings
class CuteSettings:
def __init__(self):
self._settings = QSettings("krowlog", "settings")
def set_value(self, key: str, value: any):
self._settings.setValue(key, value)
def value(self, key: str, default=None):
return self._settings.value(key, default)

23
src/settings/settings.py Normal file
View File

@@ -0,0 +1,23 @@
from configparser import ConfigParser
class Settings():
def __init__(self, session: ConfigParser):
self.session = session
def set_session(self, section: str, option: str, value: str):
return self.session.set(section, option, value)
def get_session(self, section: str, option: str, fallback: str = object()) -> str:
return self.session.get(section, option, fallback=fallback)
def getint_session(self, section: str, option: str) -> int:
return self.session.getint(section, option)
def getboolean_session(self, section: str, option: str) -> bool:
return self.session.getboolean(section, option)
@staticmethod
def max_line_length():
return 4096

View File

@@ -0,0 +1,45 @@
import os
import sys
from pathlib import Path
from os.path import join
from configparser import ConfigParser
from src.settings.settings import Settings
class SettingsStore():
def __init__(self):
pass
@staticmethod
def _session_file() -> str:
if sys.platform == 'win32' or sys.platform == 'cygwin':
return join(Path.home(), "AppData", "Local", "krowlog", "session.ini")
else:
return join(Path.home(), ".local", "share", "krowlog", "session.ini")
@staticmethod
def load() -> Settings:
session = SettingsStore._load_session()
return Settings(session)
@staticmethod
def _load_session() -> ConfigParser:
session_file = SettingsStore._session_file()
session = ConfigParser()
# apply default settings
session.add_section('general')
session.set('general', 'font_size', '12')
session.set('general', 'highlight_search_term', 'True')
session.set('general', 'open_tab_on_save_as_file', 'True')
session.read(session_file)
return session
@staticmethod
def save(settings: Settings):
session_file = SettingsStore._session_file()
dir = os.path.dirname(session_file)
os.makedirs(dir, exist_ok=True)
with open(session_file, 'w+') as fp:
settings.session.write(fp)

54
src/ui/ScaledScrollBar.py Normal file
View File

@@ -0,0 +1,54 @@
import math
from PySide6.QtWidgets import QScrollBar
from PySide6.QtCore import Signal
import logging
log = logging.getLogger("scaledScrollBar")
class ScaledScrollBar(QScrollBar):
is_huge = False
scaledValueChanged = Signal(str)
"""Signal emitted when the scroll bar value changes.
**Note**: The value is a string and must be parsed into an int.
QT's signal api only supports 32bit integers. Ints larger
than 2**32-1 will overflow. Probably because there is some C/C++
code involved. We work around this by converting the python int
into a string."""
def __init__(self):
super(ScaledScrollBar, self).__init__()
self.real_maximum = self.maximum()
super().valueChanged.connect(self._valueChanged)
def setValue(self, value: int) -> None:
if self.is_huge:
real_position = value / self.real_maximum
super().setValue(round(self.maximum() * real_position))
else:
super().setValue(value)
def setMaximum(self, maximum: int) -> None:
if maximum > 2 ** 31:
new_maximum = 1000 * math.log2(maximum)
super().setMaximum(math.ceil(new_maximum))
self.real_maximum = maximum
if not self.is_huge:
old_position = self.value() / self.maximum()
self.setValue(round(new_maximum * old_position))
self.is_huge = True
else:
self.is_huge = False
super().setMaximum(maximum)
def _valueChanged(self, value: int):
if self.is_huge:
real_value = (value / self.maximum()) * self.real_maximum
# print("scaled value changed: %d -> %d (%f)" % (value, real_value, value / self.maximum()))
self.scaledValueChanged.emit(str(int(real_value)))
else:
self.scaledValueChanged.emit(str(int(value)))

0
src/ui/__init__.py Normal file
View File

View File

467
src/ui/bigtext/bigtext.py Normal file
View File

@@ -0,0 +1,467 @@
import sys
import math
import os
import time
from typing import Callable, List
from PySide6 import QtGui
from PySide6.QtCore import *
from PySide6.QtGui import *
from PySide6.QtGui import QMouseEvent
from PySide6.QtWidgets import *
import constants
from src.ui.ScaledScrollBar import ScaledScrollBar
from src.ui.bigtext.highlight_selection import HighlightSelection
from src.ui.bigtext.highlighted_range import HighlightedRange
from src.ui.bigtext.highlightingdialog import HighlightingDialog
from src.ui.bigtext.line import Line
from src.ui.bigtext.logFileModel import LogFileModel
from src.util.conversion import humanbytes
from src.pluginregistry import PluginRegistry
from src.settings.settings import Settings
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from src.i18n import _
class FileObserver(FileSystemEventHandler):
def __init__(self, big_text):
super(FileObserver, self).__init__()
self.big_text = big_text
def on_modified(self, event):
# slow down the updates. This is needed, because the file is modified
# constantly, which would lead to constant re-rendering, which would
# block the UI thread and make the UI unresponsive.
# Note: we don't miss events, because they are queued and de-duplicated
time.sleep(0.5)
self.big_text.update()
class FileWatchdogThread(QRunnable):
def __init__(self, big_text, file: str):
super(FileWatchdogThread, self).__init__()
self.file = file
self.big_text = big_text
self.observer = Observer()
def run(self) -> None:
self.observer.schedule(FileObserver(self.big_text), self.file)
self.observer.start()
def destruct(self):
self.observer.stop()
# self.observer.join(1)
class BigText(QWidget):
def __init__(self, model: LogFileModel):
super(BigText, self).__init__()
self.model = model
self.watchdog = FileWatchdogThread(self, model.get_file())
QThreadPool.globalInstance().start(self.watchdog)
self.grid = QGridLayout()
self.grid.setContentsMargins(0, 0, 0, 0)
self.grid.setHorizontalSpacing(0)
self.grid.setVerticalSpacing(0)
self.setLayout(self.grid)
self.big_text = InnerBigText(self, model)
self.big_text.setSizePolicy(QSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding))
self.h_scroll_bar = QScrollBar(Qt.Orientation.Horizontal)
self.h_scroll_bar.setMinimum(0)
self.h_scroll_bar.setMaximum(1)
self.h_scroll_bar.valueChanged.connect(self.big_text.h_scroll_event)
self.v_scroll_bar = ScaledScrollBar()
# self.v_scroll_bar.setPageStep(1)
self.v_scroll_bar.scaledValueChanged.connect(self.big_text.v_scroll_event)
self.grid.addWidget(self.big_text, 0, 0)
self.grid.addWidget(self.h_scroll_bar, 1, 0)
self.grid.addWidget(self.v_scroll_bar, 0, 1)
def get_file(self):
return self.model.get_file()
def add_line_click_listener(self, listener: Callable[[int], None]):
"""
:param listener: a callable, the parameter is the byte offset of the clicked line
:return:
"""
self.big_text.line_click_listeners.append(listener)
def scroll_to_byte(self, byte_offset: int):
self.big_text.scroll_to_byte(byte_offset)
def destruct(self):
self.watchdog.destruct()
pass
class InnerBigText(QWidget):
_byte_offset = 0
_left_offset = 0
scroll_lines = 0
longest_line = 0
def __init__(self, parent: BigText, model: LogFileModel):
super(InnerBigText, self).__init__()
self.model = model
self.parent = parent
self.setFocusPolicy(Qt.FocusPolicy.StrongFocus)
self.setFocusPolicy(Qt.FocusPolicy.WheelFocus)
self.setContextMenuPolicy(Qt.ContextMenuPolicy.CustomContextMenu)
self.customContextMenuRequested.connect(self._open_menu)
self.update_font_metrics(QPainter(self))
self.lines = List[Line]
self.selection_highlight = HighlightSelection()
self._last_double_click_time = 0
self._last_double_click_line_number = -1
self.line_click_listeners: [Callable[[int], None]] = []
def keyPressEvent(self, e: QKeyEvent) -> None:
# print("%s + %s" % (e.keyCombination().keyboardModifiers(), e.key()))
if e.modifiers() == Qt.KeyboardModifier.NoModifier:
lines_to_scroll = math.floor(self.lines_shown()) - 1
if e.key() == Qt.Key.Key_PageUp:
self.scroll_by_lines(-lines_to_scroll)
if e.key() == Qt.Key.Key_PageDown:
self.scroll_by_lines(lines_to_scroll)
if e.key() == 16777235: # page up
self.scroll_by_lines(-3)
if e.key() == 16777237: # page down
self.scroll_by_lines(3)
elif e.modifiers() == Qt.KeyboardModifier.ControlModifier and e.key() == 67: # ctrl + c
self.copy_selection()
elif e.modifiers() == Qt.KeyboardModifier.ControlModifier and e.key() == 65: # ctrl + a
self._select_all()
def wheelEvent(self, event: QWheelEvent):
direction = 1 if event.angleDelta().y() < 0 else -1
if event.modifiers() == Qt.KeyboardModifier.ControlModifier:
# self.model.settings.update_font_size(-direction)
old_font_size = self.model.settings.getint_session('general', 'font_size')
new_font_size = max(4, min(50, old_font_size - direction))
self.model.settings.set_session('general', 'font_size', str(new_font_size))
PluginRegistry.execute("update_ui")
self.update()
else:
# print("wheel event fired :) %s" % (direction))
self.scroll_by_lines(direction * 3)
def _open_menu(self, position):
menu = QMenu(self)
copy_clipboard = QAction(QIcon.fromTheme("edit-copy"), _("&Copy to Clipboard"), self,
triggered=self.copy_selection)
copy_clipboard.setShortcut("CTRL+C")
copy_clipboard.setDisabled(not self._has_selection())
menu.addAction(copy_clipboard)
copy_to_file = QAction(QIcon.fromTheme("document-save-as"), _("Copy to &File"), self,
triggered=self._copy_selection_to_file)
copy_to_file.setDisabled(not self._has_selection())
menu.addAction(copy_to_file)
select_all = QAction(QIcon.fromTheme("edit-select-all"), _("Select &All"), self,
triggered=self._select_all)
select_all.setShortcut("CTRL+A")
menu.addAction(select_all)
manage_highlighting = QAction(
_("&Highlighter"),
self,
triggered=lambda: HighlightingDialog(self.model.settings).exec())
manage_highlighting.setShortcut("CTRL+H")
menu.addAction(manage_highlighting)
menu.exec(self.mapToGlobal(position))
def scroll_by_lines(self, scroll_lines: int):
self.scroll_lines = scroll_lines
self.update()
self.parent.v_scroll_bar.setValue(self._byte_offset)
def scroll_to_byte(self, byte_offset: int):
self._byte_offset = min(byte_offset, self.model.byte_count())
self.update()
self.parent.v_scroll_bar.setValue(self._byte_offset)
def mousePressEvent(self, e: QtGui.QMouseEvent) -> None:
if e.buttons() == Qt.MouseButton.LeftButton and e.modifiers() == Qt.KeyboardModifier.ShiftModifier:
offset = self.to_byte_offset(e)
self.selection_highlight.set_end_byte(offset)
self.update()
return
if e.buttons() == Qt.MouseButton.LeftButton and (time.time() - self._last_double_click_time) < 0.5:
# triple click: select line
line_number = self.y_pos_to_line(e.pos().y())
if line_number == self._last_double_click_line_number and line_number < len(self.lines):
line: Line = self.lines[line_number]
self.selection_highlight.set_start(line.byte_offset())
self.selection_highlight.set_end_byte(line.byte_end())
self.update()
return
if e.buttons() == Qt.MouseButton.LeftButton and e.modifiers() == Qt.KeyboardModifier.NoModifier:
offset = self.to_byte_offset(e)
self.selection_highlight.set_start(offset)
self.selection_highlight.set_end_byte(offset)
self.update()
line_number = self.y_pos_to_line(e.pos().y())
if line_number < len(self.lines):
line: Line = self.lines[line_number]
for listener in self.line_click_listeners:
listener(line.byte_offset())
def mouseDoubleClickEvent(self, e: QtGui.QMouseEvent) -> None:
if e.buttons() == Qt.MouseButton.LeftButton:
self._last_double_click_time = time.time()
self._last_double_click_line_number = self.y_pos_to_line(e.pos().y())
offset = self.to_byte_offset(e)
(_word, start_byte, end_byte) = self.model.read_word_at(offset)
if start_byte >= 0 and end_byte >= 0:
self.selection_highlight.set_start(start_byte)
self.selection_highlight.set_end_byte(end_byte)
else:
self.selection_highlight.set_start(offset)
self.selection_highlight.set_end_byte(offset)
self.update()
def mouseMoveEvent(self, e: QMouseEvent):
if e.buttons() != Qt.MouseButton.LeftButton:
return
current_byte = self.to_byte_offset(e)
if self.selection_highlight.end_byte != current_byte:
self.selection_highlight.set_end_byte(current_byte)
self.update()
# print("-> %s,%s" %(self._selection_start_byte, self._selection_end_byte))
line_number = self.y_pos_to_line(e.pos().y())
column_in_line = self.x_pos_to_column(e.pos().x())
if line_number < 0:
self.scroll_by_lines(-1)
if line_number > int(self.lines_shown()):
self.scroll_by_lines(1)
if column_in_line <= 1:
self._left_offset = max(0, self._left_offset - 2)
self.update()
if column_in_line + 1 >= self.columns_shown():
self._left_offset = self._left_offset + 2
self.update()
@Slot()
def h_scroll_event(self, left_offset: int):
self._left_offset = left_offset
# print("left_offset: %d" % left_offset)
self.update()
@Slot()
def v_scroll_event(self, byte_offset: str):
self._byte_offset = int(byte_offset)
self.update()
def update_longest_line(self, length: int):
width_in_chars = self.width() / self.char_width
# print("width_in_chars: %d" % width_in_chars)
if self.longest_line < length:
self.longest_line = length
maximum = max(0, length - width_in_chars + 1)
self.parent.h_scroll_bar.setMaximum(round(maximum))
def y_pos_to_line(self, y: int) -> int:
return int(y / self.char_height)
def x_pos_to_column(self, x: int) -> int:
return round(x / self.char_width)
def lines_shown(self) -> float:
return self.height() / float(self.char_height)
def columns_shown(self) -> float:
return self.width() / float(self.char_width)
def to_byte_offset(self, e: QMouseEvent) -> int:
line_number = self.y_pos_to_line(e.pos().y())
if line_number < len(self.lines):
line: Line = self.lines[line_number]
column_in_line = self.x_pos_to_column(e.pos().x()) + self._left_offset
column_in_line = min(column_in_line, line.length_in_columns()) # x was behind the last column of this line
char_in_line = line.column_to_char(column_in_line)
# print("%s in line %s lcolumn_in_line=%s" % (char_in_line, line_number, column_in_line))
byte_in_line = line.char_index_to_byte(char_in_line)
current_byte = line.byte_offset() + byte_in_line
# print("%s + %s = %s" % (line.byte_offset(), char_in_line, current_byte))
else:
current_byte = self.model.byte_count()
return current_byte
def _has_selection(self):
return self.selection_highlight.start_byte != self.selection_highlight.end_byte
def copy_selection(self):
if self._has_selection():
start = min(self.selection_highlight.start_byte, self.selection_highlight.end_byte)
end = max(self.selection_highlight.start_byte, self.selection_highlight.end_byte)
bytes_human_readable = humanbytes(end - start)
if end - start > (1024 ** 2) * 5:
you_sure = QMessageBox(
QMessageBox.Icon.Warning,
_("data selection"),
_(
"You have selected <b>{0}</b> of data.").format(bytes_human_readable))
you_sure.setStandardButtons(QMessageBox.Cancel)
copy_btn = you_sure.addButton(_("Copy {0} to Clipboard").format(bytes_human_readable),
QMessageBox.ActionRole)
write_btn = you_sure.addButton(_("Write to File"), QMessageBox.ActionRole)
you_sure.setDefaultButton(QMessageBox.StandardButton.Cancel)
you_sure.exec()
if you_sure.clickedButton() == write_btn:
self._copy_selection_to_file()
return
if you_sure.clickedButton() != copy_btn:
# abort
print("abort")
return
selected_text = self.model.read_range(start, end)
cb = QApplication.clipboard()
cb.setText(selected_text)
def _copy_selection_to_file(self):
if self._has_selection():
start = min(self.selection_highlight.start_byte, self.selection_highlight.end_byte)
end = max(self.selection_highlight.start_byte, self.selection_highlight.end_byte)
dialog = QFileDialog(self)
(selected_file, _filter) = dialog.getSaveFileName(
caption=_("Save File"),
dir=os.path.dirname(self.model.get_file())
)
if selected_file:
self.model.write_range(start, end, selected_file)
open_tab = self.model.settings.session.getboolean("general", "open_tab_on_save_as_file")
if open_tab:
PluginRegistry.execute("open_file", selected_file)
def _select_all(self):
self.selection_highlight.start_byte = 0
self.selection_highlight.end_byte = self.model.byte_count()
self.update()
def paintEvent(self, event: QPaintEvent) -> None:
# print("paintEvent")
painter = QPainter(self)
# painter.setFont(self.model.settings.font())
# Courier New, DejaVu Sans Mono, Monospace, Liberation Mono, Noto Mono, Nimbus Mono L, Tlwg Mono, Ubuntu Mono, FreeMono, Mitra Mono
font = "Courier New" if sys.platform == 'win32' or sys.platform == 'cygwin' else "Monospace"
painter.setFont(QFont("Courier New", self.model.settings.getint_session('general', "font_size")))
painter.setPen(QColor(0, 0, 0))
self.update_font_metrics(painter)
tab_string = " " * constants.tab_width
lines_to_show = self.lines_shown()
# print("%s / %s = %s" %(self.height(), float(self.char_height), lines_to_show))
self.lines = self.model.data(self._byte_offset, self.scroll_lines, lines_to_show)
# print("lines_to_show: %d returned: %d" % (lines_to_show, len(self.lines)))
self.scroll_lines = 0
self._byte_offset = self.lines[0].byte_offset() if len(self.lines) > 0 else 0
# print("new byte offset: ", self._byte_offset)
# document length == maximum + pageStep + aFewBytesSoThatTheLastLineIsShown
self.parent.v_scroll_bar.setMaximum(self.model.byte_count() - 1)
for l in self.lines:
self.update_longest_line(len(l.line()))
highlighters = self.model.highlighters()
if self.model.get_query_highlight():
highlighters = highlighters + [self.model.get_query_highlight()]
highlighters = highlighters + [self.selection_highlight] # selection highlight should be last
# draw hightlights first - some characters may overlap to the next line
# by drawing the background hightlights first we prevent that the hightlight
# draws over a character
start = time.time()
y_line_offset = self.char_height;
for l in self.lines:
highlight_ranges = []
for h in highlighters:
optional_highlight_range = h.compute_highlight(l)
if optional_highlight_range:
highlight_ranges = highlight_ranges + optional_highlight_range
self.draw_highlights(highlight_ranges, painter, y_line_offset)
y_line_offset = y_line_offset + self.char_height
end = time.time()
# print("highlight duration: %.3f" %((end-start)*1000))
left_offset = int(-1 * self._left_offset * self.char_width)
y_line_offset = self.char_height;
for l in self.lines:
text = l.line_tabs_replaced()
painter.drawText(left_offset, y_line_offset, text)
y_line_offset = y_line_offset + self.char_height
painter.end()
def draw_highlights(self, highlights: [HighlightedRange], painter: QPainter, y_line_offset: int):
for highlight in highlights:
if highlight.is_highlight_full_line():
left_offset = -1 * self._left_offset * self.char_width
y1 = y_line_offset - self.char_height + self.char_height / 7
height = self.char_height
full_width = Settings.max_line_length() * self.char_width
rect = QRect(round(left_offset), round(y1), round(full_width), round(height))
self.highlight_background(painter, rect, highlight.get_brush_full_line())
for highlight in highlights:
left_offset = self._left_offset * self.char_width
x1 = highlight.get_start() * self.char_width
width = highlight.get_width() * self.char_width
y1 = y_line_offset - self.char_height + self.char_height / 7
height = self.char_height
rect = QRect(round(x1 - left_offset), round(y1), round(width), round(height))
self.highlight_background(painter, rect, highlight.get_brush())
def highlight_background(self, painter: QPainter, rect: QRect, brush: QBrush):
old_brush = painter.brush()
old_pen = painter.pen()
painter.setBrush(brush)
painter.setPen(Qt.PenStyle.NoPen)
painter.drawRoundedRect(rect, 3.0, 3.0)
painter.setBrush(old_brush)
painter.setPen(old_pen)
def update_font_metrics(self, painter: QPainter):
fm: QFontMetrics = painter.fontMetrics()
self.char_height = fm.height()
self.char_width = fm.averageCharWidth() # all chars have same width for monospace font
text = "012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
self.char_width = fm.horizontalAdvance(text) / float(len(text))
# print("font width=%s height=%s" % (self.char_width, self.char_height))

View File

@@ -0,0 +1,12 @@
from typing import Optional, List
from src.ui.bigtext.line import Line
from src.ui.bigtext.highlighted_range import HighlightedRange
class Highlight:
def __init__(self):
pass
def compute_highlight(self, line: Line) -> Optional[List[HighlightedRange]]:
return None

View File

@@ -0,0 +1,64 @@
from typing import Optional
from src.ui.bigtext.highlight import Highlight
from src.ui.bigtext.highlighted_range import HighlightedRange
from src.ui.bigtext.line import Line
from PySide6.QtGui import QBrush, QColor
from typing import List
import re
class HighlightRegex(Highlight):
def __init__(self, query: str, ignore_case: bool, is_regex: bool, hit_background_color: str = "None",
line_background_color: str = "None"):
self.query = query
self.ignore_case = ignore_case
self.is_regex = is_regex
self.regex = self._get_regex()
self.hit_background_color = hit_background_color
self.line_background_color = line_background_color
self._brush_hit = self.brush(self.hit_background_color)
self._brush_line = self.brush(self.line_background_color)
def _get_regex(self):
flags = re.IGNORECASE if self.ignore_case else 0
if self.is_regex:
return re.compile(self.query, flags=flags)
else:
return re.compile(re.escape(self.query), flags=flags)
def compute_highlight(self, line: Line) -> Optional[List[HighlightedRange]]:
result = []
# print("execute regex: %s in %s" % (self.regex, line.line()))
match_iter = re.finditer(self.regex, line.line())
for match in match_iter:
# print("%s" % match)
start_char = match.start(0)
end_char = match.end(0)
start_column = line.char_to_column(start_char)
end_column = line.char_to_column(end_char)
result.append(HighlightedRange(
start_column,
end_column - start_column,
highlight_full_line=True,
brush=self._brush_hit,
brush_full_line=self._brush_line
))
return result
def hit_background_brush(self):
return self.brush(self.hit_background_color)
@staticmethod
def brush(color: str) -> QBrush:
if re.match("[0-9a-f]{6}", color, flags=re.IGNORECASE):
red = int(color[0:2], 16)
green = int(color[2:4], 16)
blue = int(color[4:6], 16)
return QBrush(QColor(red, green, blue))
return QBrush()

View File

@@ -0,0 +1,56 @@
from typing import Optional, List
from src.ui.bigtext.highlight import Highlight
from src.ui.bigtext.highlighted_range import HighlightedRange
from src.ui.bigtext.line import Line
from PySide6.QtCore import Qt
from PySide6.QtGui import QBrush, QColor
from src.settings.settings import Settings
class HighlightSelection(Highlight):
start_byte = 0
end_byte = 0
def set_start(self, start_byte):
self.start_byte = start_byte
def set_end_byte(self, end_byte):
self.end_byte = end_byte
def compute_highlight(self, line: Line) -> Optional[List[HighlightedRange]]:
begin = min(self.start_byte, self.end_byte)
end = max(self.start_byte, self.end_byte)
if line.intersects(begin, end):
if line.includes_byte(begin):
start_byte_in_line = begin - line.byte_offset()
else:
start_byte_in_line = 0
start_char = line.byte_index_to_char_index(start_byte_in_line)
if line.includes_byte(end):
length_in_bytes = end - line.byte_offset() - start_byte_in_line
end_char = line.byte_index_to_char_index(start_byte_in_line + length_in_bytes)
else:
# renders the highlighting to the end of the line
# this is how selections usually behave
length_in_bytes = Settings.max_line_length() - start_byte_in_line
# note: this mixes chars and bytes, but that should not matter, because
# it just means that we render the highlight into the invisible range on the right
end_char = start_char + length_in_bytes
start_column = line.char_to_column(start_char)
end_column = line.char_to_column(end_char)
if end_column >= 0:
length_in_columns = end_column - start_column
else:
length_in_columns = 4096
return [HighlightedRange(start_column, length_in_columns, brush=QBrush(QColor(156, 215, 255, 192)),
pen=Qt.PenStyle.NoPen)]
else:
return None

View File

@@ -0,0 +1,38 @@
from PySide6.QtCore import Qt
from PySide6.QtGui import QBrush, QPen
class HighlightedRange:
def __init__(
self,
start: int,
width: int,
highlight_full_line=False,
brush: QBrush = QBrush(),
pen: QPen = Qt.PenStyle.NoPen,
brush_full_line: QBrush = QBrush()
):
self.start = start
self.width = width
self.brush = brush
self.pen = pen
self.highlight_full_line = highlight_full_line
self.brush_full_line = brush_full_line
def is_highlight_full_line(self):
return self.highlight_full_line
def get_start(self):
return self.start
def get_width(self):
return self.width
def get_brush(self):
return self.brush
def get_pen(self):
return self.pen
def get_brush_full_line(self):
return self.brush_full_line

View File

@@ -0,0 +1,63 @@
import logging
from src.settings.settings import Settings
from src.ui.bigtext.highlight_regex import HighlightRegex
log = logging.getLogger("highlighting")
class Highlighting:
@staticmethod
def read_config(settings: Settings) -> [HighlightRegex]:
result = []
session = settings.session
for section in session.sections():
if not section.startswith("highlighting."):
continue
query = session.get(section, "query", fallback="")
if len(query) == 0:
continue
ignore_case = session.getboolean(section, "ignore-case", fallback=True)
is_regex = session.getboolean(section, "is-regex", fallback=False)
line_background_color = session.get(section, "line.background.color", fallback="None")
hit_background_color = session.get(section, "hit.background.color", fallback="None")
try:
highlight = HighlightRegex(
query=query,
ignore_case=ignore_case,
is_regex=is_regex,
hit_background_color=hit_background_color,
line_background_color=line_background_color
)
result.append(highlight)
except:
log.exception("failed to parse query for highlighter: %s" % section)
continue
return result
@staticmethod
def write_config(settings: Settings, highlighters: [HighlightRegex]):
Highlighting.remove_highlighting_sections(settings)
section_counter = 0
for highlighter in highlighters:
highlighter: HighlightRegex = highlighter
section = "highlighting.%d" % section_counter
section_counter = section_counter + 1
settings.session.add_section(section)
settings.session.set(section, "query", highlighter.query)
settings.session.set(section, "ignore-case", str(highlighter.ignore_case))
settings.session.set(section, "is-regex", str(highlighter.is_regex))
settings.session.set(section, "line.background.color", highlighter.line_background_color)
settings.session.set(section, "hit.background.color", highlighter.hit_background_color)
@staticmethod
def remove_highlighting_sections(settings: Settings):
for section in settings.session.sections():
if not section.startswith("highlighting."):
continue
settings.session.remove_section(section)

View File

@@ -0,0 +1,201 @@
from PySide6.QtGui import QIcon
from PySide6.QtWidgets import QDialog, QLineEdit, QLabel, QGridLayout, QCheckBox, QListWidget, QListWidgetItem, \
QPushButton, QDialogButtonBox, QMessageBox, QSizePolicy
from src.ui.bigtext.highlight_regex import HighlightRegex
from src.ui.bigtext.highlighting import Highlighting
from src.ui.colorbutton import ColorButton
from src.ui.hbox import HBox
from src.settings.settings import Settings
from src.i18n import _
class PayloadItem(QListWidgetItem):
def __init__(self, text: str, payload=None):
super(PayloadItem, self).__init__(text)
self.payload = payload
class HighlightingDialog(QDialog):
def __init__(self, settings: Settings):
super(HighlightingDialog, self).__init__()
self.setWindowTitle(_("Manage Highlighting"))
self.setModal(True)
self._settings = settings
form_grid = QGridLayout(self)
self.layout = form_grid
row = 0
self.list = QListWidget(self)
form_grid.addWidget(self.list, row, 0, 1, 2)
row = row + 1
self.btn_add = QPushButton(QIcon.fromTheme("list-add"), _("Add"))
self.btn_add.setSizePolicy(QSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Fixed))
self.btn_add.pressed.connect(self._add)
self.btn_update = QPushButton(QIcon.fromTheme("stock_edit"), _("Update"))
self.btn_update.setSizePolicy(QSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Fixed))
self.btn_update.pressed.connect(self._update)
self.btn_delete = QPushButton(QIcon.fromTheme("list-remove"), _("Remove"))
self.btn_delete.setSizePolicy(QSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Fixed))
self.btn_delete.pressed.connect(self._delete)
self.btn_move_up = QPushButton(QIcon.fromTheme("go-up"), _("Up"))
self.btn_move_up.setSizePolicy(QSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Fixed))
self.btn_move_up.pressed.connect(self._move_up)
self.btn_move_down = QPushButton(QIcon.fromTheme("go-down"), _("Down"))
self.btn_move_down.setSizePolicy(QSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Fixed))
self.btn_move_down.pressed.connect(self._move_down)
button_box = HBox(self.btn_update, self.btn_add, self.btn_delete, self.btn_move_up, self.btn_move_down)
form_grid.addWidget(button_box, row, 0, 1, 2)
row = row + 1
self.query = QLineEdit(self)
form_grid.addWidget(QLabel(_("Query:")), row, 0)
form_grid.addWidget(self.query, row, 1)
row = row + 1
self.ignore_case = QCheckBox(_("Ignore Case"))
self.ignore_case.setChecked(True)
form_grid.addWidget(self.ignore_case, row, 0, 1, 2)
row = row + 1
self.is_regex = QCheckBox(_("Regular Expression"))
self.is_regex.setChecked(True)
form_grid.addWidget(self.is_regex, row, 0, 1, 2)
row = row + 1
form_grid.addWidget(QLabel(_("Hit Background:")), row, 0)
self.hit_background_color = ColorButton("ccb400")
form_grid.addWidget(self.hit_background_color, row, 1)
row = row + 1
form_grid.addWidget(QLabel(_("Line Background:")), row, 0)
self.line_background_color = ColorButton("fff080")
form_grid.addWidget(self.line_background_color, row, 1)
row = row + 1
self.buttons = QDialogButtonBox()
self.buttons.setStandardButtons(QDialogButtonBox.StandardButton.Cancel | QDialogButtonBox.StandardButton.Ok)
self.buttons.accepted.connect(self._save)
self.buttons.rejected.connect(self.close)
form_grid.addWidget(self.buttons, row, 0, 1, 2)
self._load_existing_hightlighters()
self.list.setCurrentItem(None)
self._selection_changed()
self.list.itemSelectionChanged.connect(self._selection_changed)
def _add(self):
highlighter = HighlightRegex(
self.query.text(),
self.ignore_case.isChecked(),
self.is_regex.isChecked(),
self.hit_background_color.color,
self.line_background_color.color
)
item = PayloadItem(self.query.text(), highlighter)
item.setBackground(highlighter.hit_background_brush())
self.list.addItem(item)
self.list.setCurrentItem(item)
def _update(self):
item: PayloadItem = self.list.currentItem()
highlighter: HighlightRegex = item.payload
highlighter.query = self.query.text()
highlighter.ignore_case = self.ignore_case.isChecked()
highlighter.is_regex = self.is_regex.isChecked()
highlighter.hit_background_color = self.hit_background_color.color
highlighter.line_background_color = self.line_background_color.color
item.setText(self.query.text())
item.setBackground(highlighter.hit_background_brush())
def _delete(self):
index = self.list.selectedIndexes()[0]
selected_index = index.row()
self.list.takeItem(selected_index)
def _move_up(self):
index = self.list.currentIndex()
selected_index = index.row()
item = self.list.takeItem(selected_index)
self.list.insertItem(selected_index - 1, item)
self.list.setCurrentIndex(index.siblingAtRow(selected_index - 1))
def _move_down(self):
index = self.list.selectedIndexes()[0]
selected_index = index.row()
item = self.list.takeItem(selected_index)
self.list.insertItem(selected_index + 1, item)
self.list.setCurrentIndex(index.sibling(selected_index + 1, 0))
def _save(self):
if self._is_dirty():
unsaved = QMessageBox(QMessageBox.Icon.Question, _("unsaved changes"),
_("You have unsaved changes."))
unsaved.setStandardButtons(QMessageBox.Cancel | QMessageBox.StandardButton.Discard)
result = unsaved.exec()
if result == QMessageBox.StandardButton.Cancel:
return
highlighters = []
for index in range(0, self.list.count()):
item: PayloadItem = self.list.item(index)
highlighters.append(item.payload)
Highlighting.write_config(self._settings, highlighters)
self.close()
def _selection_changed(self):
if len(self.list.selectedIndexes()) == 0:
self.btn_update.setDisabled(True)
self.btn_delete.setDisabled(True)
self.btn_move_up.setDisabled(True)
self.btn_move_down.setDisabled(True)
if len(self.list.selectedIndexes()) == 1:
selected_index = self.list.selectedIndexes()[0].row()
self.btn_update.setDisabled(False)
self.btn_delete.setDisabled(False)
self.btn_move_up.setDisabled(selected_index == 0)
self.btn_move_down.setDisabled(selected_index + 1 >= self.list.count())
item: PayloadItem = self.list.item(selected_index)
highlighter: HighlightRegex = item.payload
self.query.setText(highlighter.query)
self.ignore_case.setChecked(highlighter.ignore_case)
self.is_regex.setChecked(highlighter.is_regex)
self.hit_background_color.set_color(highlighter.hit_background_color)
self.line_background_color.set_color(highlighter.line_background_color)
def _is_dirty(self):
if len(self.list.selectedIndexes()) == 0:
dirty = False
if len(self.list.selectedIndexes()) == 1:
item: PayloadItem = self.list.currentItem()
highlighter: HighlightRegex = item.payload
dirty = self.query.text() != highlighter.query \
or self.ignore_case.isChecked() != highlighter.ignore_case \
or self.is_regex.isChecked() != highlighter.is_regex \
or self.hit_background_color.color != highlighter.hit_background_color \
or self.line_background_color.color != highlighter.line_background_color
else:
dirty = False
return dirty
def _load_existing_hightlighters(self):
highlighters: [HighlightRegex] = Highlighting.read_config(self._settings)
first_item = None
for highlighter in highlighters:
item = PayloadItem(str(highlighter.query))
item.payload = highlighter
item.setBackground(highlighter.hit_background_brush())
self.list.addItem(item)
if not first_item:
first_item = item

115
src/ui/bigtext/line.py Normal file
View File

@@ -0,0 +1,115 @@
import unicodedata
import constants
class Line:
def __init__(self, byte_offset: int, byte_end: int, line: str):
self._byte_offset = byte_offset
self._byte_end = byte_end
self._line = line
self._cache_char_to_column()
def byte_offset(self) -> int:
return self._byte_offset
def byte_end(self) -> int:
return self._byte_end
def line(self) -> str:
return self._line
def length_in_charaters(self) -> int:
return len(self._line)
def length_in_columns(self) -> int:
return self.char_to_column(len(self._line) - 1)
def char_index_to_byte(self, char_in_line: int) -> int:
# todo this does not work with multibyte characters
# should probably be len(self.prefix(char_in_line-1).encode("utf8"))
return len(self.prefix(char_in_line).encode("utf8"))
def byte_index_to_char_index(self, byte_index: int) -> int:
prefix_bytes = self._line.encode("utf8")[:byte_index]
prefix_chars = prefix_bytes.decode("utf8", errors="ignore")
return len(prefix_chars)
def line_tabs_replaced(self):
line = self._line;
i = 0
offset = 0
result = ""
length = len(line)
while True:
tab_index = line.find("\t", offset)
if tab_index < 0:
break
result = result + line[offset:tab_index]
result = result + " " * (constants.tab_width - len(result) % constants.tab_width)
offset = tab_index + 1
result = result + line[offset:]
return result
def column_to_char(self, column_in_line: int) -> int:
if column_in_line in self._column_to_char_cache:
return self._column_to_char_cache[column_in_line]
return 0
def char_to_column(self, char_in_line: int) -> int:
if not char_in_line in self._char_to_column_cache:
# print("%d in %s" % (char_in_line, self._char_to_column_cache))
return -1
return self._char_to_column_cache[char_in_line]
def _cache_char_to_column(self):
self._char_to_column_cache = {}
self._column_to_char_cache = {}
result = 0
i = 0
self._char_to_column_cache[0] = 0
while i < len(self._line):
self._char_to_column_cache[i] = result
if not result in self._column_to_char_cache:
self._column_to_char_cache[result] = i
current_char = self._line[i]
if current_char == "\t":
result = result + constants.tab_width - result % constants.tab_width
else:
result = result + 1
i = i + 1
# ignore: Nonspacing Mark characters are decorations for the previous character.
# They do not take up space.
# For example the character Combining Diaeresis (U+0308, %CC%88) that adds two
# dots above the previous character. It can be used to create an 'ä' from an 'a'+'◌̈'.
# In url encoding this looks like: a%CC%88.
# todo there are many other character combinations that should be skipped
while i < len(self._line) and unicodedata.category(self._line[i]) == "Mn":
self._char_to_column_cache[i] = result - 1
if not result in self._column_to_char_cache:
self._column_to_char_cache[result - 1] = i
i = i + 1
def includes_byte(self, byte: int) -> bool:
return self._byte_offset <= byte <= self._byte_end
def intersects(self, start_byte: int, end_byte: int):
result = start_byte < self._byte_end and end_byte > self._byte_offset
# print("%d,%d in %d,%d" % (start_byte, end_byte, self._byte_offset, self._byte_end))
return result
def prefix(self, index: int) -> str:
return self._line[0:index]
def substr(self, offset: int, length: int) -> str:
return self._line[offset:offset+length]
def suffix(self, index: int) -> str:
return self._line[index:]
def __str__(self):
return "%s (%d->%d)" % (self._line, self._byte_offset, self._byte_end)

View File

@@ -0,0 +1,22 @@
import os
import tempfile
from typing import Optional
from src.util.int2intmap import Int2IntMap
class LineToLineMap:
def __init__(self):
(handle, self.tmpfilename) = tempfile.mkstemp()
os.close(handle)
self._int2intmap = Int2IntMap(self.tmpfilename)
def close(self):
self._int2intmap.close()
os.remove(self.tmpfilename)
def add_line(self, key_byte_start: int, value_byte_start):
self._int2intmap.add(key_byte_start, value_byte_start)
def get_line(self, key_byte_start: int) -> Optional[int]:
return self._int2intmap.find(key_byte_start)

View File

@@ -0,0 +1,164 @@
import math
import re
from typing import List, Optional
from PySide6.QtCore import Signal
from src.ui.bigtext.highlight_regex import HighlightRegex
from src.ui.bigtext.highlighting import Highlighting
from src.ui.bigtext.line import Line
import os
from src.settings.settings import Settings
class LogFileModel:
_query_highlight: Optional[HighlightRegex] = None
file_size_changed = Signal()
"""Fires when the file size changed. **Note:** uses strings,
because int in Qt signal are limited to 32bit."""
_file_size = -1
def __init__(self, file: str, settings: Settings):
self.settings = settings
self._file = os.path.realpath(file)
# self._lock = threading.RLock()
def highlighters(self):
return Highlighting.read_config(self.settings)
def get_file(self):
return self._file
def __str__(self):
return self._file
def get_query_highlight(self) -> Optional[HighlightRegex]:
if not self.settings.session.getboolean("general", "highlight_search_term"):
return None
return self._query_highlight
def clear_query_highlight(self):
self._query_highlight = None
def set_query_highlight(self, query: str, ignore_case: bool, is_regex: bool):
self._query_highlight = HighlightRegex(
query=query,
ignore_case=ignore_case,
is_regex=is_regex,
hit_background_color="ffff00")
def get_tab_name(self):
file_name = os.path.basename(self._file)
if len(file_name) > 35:
file_name = file_name[:15] + "..." + file_name[-15:]
return file_name
def read_range(self, start_byte: int, end_byte: int):
# with self._lock:
if True:
with open(self._file, 'rb') as f:
f.seek(start_byte)
bytes = f.read(end_byte - start_byte)
return bytes.decode("utf8", errors="ignore")
def write_range(self, start_byte: int, end_byte: int, file: str):
# print("write range: %d - %d -> %s" % (start_byte, end_byte, file))
# with self._lock, open(self._file, 'rb') as source, open(file, "w+b") as target:
with open(self._file, 'rb') as source, open(file, "w+b") as target:
offset = start_byte
source.seek(offset)
while offset < end_byte:
new_offset = min(offset + 1024 * 1024, end_byte)
buffer_size = new_offset - offset
buffer = source.read(buffer_size)
target.write(buffer)
offset = new_offset
def read_word_at(self, byte_offset: int) -> (str, int, int):
lines = self.data(byte_offset, 0, 1)
if len(lines) == 0:
return "", -1, -1
line: Line = lines[0]
if not line.includes_byte(byte_offset):
return "", -1, -1
offset_in_line = byte_offset - line.byte_offset()
char_index = line.byte_index_to_char_index(offset_in_line)
current_char = line.line()[char_index]
# print("read_word: char_index=%s, current_char=%s, line=%s" %(char_index, current_char, line.line()))
if not self._is_word_char(current_char):
return current_char, byte_offset, byte_offset + 1
start_in_line = line.byte_index_to_char_index(byte_offset - line.byte_offset())
while start_in_line - 1 >= 0 and self._is_word_char(line.line()[start_in_line - 1]):
start_in_line = start_in_line - 1
end_in_line = line.byte_index_to_char_index(byte_offset - line.byte_offset())
while end_in_line < len(line.line()) and self._is_word_char(line.line()[end_in_line]):
end_in_line = end_in_line + 1
start_byte = line.char_index_to_byte(start_in_line) + line.byte_offset()
end_byte = line.char_index_to_byte(end_in_line) + line.byte_offset()
return line.line()[start_in_line:end_in_line], start_byte, end_byte
def _is_word_char(self, char: str) -> bool:
return re.match(r"\w", char) is not None
def data(self, byte_offset: int, scroll_lines: int, lines: int) -> List[Line]:
# print("data(%s, %s, %s)" % (byte_offset, scroll_lines, lines))
lines_before_offset: List[Line] = []
lines_after_offset: List[Line] = []
lines_to_find = lines + abs(scroll_lines)
lines_to_return = math.ceil(lines)
# with self._lock:
if True:
# TODO handle lines longer than 4096 bytes
# TODO abort file open after a few secons: https://docs.python.org/3/library/signal.html#example
with open(self._file, 'rb') as f:
offset = min(byte_offset, self.byte_count())
# print("offset: %s byte_count: %d" % (offset, self.byte_count()))
offset = max(0, offset - self.settings.max_line_length())
eof_reached = True
f.seek(offset)
while l := f.readline():
new_offset = f.tell()
line = Line(offset, new_offset,
l.decode("utf8", errors="ignore") # .replace("\r", "").replace("\n", "")
)
# print("%s %s %s" %(line.byte_offset(), line.line(), line.byte_end()))
if line.byte_end() <= byte_offset: # line.byte_end() returns the end byte +1
lines_before_offset.append(line)
else:
lines_after_offset.append(line)
offset = f.tell()
if len(lines_after_offset) >= lines_to_find:
eof_reached = False
break
all_lines = lines_before_offset + lines_after_offset
start = max(0, len(lines_before_offset) + scroll_lines)
if start + lines_to_return - 1 < len(all_lines):
result = all_lines[start:start + lines_to_return]
else:
result = all_lines[-lines_to_return + 1:]
# print("returning %s lines" % (len(result)))
# if len(result) > 0:
# print("returning %s %d -> %d" % (result[0].line(), result[0].byte_offset(), result[0].byte_end()))
return result
def byte_count(self) -> int:
size = os.stat(self._file).st_size
if self._file_size != size:
# self.file_size_changed.emit(str(size))
self._file_size = size
return size
def write_line(self, line: str):
with open(self._file, 'a+b') as f:
f.write(line.encode("utf8"))
if not line.endswith("\n"):
f.write("\n".encode("utf8"))
def truncate(self):
with open(self._file, 'a') as f:
f.truncate(0)

116
src/ui/bigtext/testline.py Normal file
View File

@@ -0,0 +1,116 @@
import unittest
import unicodedata
from line import Line
class MyTestCase(unittest.TestCase):
def test_column_to_char(self):
byte_offset = 123
text = "\tabc\td\tef\tg" # will be rendered as: ....abc.d...ef..g where . represents a whitespace column
line = Line(byte_offset=byte_offset, byte_end=byte_offset + len(text.encode("utf8")), line=text)
self.assertEqual(0, line.column_to_char(0)) # the tab
self.assertEqual(0, line.column_to_char(1)) # the tab
self.assertEqual(0, line.column_to_char(2)) # the tab
self.assertEqual(0, line.column_to_char(3)) # last column of the tab
self.assertEqual(1, line.column_to_char(4)) # a
self.assertEqual(2, line.column_to_char(5)) # b
self.assertEqual(3, line.column_to_char(6)) # c
self.assertEqual(4, line.column_to_char(7)) # tab
self.assertEqual(5, line.column_to_char(8)) # d
self.assertEqual(6, line.column_to_char(9)) # tab
self.assertEqual(6, line.column_to_char(10)) # tab
self.assertEqual(6, line.column_to_char(11)) # tab
self.assertEqual(7, line.column_to_char(12)) # e
self.assertEqual(8, line.column_to_char(13)) # f
self.assertEqual(9, line.column_to_char(14)) # tab
self.assertEqual(9, line.column_to_char(15)) # tab
self.assertEqual(10, line.column_to_char(16)) # g
def test_column_to_char_ignore_nonspacing_mark_charaters(self):
"""
nonspacing mark charaters are those little decorations that are applied to the previous character,
e.g. x\u0308 to make ẍ
:return:
"""
byte_offset = 123
text = "x\u0308y\u0308z\u0308"
line = Line(byte_offset=byte_offset, byte_end=byte_offset + len(text.encode("utf8")), line=text)
self.assertEqual(0, line.column_to_char(0)) # ẍ
self.assertEqual(2, line.column_to_char(1)) # ÿ
self.assertEqual(4, line.column_to_char(2)) # z̈
def test_char_to_column(self):
byte_offset = 123
text = "\tabc\td\tef\tg" # will be rendered as: ....abc.d...ef..g where . represents a whitespace column
line = Line(byte_offset=byte_offset, byte_end=byte_offset + len(text.encode("utf8")), line=text)
self.assertEqual(0, line.char_to_column(0)) # tab
self.assertEqual(4, line.char_to_column(1)) # a
self.assertEqual(5, line.char_to_column(2)) # b
self.assertEqual(6, line.char_to_column(3)) # c
self.assertEqual(7, line.char_to_column(4)) # tab
self.assertEqual(8, line.char_to_column(5)) # d
self.assertEqual(9, line.char_to_column(6)) # tab
self.assertEqual(12, line.char_to_column(7)) # e
self.assertEqual(13, line.char_to_column(8)) # f
self.assertEqual(14, line.char_to_column(9)) # tab
self.assertEqual(16, line.char_to_column(10)) # g
def test_char_to_column_ignore_nonspacing_mark_charaters(self):
"""
nonspacing mark charaters are those little decorations that are applied to the previous character,
e.g. x\u0308 to make ẍ
:return:
"""
byte_offset = 123
text = "x\u0308y\u0308z\u0308"
print(text)
line = Line(byte_offset=byte_offset, byte_end=byte_offset + len(text.encode("utf8")), line=text)
self.assertEqual(0, line.char_to_column(0)) # ẍ
self.assertEqual(0, line.char_to_column(1)) # ẍ
self.assertEqual(1, line.char_to_column(2)) # ÿ
self.assertEqual(1, line.char_to_column(3)) # ÿ
self.assertEqual(2, line.char_to_column(4)) # z̈
self.assertEqual(2, line.char_to_column(5)) # z̈
def test_line_tabs_replaced(self):
byte_offset = 123
text = "\ta\tb" # will be rendered as: ....abc where . represents a whitespace column
expected = " a b"
line = Line(byte_offset=byte_offset, byte_end=byte_offset + len(text.encode("utf8")), line=text)
self.assertEqual(expected, line.line_tabs_replaced())
def test_line_tabs_replaced_performance(self):
byte_offset = 123
text = "a\t" * 10000
expected = "a " * 10000
line = Line(byte_offset=byte_offset, byte_end=byte_offset + len(text.encode("utf8")), line=text)
self.assertEqual(expected, line.line_tabs_replaced())
def test_byte_index_to_char_index(self):
byte_offset = 123
text = "x\u0308y\u0308z\u0308\t\u0308a"
print(text)
line = Line(byte_offset=byte_offset, byte_end=byte_offset + len(text.encode("utf8")), line=text)
self.assertEqual(0, line.byte_index_to_char_index(0)) # x
self.assertEqual(0, line.byte_index_to_char_index(1)) # first byte of diacritical mark belonging to x
self.assertEqual(0, line.byte_index_to_char_index(2)) # second byte of diacritical mark belonging to x
def test_diacritical_marks(self):
text = "̈ẍỏôŏ̮👍🏿"
text = "\U0001F9D9\u200D\u2642\uFE0F - \U0001F44D\U0001F3FF - a\u02c3 - ẍ - y\u0308 - w\u200D\u00A8"
text = unicodedata.normalize("NFD", text)
i = 0
print("%s" % text)
print("length: %s" % len(text))
while i < len(text):
c = text[i]
print("%s %s cat: %s" % (c, unicodedata.name(c), unicodedata.category(c)))
i = i + 1
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,174 @@
import unittest
import tempfile
from configparser import ConfigParser
from os.path import join
from logFileModel import LogFileModel
from src.settings.settings import Settings
class TestLogFileModel(unittest.TestCase):
def setUp(self):
self.test_dir = tempfile.TemporaryDirectory()
self.tmpfile = join(self.test_dir.name, "my.log")
self.model = LogFileModel(self.tmpfile, Settings(ConfigParser()))
def tearDown(self):
self.test_dir.cleanup()
def write_str(self, string: str):
with open(self.tmpfile, "w+b") as f:
f.write(string.encode("utf8"))
def test_load_from_beginning(self):
self.write_str("1\n2\n3\n4\n5\n6\n7\n")
expected_lines = ["1", "2", "3", "4", "5"]
lines = self.model.data(0, 0, 5)
line_str = [l.line() for l in lines]
self.assertEqual(expected_lines, line_str)
def test_load_from_middle_of_first_line(self):
self.write_str("abc\ndef\nghi\njkl")
expected_lines = ["abc", "def", "ghi"]
lines = self.model.data(1, 0, 3)
line_str = [l.line() for l in lines]
self.assertEqual(expected_lines, line_str)
def test_read_from_newline_character(self):
self.write_str("abc\ndef\nghi\njkl")
expected_lines = ["abc", "def"]
lines = self.model.data(3, 0, 2)
line_str = [l.line() for l in lines]
self.assertEqual(expected_lines, line_str)
def test_negative_byte_offset(self):
self.write_str("abc\ndef\nghi\njkl")
expected_lines = ["abc","def"]
lines = self.model.data(-1, 0, 2)
line_str = [l.line() for l in lines]
self.assertEqual(expected_lines, line_str)
def test_empty_last_line_is_ignored(self):
self.write_str("1\n")
expected_lines = ["1"]
lines = self.model.data(0, 0, 5)
line_str = [l.line() for l in lines]
self.assertEqual(expected_lines, line_str)
def test_load_more_lines_than_are_available(self):
"""
Wants to read 4 lines in a file with only 3 lines.
Returns all three lines.
"""
self.write_str("abc\ndef\nghi")
expected_lines = ["abc", "def", "ghi"]
lines = self.model.data(0, 0, 4)
line_str = [l.line() for l in lines]
self.assertEqual(expected_lines, line_str)
def test_read_behind_eof(self):
"""
Wants to read 4 lines from the middle of the second line.
File has only 3 lines.
Returns all 3 lines.
"""
text = "abc\ndef\nghi"
self.write_str(text)
expected_lines = ["abc", "def", "ghi"]
lines = self.model.data(text.index("e"), 0, 4)
line_str = [l.line() for l in lines]
self.assertEqual(expected_lines, line_str)
def test_read_after_scrolling_2_lines(self):
"""
"""
text = "0___\n1___\n2___\n3___\n4___\n5___"
self.write_str(text)
expected_lines = ["3___", "4___"]
lines = self.model.data(text.index("1___"), 2, 2)
line_str = [l.line() for l in lines]
self.assertEqual(expected_lines, line_str)
def test_scroll_with_float(self):
"""
If lines to lines to return is a float, then the value is rounded up.
Floats mean that the text area is such that a line is partially visible.
"""
text = "0___\n1___\n2___\n3___\n4___\n5___"
self.write_str(text)
expected_lines = ["3___","4___", "5___"]
lines = self.model.data(text.index("1___"), 2, 2.1)
line_str = [l.line() for l in lines]
self.assertEqual(expected_lines, line_str)
def test_scroll_up_at_beginning_of_file(self):
"""
Scrolling up at beginning of file.
Return
"""
text = "0___\n1___\n2___\n3___\n4___\n5___"
self.write_str(text)
expected_lines = ["0___", "1___"]
lines = self.model.data(5, -2, 2)
line_str = [l.line() for l in lines]
self.assertEqual(expected_lines, line_str)
def test_read_word_at_middle_of_line(self):
text = "0___\nlorem ipsum dolor sit amet\n2___"
self.write_str(text)
expected = ("ipsum", text.index("ipsum"), text.index("ipsum") + len("ipsum"))
actual = self.model.read_word_at(text.index("ipsum") + 2)
self.assertEqual(expected, actual)
def test_read_word_at_start_of_line(self):
text = "0___\nlorem ipsum dolor sit amet\n2___"
word = "lorem"
self.write_str(text)
expected = (word, text.index(word), text.index(word) + len(word))
actual = self.model.read_word_at(text.index(word))
self.assertEqual(expected, actual)
def test_read_word_at_end_of_line(self):
text = "0___\nlorem ipsum dolor sit amet\n2___"
word = "amet"
self.write_str(text)
expected = (word, text.index(word), text.index(word) + len(word))
actual = self.model.read_word_at(text.index(word))
self.assertEqual(expected, actual)
def test_read_word_at_beginning_of_file(self):
text = "lorem ipsum dolor sit amet\n1___"
word = "lorem"
self.write_str(text)
expected = (word, text.index(word), text.index(word) + len(word))
actual = self.model.read_word_at(text.index(word))
self.assertEqual(expected, actual)
if __name__ == '__main__':
unittest.main()

101
src/ui/colorbutton.py Normal file
View File

@@ -0,0 +1,101 @@
import re
from PySide6.QtGui import QColor, QPixmap, QIcon
from PySide6.QtWidgets import QWidget, QHBoxLayout, QPushButton, QColorDialog, QSizePolicy, QComboBox
from src.i18n import _
class ColorButton(QWidget):
def __init__(self, color: str, parent=None):
super(ColorButton, self).__init__(parent)
self.setSizePolicy(QSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Fixed))
self.layout = QHBoxLayout(self)
self.color = color
colors = {
# red
_('Strawberry Cream'): 'ff8080',
_('Pale Crimson'): 'e61919',
# orange
_('Broken Buttercup'): 'ffd080',
_('Passion Fruit Sugar'): 'ffa200',
# yellow
_('Sunrise Yellow'): 'fff080',
_('Magical Mustard'): 'ccb400',
# green
_('Trendy Green'): 'aaff80',
_('Garden Of Sweden'): '44cc00',
# blue
_('Light Sky Blue'): '80c6ff',
_('True Blue'): '0073d1',
# purple
_('Fairy Topia'): 'ff80f4',
_('Magenta Bachiego'): 'cc00bb',
# grey
_('Breeze of Mist'): 'eaeaea',
_('Light Grey'): 'cccccc',
_('Grey'): '999999',
}
self.color_drop_down = QComboBox()
self.layout.addWidget(self.color_drop_down)
self.color_drop_down.currentIndexChanged.connect(self._color_selected)
self.color_drop_down.addItem(QIcon(self._color_pixmap("ffffffff")), _("transparent"), "None")
for color_name in colors.keys():
color_value = colors[color_name]
self.color_drop_down.addItem(QIcon(self._color_pixmap(color_value)), color_name, color_value)
if color == color_name or color == color_value:
self.color_drop_down.setCurrentIndex(self.color_drop_down.count() - 1)
self.btn_color_picker = QPushButton(QIcon.fromTheme("color-picker"), _("custom"))
self.layout.addWidget(self.btn_color_picker)
self.btn_color_picker.pressed.connect(self._update_color)
self.btn_color_picker.setSizePolicy(QSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Fixed))
def _color_selected(self, index: int):
self.color = self.color_drop_down.currentData()
def set_color(self, color: str):
self.color = color
for index in range(0, self.color_drop_down.count()):
item_value = self.color_drop_down.itemData(index)
if item_value == color:
self.color_drop_down.setCurrentIndex(index)
return
# color not yet present -> add it
self.color_drop_down.addItem(QIcon(self._color_pixmap(color)), color, color)
self.color_drop_down.setCurrentIndex(self.color_drop_down.count() - 1)
def _update_color(self):
new_color = QColorDialog.getColor(self._to_qcolor(self.color))
if new_color.isValid():
color = self._to_hex(new_color)
self.set_color(color)
@staticmethod
def _is_hex_color(color: str):
return re.match("[0-9a-f]{6}", color, flags=re.IGNORECASE)
def _color_pixmap(self, color: str) -> QPixmap:
pixmap = QPixmap(40, 40)
qcolor = self._to_qcolor(color)
pixmap.fill((qcolor))
return pixmap
def _to_qcolor(self, color: str):
if self._is_hex_color(color):
red = int(color[0:2], 16)
green = int(color[2:4], 16)
blue = int(color[4:6], 16)
return QColor(red, green, blue)
elif color in QColor().colorNames():
return QColor(color)
return QColor(255, 255, 255)
def _to_hex(self, color: QColor) -> str:
red = "{0:0{1}x}".format(color.red(), 2)
green = "{0:0{1}x}".format(color.green(), 2)
blue = "{0:0{1}x}".format(color.blue(), 2)
return red + green + blue

9
src/ui/hbox.py Normal file
View File

@@ -0,0 +1,9 @@
from PySide6.QtWidgets import QWidget, QHBoxLayout
class HBox(QWidget):
def __init__(self, *widgets: QWidget):
super(HBox, self).__init__()
self.layout = QHBoxLayout(self)
for widget in widgets:
self.layout.addWidget(widget)

11
src/ui/label.py Normal file
View File

@@ -0,0 +1,11 @@
from PySide6.QtWidgets import QLabel
from PySide6.QtCore import Qt
class Label(QLabel):
def __init__(self, text: str):
super(Label, self).__init__(text)
self.setTextInteractionFlags(Qt.TextInteractionFlag.TextBrowserInteraction)
self.setTextInteractionFlags(Qt.TextInteractionFlag.LinksAccessibleByMouse)
self.setTextInteractionFlags(Qt.TextInteractionFlag.TextSelectableByMouse)
self.setOpenExternalLinks(True)

71
src/ui/tabs.py Normal file
View File

@@ -0,0 +1,71 @@
from typing import Optional
from PySide6.QtWidgets import QWidget, QTabWidget, QVBoxLayout
from src.pluginregistry import PluginRegistry
from src.plugins.krowlog.Tab import Tab
from src.settings.settings import Settings
class Tabs(QWidget):
def __init__(self, settings: Settings):
super(Tabs, self).__init__()
self.settings = settings
self.tabs = QTabWidget()
self.tabs.setTabsClosable(True)
self.tabs.setMovable(True)
self.tabs.tabCloseRequested.connect(self._close_tab)
self.tabs.currentChanged.connect(self._current_tab_changed)
self.layout = QVBoxLayout(self)
self.layout.setContentsMargins(0, 0, 0, 0)
self.layout.addWidget(self.tabs)
def add_tab(self, tab: Tab):
# if tab already exists then open it
for tab_index in range(0, self.tabs.count()):
widget: Tab = self.tabs.widget(tab_index)
if widget.unique_id == tab.unique_id:
self.tabs.setCurrentIndex(tab_index)
return
tab_index = self.tabs.addTab(tab, tab.title)
self.tabs.setCurrentIndex(tab_index)
def _current_tab_changed(self, tab_index: int):
tab: Tab = self.tabs.widget(tab_index)
if tab:
PluginRegistry.execute("update_window_title", tab.title)
PluginRegistry.execute("update_status_bar", tab.get_status_text())
else:
PluginRegistry.execute("update_window_title", "")
PluginRegistry.execute("update_status_bar", "")
def _close_tab(self, tab_index: int):
full_tab: Tab = self.tabs.widget(tab_index)
full_tab.destruct()
self.tabs.removeTab(tab_index)
def destruct(self):
while self.tabs.count() > 0:
self._close_tab(0)
def _current_tab(self) -> int:
return self.tabs.currentIndex()
def current_file(self) -> Optional[str]:
if self.tabs.currentIndex() < 0:
return None
tab: Tab = self.tabs.widget(self.tabs.currentIndex())
return tab.get_file()
def open_files(self) -> [str]:
result = []
for i in range(self.tabs.count()):
tab: Tab = self.tabs.widget(i)
result.append(tab.get_file())
return result

9
src/ui/vbox.py Normal file
View File

@@ -0,0 +1,9 @@
from PySide6.QtWidgets import QWidget, QVBoxLayout
class VBox(QWidget):
def __init__(self, *widgets: QWidget):
super(VBox, self).__init__()
self.layout = QVBoxLayout(self)
for widget in widgets:
self.layout.addWidget(widget)

0
src/util/__init__.py Normal file
View File

29
src/util/conversion.py Normal file
View File

@@ -0,0 +1,29 @@
import unittest
def humanbytes(bytes: int) -> str:
"""non-localized conversion of bytes to human readable strings"""
powers = {0: 'bytes', 1: 'KB', 2: 'MB', 3: 'GB', 4: 'TB', 5: 'PB', 6: 'EB'}
power = 1
result = "%d bytes" % bytes
while bytes >= 1024 ** power and power in powers:
result = "%.3f" % (bytes / (1024 ** power))
result = result.rstrip("0")
result = result.rstrip(".")
result = result + " " + powers[power]
power = power + 1
return result
class TestLogFileModel(unittest.TestCase):
def test_humanbytes(self):
inputs = {
0: "0 bytes",
1023: "1023 bytes",
1024: "1 KB",
1048575: "1023.999 KB",
1048576: "1 MB",
}
for input in inputs.keys():
actual = humanbytes(input)
self.assertEqual(inputs[input], actual)

85
src/util/int2intmap.py Normal file
View File

@@ -0,0 +1,85 @@
import math
import os
from logging import exception
from typing import Optional
class Int2IntMap():
"""
A file used to map byte numbers of the filter view to byte numbers in the original file.
Each line contains the two integers separated by a comma.
The first column is sorted ascending. This allows us to do binary searches.
The file uses 4kb blocks. That means we add fill bytes (newlines) if a line would cross a 4kb block boundary.
"""
blocksize = 4096
def __init__(self, file):
self._file = file
self._handle = open(file, "w+t")
self._buffer = ""
def close(self):
if not self._handle.closed:
self._handle.close()
def reset(self):
self._handle.truncate(0)
def add(self, key: int, val: int):
line = "%d,%d\n" % (key, val)
length = len(line)
offset = self._handle.tell() + len(self._buffer)
if offset % self.blocksize + length > self.blocksize:
# end of block: fill block
fill_bytes = self.blocksize - offset % self.blocksize
self._buffer = self._buffer + ("\n" * fill_bytes)
self._buffer = self._buffer + line
if len(self._buffer) > self.blocksize * 100:
self._flush_buffer()
def _flush_buffer(self):
self._handle.write(self._buffer)
self._buffer = ""
self._handle.flush()
def find(self, key: int) -> Optional[int]:
if (len(self._buffer)) > 0:
self._flush_buffer()
size = os.stat(self._file).st_size
if size == 0:
return None
total_blocks = math.ceil(size / self.blocksize)
l = 0
r = total_blocks - 1
while r >= l:
mid = l + math.floor((r - l) / 2)
offset = mid * self.blocksize
self._handle.seek(offset)
block = self._handle.read(self.blocksize)
lines = block.split("\n")
is_before = None
for line in lines:
if len(line) == 0:
continue
token = line.split(",")
k = int(token[0])
val = int(token[1])
if key == k:
return val
tmp = key < k
if is_before is not None and tmp != is_before:
return None
else:
is_before = tmp
if is_before:
r = mid - 1
else:
l = mid + 1
return None
def total_blocks(self) -> int:
size = os.stat(self._file).st_size
return math.ceil(size / self.blocksize)

View File

@@ -0,0 +1,76 @@
import tempfile
import unittest
from os.path import join
from src.util.int2intmap import Int2IntMap
class Int2IntMapLike(unittest.TestCase):
def setUp(self):
self.test_dir = tempfile.TemporaryDirectory()
self.tmpfile = join(self.test_dir.name, "my.log")
self.map = Int2IntMap(self.tmpfile)
def tearDown(self):
self.map.close()
self.test_dir.cleanup()
def test_empty_map(self):
map = self.map
self.assertEqual(None, map.find(0))
def test_one_line_one_byte(self):
map = self.map
map.add(10, 1) # add the key 10
self.assertEqual(None, map.find(9)) # directly before
self.assertEqual(1, map.find(10))
self.assertEqual(None, map.find(11)) # directly after
def test_one_line_two_bytes(self):
map = self.map
map.add(10, 1) # added key 10
map.add(11, 2) # added key 11
self.assertEqual(None, map.find(9)) # directly before
self.assertEqual(1, map.find(10))
self.assertEqual(2, map.find(11))
self.assertEqual(None, map.find(12)) # directly after
def test_two_lines(self):
map = self.map
map.add(10, 1) # added key 10
map.add(12, 2) # added key 12
self.assertEqual(None, map.find(9)) # directly before
self.assertEqual(1, map.find(10))
self.assertEqual(None, map.find(11)) # between
self.assertEqual(2, map.find(12))
self.assertEqual(None, map.find(13)) # directly after
def test_fill_map(self):
map = self.map
map.blocksize = 64
# fill map with
# 10,5,1
# 20,5,2
# 30,5,3
# ...
#
# range(1,50) results in 6 blocks a 64 byte
for i in range(1, 50):
# print("add %d"%(i*10))
map.add(i * 10, i)
# print("%d -> blocks: %d" %(i, map.total_blocks()))
for j in range(1, i * 10):
if j % 10 == 0:
# values that are in the map
# print("check %d" % (j * 10))
self.assertEqual(j / 10, map.find(j))
else:
# values that are not in the map
self.assertEqual(None, map.find(j))
if __name__ == '__main__':
unittest.main()

28
src/util/urlutils.py Normal file
View File

@@ -0,0 +1,28 @@
import os
from urllib.parse import urlparse
import sys
def urls_to_path(urls: str) -> [str]:
result = []
url_list = urls.splitlines(keepends=False)
for url in url_list:
path = url_to_path(url)
result.append(path)
return result
def url_to_path(url: str) -> str:
p = urlparse(url)
if sys.platform == 'win32' or sys.platform == 'cygwin':
return os.path.abspath(p.path[1:])
return os.path.abspath(os.path.join(p.netloc, p.path))
def url_is_file(string: str) -> bool:
url_candidates = string.splitlines(keepends=False)
for url in url_candidates:
if url.startswith("file://"):
path = url_to_path(url)
if not os.path.isfile(path):
return False
return True