import math import os from logging import exception from typing import Optional class Int2IntMapLike(): """ A file used to map byte numbers of the filter view to byte numbers in the original file. Each line contains the two integers separated by a comma. The first column is sorted ascending. This allows us to do binary searches. The file uses 4kb blocks. That means we add fill bytes (newlines) if a line would cross a 4kb block boundary. """ blocksize = 4096 def __init__(self, file): self._file = file self._handle = open(file, "w+t") def close(self): if not self._handle.closed: self._handle.close() def reset(self): self._handle.truncate(0) def add(self, start: int, length: int, val: int): line = "%d,%d,%d\n" % (start, length, val) length = len(line) offset = self._handle.tell() if offset % self.blocksize + length > self.blocksize: fill_bytes = self.blocksize - offset % self.blocksize self._handle.write("\n" * fill_bytes) self._handle.write(line) def find(self, key: int) -> Optional[int]: size = os.stat(self._file).st_size if size == 0: return None total_blocks = math.ceil(size / self.blocksize) step = math.ceil(total_blocks / 2) offset = (step - 1) * self.blocksize while step >= 1: self._handle.seek(offset) block = self._handle.read(self.blocksize) lines = block.split("\n") is_before = None for line in lines: if len(line) == 0: continue token = line.split(",") start = int(token[0]) length = int(token[1]) val = int(token[2]) if key >= start and key - start < length: return val tmp = key < start if is_before != None and tmp != is_before: return None is_before = tmp if step == 1: return None step = math.ceil(step / 2) if is_before: offset = offset - step * self.blocksize else: offset = offset + step * self.blocksize