BSFile uses a wrapper for DiskBlock to add BSFile specific stuff
This keeps the DiskBlock class clean, so that it can be used for PersistentMap.
This commit is contained in:
@@ -65,7 +65,7 @@ public class BSFile implements AutoCloseable {
|
||||
* The last disk block of this sequence. This is the block new values will be
|
||||
* appended to.
|
||||
*/
|
||||
private DiskBlock buffer;
|
||||
private BSFileDiskBlock buffer;
|
||||
|
||||
private int offsetInBuffer = 0;
|
||||
|
||||
@@ -75,16 +75,16 @@ public class BSFile implements AutoCloseable {
|
||||
|
||||
private final DiskStorage diskStorage;
|
||||
|
||||
private final DiskBlock rootDiskBlock;
|
||||
private final BSFileDiskBlock rootDiskBlock;
|
||||
|
||||
private long lastEpochMilli;
|
||||
|
||||
BSFile(final long rootBlockOffset, final DiskStorage diskStorage) throws IOException {
|
||||
|
||||
this(diskStorage.getDiskBlock(rootBlockOffset, BLOCK_SIZE), diskStorage);
|
||||
this(new BSFileDiskBlock(diskStorage.getDiskBlock(rootBlockOffset, BLOCK_SIZE)), diskStorage);
|
||||
}
|
||||
|
||||
BSFile(final DiskBlock rootDiskBlock, final DiskStorage diskStorage) throws IOException {
|
||||
BSFile(final BSFileDiskBlock rootDiskBlock, final DiskStorage diskStorage) throws IOException {
|
||||
|
||||
this.rootDiskBlock = rootDiskBlock;
|
||||
this.rootBlockOffset = rootDiskBlock.getBlockOffset();
|
||||
@@ -94,14 +94,14 @@ public class BSFile implements AutoCloseable {
|
||||
if (lastBlockNumber == rootBlockOffset || lastBlockNumber == 0) {
|
||||
buffer = rootDiskBlock;
|
||||
} else {
|
||||
buffer = diskStorage.getDiskBlock(lastBlockNumber, BLOCK_SIZE);
|
||||
buffer = new BSFileDiskBlock(diskStorage.getDiskBlock(lastBlockNumber, BLOCK_SIZE));
|
||||
}
|
||||
offsetInBuffer = determineWriteOffsetInExistingBuffer(buffer);
|
||||
lastEpochMilli = determineLastEpochMilli(buffer);
|
||||
LOGGER.trace("create bsFile={} lastBlockNumber={}", rootBlockOffset, lastBlockNumber);
|
||||
}
|
||||
|
||||
private long determineLastEpochMilli(final DiskBlock diskBlock) {
|
||||
private long determineLastEpochMilli(final BSFileDiskBlock diskBlock) {
|
||||
|
||||
// get the time/value delta encoded longs
|
||||
final byte[] buf = diskBlock.getBuffer();
|
||||
@@ -121,7 +121,7 @@ public class BSFile implements AutoCloseable {
|
||||
return result;
|
||||
}
|
||||
|
||||
private int determineWriteOffsetInExistingBuffer(final DiskBlock buffer) {
|
||||
private int determineWriteOffsetInExistingBuffer(final BSFileDiskBlock buffer) {
|
||||
|
||||
final byte[] buf = buffer.getBuffer();
|
||||
|
||||
@@ -194,7 +194,7 @@ public class BSFile implements AutoCloseable {
|
||||
}
|
||||
|
||||
// set the new buffer
|
||||
buffer = diskStorage.getDiskBlock(newBlockOffset, BLOCK_SIZE);
|
||||
buffer = new BSFileDiskBlock(diskStorage.getDiskBlock(newBlockOffset, BLOCK_SIZE));
|
||||
offsetInBuffer = 0;
|
||||
dirty = false;
|
||||
LOGGER.trace("flushFullBufferAndCreateNew bsFile={} newBlock={}", rootBlockOffset, newBlockOffset);
|
||||
@@ -247,17 +247,17 @@ public class BSFile implements AutoCloseable {
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return nextBlockOffset != DiskBlock.NO_NEXT_POINTER;
|
||||
return nextBlockOffset != BSFileDiskBlock.NO_NEXT_POINTER;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongList next() {
|
||||
try {
|
||||
if (nextBlockOffset == DiskBlock.NO_NEXT_POINTER) {
|
||||
if (nextBlockOffset == BSFileDiskBlock.NO_NEXT_POINTER) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
|
||||
final DiskBlock diskBlock = diskStorage.getDiskBlock(nextBlockOffset, BLOCK_SIZE);
|
||||
final BSFileDiskBlock diskBlock = getDiskBlock(nextBlockOffset);
|
||||
nextBlockOffset = diskBlock.getNextBlockNumber();
|
||||
|
||||
final byte[] buf = diskBlock.getBuffer();
|
||||
@@ -267,6 +267,11 @@ public class BSFile implements AutoCloseable {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private BSFileDiskBlock getDiskBlock(final long blockOffset) throws IOException {
|
||||
final DiskBlock diskBlock = diskStorage.getDiskBlock(blockOffset, BLOCK_SIZE);
|
||||
return new BSFileDiskBlock(diskBlock);
|
||||
}
|
||||
}
|
||||
|
||||
public LongList asLongList() {
|
||||
|
||||
@@ -0,0 +1,97 @@
|
||||
package org.lucares.pdb.blockstorage;
|
||||
|
||||
import java.nio.MappedByteBuffer;
|
||||
|
||||
import org.lucares.collections.LongList;
|
||||
import org.lucares.pdb.blockstorage.intsequence.VariableByteEncoder;
|
||||
import org.lucares.pdb.diskstorage.DiskBlock;
|
||||
|
||||
public class BSFileDiskBlock {
|
||||
|
||||
protected static final int NEXT_POINTER_OFFSET = 0;
|
||||
public static final long NO_NEXT_POINTER = 0;
|
||||
private static final int LAST_BLOCK_POINTER_POSITION = 8;
|
||||
public static final long NO_LAST_BLOCK = 0;
|
||||
private static final int INT_SEQUENCE_OFFSET = 8 // next block pointer
|
||||
+ 8; // last block pointer;
|
||||
|
||||
private final DiskBlock diskBlock;
|
||||
private long nextBlockOffset = 0;
|
||||
private long lastBlockOffset = 0;
|
||||
|
||||
private byte[] buffer = null;
|
||||
|
||||
public BSFileDiskBlock(final DiskBlock diskBlock) {
|
||||
this.diskBlock = diskBlock;
|
||||
}
|
||||
|
||||
public byte[] getBuffer() {
|
||||
|
||||
if (buffer == null) {
|
||||
final MappedByteBuffer byteBuffer = diskBlock.getByteBuffer();
|
||||
this.buffer = new byte[byteBuffer.capacity() - INT_SEQUENCE_OFFSET];
|
||||
byteBuffer.position(INT_SEQUENCE_OFFSET);
|
||||
byteBuffer.get(buffer);
|
||||
}
|
||||
|
||||
return buffer;
|
||||
}
|
||||
|
||||
public long getBlockOffset() {
|
||||
return diskBlock.getBlockOffset();
|
||||
}
|
||||
|
||||
public void setNextBlockOffset(final long nextBlockOffset) {
|
||||
this.nextBlockOffset = nextBlockOffset;
|
||||
}
|
||||
|
||||
public long getLastBlockPointer() {
|
||||
|
||||
if (lastBlockOffset <= 0) {
|
||||
lastBlockOffset = diskBlock.getByteBuffer().getLong(LAST_BLOCK_POINTER_POSITION);
|
||||
}
|
||||
|
||||
return lastBlockOffset;
|
||||
}
|
||||
|
||||
public long getNextBlockNumber() {
|
||||
if (nextBlockOffset <= 0) {
|
||||
nextBlockOffset = diskBlock.getByteBuffer().getLong(NEXT_POINTER_OFFSET);
|
||||
}
|
||||
return nextBlockOffset;
|
||||
}
|
||||
|
||||
public void setLastBlockOffset(final long lastBlockOffset) {
|
||||
this.lastBlockOffset = lastBlockOffset;
|
||||
}
|
||||
|
||||
public void writeLastBlockOffset(final long lastBlockOffset) {
|
||||
this.lastBlockOffset = lastBlockOffset;
|
||||
diskBlock.getByteBuffer().putLong(LAST_BLOCK_POINTER_POSITION, lastBlockOffset);
|
||||
}
|
||||
|
||||
private void writeBufferToByteBuffer() {
|
||||
diskBlock.getByteBuffer().position(INT_SEQUENCE_OFFSET);
|
||||
diskBlock.getByteBuffer().put(buffer);
|
||||
}
|
||||
|
||||
private void writeBlockHeader() {
|
||||
diskBlock.getByteBuffer().putLong(NEXT_POINTER_OFFSET, nextBlockOffset);
|
||||
diskBlock.getByteBuffer().putLong(LAST_BLOCK_POINTER_POSITION, lastBlockOffset);
|
||||
}
|
||||
|
||||
public void writeAsync() {
|
||||
writeBlockHeader();
|
||||
writeBufferToByteBuffer();
|
||||
}
|
||||
|
||||
public void force() {
|
||||
diskBlock.getByteBuffer().force();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
final LongList bufferDecoded = VariableByteEncoder.decode(buffer);
|
||||
return "BSFileDiskBlock[bufferDecoded=" + bufferDecoded + "]";
|
||||
}
|
||||
}
|
||||
@@ -2,26 +2,10 @@ package org.lucares.pdb.diskstorage;
|
||||
|
||||
import java.nio.MappedByteBuffer;
|
||||
|
||||
import org.lucares.collections.LongList;
|
||||
import org.lucares.pdb.blockstorage.intsequence.VariableByteEncoder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class DiskBlock {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(DiskBlock.class);
|
||||
|
||||
protected static final int NEXT_POINTER_OFFSET = 0;
|
||||
public static final long NO_NEXT_POINTER = 0;
|
||||
private static final int LAST_BLOCK_POINTER_POSITION = 8;
|
||||
public static final long NO_LAST_BLOCK = 0;
|
||||
private static final int INT_SEQUENCE_OFFSET = 8 // next block pointer
|
||||
+ 8; // last block pointer;
|
||||
|
||||
private byte[] buffer = null;
|
||||
private final long blockOffset;
|
||||
private long nextBlockOffset = 0;
|
||||
private long lastBlockOffset = 0;
|
||||
|
||||
private final MappedByteBuffer byteBuffer;
|
||||
|
||||
@@ -33,74 +17,36 @@ public class DiskBlock {
|
||||
public byte[] getBuffer() {
|
||||
|
||||
if (buffer == null) {
|
||||
this.buffer = new byte[byteBuffer.capacity() - INT_SEQUENCE_OFFSET];
|
||||
byteBuffer.position(INT_SEQUENCE_OFFSET);
|
||||
this.buffer = new byte[byteBuffer.capacity()];
|
||||
byteBuffer.get(buffer);
|
||||
}
|
||||
|
||||
return buffer;
|
||||
}
|
||||
|
||||
public MappedByteBuffer getByteBuffer() {
|
||||
return byteBuffer;
|
||||
}
|
||||
|
||||
public long getBlockOffset() {
|
||||
return blockOffset;
|
||||
}
|
||||
|
||||
public void setNextBlockOffset(final long nextBlockOffset) {
|
||||
this.nextBlockOffset = nextBlockOffset;
|
||||
}
|
||||
|
||||
private void writeBufferToByteBuffer() {
|
||||
byteBuffer.position(INT_SEQUENCE_OFFSET);
|
||||
byteBuffer.position(0);
|
||||
byteBuffer.put(buffer);
|
||||
}
|
||||
|
||||
private void writeBlockHeader() {
|
||||
byteBuffer.putLong(NEXT_POINTER_OFFSET, nextBlockOffset);
|
||||
byteBuffer.putLong(LAST_BLOCK_POINTER_POSITION, lastBlockOffset);
|
||||
}
|
||||
|
||||
public void writeAsync() {
|
||||
final long start = System.nanoTime();
|
||||
writeBlockHeader();
|
||||
writeBufferToByteBuffer();
|
||||
final long duration = System.nanoTime() - start;
|
||||
LOGGER.trace("write() of block={}: {}ms", blockOffset, duration / 1_000_000.0);
|
||||
}
|
||||
|
||||
public void force() {
|
||||
final long start = System.nanoTime();
|
||||
byteBuffer.force();
|
||||
LOGGER.trace("force of block={}: {}ms", blockOffset, (System.nanoTime() - start) / 1_000_000.0);
|
||||
}
|
||||
|
||||
public long getLastBlockPointer() {
|
||||
|
||||
if (lastBlockOffset <= 0) {
|
||||
lastBlockOffset = byteBuffer.getLong(LAST_BLOCK_POINTER_POSITION);
|
||||
}
|
||||
|
||||
return lastBlockOffset;
|
||||
}
|
||||
|
||||
public long getNextBlockNumber() {
|
||||
if (nextBlockOffset <= 0) {
|
||||
nextBlockOffset = byteBuffer.getLong(NEXT_POINTER_OFFSET);
|
||||
}
|
||||
return nextBlockOffset;
|
||||
}
|
||||
|
||||
public void setLastBlockOffset(final long lastBlockOffset) {
|
||||
this.lastBlockOffset = lastBlockOffset;
|
||||
}
|
||||
|
||||
public void writeLastBlockOffset(final long lastBlockOffset) {
|
||||
this.lastBlockOffset = lastBlockOffset;
|
||||
byteBuffer.putLong(LAST_BLOCK_POINTER_POSITION, lastBlockOffset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
final LongList bufferDecoded = VariableByteEncoder.decode(buffer);
|
||||
return "DiskBlock[" + blockOffset + ", bufferDecoded=" + bufferDecoded + "]";
|
||||
return "DiskBlock[" + blockOffset + "]";
|
||||
}
|
||||
}
|
||||
|
||||
@@ -168,11 +168,12 @@ class NodeEntry {
|
||||
|
||||
public int compare(final byte[] otherKey) {
|
||||
|
||||
final int i = 0;
|
||||
int i = 0;
|
||||
while (i < key.length && i < otherKey.length) {
|
||||
if (key[i] != otherKey[i]) {
|
||||
return key[i] - otherKey[i];
|
||||
}
|
||||
i++;
|
||||
}
|
||||
|
||||
return key.length - otherKey.length;
|
||||
|
||||
@@ -61,14 +61,18 @@ public class PersistentMap {
|
||||
public byte[] get(final byte[] key) throws IOException {
|
||||
final NodeEntry entry = findNodeEntry(ROOT_NODE_OFFEST, key);
|
||||
|
||||
return entry.getValue();
|
||||
return entry == null ? null : entry.getValue();
|
||||
}
|
||||
|
||||
private byte[] insert(final long nodeOffest, final byte[] key, final byte[] value) throws IOException {
|
||||
final PersistentMapDiskNode node = getNode(nodeOffest);
|
||||
|
||||
final var entry = node.getNodeEntryTo(key);
|
||||
if (entry.isDataNode()) {
|
||||
if (entry == null) {
|
||||
node.addKeyValue(key, value);
|
||||
writeNode(nodeOffest, node);
|
||||
return null;
|
||||
} else if (entry.isDataNode()) {
|
||||
if (entry.equal(key)) {
|
||||
return entry.getValue();
|
||||
} else {
|
||||
@@ -87,7 +91,9 @@ public class PersistentMap {
|
||||
final PersistentMapDiskNode node = getNode(nodeOffest);
|
||||
|
||||
final var entry = node.getNodeEntryTo(key);
|
||||
if (entry.isDataNode()) {
|
||||
if (entry == null) {
|
||||
return null;
|
||||
} else if (entry.isDataNode()) {
|
||||
if (entry.equal(key)) {
|
||||
return entry;
|
||||
} else {
|
||||
@@ -116,6 +122,7 @@ public class PersistentMap {
|
||||
final byte[] buffer = diskBlock.getBuffer();
|
||||
final byte[] newBuffer = node.serialize();
|
||||
System.arraycopy(newBuffer, 0, buffer, 0, buffer.length);
|
||||
diskBlock.writeAsync();
|
||||
diskBlock.force();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user