Files
krowlog/int2intmaplike.py

81 lines
2.6 KiB
Python

import math
import os
from logging import exception
from typing import Optional
class Int2IntMapLike():
"""
A file used to map byte numbers of the filter view to byte numbers in the original file.
Each line contains the two integers separated by a comma.
The first column is sorted ascending. This allows us to do binary searches.
The file uses 4kb blocks. That means we add fill bytes (newlines) if a line would cross a 4kb block boundary.
"""
blocksize = 4096
def __init__(self, file):
self._file = file
self._handle = open(file, "w+t")
self._buffer = ""
def close(self):
if not self._handle.closed:
self._handle.close()
def reset(self):
self._handle.truncate(0)
def add(self, start: int, length: int, val: int):
line = "%d,%d,%d\n" % (start, length, val)
length = len(line)
offset = self._handle.tell() + len(self._buffer)
if offset % self.blocksize + length > self.blocksize:
# end of block: fill block
fill_bytes = self.blocksize - offset % self.blocksize
self._buffer = self._buffer + ("\n" * fill_bytes)
else:
self._buffer = self._buffer + line
def _flush_buffer(self):
self._handle.write(self._buffer)
self._buffer = ""
self._handle.flush()
def find(self, key: int) -> Optional[int]:
if (len(self._buffer)) > 0:
self._flush_buffer()
size = os.stat(self._file).st_size
if size == 0:
return None
total_blocks = math.ceil(size / self.blocksize)
step = math.ceil(total_blocks / 2)
offset = (step - 1) * self.blocksize
while step >= 1:
self._handle.seek(offset)
block = self._handle.read(self.blocksize)
lines = block.split("\n")
is_before = None
for line in lines:
if len(line) == 0:
continue
token = line.split(",")
start = int(token[0])
length = int(token[1])
val = int(token[2])
if key >= start and key - start < length:
return val
tmp = key < start
if is_before != None and tmp != is_before:
return None
is_before = tmp
if step == 1:
return None
step = math.ceil(step / 2)
if is_before:
offset = offset - step * self.blocksize
else:
offset = offset + step * self.blocksize