import math import os from logging import exception from typing import Optional class Int2IntMap(): """ A file used to map byte numbers of the filter view to byte numbers in the original file. Each line contains the two integers separated by a comma. The first column is sorted ascending. This allows us to do binary searches. The file uses 4kb blocks. That means we add fill bytes (newlines) if a line would cross a 4kb block boundary. """ blocksize = 4096 def __init__(self, file): self._file = file self._handle = open(file, "w+t") self._buffer = "" def close(self): if not self._handle.closed: self._handle.close() def reset(self): self._handle.truncate(0) def add(self, key: int, val: int): line = "%d,%d\n" % (key, val) length = len(line) offset = self._handle.tell() + len(self._buffer) if offset % self.blocksize + length > self.blocksize: # end of block: fill block fill_bytes = self.blocksize - offset % self.blocksize self._buffer = self._buffer + ("\n" * fill_bytes) self._buffer = self._buffer + line if len(self._buffer) > self.blocksize * 100: self._flush_buffer() def _flush_buffer(self): self._handle.write(self._buffer) self._buffer = "" self._handle.flush() def find(self, key: int) -> Optional[int]: if (len(self._buffer)) > 0: self._flush_buffer() size = os.stat(self._file).st_size if size == 0: return None total_blocks = math.ceil(size / self.blocksize) l = 0 r = total_blocks - 1 while r >= l: mid = l + math.floor((r - l) / 2) offset = mid * self.blocksize self._handle.seek(offset) block = self._handle.read(self.blocksize) lines = block.split("\n") is_before = None for line in lines: if len(line) == 0: continue token = line.split(",") k = int(token[0]) val = int(token[1]) if key == k: return val tmp = key < k if is_before is not None and tmp != is_before: return None else: is_before = tmp if is_before: r = mid - 1 else: l = mid + 1 return None def total_blocks(self) -> int: size = os.stat(self._file).st_size return math.ceil(size / self.blocksize)