move files into a package structure
This commit is contained in:
0
raven/ui/bigtext/__init__.py
Normal file
0
raven/ui/bigtext/__init__.py
Normal file
467
raven/ui/bigtext/bigtext.py
Normal file
467
raven/ui/bigtext/bigtext.py
Normal file
@@ -0,0 +1,467 @@
|
||||
import sys
|
||||
|
||||
import math
|
||||
import os
|
||||
import time
|
||||
from typing import Callable, List
|
||||
from PySide6 import QtGui
|
||||
|
||||
from PySide6.QtCore import *
|
||||
from PySide6.QtGui import *
|
||||
from PySide6.QtGui import QMouseEvent
|
||||
from PySide6.QtWidgets import *
|
||||
|
||||
import constants
|
||||
from raven.ui.ScaledScrollBar import ScaledScrollBar
|
||||
from raven.ui.bigtext.highlight_selection import HighlightSelection
|
||||
from raven.ui.bigtext.highlighted_range import HighlightedRange
|
||||
from raven.ui.bigtext.highlightingdialog import HighlightingDialog
|
||||
from raven.ui.bigtext.line import Line
|
||||
from raven.ui.bigtext.logFileModel import LogFileModel
|
||||
from raven.util.conversion import humanbytes
|
||||
from raven.pluginregistry import PluginRegistry
|
||||
|
||||
from raven.settings.settings import Settings
|
||||
from watchdog.observers import Observer
|
||||
from watchdog.events import FileSystemEventHandler
|
||||
from raven.i18n import _
|
||||
|
||||
|
||||
class FileObserver(FileSystemEventHandler):
|
||||
|
||||
def __init__(self, big_text):
|
||||
super(FileObserver, self).__init__()
|
||||
self.big_text = big_text
|
||||
|
||||
def on_modified(self, event):
|
||||
# slow down the updates. This is needed, because the file is modified
|
||||
# constantly, which would lead to constant re-rendering, which would
|
||||
# block the UI thread and make the UI unresponsive.
|
||||
# Note: we don't miss events, because they are queued and de-duplicated
|
||||
time.sleep(0.5)
|
||||
self.big_text.update()
|
||||
|
||||
|
||||
class FileWatchdogThread(QRunnable):
|
||||
|
||||
def __init__(self, big_text, file: str):
|
||||
super(FileWatchdogThread, self).__init__()
|
||||
self.file = file
|
||||
self.big_text = big_text
|
||||
self.observer = Observer()
|
||||
|
||||
def run(self) -> None:
|
||||
self.observer.schedule(FileObserver(self.big_text), self.file)
|
||||
self.observer.start()
|
||||
|
||||
def destruct(self):
|
||||
self.observer.stop()
|
||||
# self.observer.join(1)
|
||||
|
||||
|
||||
class BigText(QWidget):
|
||||
|
||||
def __init__(self, model: LogFileModel):
|
||||
super(BigText, self).__init__()
|
||||
|
||||
self.model = model
|
||||
|
||||
self.watchdog = FileWatchdogThread(self, model.get_file())
|
||||
QThreadPool.globalInstance().start(self.watchdog)
|
||||
|
||||
self.grid = QGridLayout()
|
||||
self.grid.setContentsMargins(0, 0, 0, 0)
|
||||
self.grid.setHorizontalSpacing(0)
|
||||
self.grid.setVerticalSpacing(0)
|
||||
self.setLayout(self.grid)
|
||||
|
||||
self.big_text = InnerBigText(self, model)
|
||||
self.big_text.setSizePolicy(QSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding))
|
||||
|
||||
self.h_scroll_bar = QScrollBar(Qt.Orientation.Horizontal)
|
||||
self.h_scroll_bar.setMinimum(0)
|
||||
self.h_scroll_bar.setMaximum(1)
|
||||
self.h_scroll_bar.valueChanged.connect(self.big_text.h_scroll_event)
|
||||
|
||||
self.v_scroll_bar = ScaledScrollBar()
|
||||
# self.v_scroll_bar.setPageStep(1)
|
||||
self.v_scroll_bar.scaledValueChanged.connect(self.big_text.v_scroll_event)
|
||||
|
||||
self.grid.addWidget(self.big_text, 0, 0)
|
||||
self.grid.addWidget(self.h_scroll_bar, 1, 0)
|
||||
self.grid.addWidget(self.v_scroll_bar, 0, 1)
|
||||
|
||||
def get_file(self):
|
||||
return self.model.get_file()
|
||||
|
||||
def add_line_click_listener(self, listener: Callable[[int], None]):
|
||||
"""
|
||||
|
||||
:param listener: a callable, the parameter is the byte offset of the clicked line
|
||||
:return:
|
||||
"""
|
||||
self.big_text.line_click_listeners.append(listener)
|
||||
|
||||
def scroll_to_byte(self, byte_offset: int):
|
||||
self.big_text.scroll_to_byte(byte_offset)
|
||||
|
||||
def destruct(self):
|
||||
self.watchdog.destruct()
|
||||
pass
|
||||
|
||||
|
||||
class InnerBigText(QWidget):
|
||||
_byte_offset = 0
|
||||
_left_offset = 0
|
||||
scroll_lines = 0
|
||||
longest_line = 0
|
||||
|
||||
def __init__(self, parent: BigText, model: LogFileModel):
|
||||
super(InnerBigText, self).__init__()
|
||||
self.model = model
|
||||
self.parent = parent
|
||||
self.setFocusPolicy(Qt.FocusPolicy.StrongFocus)
|
||||
self.setFocusPolicy(Qt.FocusPolicy.WheelFocus)
|
||||
self.setContextMenuPolicy(Qt.ContextMenuPolicy.CustomContextMenu)
|
||||
self.customContextMenuRequested.connect(self._open_menu)
|
||||
|
||||
self.update_font_metrics(QPainter(self))
|
||||
self.lines = List[Line]
|
||||
self.selection_highlight = HighlightSelection()
|
||||
self._last_double_click_time = 0
|
||||
self._last_double_click_line_number = -1
|
||||
|
||||
self.line_click_listeners: [Callable[[int], None]] = []
|
||||
|
||||
def keyPressEvent(self, e: QKeyEvent) -> None:
|
||||
|
||||
# print("%s + %s" % (e.keyCombination().keyboardModifiers(), e.key()))
|
||||
if e.modifiers() == Qt.KeyboardModifier.NoModifier:
|
||||
lines_to_scroll = math.floor(self.lines_shown()) - 1
|
||||
if e.key() == Qt.Key.Key_PageUp:
|
||||
self.scroll_by_lines(-lines_to_scroll)
|
||||
if e.key() == Qt.Key.Key_PageDown:
|
||||
self.scroll_by_lines(lines_to_scroll)
|
||||
if e.key() == 16777235: # page up
|
||||
self.scroll_by_lines(-3)
|
||||
if e.key() == 16777237: # page down
|
||||
self.scroll_by_lines(3)
|
||||
elif e.modifiers() == Qt.KeyboardModifier.ControlModifier and e.key() == 67: # ctrl + c
|
||||
self.copy_selection()
|
||||
elif e.modifiers() == Qt.KeyboardModifier.ControlModifier and e.key() == 65: # ctrl + a
|
||||
self._select_all()
|
||||
|
||||
def wheelEvent(self, event: QWheelEvent):
|
||||
direction = 1 if event.angleDelta().y() < 0 else -1
|
||||
if event.modifiers() == Qt.KeyboardModifier.ControlModifier:
|
||||
# self.model.settings.update_font_size(-direction)
|
||||
old_font_size = self.model.settings.getint_session('general', 'font_size')
|
||||
new_font_size = max(4, min(50, old_font_size - direction))
|
||||
self.model.settings.set_session('general', 'font_size', str(new_font_size))
|
||||
PluginRegistry.execute("update_ui")
|
||||
self.update()
|
||||
else:
|
||||
# print("wheel event fired :) %s" % (direction))
|
||||
self.scroll_by_lines(direction * 3)
|
||||
|
||||
def _open_menu(self, position):
|
||||
menu = QMenu(self)
|
||||
|
||||
copy_clipboard = QAction(QIcon.fromTheme("edit-copy"), _("&Copy to Clipboard"), self,
|
||||
triggered=self.copy_selection)
|
||||
copy_clipboard.setShortcut("CTRL+C")
|
||||
copy_clipboard.setDisabled(not self._has_selection())
|
||||
menu.addAction(copy_clipboard)
|
||||
|
||||
copy_to_file = QAction(QIcon.fromTheme("document-save-as"), _("Copy to &File"), self,
|
||||
triggered=self._copy_selection_to_file)
|
||||
copy_to_file.setDisabled(not self._has_selection())
|
||||
menu.addAction(copy_to_file)
|
||||
|
||||
select_all = QAction(QIcon.fromTheme("edit-select-all"), _("Select &All"), self,
|
||||
triggered=self._select_all)
|
||||
select_all.setShortcut("CTRL+A")
|
||||
menu.addAction(select_all)
|
||||
|
||||
manage_highlighting = QAction(
|
||||
_("&Highlighter"),
|
||||
self,
|
||||
triggered=lambda: HighlightingDialog(self.model.settings).exec())
|
||||
manage_highlighting.setShortcut("CTRL+H")
|
||||
menu.addAction(manage_highlighting)
|
||||
|
||||
menu.exec(self.mapToGlobal(position))
|
||||
|
||||
def scroll_by_lines(self, scroll_lines: int):
|
||||
self.scroll_lines = scroll_lines
|
||||
self.update()
|
||||
self.parent.v_scroll_bar.setValue(self._byte_offset)
|
||||
|
||||
def scroll_to_byte(self, byte_offset: int):
|
||||
self._byte_offset = min(byte_offset, self.model.byte_count())
|
||||
self.update()
|
||||
self.parent.v_scroll_bar.setValue(self._byte_offset)
|
||||
|
||||
def mousePressEvent(self, e: QtGui.QMouseEvent) -> None:
|
||||
if e.buttons() == Qt.MouseButton.LeftButton and e.modifiers() == Qt.KeyboardModifier.ShiftModifier:
|
||||
offset = self.to_byte_offset(e)
|
||||
self.selection_highlight.set_end_byte(offset)
|
||||
self.update()
|
||||
return
|
||||
|
||||
if e.buttons() == Qt.MouseButton.LeftButton and (time.time() - self._last_double_click_time) < 0.5:
|
||||
# triple click: select line
|
||||
line_number = self.y_pos_to_line(e.pos().y())
|
||||
if line_number == self._last_double_click_line_number and line_number < len(self.lines):
|
||||
line: Line = self.lines[line_number]
|
||||
self.selection_highlight.set_start(line.byte_offset())
|
||||
self.selection_highlight.set_end_byte(line.byte_end())
|
||||
self.update()
|
||||
return
|
||||
|
||||
if e.buttons() == Qt.MouseButton.LeftButton and e.modifiers() == Qt.KeyboardModifier.NoModifier:
|
||||
offset = self.to_byte_offset(e)
|
||||
self.selection_highlight.set_start(offset)
|
||||
self.selection_highlight.set_end_byte(offset)
|
||||
self.update()
|
||||
|
||||
line_number = self.y_pos_to_line(e.pos().y())
|
||||
if line_number < len(self.lines):
|
||||
line: Line = self.lines[line_number]
|
||||
for listener in self.line_click_listeners:
|
||||
listener(line.byte_offset())
|
||||
|
||||
def mouseDoubleClickEvent(self, e: QtGui.QMouseEvent) -> None:
|
||||
if e.buttons() == Qt.MouseButton.LeftButton:
|
||||
self._last_double_click_time = time.time()
|
||||
self._last_double_click_line_number = self.y_pos_to_line(e.pos().y())
|
||||
|
||||
offset = self.to_byte_offset(e)
|
||||
(_word, start_byte, end_byte) = self.model.read_word_at(offset)
|
||||
if start_byte >= 0 and end_byte >= 0:
|
||||
self.selection_highlight.set_start(start_byte)
|
||||
self.selection_highlight.set_end_byte(end_byte)
|
||||
else:
|
||||
self.selection_highlight.set_start(offset)
|
||||
self.selection_highlight.set_end_byte(offset)
|
||||
self.update()
|
||||
|
||||
def mouseMoveEvent(self, e: QMouseEvent):
|
||||
|
||||
if e.buttons() != Qt.MouseButton.LeftButton:
|
||||
return
|
||||
|
||||
current_byte = self.to_byte_offset(e)
|
||||
|
||||
if self.selection_highlight.end_byte != current_byte:
|
||||
self.selection_highlight.set_end_byte(current_byte)
|
||||
self.update()
|
||||
# print("-> %s,%s" %(self._selection_start_byte, self._selection_end_byte))
|
||||
line_number = self.y_pos_to_line(e.pos().y())
|
||||
column_in_line = self.x_pos_to_column(e.pos().x())
|
||||
if line_number < 0:
|
||||
self.scroll_by_lines(-1)
|
||||
if line_number > int(self.lines_shown()):
|
||||
self.scroll_by_lines(1)
|
||||
if column_in_line <= 1:
|
||||
self._left_offset = max(0, self._left_offset - 2)
|
||||
self.update()
|
||||
if column_in_line + 1 >= self.columns_shown():
|
||||
self._left_offset = self._left_offset + 2
|
||||
self.update()
|
||||
|
||||
@Slot()
|
||||
def h_scroll_event(self, left_offset: int):
|
||||
self._left_offset = left_offset
|
||||
# print("left_offset: %d" % left_offset)
|
||||
self.update()
|
||||
|
||||
@Slot()
|
||||
def v_scroll_event(self, byte_offset: str):
|
||||
self._byte_offset = int(byte_offset)
|
||||
self.update()
|
||||
|
||||
def update_longest_line(self, length: int):
|
||||
width_in_chars = self.width() / self.char_width
|
||||
# print("width_in_chars: %d" % width_in_chars)
|
||||
if self.longest_line < length:
|
||||
self.longest_line = length
|
||||
maximum = max(0, length - width_in_chars + 1)
|
||||
self.parent.h_scroll_bar.setMaximum(round(maximum))
|
||||
|
||||
def y_pos_to_line(self, y: int) -> int:
|
||||
return int(y / self.char_height)
|
||||
|
||||
def x_pos_to_column(self, x: int) -> int:
|
||||
return round(x / self.char_width)
|
||||
|
||||
def lines_shown(self) -> float:
|
||||
return self.height() / float(self.char_height)
|
||||
|
||||
def columns_shown(self) -> float:
|
||||
return self.width() / float(self.char_width)
|
||||
|
||||
def to_byte_offset(self, e: QMouseEvent) -> int:
|
||||
line_number = self.y_pos_to_line(e.pos().y())
|
||||
|
||||
if line_number < len(self.lines):
|
||||
line: Line = self.lines[line_number]
|
||||
column_in_line = self.x_pos_to_column(e.pos().x()) + self._left_offset
|
||||
column_in_line = min(column_in_line, line.length_in_columns()) # x was behind the last column of this line
|
||||
char_in_line = line.column_to_char(column_in_line)
|
||||
# print("%s in line %s lcolumn_in_line=%s" % (char_in_line, line_number, column_in_line))
|
||||
byte_in_line = line.char_index_to_byte(char_in_line)
|
||||
current_byte = line.byte_offset() + byte_in_line
|
||||
# print("%s + %s = %s" % (line.byte_offset(), char_in_line, current_byte))
|
||||
else:
|
||||
current_byte = self.model.byte_count()
|
||||
return current_byte
|
||||
|
||||
def _has_selection(self):
|
||||
return self.selection_highlight.start_byte != self.selection_highlight.end_byte
|
||||
|
||||
def copy_selection(self):
|
||||
if self._has_selection():
|
||||
start = min(self.selection_highlight.start_byte, self.selection_highlight.end_byte)
|
||||
end = max(self.selection_highlight.start_byte, self.selection_highlight.end_byte)
|
||||
bytes_human_readable = humanbytes(end - start)
|
||||
if end - start > (1024 ** 2) * 5:
|
||||
you_sure = QMessageBox(
|
||||
QMessageBox.Icon.Warning,
|
||||
_("data selection"),
|
||||
_(
|
||||
"You have selected <b>{0}</b> of data.").format(bytes_human_readable))
|
||||
you_sure.setStandardButtons(QMessageBox.Cancel)
|
||||
copy_btn = you_sure.addButton(_("Copy {0} to Clipboard").format(bytes_human_readable),
|
||||
QMessageBox.ActionRole)
|
||||
write_btn = you_sure.addButton(_("Write to File"), QMessageBox.ActionRole)
|
||||
you_sure.setDefaultButton(QMessageBox.StandardButton.Cancel)
|
||||
you_sure.exec()
|
||||
if you_sure.clickedButton() == write_btn:
|
||||
self._copy_selection_to_file()
|
||||
return
|
||||
|
||||
if you_sure.clickedButton() != copy_btn:
|
||||
# abort
|
||||
print("abort")
|
||||
return
|
||||
|
||||
selected_text = self.model.read_range(start, end)
|
||||
cb = QApplication.clipboard()
|
||||
cb.setText(selected_text)
|
||||
|
||||
def _copy_selection_to_file(self):
|
||||
if self._has_selection():
|
||||
start = min(self.selection_highlight.start_byte, self.selection_highlight.end_byte)
|
||||
end = max(self.selection_highlight.start_byte, self.selection_highlight.end_byte)
|
||||
dialog = QFileDialog(self)
|
||||
(selected_file, _filter) = dialog.getSaveFileName(
|
||||
caption=_("Save File"),
|
||||
dir=os.path.dirname(self.model.get_file())
|
||||
)
|
||||
if selected_file:
|
||||
self.model.write_range(start, end, selected_file)
|
||||
open_tab = self.model.settings.session.getboolean("general", "open_tab_on_save_as_file")
|
||||
if open_tab:
|
||||
PluginRegistry.execute("open_file", selected_file)
|
||||
|
||||
def _select_all(self):
|
||||
self.selection_highlight.start_byte = 0
|
||||
self.selection_highlight.end_byte = self.model.byte_count()
|
||||
self.update()
|
||||
|
||||
def paintEvent(self, event: QPaintEvent) -> None:
|
||||
# print("paintEvent")
|
||||
painter = QPainter(self)
|
||||
# painter.setFont(self.model.settings.font())
|
||||
# Courier New, DejaVu Sans Mono, Monospace, Liberation Mono, Noto Mono, Nimbus Mono L, Tlwg Mono, Ubuntu Mono, FreeMono, Mitra Mono
|
||||
font = "Courier New" if sys.platform == 'win32' or sys.platform == 'cygwin' else "Monospace"
|
||||
painter.setFont(QFont("Courier New", self.model.settings.getint_session('general', "font_size")))
|
||||
painter.setPen(QColor(0, 0, 0))
|
||||
self.update_font_metrics(painter)
|
||||
|
||||
tab_string = " " * constants.tab_width
|
||||
|
||||
lines_to_show = self.lines_shown()
|
||||
# print("%s / %s = %s" %(self.height(), float(self.char_height), lines_to_show))
|
||||
|
||||
self.lines = self.model.data(self._byte_offset, self.scroll_lines, lines_to_show)
|
||||
# print("lines_to_show: %d returned: %d" % (lines_to_show, len(self.lines)))
|
||||
self.scroll_lines = 0
|
||||
self._byte_offset = self.lines[0].byte_offset() if len(self.lines) > 0 else 0
|
||||
# print("new byte offset: ", self._byte_offset)
|
||||
# document length == maximum + pageStep + aFewBytesSoThatTheLastLineIsShown
|
||||
self.parent.v_scroll_bar.setMaximum(self.model.byte_count() - 1)
|
||||
|
||||
for l in self.lines:
|
||||
self.update_longest_line(len(l.line()))
|
||||
|
||||
highlighters = self.model.highlighters()
|
||||
if self.model.get_query_highlight():
|
||||
highlighters = highlighters + [self.model.get_query_highlight()]
|
||||
highlighters = highlighters + [self.selection_highlight] # selection highlight should be last
|
||||
|
||||
# draw hightlights first - some characters may overlap to the next line
|
||||
# by drawing the background hightlights first we prevent that the hightlight
|
||||
# draws over a character
|
||||
start = time.time()
|
||||
y_line_offset = self.char_height;
|
||||
for l in self.lines:
|
||||
highlight_ranges = []
|
||||
for h in highlighters:
|
||||
optional_highlight_range = h.compute_highlight(l)
|
||||
if optional_highlight_range:
|
||||
highlight_ranges = highlight_ranges + optional_highlight_range
|
||||
|
||||
self.draw_highlights(highlight_ranges, painter, y_line_offset)
|
||||
y_line_offset = y_line_offset + self.char_height
|
||||
|
||||
end = time.time()
|
||||
# print("highlight duration: %.3f" %((end-start)*1000))
|
||||
|
||||
left_offset = int(-1 * self._left_offset * self.char_width)
|
||||
y_line_offset = self.char_height;
|
||||
for l in self.lines:
|
||||
text = l.line_tabs_replaced()
|
||||
painter.drawText(left_offset, y_line_offset, text)
|
||||
y_line_offset = y_line_offset + self.char_height
|
||||
|
||||
painter.end()
|
||||
|
||||
def draw_highlights(self, highlights: [HighlightedRange], painter: QPainter, y_line_offset: int):
|
||||
|
||||
for highlight in highlights:
|
||||
if highlight.is_highlight_full_line():
|
||||
left_offset = -1 * self._left_offset * self.char_width
|
||||
y1 = y_line_offset - self.char_height + self.char_height / 7
|
||||
height = self.char_height
|
||||
full_width = Settings.max_line_length() * self.char_width
|
||||
rect = QRect(round(left_offset), round(y1), round(full_width), round(height))
|
||||
self.highlight_background(painter, rect, highlight.get_brush_full_line())
|
||||
|
||||
for highlight in highlights:
|
||||
left_offset = self._left_offset * self.char_width
|
||||
x1 = highlight.get_start() * self.char_width
|
||||
width = highlight.get_width() * self.char_width
|
||||
y1 = y_line_offset - self.char_height + self.char_height / 7
|
||||
height = self.char_height
|
||||
|
||||
rect = QRect(round(x1 - left_offset), round(y1), round(width), round(height))
|
||||
self.highlight_background(painter, rect, highlight.get_brush())
|
||||
|
||||
def highlight_background(self, painter: QPainter, rect: QRect, brush: QBrush):
|
||||
old_brush = painter.brush()
|
||||
old_pen = painter.pen()
|
||||
painter.setBrush(brush)
|
||||
painter.setPen(Qt.PenStyle.NoPen)
|
||||
painter.drawRoundedRect(rect, 3.0, 3.0)
|
||||
painter.setBrush(old_brush)
|
||||
painter.setPen(old_pen)
|
||||
|
||||
def update_font_metrics(self, painter: QPainter):
|
||||
fm: QFontMetrics = painter.fontMetrics()
|
||||
self.char_height = fm.height()
|
||||
self.char_width = fm.averageCharWidth() # all chars have same width for monospace font
|
||||
text = "012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
|
||||
self.char_width = fm.horizontalAdvance(text) / float(len(text))
|
||||
# print("font width=%s height=%s" % (self.char_width, self.char_height))
|
||||
12
raven/ui/bigtext/highlight.py
Normal file
12
raven/ui/bigtext/highlight.py
Normal file
@@ -0,0 +1,12 @@
|
||||
from typing import Optional, List
|
||||
|
||||
from raven.ui.bigtext.line import Line
|
||||
from raven.ui.bigtext.highlighted_range import HighlightedRange
|
||||
|
||||
|
||||
class Highlight:
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def compute_highlight(self, line: Line) -> Optional[List[HighlightedRange]]:
|
||||
return None
|
||||
64
raven/ui/bigtext/highlight_regex.py
Normal file
64
raven/ui/bigtext/highlight_regex.py
Normal file
@@ -0,0 +1,64 @@
|
||||
from typing import Optional
|
||||
|
||||
from raven.ui.bigtext.highlight import Highlight
|
||||
from raven.ui.bigtext.highlighted_range import HighlightedRange
|
||||
from raven.ui.bigtext.line import Line
|
||||
from PySide6.QtGui import QBrush, QColor
|
||||
|
||||
from typing import List
|
||||
import re
|
||||
|
||||
|
||||
class HighlightRegex(Highlight):
|
||||
|
||||
def __init__(self, query: str, ignore_case: bool, is_regex: bool, hit_background_color: str = "None",
|
||||
line_background_color: str = "None"):
|
||||
self.query = query
|
||||
self.ignore_case = ignore_case
|
||||
self.is_regex = is_regex
|
||||
self.regex = self._get_regex()
|
||||
self.hit_background_color = hit_background_color
|
||||
self.line_background_color = line_background_color
|
||||
self._brush_hit = self.brush(self.hit_background_color)
|
||||
self._brush_line = self.brush(self.line_background_color)
|
||||
|
||||
def _get_regex(self):
|
||||
flags = re.IGNORECASE if self.ignore_case else 0
|
||||
if self.is_regex:
|
||||
return re.compile(self.query, flags=flags)
|
||||
else:
|
||||
return re.compile(re.escape(self.query), flags=flags)
|
||||
|
||||
def compute_highlight(self, line: Line) -> Optional[List[HighlightedRange]]:
|
||||
result = []
|
||||
# print("execute regex: %s in %s" % (self.regex, line.line()))
|
||||
match_iter = re.finditer(self.regex, line.line())
|
||||
for match in match_iter:
|
||||
# print("%s" % match)
|
||||
start_char = match.start(0)
|
||||
end_char = match.end(0)
|
||||
|
||||
start_column = line.char_to_column(start_char)
|
||||
end_column = line.char_to_column(end_char)
|
||||
|
||||
result.append(HighlightedRange(
|
||||
start_column,
|
||||
end_column - start_column,
|
||||
highlight_full_line=True,
|
||||
brush=self._brush_hit,
|
||||
brush_full_line=self._brush_line
|
||||
))
|
||||
|
||||
return result
|
||||
|
||||
def hit_background_brush(self):
|
||||
return self.brush(self.hit_background_color)
|
||||
|
||||
@staticmethod
|
||||
def brush(color: str) -> QBrush:
|
||||
if re.match("[0-9a-f]{6}", color, flags=re.IGNORECASE):
|
||||
red = int(color[0:2], 16)
|
||||
green = int(color[2:4], 16)
|
||||
blue = int(color[4:6], 16)
|
||||
return QBrush(QColor(red, green, blue))
|
||||
return QBrush()
|
||||
56
raven/ui/bigtext/highlight_selection.py
Normal file
56
raven/ui/bigtext/highlight_selection.py
Normal file
@@ -0,0 +1,56 @@
|
||||
from typing import Optional, List
|
||||
|
||||
from raven.ui.bigtext.highlight import Highlight
|
||||
from raven.ui.bigtext.highlighted_range import HighlightedRange
|
||||
from raven.ui.bigtext.line import Line
|
||||
from PySide6.QtCore import Qt
|
||||
from PySide6.QtGui import QBrush, QColor
|
||||
|
||||
from raven.settings.settings import Settings
|
||||
|
||||
|
||||
class HighlightSelection(Highlight):
|
||||
start_byte = 0
|
||||
end_byte = 0
|
||||
|
||||
def set_start(self, start_byte):
|
||||
self.start_byte = start_byte
|
||||
|
||||
def set_end_byte(self, end_byte):
|
||||
self.end_byte = end_byte
|
||||
|
||||
def compute_highlight(self, line: Line) -> Optional[List[HighlightedRange]]:
|
||||
begin = min(self.start_byte, self.end_byte)
|
||||
end = max(self.start_byte, self.end_byte)
|
||||
|
||||
if line.intersects(begin, end):
|
||||
if line.includes_byte(begin):
|
||||
start_byte_in_line = begin - line.byte_offset()
|
||||
else:
|
||||
start_byte_in_line = 0
|
||||
|
||||
start_char = line.byte_index_to_char_index(start_byte_in_line)
|
||||
|
||||
if line.includes_byte(end):
|
||||
length_in_bytes = end - line.byte_offset() - start_byte_in_line
|
||||
end_char = line.byte_index_to_char_index(start_byte_in_line + length_in_bytes)
|
||||
else:
|
||||
# renders the highlighting to the end of the line
|
||||
# this is how selections usually behave
|
||||
length_in_bytes = Settings.max_line_length() - start_byte_in_line
|
||||
|
||||
# note: this mixes chars and bytes, but that should not matter, because
|
||||
# it just means that we render the highlight into the invisible range on the right
|
||||
end_char = start_char + length_in_bytes
|
||||
|
||||
start_column = line.char_to_column(start_char)
|
||||
end_column = line.char_to_column(end_char)
|
||||
if end_column >= 0:
|
||||
length_in_columns = end_column - start_column
|
||||
else:
|
||||
length_in_columns = 4096
|
||||
|
||||
return [HighlightedRange(start_column, length_in_columns, brush=QBrush(QColor(156, 215, 255, 192)),
|
||||
pen=Qt.PenStyle.NoPen)]
|
||||
else:
|
||||
return None
|
||||
38
raven/ui/bigtext/highlighted_range.py
Normal file
38
raven/ui/bigtext/highlighted_range.py
Normal file
@@ -0,0 +1,38 @@
|
||||
from PySide6.QtCore import Qt
|
||||
from PySide6.QtGui import QBrush, QPen
|
||||
|
||||
|
||||
class HighlightedRange:
|
||||
def __init__(
|
||||
self,
|
||||
start: int,
|
||||
width: int,
|
||||
highlight_full_line=False,
|
||||
brush: QBrush = QBrush(),
|
||||
pen: QPen = Qt.PenStyle.NoPen,
|
||||
brush_full_line: QBrush = QBrush()
|
||||
):
|
||||
self.start = start
|
||||
self.width = width
|
||||
self.brush = brush
|
||||
self.pen = pen
|
||||
self.highlight_full_line = highlight_full_line
|
||||
self.brush_full_line = brush_full_line
|
||||
|
||||
def is_highlight_full_line(self):
|
||||
return self.highlight_full_line
|
||||
|
||||
def get_start(self):
|
||||
return self.start
|
||||
|
||||
def get_width(self):
|
||||
return self.width
|
||||
|
||||
def get_brush(self):
|
||||
return self.brush
|
||||
|
||||
def get_pen(self):
|
||||
return self.pen
|
||||
|
||||
def get_brush_full_line(self):
|
||||
return self.brush_full_line
|
||||
62
raven/ui/bigtext/highlighting.py
Normal file
62
raven/ui/bigtext/highlighting.py
Normal file
@@ -0,0 +1,62 @@
|
||||
import logging
|
||||
|
||||
from raven.settings.settings import Settings
|
||||
from raven.ui.bigtext.highlight_regex import HighlightRegex
|
||||
|
||||
log = logging.getLogger("highlighting")
|
||||
|
||||
class Highlighting:
|
||||
|
||||
@staticmethod
|
||||
def read_config(settings: Settings) -> [HighlightRegex]:
|
||||
result = []
|
||||
session = settings.session
|
||||
|
||||
for section in session.sections():
|
||||
if not section.startswith("highlighting."):
|
||||
continue
|
||||
|
||||
query = session.get(section, "query", fallback="")
|
||||
if len(query) == 0:
|
||||
continue
|
||||
ignore_case = session.getboolean(section, "ignore-case", fallback=True)
|
||||
is_regex = session.getboolean(section, "is-regex", fallback=False)
|
||||
line_background_color = session.get(section, "line.background.color", fallback="None")
|
||||
hit_background_color = session.get(section, "hit.background.color", fallback="None")
|
||||
|
||||
try:
|
||||
highlight = HighlightRegex(
|
||||
query=query,
|
||||
ignore_case=ignore_case,
|
||||
is_regex=is_regex,
|
||||
hit_background_color=hit_background_color,
|
||||
line_background_color=line_background_color
|
||||
)
|
||||
result.append(highlight)
|
||||
except:
|
||||
log.exception("failed to parse query for highlighter: %s" % section)
|
||||
continue
|
||||
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def write_config(settings: Settings, highlighters: [HighlightRegex]):
|
||||
Highlighting.remove_highlighting_sections(settings)
|
||||
section_counter = 0
|
||||
for highlighter in highlighters:
|
||||
highlighter: HighlightRegex = highlighter
|
||||
section = "highlighting.%d" % section_counter
|
||||
section_counter = section_counter + 1
|
||||
settings.session.add_section(section)
|
||||
settings.session.set(section, "query", highlighter.query)
|
||||
settings.session.set(section, "ignore-case", str(highlighter.ignore_case))
|
||||
settings.session.set(section, "is-regex", str(highlighter.is_regex))
|
||||
settings.session.set(section, "line.background.color", highlighter.line_background_color)
|
||||
settings.session.set(section, "hit.background.color", highlighter.hit_background_color)
|
||||
|
||||
@staticmethod
|
||||
def remove_highlighting_sections(settings: Settings):
|
||||
for section in settings.session.sections():
|
||||
if not section.startswith("highlighting."):
|
||||
continue
|
||||
settings.session.remove_section(section)
|
||||
201
raven/ui/bigtext/highlightingdialog.py
Normal file
201
raven/ui/bigtext/highlightingdialog.py
Normal file
@@ -0,0 +1,201 @@
|
||||
from PySide6.QtGui import QIcon
|
||||
from PySide6.QtWidgets import QDialog, QLineEdit, QLabel, QGridLayout, QCheckBox, QListWidget, QListWidgetItem, \
|
||||
QPushButton, QDialogButtonBox, QMessageBox, QSizePolicy
|
||||
|
||||
from raven.ui.bigtext.highlight_regex import HighlightRegex
|
||||
from raven.ui.bigtext.highlighting import Highlighting
|
||||
from raven.ui.colorbutton import ColorButton
|
||||
from raven.ui.hbox import HBox
|
||||
from raven.settings.settings import Settings
|
||||
|
||||
from raven.i18n import _
|
||||
|
||||
|
||||
class PayloadItem(QListWidgetItem):
|
||||
def __init__(self, text: str, payload=None):
|
||||
super(PayloadItem, self).__init__(text)
|
||||
self.payload = payload
|
||||
|
||||
|
||||
class HighlightingDialog(QDialog):
|
||||
def __init__(self, settings: Settings):
|
||||
super(HighlightingDialog, self).__init__()
|
||||
self.setWindowTitle(_("Manage Highlighting"))
|
||||
self.setModal(True)
|
||||
self._settings = settings
|
||||
|
||||
form_grid = QGridLayout(self)
|
||||
self.layout = form_grid
|
||||
|
||||
row = 0
|
||||
self.list = QListWidget(self)
|
||||
form_grid.addWidget(self.list, row, 0, 1, 2)
|
||||
|
||||
row = row + 1
|
||||
self.btn_add = QPushButton(QIcon.fromTheme("list-add"), _("Add"))
|
||||
self.btn_add.setSizePolicy(QSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Fixed))
|
||||
self.btn_add.pressed.connect(self._add)
|
||||
|
||||
self.btn_update = QPushButton(QIcon.fromTheme("stock_edit"), _("Update"))
|
||||
self.btn_update.setSizePolicy(QSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Fixed))
|
||||
self.btn_update.pressed.connect(self._update)
|
||||
|
||||
self.btn_delete = QPushButton(QIcon.fromTheme("list-remove"), _("Remove"))
|
||||
self.btn_delete.setSizePolicy(QSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Fixed))
|
||||
self.btn_delete.pressed.connect(self._delete)
|
||||
|
||||
self.btn_move_up = QPushButton(QIcon.fromTheme("go-up"), _("Up"))
|
||||
self.btn_move_up.setSizePolicy(QSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Fixed))
|
||||
self.btn_move_up.pressed.connect(self._move_up)
|
||||
|
||||
self.btn_move_down = QPushButton(QIcon.fromTheme("go-down"), _("Down"))
|
||||
self.btn_move_down.setSizePolicy(QSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Fixed))
|
||||
self.btn_move_down.pressed.connect(self._move_down)
|
||||
button_box = HBox(self.btn_update, self.btn_add, self.btn_delete, self.btn_move_up, self.btn_move_down)
|
||||
form_grid.addWidget(button_box, row, 0, 1, 2)
|
||||
|
||||
row = row + 1
|
||||
self.query = QLineEdit(self)
|
||||
form_grid.addWidget(QLabel(_("Query:")), row, 0)
|
||||
form_grid.addWidget(self.query, row, 1)
|
||||
|
||||
row = row + 1
|
||||
self.ignore_case = QCheckBox(_("Ignore Case"))
|
||||
self.ignore_case.setChecked(True)
|
||||
form_grid.addWidget(self.ignore_case, row, 0, 1, 2)
|
||||
|
||||
row = row + 1
|
||||
self.is_regex = QCheckBox(_("Regular Expression"))
|
||||
self.is_regex.setChecked(True)
|
||||
form_grid.addWidget(self.is_regex, row, 0, 1, 2)
|
||||
|
||||
row = row + 1
|
||||
form_grid.addWidget(QLabel(_("Hit Background:")), row, 0)
|
||||
self.hit_background_color = ColorButton("ccb400")
|
||||
form_grid.addWidget(self.hit_background_color, row, 1)
|
||||
|
||||
row = row + 1
|
||||
form_grid.addWidget(QLabel(_("Line Background:")), row, 0)
|
||||
self.line_background_color = ColorButton("fff080")
|
||||
form_grid.addWidget(self.line_background_color, row, 1)
|
||||
|
||||
row = row + 1
|
||||
self.buttons = QDialogButtonBox()
|
||||
self.buttons.setStandardButtons(QDialogButtonBox.StandardButton.Cancel | QDialogButtonBox.StandardButton.Ok)
|
||||
self.buttons.accepted.connect(self._save)
|
||||
self.buttons.rejected.connect(self.close)
|
||||
form_grid.addWidget(self.buttons, row, 0, 1, 2)
|
||||
|
||||
self._load_existing_hightlighters()
|
||||
self.list.setCurrentItem(None)
|
||||
self._selection_changed()
|
||||
self.list.itemSelectionChanged.connect(self._selection_changed)
|
||||
|
||||
def _add(self):
|
||||
highlighter = HighlightRegex(
|
||||
self.query.text(),
|
||||
self.ignore_case.isChecked(),
|
||||
self.is_regex.isChecked(),
|
||||
self.hit_background_color.color,
|
||||
self.line_background_color.color
|
||||
)
|
||||
item = PayloadItem(self.query.text(), highlighter)
|
||||
item.setBackground(highlighter.hit_background_brush())
|
||||
self.list.addItem(item)
|
||||
self.list.setCurrentItem(item)
|
||||
|
||||
def _update(self):
|
||||
item: PayloadItem = self.list.currentItem()
|
||||
highlighter: HighlightRegex = item.payload
|
||||
|
||||
highlighter.query = self.query.text()
|
||||
highlighter.ignore_case = self.ignore_case.isChecked()
|
||||
highlighter.is_regex = self.is_regex.isChecked()
|
||||
highlighter.hit_background_color = self.hit_background_color.color
|
||||
highlighter.line_background_color = self.line_background_color.color
|
||||
|
||||
item.setText(self.query.text())
|
||||
item.setBackground(highlighter.hit_background_brush())
|
||||
|
||||
def _delete(self):
|
||||
index = self.list.selectedIndexes()[0]
|
||||
selected_index = index.row()
|
||||
self.list.takeItem(selected_index)
|
||||
|
||||
def _move_up(self):
|
||||
index = self.list.currentIndex()
|
||||
selected_index = index.row()
|
||||
item = self.list.takeItem(selected_index)
|
||||
self.list.insertItem(selected_index - 1, item)
|
||||
self.list.setCurrentIndex(index.siblingAtRow(selected_index - 1))
|
||||
|
||||
def _move_down(self):
|
||||
index = self.list.selectedIndexes()[0]
|
||||
selected_index = index.row()
|
||||
item = self.list.takeItem(selected_index)
|
||||
self.list.insertItem(selected_index + 1, item)
|
||||
self.list.setCurrentIndex(index.sibling(selected_index + 1, 0))
|
||||
|
||||
def _save(self):
|
||||
|
||||
if self._is_dirty():
|
||||
unsaved = QMessageBox(QMessageBox.Icon.Question, _("unsaved changes"),
|
||||
_("You have unsaved changes."))
|
||||
unsaved.setStandardButtons(QMessageBox.Cancel | QMessageBox.StandardButton.Discard)
|
||||
result = unsaved.exec()
|
||||
if result == QMessageBox.StandardButton.Cancel:
|
||||
return
|
||||
|
||||
highlighters = []
|
||||
for index in range(0, self.list.count()):
|
||||
item: PayloadItem = self.list.item(index)
|
||||
highlighters.append(item.payload)
|
||||
Highlighting.write_config(self._settings, highlighters)
|
||||
self.close()
|
||||
|
||||
def _selection_changed(self):
|
||||
if len(self.list.selectedIndexes()) == 0:
|
||||
self.btn_update.setDisabled(True)
|
||||
self.btn_delete.setDisabled(True)
|
||||
self.btn_move_up.setDisabled(True)
|
||||
self.btn_move_down.setDisabled(True)
|
||||
if len(self.list.selectedIndexes()) == 1:
|
||||
selected_index = self.list.selectedIndexes()[0].row()
|
||||
self.btn_update.setDisabled(False)
|
||||
self.btn_delete.setDisabled(False)
|
||||
self.btn_move_up.setDisabled(selected_index == 0)
|
||||
self.btn_move_down.setDisabled(selected_index + 1 >= self.list.count())
|
||||
|
||||
item: PayloadItem = self.list.item(selected_index)
|
||||
highlighter: HighlightRegex = item.payload
|
||||
self.query.setText(highlighter.query)
|
||||
self.ignore_case.setChecked(highlighter.ignore_case)
|
||||
self.is_regex.setChecked(highlighter.is_regex)
|
||||
self.hit_background_color.set_color(highlighter.hit_background_color)
|
||||
self.line_background_color.set_color(highlighter.line_background_color)
|
||||
|
||||
def _is_dirty(self):
|
||||
if len(self.list.selectedIndexes()) == 0:
|
||||
dirty = False
|
||||
if len(self.list.selectedIndexes()) == 1:
|
||||
item: PayloadItem = self.list.currentItem()
|
||||
highlighter: HighlightRegex = item.payload
|
||||
dirty = self.query.text() != highlighter.query \
|
||||
or self.ignore_case.isChecked() != highlighter.ignore_case \
|
||||
or self.is_regex.isChecked() != highlighter.is_regex \
|
||||
or self.hit_background_color.color != highlighter.hit_background_color \
|
||||
or self.line_background_color.color != highlighter.line_background_color
|
||||
else:
|
||||
dirty = False
|
||||
return dirty
|
||||
|
||||
def _load_existing_hightlighters(self):
|
||||
highlighters: [HighlightRegex] = Highlighting.read_config(self._settings)
|
||||
first_item = None
|
||||
for highlighter in highlighters:
|
||||
item = PayloadItem(str(highlighter.query))
|
||||
item.payload = highlighter
|
||||
item.setBackground(highlighter.hit_background_brush())
|
||||
self.list.addItem(item)
|
||||
if not first_item:
|
||||
first_item = item
|
||||
115
raven/ui/bigtext/line.py
Normal file
115
raven/ui/bigtext/line.py
Normal file
@@ -0,0 +1,115 @@
|
||||
import unicodedata
|
||||
|
||||
import constants
|
||||
|
||||
|
||||
class Line:
|
||||
def __init__(self, byte_offset: int, byte_end: int, line: str):
|
||||
self._byte_offset = byte_offset
|
||||
self._byte_end = byte_end
|
||||
self._line = line
|
||||
|
||||
self._cache_char_to_column()
|
||||
|
||||
def byte_offset(self) -> int:
|
||||
return self._byte_offset
|
||||
|
||||
def byte_end(self) -> int:
|
||||
return self._byte_end
|
||||
|
||||
def line(self) -> str:
|
||||
return self._line
|
||||
|
||||
def length_in_charaters(self) -> int:
|
||||
return len(self._line)
|
||||
|
||||
def length_in_columns(self) -> int:
|
||||
return self.char_to_column(len(self._line) - 1)
|
||||
|
||||
def char_index_to_byte(self, char_in_line: int) -> int:
|
||||
# todo this does not work with multibyte characters
|
||||
# should probably be len(self.prefix(char_in_line-1).encode("utf8"))
|
||||
return len(self.prefix(char_in_line).encode("utf8"))
|
||||
|
||||
def byte_index_to_char_index(self, byte_index: int) -> int:
|
||||
prefix_bytes = self._line.encode("utf8")[:byte_index]
|
||||
prefix_chars = prefix_bytes.decode("utf8", errors="ignore")
|
||||
return len(prefix_chars)
|
||||
|
||||
def line_tabs_replaced(self):
|
||||
line = self._line;
|
||||
i = 0
|
||||
offset = 0
|
||||
result = ""
|
||||
length = len(line)
|
||||
while True:
|
||||
tab_index = line.find("\t", offset)
|
||||
if tab_index < 0:
|
||||
break
|
||||
result = result + line[offset:tab_index]
|
||||
result = result + " " * (constants.tab_width - len(result) % constants.tab_width)
|
||||
offset = tab_index + 1
|
||||
|
||||
result = result + line[offset:]
|
||||
|
||||
return result
|
||||
|
||||
def column_to_char(self, column_in_line: int) -> int:
|
||||
if column_in_line in self._column_to_char_cache:
|
||||
return self._column_to_char_cache[column_in_line]
|
||||
return 0
|
||||
|
||||
def char_to_column(self, char_in_line: int) -> int:
|
||||
if not char_in_line in self._char_to_column_cache:
|
||||
# print("%d in %s" % (char_in_line, self._char_to_column_cache))
|
||||
return -1
|
||||
return self._char_to_column_cache[char_in_line]
|
||||
|
||||
def _cache_char_to_column(self):
|
||||
self._char_to_column_cache = {}
|
||||
self._column_to_char_cache = {}
|
||||
result = 0
|
||||
i = 0
|
||||
self._char_to_column_cache[0] = 0
|
||||
while i < len(self._line):
|
||||
self._char_to_column_cache[i] = result
|
||||
if not result in self._column_to_char_cache:
|
||||
self._column_to_char_cache[result] = i
|
||||
current_char = self._line[i]
|
||||
if current_char == "\t":
|
||||
result = result + constants.tab_width - result % constants.tab_width
|
||||
else:
|
||||
result = result + 1
|
||||
i = i + 1
|
||||
|
||||
# ignore: Nonspacing Mark characters are decorations for the previous character.
|
||||
# They do not take up space.
|
||||
# For example the character Combining Diaeresis (U+0308, %CC%88) that adds two
|
||||
# dots above the previous character. It can be used to create an 'ä' from an 'a'+'◌̈'.
|
||||
# In url encoding this looks like: a%CC%88.
|
||||
# todo there are many other character combinations that should be skipped
|
||||
while i < len(self._line) and unicodedata.category(self._line[i]) == "Mn":
|
||||
self._char_to_column_cache[i] = result - 1
|
||||
if not result in self._column_to_char_cache:
|
||||
self._column_to_char_cache[result - 1] = i
|
||||
i = i + 1
|
||||
|
||||
def includes_byte(self, byte: int) -> bool:
|
||||
return self._byte_offset <= byte <= self._byte_end
|
||||
|
||||
def intersects(self, start_byte: int, end_byte: int):
|
||||
result = start_byte < self._byte_end and end_byte > self._byte_offset
|
||||
# print("%d,%d in %d,%d" % (start_byte, end_byte, self._byte_offset, self._byte_end))
|
||||
return result
|
||||
|
||||
def prefix(self, index: int) -> str:
|
||||
return self._line[0:index]
|
||||
|
||||
def substr(self, offset: int, length: int) -> str:
|
||||
return self._line[offset:offset+length]
|
||||
|
||||
def suffix(self, index: int) -> str:
|
||||
return self._line[index:]
|
||||
|
||||
def __str__(self):
|
||||
return "%s (%d->%d)" % (self._line, self._byte_offset, self._byte_end)
|
||||
22
raven/ui/bigtext/linetolinemap.py
Normal file
22
raven/ui/bigtext/linetolinemap.py
Normal file
@@ -0,0 +1,22 @@
|
||||
import os
|
||||
import tempfile
|
||||
from typing import Optional
|
||||
|
||||
from raven.util.int2intmap import Int2IntMap
|
||||
|
||||
|
||||
class LineToLineMap:
|
||||
def __init__(self):
|
||||
(handle, self.tmpfilename) = tempfile.mkstemp()
|
||||
os.close(handle)
|
||||
self._int2intmap = Int2IntMap(self.tmpfilename)
|
||||
|
||||
def close(self):
|
||||
self._int2intmap.close()
|
||||
os.remove(self.tmpfilename)
|
||||
|
||||
def add_line(self, key_byte_start: int, value_byte_start):
|
||||
self._int2intmap.add(key_byte_start, value_byte_start)
|
||||
|
||||
def get_line(self, key_byte_start: int) -> Optional[int]:
|
||||
return self._int2intmap.find(key_byte_start)
|
||||
164
raven/ui/bigtext/logFileModel.py
Normal file
164
raven/ui/bigtext/logFileModel.py
Normal file
@@ -0,0 +1,164 @@
|
||||
import math
|
||||
import re
|
||||
from typing import List, Optional
|
||||
from PySide6.QtCore import Signal
|
||||
from raven.ui.bigtext.highlight_regex import HighlightRegex
|
||||
from raven.ui.bigtext.highlighting import Highlighting
|
||||
from raven.ui.bigtext.line import Line
|
||||
import os
|
||||
from raven.settings.settings import Settings
|
||||
|
||||
|
||||
class LogFileModel:
|
||||
_query_highlight: Optional[HighlightRegex] = None
|
||||
|
||||
file_size_changed = Signal()
|
||||
"""Fires when the file size changed. **Note:** uses strings,
|
||||
because int in Qt signal are limited to 32bit."""
|
||||
|
||||
_file_size = -1
|
||||
|
||||
def __init__(self, file: str, settings: Settings):
|
||||
self.settings = settings
|
||||
self._file = os.path.realpath(file)
|
||||
# self._lock = threading.RLock()
|
||||
|
||||
def highlighters(self):
|
||||
return Highlighting.read_config(self.settings)
|
||||
|
||||
def get_file(self):
|
||||
return self._file
|
||||
|
||||
def __str__(self):
|
||||
return self._file
|
||||
|
||||
def get_query_highlight(self) -> Optional[HighlightRegex]:
|
||||
if not self.settings.session.getboolean("general", "highlight_search_term"):
|
||||
return None
|
||||
return self._query_highlight
|
||||
|
||||
def clear_query_highlight(self):
|
||||
self._query_highlight = None
|
||||
|
||||
def set_query_highlight(self, query: str, ignore_case: bool, is_regex: bool):
|
||||
self._query_highlight = HighlightRegex(
|
||||
query=query,
|
||||
ignore_case=ignore_case,
|
||||
is_regex=is_regex,
|
||||
hit_background_color="ffff00")
|
||||
|
||||
def get_tab_name(self):
|
||||
file_name = os.path.basename(self._file)
|
||||
if len(file_name) > 35:
|
||||
file_name = file_name[:15] + "..." + file_name[-15:]
|
||||
return file_name
|
||||
|
||||
def read_range(self, start_byte: int, end_byte: int):
|
||||
# with self._lock:
|
||||
if True:
|
||||
with open(self._file, 'rb') as f:
|
||||
f.seek(start_byte)
|
||||
bytes = f.read(end_byte - start_byte)
|
||||
return bytes.decode("utf8", errors="ignore")
|
||||
|
||||
def write_range(self, start_byte: int, end_byte: int, file: str):
|
||||
# print("write range: %d - %d -> %s" % (start_byte, end_byte, file))
|
||||
# with self._lock, open(self._file, 'rb') as source, open(file, "w+b") as target:
|
||||
with open(self._file, 'rb') as source, open(file, "w+b") as target:
|
||||
offset = start_byte
|
||||
source.seek(offset)
|
||||
while offset < end_byte:
|
||||
new_offset = min(offset + 1024 * 1024, end_byte)
|
||||
buffer_size = new_offset - offset
|
||||
buffer = source.read(buffer_size)
|
||||
target.write(buffer)
|
||||
offset = new_offset
|
||||
|
||||
def read_word_at(self, byte_offset: int) -> (str, int, int):
|
||||
lines = self.data(byte_offset, 0, 1)
|
||||
if len(lines) == 0:
|
||||
return "", -1, -1
|
||||
line: Line = lines[0]
|
||||
if not line.includes_byte(byte_offset):
|
||||
return "", -1, -1
|
||||
|
||||
offset_in_line = byte_offset - line.byte_offset()
|
||||
char_index = line.byte_index_to_char_index(offset_in_line)
|
||||
current_char = line.line()[char_index]
|
||||
# print("read_word: char_index=%s, current_char=%s, line=%s" %(char_index, current_char, line.line()))
|
||||
if not self._is_word_char(current_char):
|
||||
return current_char, byte_offset, byte_offset + 1
|
||||
start_in_line = line.byte_index_to_char_index(byte_offset - line.byte_offset())
|
||||
while start_in_line - 1 >= 0 and self._is_word_char(line.line()[start_in_line - 1]):
|
||||
start_in_line = start_in_line - 1
|
||||
end_in_line = line.byte_index_to_char_index(byte_offset - line.byte_offset())
|
||||
while end_in_line < len(line.line()) and self._is_word_char(line.line()[end_in_line]):
|
||||
end_in_line = end_in_line + 1
|
||||
start_byte = line.char_index_to_byte(start_in_line) + line.byte_offset()
|
||||
end_byte = line.char_index_to_byte(end_in_line) + line.byte_offset()
|
||||
return line.line()[start_in_line:end_in_line], start_byte, end_byte
|
||||
|
||||
def _is_word_char(self, char: str) -> bool:
|
||||
return re.match(r"\w", char) is not None
|
||||
|
||||
def data(self, byte_offset: int, scroll_lines: int, lines: int) -> List[Line]:
|
||||
# print("data(%s, %s, %s)" % (byte_offset, scroll_lines, lines))
|
||||
lines_before_offset: List[Line] = []
|
||||
lines_after_offset: List[Line] = []
|
||||
lines_to_find = lines + abs(scroll_lines)
|
||||
lines_to_return = math.ceil(lines)
|
||||
|
||||
# with self._lock:
|
||||
if True:
|
||||
# TODO handle lines longer than 4096 bytes
|
||||
# TODO abort file open after a few secons: https://docs.python.org/3/library/signal.html#example
|
||||
with open(self._file, 'rb') as f:
|
||||
offset = min(byte_offset, self.byte_count())
|
||||
# print("offset: %s byte_count: %d" % (offset, self.byte_count()))
|
||||
offset = max(0, offset - self.settings.max_line_length())
|
||||
|
||||
eof_reached = True
|
||||
f.seek(offset)
|
||||
while l := f.readline():
|
||||
new_offset = f.tell()
|
||||
line = Line(offset, new_offset,
|
||||
l.decode("utf8", errors="ignore") # .replace("\r", "").replace("\n", "")
|
||||
)
|
||||
# print("%s %s %s" %(line.byte_offset(), line.line(), line.byte_end()))
|
||||
if line.byte_end() <= byte_offset: # line.byte_end() returns the end byte +1
|
||||
lines_before_offset.append(line)
|
||||
else:
|
||||
lines_after_offset.append(line)
|
||||
offset = f.tell()
|
||||
if len(lines_after_offset) >= lines_to_find:
|
||||
eof_reached = False
|
||||
break
|
||||
|
||||
all_lines = lines_before_offset + lines_after_offset
|
||||
start = max(0, len(lines_before_offset) + scroll_lines)
|
||||
if start + lines_to_return - 1 < len(all_lines):
|
||||
result = all_lines[start:start + lines_to_return]
|
||||
else:
|
||||
result = all_lines[-lines_to_return + 1:]
|
||||
|
||||
# print("returning %s lines" % (len(result)))
|
||||
# if len(result) > 0:
|
||||
# print("returning %s %d -> %d" % (result[0].line(), result[0].byte_offset(), result[0].byte_end()))
|
||||
return result
|
||||
|
||||
def byte_count(self) -> int:
|
||||
size = os.stat(self._file).st_size
|
||||
if self._file_size != size:
|
||||
# self.file_size_changed.emit(str(size))
|
||||
self._file_size = size
|
||||
return size
|
||||
|
||||
def write_line(self, line: str):
|
||||
with open(self._file, 'a+b') as f:
|
||||
f.write(line.encode("utf8"))
|
||||
if not line.endswith("\n"):
|
||||
f.write("\n".encode("utf8"))
|
||||
|
||||
def truncate(self):
|
||||
with open(self._file, 'a') as f:
|
||||
f.truncate(0)
|
||||
116
raven/ui/bigtext/testline.py
Normal file
116
raven/ui/bigtext/testline.py
Normal file
@@ -0,0 +1,116 @@
|
||||
import unittest
|
||||
|
||||
import unicodedata
|
||||
|
||||
from line import Line
|
||||
|
||||
|
||||
class MyTestCase(unittest.TestCase):
|
||||
def test_column_to_char(self):
|
||||
byte_offset = 123
|
||||
text = "\tabc\td\tef\tg" # will be rendered as: ....abc.d...ef..g where . represents a whitespace column
|
||||
line = Line(byte_offset=byte_offset, byte_end=byte_offset + len(text.encode("utf8")), line=text)
|
||||
|
||||
self.assertEqual(0, line.column_to_char(0)) # the tab
|
||||
self.assertEqual(0, line.column_to_char(1)) # the tab
|
||||
self.assertEqual(0, line.column_to_char(2)) # the tab
|
||||
self.assertEqual(0, line.column_to_char(3)) # last column of the tab
|
||||
self.assertEqual(1, line.column_to_char(4)) # a
|
||||
self.assertEqual(2, line.column_to_char(5)) # b
|
||||
self.assertEqual(3, line.column_to_char(6)) # c
|
||||
self.assertEqual(4, line.column_to_char(7)) # tab
|
||||
self.assertEqual(5, line.column_to_char(8)) # d
|
||||
self.assertEqual(6, line.column_to_char(9)) # tab
|
||||
self.assertEqual(6, line.column_to_char(10)) # tab
|
||||
self.assertEqual(6, line.column_to_char(11)) # tab
|
||||
self.assertEqual(7, line.column_to_char(12)) # e
|
||||
self.assertEqual(8, line.column_to_char(13)) # f
|
||||
self.assertEqual(9, line.column_to_char(14)) # tab
|
||||
self.assertEqual(9, line.column_to_char(15)) # tab
|
||||
self.assertEqual(10, line.column_to_char(16)) # g
|
||||
|
||||
def test_column_to_char_ignore_nonspacing_mark_charaters(self):
|
||||
"""
|
||||
nonspacing mark charaters are those little decorations that are applied to the previous character,
|
||||
e.g. x\u0308 to make ẍ
|
||||
:return:
|
||||
"""
|
||||
byte_offset = 123
|
||||
text = "x\u0308y\u0308z\u0308"
|
||||
line = Line(byte_offset=byte_offset, byte_end=byte_offset + len(text.encode("utf8")), line=text)
|
||||
|
||||
self.assertEqual(0, line.column_to_char(0)) # ẍ
|
||||
self.assertEqual(2, line.column_to_char(1)) # ÿ
|
||||
self.assertEqual(4, line.column_to_char(2)) # z̈
|
||||
|
||||
def test_char_to_column(self):
|
||||
byte_offset = 123
|
||||
text = "\tabc\td\tef\tg" # will be rendered as: ....abc.d...ef..g where . represents a whitespace column
|
||||
line = Line(byte_offset=byte_offset, byte_end=byte_offset + len(text.encode("utf8")), line=text)
|
||||
self.assertEqual(0, line.char_to_column(0)) # tab
|
||||
self.assertEqual(4, line.char_to_column(1)) # a
|
||||
self.assertEqual(5, line.char_to_column(2)) # b
|
||||
self.assertEqual(6, line.char_to_column(3)) # c
|
||||
self.assertEqual(7, line.char_to_column(4)) # tab
|
||||
self.assertEqual(8, line.char_to_column(5)) # d
|
||||
self.assertEqual(9, line.char_to_column(6)) # tab
|
||||
self.assertEqual(12, line.char_to_column(7)) # e
|
||||
self.assertEqual(13, line.char_to_column(8)) # f
|
||||
self.assertEqual(14, line.char_to_column(9)) # tab
|
||||
self.assertEqual(16, line.char_to_column(10)) # g
|
||||
|
||||
def test_char_to_column_ignore_nonspacing_mark_charaters(self):
|
||||
"""
|
||||
nonspacing mark charaters are those little decorations that are applied to the previous character,
|
||||
e.g. x\u0308 to make ẍ
|
||||
:return:
|
||||
"""
|
||||
byte_offset = 123
|
||||
text = "x\u0308y\u0308z\u0308"
|
||||
print(text)
|
||||
line = Line(byte_offset=byte_offset, byte_end=byte_offset + len(text.encode("utf8")), line=text)
|
||||
self.assertEqual(0, line.char_to_column(0)) # ẍ
|
||||
self.assertEqual(0, line.char_to_column(1)) # ẍ
|
||||
self.assertEqual(1, line.char_to_column(2)) # ÿ
|
||||
self.assertEqual(1, line.char_to_column(3)) # ÿ
|
||||
self.assertEqual(2, line.char_to_column(4)) # z̈
|
||||
self.assertEqual(2, line.char_to_column(5)) # z̈
|
||||
|
||||
def test_line_tabs_replaced(self):
|
||||
byte_offset = 123
|
||||
text = "\ta\tb" # will be rendered as: ....abc where . represents a whitespace column
|
||||
expected = " a b"
|
||||
line = Line(byte_offset=byte_offset, byte_end=byte_offset + len(text.encode("utf8")), line=text)
|
||||
self.assertEqual(expected, line.line_tabs_replaced())
|
||||
|
||||
def test_line_tabs_replaced_performance(self):
|
||||
byte_offset = 123
|
||||
text = "a\t" * 10000
|
||||
expected = "a " * 10000
|
||||
line = Line(byte_offset=byte_offset, byte_end=byte_offset + len(text.encode("utf8")), line=text)
|
||||
self.assertEqual(expected, line.line_tabs_replaced())
|
||||
|
||||
def test_byte_index_to_char_index(self):
|
||||
byte_offset = 123
|
||||
text = "x\u0308y\u0308z\u0308\t\u0308a"
|
||||
print(text)
|
||||
line = Line(byte_offset=byte_offset, byte_end=byte_offset + len(text.encode("utf8")), line=text)
|
||||
self.assertEqual(0, line.byte_index_to_char_index(0)) # x
|
||||
self.assertEqual(0, line.byte_index_to_char_index(1)) # first byte of diacritical mark belonging to x
|
||||
self.assertEqual(0, line.byte_index_to_char_index(2)) # second byte of diacritical mark belonging to x
|
||||
|
||||
def test_diacritical_marks(self):
|
||||
text = "̈ẍỏôŏ̮👍🏿"
|
||||
text = "\U0001F9D9\u200D\u2642\uFE0F - \U0001F44D\U0001F3FF - a\u02c3 - ẍ - y\u0308 - w\u200D\u00A8"
|
||||
text = unicodedata.normalize("NFD", text)
|
||||
i = 0
|
||||
print("%s" % text)
|
||||
print("length: %s" % len(text))
|
||||
while i < len(text):
|
||||
c = text[i]
|
||||
print("%s %s cat: %s" % (c, unicodedata.name(c), unicodedata.category(c)))
|
||||
i = i + 1
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
174
raven/ui/bigtext/testlogfilemodel.py
Normal file
174
raven/ui/bigtext/testlogfilemodel.py
Normal file
@@ -0,0 +1,174 @@
|
||||
import unittest
|
||||
import tempfile
|
||||
from configparser import ConfigParser
|
||||
from os.path import join
|
||||
|
||||
from logFileModel import LogFileModel
|
||||
from raven.settings.settings import Settings
|
||||
|
||||
|
||||
class TestLogFileModel(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.test_dir = tempfile.TemporaryDirectory()
|
||||
self.tmpfile = join(self.test_dir.name, "my.log")
|
||||
self.model = LogFileModel(self.tmpfile, Settings(ConfigParser()))
|
||||
|
||||
def tearDown(self):
|
||||
self.test_dir.cleanup()
|
||||
|
||||
def write_str(self, string: str):
|
||||
with open(self.tmpfile, "w+b") as f:
|
||||
f.write(string.encode("utf8"))
|
||||
|
||||
def test_load_from_beginning(self):
|
||||
self.write_str("1\n2\n3\n4\n5\n6\n7\n")
|
||||
expected_lines = ["1", "2", "3", "4", "5"]
|
||||
|
||||
lines = self.model.data(0, 0, 5)
|
||||
|
||||
line_str = [l.line() for l in lines]
|
||||
self.assertEqual(expected_lines, line_str)
|
||||
|
||||
def test_load_from_middle_of_first_line(self):
|
||||
self.write_str("abc\ndef\nghi\njkl")
|
||||
expected_lines = ["abc", "def", "ghi"]
|
||||
|
||||
lines = self.model.data(1, 0, 3)
|
||||
|
||||
line_str = [l.line() for l in lines]
|
||||
self.assertEqual(expected_lines, line_str)
|
||||
|
||||
def test_read_from_newline_character(self):
|
||||
self.write_str("abc\ndef\nghi\njkl")
|
||||
expected_lines = ["abc", "def"]
|
||||
|
||||
lines = self.model.data(3, 0, 2)
|
||||
|
||||
line_str = [l.line() for l in lines]
|
||||
self.assertEqual(expected_lines, line_str)
|
||||
|
||||
|
||||
def test_negative_byte_offset(self):
|
||||
self.write_str("abc\ndef\nghi\njkl")
|
||||
expected_lines = ["abc","def"]
|
||||
|
||||
lines = self.model.data(-1, 0, 2)
|
||||
|
||||
line_str = [l.line() for l in lines]
|
||||
self.assertEqual(expected_lines, line_str)
|
||||
|
||||
def test_empty_last_line_is_ignored(self):
|
||||
self.write_str("1\n")
|
||||
expected_lines = ["1"]
|
||||
|
||||
lines = self.model.data(0, 0, 5)
|
||||
|
||||
line_str = [l.line() for l in lines]
|
||||
self.assertEqual(expected_lines, line_str)
|
||||
|
||||
def test_load_more_lines_than_are_available(self):
|
||||
"""
|
||||
Wants to read 4 lines in a file with only 3 lines.
|
||||
Returns all three lines.
|
||||
"""
|
||||
self.write_str("abc\ndef\nghi")
|
||||
expected_lines = ["abc", "def", "ghi"]
|
||||
|
||||
lines = self.model.data(0, 0, 4)
|
||||
|
||||
line_str = [l.line() for l in lines]
|
||||
self.assertEqual(expected_lines, line_str)
|
||||
|
||||
def test_read_behind_eof(self):
|
||||
"""
|
||||
Wants to read 4 lines from the middle of the second line.
|
||||
File has only 3 lines.
|
||||
Returns all 3 lines.
|
||||
"""
|
||||
text = "abc\ndef\nghi"
|
||||
self.write_str(text)
|
||||
expected_lines = ["abc", "def", "ghi"]
|
||||
|
||||
lines = self.model.data(text.index("e"), 0, 4)
|
||||
|
||||
line_str = [l.line() for l in lines]
|
||||
self.assertEqual(expected_lines, line_str)
|
||||
|
||||
def test_read_after_scrolling_2_lines(self):
|
||||
"""
|
||||
|
||||
"""
|
||||
text = "0___\n1___\n2___\n3___\n4___\n5___"
|
||||
self.write_str(text)
|
||||
expected_lines = ["3___", "4___"]
|
||||
|
||||
lines = self.model.data(text.index("1___"), 2, 2)
|
||||
|
||||
line_str = [l.line() for l in lines]
|
||||
self.assertEqual(expected_lines, line_str)
|
||||
|
||||
|
||||
def test_scroll_with_float(self):
|
||||
"""
|
||||
If lines to lines to return is a float, then the value is rounded up.
|
||||
Floats mean that the text area is such that a line is partially visible.
|
||||
"""
|
||||
text = "0___\n1___\n2___\n3___\n4___\n5___"
|
||||
self.write_str(text)
|
||||
expected_lines = ["3___","4___", "5___"]
|
||||
|
||||
lines = self.model.data(text.index("1___"), 2, 2.1)
|
||||
|
||||
line_str = [l.line() for l in lines]
|
||||
self.assertEqual(expected_lines, line_str)
|
||||
|
||||
|
||||
def test_scroll_up_at_beginning_of_file(self):
|
||||
"""
|
||||
Scrolling up at beginning of file.
|
||||
Return
|
||||
"""
|
||||
text = "0___\n1___\n2___\n3___\n4___\n5___"
|
||||
self.write_str(text)
|
||||
expected_lines = ["0___", "1___"]
|
||||
|
||||
lines = self.model.data(5, -2, 2)
|
||||
|
||||
line_str = [l.line() for l in lines]
|
||||
self.assertEqual(expected_lines, line_str)
|
||||
|
||||
def test_read_word_at_middle_of_line(self):
|
||||
text = "0___\nlorem ipsum dolor sit amet\n2___"
|
||||
self.write_str(text)
|
||||
expected = ("ipsum", text.index("ipsum"), text.index("ipsum") + len("ipsum"))
|
||||
actual = self.model.read_word_at(text.index("ipsum") + 2)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def test_read_word_at_start_of_line(self):
|
||||
text = "0___\nlorem ipsum dolor sit amet\n2___"
|
||||
word = "lorem"
|
||||
self.write_str(text)
|
||||
expected = (word, text.index(word), text.index(word) + len(word))
|
||||
actual = self.model.read_word_at(text.index(word))
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def test_read_word_at_end_of_line(self):
|
||||
text = "0___\nlorem ipsum dolor sit amet\n2___"
|
||||
word = "amet"
|
||||
self.write_str(text)
|
||||
expected = (word, text.index(word), text.index(word) + len(word))
|
||||
actual = self.model.read_word_at(text.index(word))
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def test_read_word_at_beginning_of_file(self):
|
||||
text = "lorem ipsum dolor sit amet\n1___"
|
||||
word = "lorem"
|
||||
self.write_str(text)
|
||||
expected = (word, text.index(word), text.index(word) + len(word))
|
||||
actual = self.model.read_word_at(text.index(word))
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user