From 95f2f269661cc576614033940138136f98af5869 Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Sun, 17 Mar 2019 11:13:46 +0100 Subject: [PATCH] handle IOExceptions earlier --- .../org/lucares/pdb/blockstorage/BSFile.java | 41 +++---- .../pdb/blockstorage/TimeSeriesFile.java | 10 +- .../lucares/pdb/diskstorage/DiskStorage.java | 87 ++++++++------ .../pdb/diskstorage/DiskStorageException.java | 19 ++++ .../org/lucares/pdb/map/PersistentMap.java | 43 ++++--- .../org/lucares/pdb/datastore/PdbFile.java | 12 +- .../lucares/pdb/datastore/ReadException.java | 4 +- .../internal/ClusteredDiskStore.java | 38 +++---- .../internal/ClusteredPersistentMap.java | 107 ++++++------------ .../pdb/datastore/internal/DataStore.java | 6 +- .../pdb/datastore/internal/PdbWriter.java | 8 +- 11 files changed, 174 insertions(+), 201 deletions(-) create mode 100644 block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskStorageException.java diff --git a/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFile.java b/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFile.java index 4f996dd..fc73026 100644 --- a/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFile.java +++ b/block-storage/src/main/java/org/lucares/pdb/blockstorage/BSFile.java @@ -1,6 +1,5 @@ package org.lucares.pdb.blockstorage; -import java.io.IOException; import java.util.Iterator; import java.util.NoSuchElementException; import java.util.Optional; @@ -60,14 +59,12 @@ public class BSFile implements AutoCloseable { private final BSFileCustomizer customizer; - BSFile(final long rootBlockOffset, final DiskStorage diskStorage, final BSFileCustomizer customizer) - throws IOException { + BSFile(final long rootBlockOffset, final DiskStorage diskStorage, final BSFileCustomizer customizer) { this(new BSFileDiskBlock(diskStorage.getDiskBlock(rootBlockOffset, BLOCK_SIZE)), diskStorage, customizer); } - BSFile(final BSFileDiskBlock rootDiskBlock, final DiskStorage diskStorage, final BSFileCustomizer customizer) - throws IOException { + BSFile(final BSFileDiskBlock rootDiskBlock, final DiskStorage diskStorage, final BSFileCustomizer customizer) { this.rootDiskBlock = rootDiskBlock; this.customizer = customizer; @@ -98,17 +95,17 @@ public class BSFile implements AutoCloseable { } public static BSFile existingFile(final long blockNumber, final DiskStorage diskStorage, - final BSFileCustomizer customizer) throws IOException { + final BSFileCustomizer customizer) { return new BSFile(blockNumber, diskStorage, customizer); } - public static BSFile newFile(final DiskStorage diskStorage, final BSFileCustomizer customizer) throws IOException { + public static BSFile newFile(final DiskStorage diskStorage, final BSFileCustomizer customizer) { final long rootBlockOffset = diskStorage.allocateBlock(BLOCK_SIZE); LOGGER.trace("create new bsFile={}", rootBlockOffset); return new BSFile(rootBlockOffset, diskStorage, customizer); } - public void append(final long value1, final long value2) throws IOException { + public void append(final long value1, final long value2) { final long val1 = customizer.preProcessWriteValue1(value1); final long val2 = customizer.preProcessWriteValue2(value2); @@ -124,7 +121,7 @@ public class BSFile implements AutoCloseable { dirty = true; } - public void append(final long value) throws IOException { + public void append(final long value) { int bytesWritten = VariableByteEncoder.encodeInto(value, buffer.getBuffer(), offsetInBuffer); if (bytesWritten == 0) { @@ -136,7 +133,7 @@ public class BSFile implements AutoCloseable { dirty = true; } - private void flushFullBufferAndCreateNew() throws IOException { + private void flushFullBufferAndCreateNew() { final long newBlockOffset = diskStorage.allocateBlock(BLOCK_SIZE); @@ -214,23 +211,19 @@ public class BSFile implements AutoCloseable { @Override public LongList next() { - try { - if (nextBlockOffset == BSFileDiskBlock.NO_NEXT_POINTER) { - throw new NoSuchElementException(); - } - - final BSFileDiskBlock diskBlock = getDiskBlock(nextBlockOffset); - nextBlockOffset = diskBlock.getNextBlockNumber(); - - final byte[] buf = diskBlock.getBuffer(); - next = VariableByteEncoder.decode(buf); - return next; - } catch (final IOException e) { - throw new RuntimeException(e); + if (nextBlockOffset == BSFileDiskBlock.NO_NEXT_POINTER) { + throw new NoSuchElementException(); } + + final BSFileDiskBlock diskBlock = getDiskBlock(nextBlockOffset); + nextBlockOffset = diskBlock.getNextBlockNumber(); + + final byte[] buf = diskBlock.getBuffer(); + next = VariableByteEncoder.decode(buf); + return next; } - private BSFileDiskBlock getDiskBlock(final long blockOffset) throws IOException { + private BSFileDiskBlock getDiskBlock(final long blockOffset) { final DiskBlock diskBlock = diskStorage.getDiskBlock(blockOffset, BLOCK_SIZE); return new BSFileDiskBlock(diskBlock); } diff --git a/block-storage/src/main/java/org/lucares/pdb/blockstorage/TimeSeriesFile.java b/block-storage/src/main/java/org/lucares/pdb/blockstorage/TimeSeriesFile.java index 05afefb..b24b682 100644 --- a/block-storage/src/main/java/org/lucares/pdb/blockstorage/TimeSeriesFile.java +++ b/block-storage/src/main/java/org/lucares/pdb/blockstorage/TimeSeriesFile.java @@ -1,6 +1,5 @@ package org.lucares.pdb.blockstorage; -import java.io.IOException; import java.util.Optional; import java.util.stream.Stream; @@ -11,22 +10,21 @@ public class TimeSeriesFile implements AutoCloseable { private final BSFile bsFile; - private TimeSeriesFile(final BSFile bsFile) throws IOException { + private TimeSeriesFile(final BSFile bsFile) { this.bsFile = bsFile; } - public static TimeSeriesFile existingFile(final long blockNumber, final DiskStorage diskStorage) - throws IOException { + public static TimeSeriesFile existingFile(final long blockNumber, final DiskStorage diskStorage) { final BSFile bsFile = BSFile.existingFile(blockNumber, diskStorage, new TimeSeriesCustomizer()); return new TimeSeriesFile(bsFile); } - public static TimeSeriesFile newFile(final DiskStorage diskStorage) throws IOException { + public static TimeSeriesFile newFile(final DiskStorage diskStorage) { final BSFile bsFile = BSFile.newFile(diskStorage, new TimeSeriesCustomizer()); return new TimeSeriesFile(bsFile); } - public void appendTimeValue(final long epochMilli, final long value) throws IOException { + public void appendTimeValue(final long epochMilli, final long value) { bsFile.append(epochMilli, value); } diff --git a/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskStorage.java b/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskStorage.java index 1628335..c3dd8fd 100644 --- a/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskStorage.java +++ b/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskStorage.java @@ -25,14 +25,17 @@ public class DiskStorage implements AutoCloseable { private final FileChannel fileChannel; - public DiskStorage(final Path databaseFile) throws IOException { + public DiskStorage(final Path databaseFile) { + try { + Files.createDirectories(databaseFile.getParent()); - Files.createDirectories(databaseFile.getParent()); + fileChannel = FileChannel.open(databaseFile, StandardOpenOption.READ, StandardOpenOption.WRITE, + StandardOpenOption.CREATE); - fileChannel = FileChannel.open(databaseFile, StandardOpenOption.READ, StandardOpenOption.WRITE, - StandardOpenOption.CREATE); - - initIfNew(); + initIfNew(); + } catch (final IOException e) { + throw new DiskStorageException(e); + } } private void initIfNew() throws IOException { @@ -42,35 +45,45 @@ public class DiskStorage implements AutoCloseable { } } - public DiskBlock getDiskBlock(final long blockOffset, final int blockSize) throws IOException { + public DiskBlock getDiskBlock(final long blockOffset, final int blockSize) { + try { + LOGGER.trace("read block={}", blockOffset); - LOGGER.trace("read block={}", blockOffset); + final var byteBuffer = fileChannel.map(MapMode.READ_WRITE, blockOffset, blockSize); - final var byteBuffer = fileChannel.map(MapMode.READ_WRITE, blockOffset, blockSize); - - return new DiskBlock(blockOffset, byteBuffer); + return new DiskBlock(blockOffset, byteBuffer); + } catch (final IOException e) { + throw new DiskStorageException(e); + } } @Override - public void close() throws IOException { - fileChannel.force(true); - fileChannel.close(); + public void close() { + try { + fileChannel.force(true); + fileChannel.close(); + } catch (final IOException e) { + throw new DiskStorageException(e); + } } - public synchronized long allocateBlock(final int blockSize) throws IOException { - + public synchronized long allocateBlock(final int blockSize) { if (blockSize < FREE_LIST_NODE_SIZE) { throw new IllegalArgumentException("The minimal allocation size is 32 byte."); } - final var optionalFreeBlock = findFreeBlockWithSize(blockSize); - if (optionalFreeBlock.isPresent()) { - final FreeListNode freeBlock = optionalFreeBlock.get(); - removeBlockFromFreeList(freeBlock); - clearBlock(freeBlock); - return freeBlock.getOffset(); - } else { - return allocateNewBlock(blockSize); + try { + final var optionalFreeBlock = findFreeBlockWithSize(blockSize); + if (optionalFreeBlock.isPresent()) { + final FreeListNode freeBlock = optionalFreeBlock.get(); + removeBlockFromFreeList(freeBlock); + clearBlock(freeBlock); + return freeBlock.getOffset(); + } else { + return allocateNewBlock(blockSize); + } + } catch (final IOException e) { + throw new DiskStorageException(e); } } @@ -237,18 +250,26 @@ public class DiskStorage implements AutoCloseable { fileChannel.write(freeListFirstBlock, FREE_LIST_ROOT_OFFSET); } - public synchronized void ensureAlignmentForNewBlocks(final int alignment) throws IOException { - final long size = fileChannel.size(); - final int alignmentMismatch = Math.floorMod(size, alignment); - if (alignmentMismatch != 0) { - // The next allocated block would not be aligned. Therefore we allocate a - // throw-away block. - allocateNewBlock(alignment - alignmentMismatch); + public synchronized void ensureAlignmentForNewBlocks(final int alignment) { + try { + final long size = fileChannel.size(); + final int alignmentMismatch = Math.floorMod(size, alignment); + if (alignmentMismatch != 0) { + // The next allocated block would not be aligned. Therefore we allocate a + // throw-away block. + allocateNewBlock(alignment - alignmentMismatch); + } + } catch (final IOException e) { + throw new DiskStorageException(e); } } - public long size() throws IOException { - return fileChannel.size(); + public long size() { + try { + return fileChannel.size(); + } catch (final IOException e) { + throw new DiskStorageException(e); + } } public int minAllocationSize() { diff --git a/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskStorageException.java b/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskStorageException.java new file mode 100644 index 0000000..5b2763d --- /dev/null +++ b/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskStorageException.java @@ -0,0 +1,19 @@ +package org.lucares.pdb.diskstorage; + +public class DiskStorageException extends RuntimeException { + + private static final long serialVersionUID = 1683775743640383633L; + + public DiskStorageException(final String message, final Throwable cause) { + super(message, cause); + } + + public DiskStorageException(final String message) { + super(message); + } + + public DiskStorageException(final Throwable cause) { + super(cause); + } + +} diff --git a/block-storage/src/main/java/org/lucares/pdb/map/PersistentMap.java b/block-storage/src/main/java/org/lucares/pdb/map/PersistentMap.java index 4686cee..c5efa6d 100644 --- a/block-storage/src/main/java/org/lucares/pdb/map/PersistentMap.java +++ b/block-storage/src/main/java/org/lucares/pdb/map/PersistentMap.java @@ -1,6 +1,5 @@ package org.lucares.pdb.map; -import java.io.IOException; import java.io.PrintStream; import java.nio.charset.StandardCharsets; import java.nio.file.Path; @@ -126,8 +125,7 @@ public class PersistentMap implements AutoCloseable { private final LRUCache valueCache = new LRUCache<>(1_000); - public PersistentMap(final Path path, final EncoderDecoder keyEncoder, final EncoderDecoder valueEncoder) - throws IOException { + public PersistentMap(final Path path, final EncoderDecoder keyEncoder, final EncoderDecoder valueEncoder) { this.diskStore = new DiskStorage(path); this.keyEncoder = keyEncoder; this.valueEncoder = valueEncoder; @@ -135,7 +133,7 @@ public class PersistentMap implements AutoCloseable { } @Override - public void close() throws IOException { + public void close() { diskStore.close(); } @@ -143,7 +141,7 @@ public class PersistentMap implements AutoCloseable { this.maxEntriesInNode = maxEntriesInNode; } - private void initIfNew() throws IOException { + private void initIfNew() { if (diskStore.size() < BLOCK_SIZE) { final long nodeOffsetToRootNode = diskStore.allocateBlock(diskStore.minAllocationSize()); Preconditions.checkEqual(nodeOffsetToRootNode, NODE_OFFSET_TO_ROOT_NODE, @@ -166,13 +164,13 @@ public class PersistentMap implements AutoCloseable { } } - public synchronized void putAllValues(final Map map) throws IOException { + public synchronized void putAllValues(final Map map) { for (final Entry e : map.entrySet()) { putValue(e.getKey(), e.getValue()); } } - public synchronized V putValue(final K key, final V value) throws IOException { + public synchronized V putValue(final K key, final V value) { final V cachedValue = valueCache.get(key); if (cachedValue != null && cachedValue == value) { @@ -187,7 +185,7 @@ public class PersistentMap implements AutoCloseable { return oldValue; } - public synchronized V getValue(final K key) throws IOException { + public synchronized V getValue(final K key) { final V cachedValue = valueCache.get(key); if (cachedValue != null) { @@ -201,13 +199,13 @@ public class PersistentMap implements AutoCloseable { return result; } - private byte[] putValue(final byte[] key, final byte[] value) throws IOException { + private byte[] putValue(final byte[] key, final byte[] value) { final long rootNodeOffset = readNodeOffsetOfRootNode(); final Stack parents = new Stack<>(); return insert(parents, rootNodeOffset, key, value); } - private byte[] getValue(final byte[] key) throws IOException { + private byte[] getValue(final byte[] key) { final long rootNodeOffset = readNodeOffsetOfRootNode(); final NodeEntry entry = findNodeEntry(rootNodeOffset, key); @@ -215,7 +213,7 @@ public class PersistentMap implements AutoCloseable { } private byte[] insert(final Stack parents, final long nodeOffest, final byte[] key, - final byte[] value) throws IOException { + final byte[] value) { final PersistentMapDiskNode node = getNode(nodeOffest); final NodeEntry entry = node.getNodeEntryTo(key); @@ -266,7 +264,7 @@ public class PersistentMap implements AutoCloseable { } private PersistentMapDiskNode splitNode(final Stack parents, - final PersistentMapDiskNode node) throws IOException { + final PersistentMapDiskNode node) { // System.out.println("\n\npre split node: " + node + "\n"); @@ -321,7 +319,7 @@ public class PersistentMap implements AutoCloseable { } } - private NodeEntry findNodeEntry(final long nodeOffest, final byte[] key) throws IOException { + private NodeEntry findNodeEntry(final long nodeOffest, final byte[] key) { final PersistentMapDiskNode node = getNode(nodeOffest); final var entry = node.getNodeEntryTo(key); @@ -344,7 +342,7 @@ public class PersistentMap implements AutoCloseable { return VariableByteEncoder.decodeFirstValue(entry.getValue()); } - private PersistentMapDiskNode getNode(final long nodeOffset) throws IOException { + private PersistentMapDiskNode getNode(final long nodeOffset) { PersistentMapDiskNode node = nodeCache.get(nodeOffset); if (node == null) { @@ -358,7 +356,7 @@ public class PersistentMap implements AutoCloseable { return node; } - private void writeNode(final PersistentMapDiskNode node) throws IOException { + private void writeNode(final PersistentMapDiskNode node) { LOGGER.trace("writing node {}", node); final long nodeOffest = node.getNodeOffset(); // final DiskBlock diskBlock = diskStore.getDiskBlock(nodeOffest, BLOCK_SIZE); @@ -373,7 +371,7 @@ public class PersistentMap implements AutoCloseable { // diskBlock.force(); // makes writing nodes slower by factor 800 (sic!) } - public synchronized void print() throws IOException { + public synchronized void print() { visitNodeEntriesPreOrder((node, parentNode, nodeEntry, depth) -> { @@ -386,13 +384,13 @@ public class PersistentMap implements AutoCloseable { }); } - public synchronized void visitNodeEntriesPreOrder(final VisitorCallback visitor) throws IOException { + public synchronized void visitNodeEntriesPreOrder(final VisitorCallback visitor) { final long rootNodeOffset = readNodeOffsetOfRootNode(); visitNodeEntriesPreOrderRecursively(rootNodeOffset, null, visitor, 0); } private void visitNodeEntriesPreOrderRecursively(final long nodeOffset, final PersistentMapDiskNode parentNode, - final VisitorCallback visitor, final int depth) throws IOException { + final VisitorCallback visitor, final int depth) { final PersistentMapDiskNode node = getNode(nodeOffset); for (final NodeEntry child : node.getEntries()) { @@ -409,15 +407,14 @@ public class PersistentMap implements AutoCloseable { FIND, ITERATE } - public synchronized void visitValues(final K keyPrefix, final Visitor visitor) throws IOException { + public synchronized void visitValues(final K keyPrefix, final Visitor visitor) { final byte[] encodedKeyPrefix = keyEncoder.encode(keyPrefix); final long rootNodeOffset = readNodeOffsetOfRootNode(); iterateNodeEntryByPrefix(rootNodeOffset, encodedKeyPrefix, visitor); } - private void iterateNodeEntryByPrefix(final long nodeOffest, final byte[] keyPrefix, final Visitor visitor) - throws IOException { + private void iterateNodeEntryByPrefix(final long nodeOffest, final byte[] keyPrefix, final Visitor visitor) { final PersistentMapDiskNode node = getNode(nodeOffest); // list of children that might contain a key with the keyPrefix @@ -447,13 +444,13 @@ public class PersistentMap implements AutoCloseable { } } - private long readNodeOffsetOfRootNode() throws IOException { + private long readNodeOffsetOfRootNode() { final DiskBlock diskBlock = diskStore.getDiskBlock(NODE_OFFSET_TO_ROOT_NODE, diskStore.minAllocationSize()); return diskBlock.getByteBuffer().getLong(0); } - private void writeNodeOffsetOfRootNode(final long newNodeOffsetToRootNode) throws IOException { + private void writeNodeOffsetOfRootNode(final long newNodeOffsetToRootNode) { final DiskBlock diskBlock = diskStore.getDiskBlock(NODE_OFFSET_TO_ROOT_NODE, diskStore.minAllocationSize()); diskBlock.getByteBuffer().putLong(0, newNodeOffsetToRootNode); diskBlock.force(); diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/PdbFile.java b/data-store/src/main/java/org/lucares/pdb/datastore/PdbFile.java index bb6163f..c9121c4 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/PdbFile.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/PdbFile.java @@ -1,12 +1,10 @@ package org.lucares.pdb.datastore; -import java.io.IOException; import java.util.List; import java.util.function.Function; import java.util.stream.Stream; import org.lucares.collections.LongList; -import org.lucares.pdb.api.RuntimeIOException; import org.lucares.pdb.api.Tags; import org.lucares.pdb.blockstorage.BSFile; import org.lucares.pdb.blockstorage.TimeSeriesFile; @@ -26,13 +24,9 @@ public class PdbFile { @Override public Stream apply(final PdbFile pdbFile) { - try { - final DiskStorage diskStorage = clusteredDiskStorage.getExisting(pdbFile.getClusterId()); - final TimeSeriesFile bsFile = TimeSeriesFile.existingFile(pdbFile.getRootBlockNumber(), diskStorage); - return bsFile.streamOfLongLists(); - } catch (final IOException e) { - throw new RuntimeIOException(e); - } + final DiskStorage diskStorage = clusteredDiskStorage.getExisting(pdbFile.getClusterId()); + final TimeSeriesFile bsFile = TimeSeriesFile.existingFile(pdbFile.getRootBlockNumber(), diskStorage); + return bsFile.streamOfLongLists(); } } diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/ReadException.java b/data-store/src/main/java/org/lucares/pdb/datastore/ReadException.java index cc9c4df..bb77891 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/ReadException.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/ReadException.java @@ -1,12 +1,10 @@ package org.lucares.pdb.datastore; -import java.io.IOException; - public class ReadException extends RuntimeException { private static final long serialVersionUID = 1L; - public ReadException(final IOException e) { + public ReadException(final RuntimeException e) { super(e); } } diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusteredDiskStore.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusteredDiskStore.java index 87c699e..f159105 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusteredDiskStore.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusteredDiskStore.java @@ -22,28 +22,20 @@ public class ClusteredDiskStore { public ClusteredDiskStore(final Path storageBasePath, final String filename) { creator = clusterId -> { - try { - final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename); - final boolean isNew = !Files.exists(file); - final DiskStorage diskStorage = new DiskStorage(file); - if (isNew) { - diskStorage.ensureAlignmentForNewBlocks(BSFile.BLOCK_SIZE); - } - return diskStorage; - } catch (final IOException e) { - throw new RuntimeIOException(e); + final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename); + final boolean isNew = !Files.exists(file); + final DiskStorage diskStorage = new DiskStorage(file); + if (isNew) { + diskStorage.ensureAlignmentForNewBlocks(BSFile.BLOCK_SIZE); } + return diskStorage; }; supplier = clusterId -> { - try { - final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename); - if (Files.exists(file)) { - return new DiskStorage(file); - } - return null; - } catch (final IOException e) { - throw new RuntimeIOException(e); + final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename); + if (Files.exists(file)) { + return new DiskStorage(file); } + return null; }; } @@ -56,12 +48,8 @@ public class ClusteredDiskStore { } public long allocateBlock(final ClusterId clusterId, final int blockSize) { - try { - final DiskStorage diskStorage = getCreateIfNotExists(clusterId); - return diskStorage.allocateBlock(blockSize); - } catch (final IOException e) { - throw new RuntimeIOException(e); - } + final DiskStorage diskStorage = getCreateIfNotExists(clusterId); + return diskStorage.allocateBlock(blockSize); } public LongStreamFile streamExistingFile(final Long diskStoreOffsetForDocIdsOfTag, final ClusterId clusterId) { @@ -79,7 +67,7 @@ public class ClusteredDiskStore { for (final DiskStorage diskStorage : diskStorages.values()) { try { diskStorage.close(); - } catch (final IOException e) { + } catch (final RuntimeException e) { throwables.add(e); } } diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusteredPersistentMap.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusteredPersistentMap.java index 1e2dffb..0b9434c 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusteredPersistentMap.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/ClusteredPersistentMap.java @@ -1,6 +1,5 @@ package org.lucares.pdb.datastore.internal; -import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; @@ -8,8 +7,6 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; -import org.lucares.pdb.api.RuntimeIOException; -import org.lucares.pdb.datastore.ReadRuntimeException; import org.lucares.pdb.map.PersistentMap; import org.lucares.pdb.map.PersistentMap.EncoderDecoder; import org.lucares.pdb.map.Visitor; @@ -37,23 +34,15 @@ public class ClusteredPersistentMap implements AutoCloseable { this.valueEncoder = valueEncoder; creator = clusterId -> { - try { - final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename); - return new PersistentMap<>(file, keyEncoder, valueEncoder); - } catch (final IOException e) { - throw new RuntimeIOException(e); - } + final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename); + return new PersistentMap<>(file, keyEncoder, valueEncoder); }; supplier = clusterId -> { - try { - final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename); - if (Files.exists(file)) { - return new PersistentMap<>(file, keyEncoder, valueEncoder); - } - return null; - } catch (final IOException e) { - throw new RuntimeIOException(e); + final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename); + if (Files.exists(file)) { + return new PersistentMap<>(file, keyEncoder, valueEncoder); } + return null; }; } @@ -66,51 +55,49 @@ public class ClusteredPersistentMap implements AutoCloseable { } public V getValue(final ClusterId clusterId, final K key) { - try { - - final PersistentMap map = getExistingPersistentMap(clusterId); - final P persistedValue = map != null ? map.getValue(key) : null; - return valueEncoder.decodeValue(clusterId, persistedValue); - } catch (final IOException e) { - throw new ReadRuntimeException(e); - } + final PersistentMap map = getExistingPersistentMap(clusterId); + final P persistedValue = map != null ? map.getValue(key) : null; + return valueEncoder.decodeValue(clusterId, persistedValue); } public List getValues(final ClusterIdSource clusterIdSource, final K key) { - try { - final List result = new ArrayList<>(); - final List clusterIds = clusterIdSource.toClusterIds(); + final List result = new ArrayList<>(); + final List clusterIds = clusterIdSource.toClusterIds(); - for (final ClusterId clusterId : clusterIds) { - final PersistentMap map = getPersistentMapCreateIfNotExists(clusterId); - if (map != null) { - final V value = valueEncoder.decodeValue(clusterId, map.getValue(key)); - if (value != null) { - result.add(value); - } + for (final ClusterId clusterId : clusterIds) { + final PersistentMap map = getPersistentMapCreateIfNotExists(clusterId); + if (map != null) { + final V value = valueEncoder.decodeValue(clusterId, map.getValue(key)); + if (value != null) { + result.add(value); } } - - return result; - } catch (final IOException e) { - throw new ReadRuntimeException(e); } + + return result; } public V putValue(final ClusterId clusterId, final K key, final V value) { - try { - - final PersistentMap map = getPersistentMapCreateIfNotExists(clusterId); - final P persistedValue = valueEncoder.encodeValue(value); - final P previousPersistedValue = map.putValue(key, persistedValue); - return valueEncoder.decodeValue(clusterId, previousPersistedValue); - } catch (final IOException e) { - throw new ReadRuntimeException(e); - } + final PersistentMap map = getPersistentMapCreateIfNotExists(clusterId); + final P persistedValue = valueEncoder.encodeValue(value); + final P previousPersistedValue = map.putValue(key, persistedValue); + return valueEncoder.decodeValue(clusterId, previousPersistedValue); } public void visitValues(final ClusterId clusterId, final K keyPrefix, final Visitor visitor) { - try { + final PersistentMap map = getExistingPersistentMap(clusterId); + if (map != null) { + map.visitValues(keyPrefix, (k, p) -> { + final V value = valueEncoder.decodeValue(clusterId, p); + visitor.visit(k, value); + }); + } + } + + public void visitValues(final ClusterIdSource clusterIdSource, final K keyPrefix, final Visitor visitor) { + final List clusterIds = clusterIdSource.toClusterIds(); + + for (final ClusterId clusterId : clusterIds) { final PersistentMap map = getExistingPersistentMap(clusterId); if (map != null) { map.visitValues(keyPrefix, (k, p) -> { @@ -118,26 +105,6 @@ public class ClusteredPersistentMap implements AutoCloseable { visitor.visit(k, value); }); } - } catch (final IOException e) { - throw new ReadRuntimeException(e); - } - } - - public void visitValues(final ClusterIdSource clusterIdSource, final K keyPrefix, final Visitor visitor) { - try { - final List clusterIds = clusterIdSource.toClusterIds(); - - for (final ClusterId clusterId : clusterIds) { - final PersistentMap map = getExistingPersistentMap(clusterId); - if (map != null) { - map.visitValues(keyPrefix, (k, p) -> { - final V value = valueEncoder.decodeValue(clusterId, p); - visitor.visit(k, value); - }); - } - } - } catch (final IOException e) { - throw new ReadRuntimeException(e); } } @@ -148,7 +115,7 @@ public class ClusteredPersistentMap implements AutoCloseable { for (final PersistentMap map : maps.values()) { try { map.close(); - } catch (final IOException e) { + } catch (final RuntimeException e) { throwables.add(e); } } diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java index bb515b7..7a9c6b0 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java @@ -371,7 +371,7 @@ public class DataStore implements AutoCloseable { final Doc doc = docsForTags.get(); final PdbFile pdbFile = new PdbFile(clusterId, doc.getRootBlockNumber(), tags); writer = new PdbWriter(pdbFile, diskStorage.getExisting(clusterId)); - } catch (final IOException e) { + } catch (final RuntimeException e) { throw new ReadException(e); } } else { @@ -389,12 +389,12 @@ public class DataStore implements AutoCloseable { METRICS_LOGGER_NEW_WRITER.debug("newPdbWriter took {}ms tags: {}", (System.nanoTime() - start) / 1_000_000.0, tags); return result; - } catch (final IOException e) { + } catch (final RuntimeException e) { throw new WriteException(e); } } - private PdbFile createNewPdbFile(final ClusterId clusterId, final Tags tags) throws IOException { + private PdbFile createNewPdbFile(final ClusterId clusterId, final Tags tags) { final long rootBlockNumber = createNewFile(clusterId, tags); diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/PdbWriter.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/PdbWriter.java index 955e2d4..9ae5f47 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/PdbWriter.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/PdbWriter.java @@ -1,7 +1,6 @@ package org.lucares.pdb.datastore.internal; import java.io.Flushable; -import java.io.IOException; import java.util.Optional; import org.lucares.pdb.api.Entry; @@ -25,7 +24,7 @@ class PdbWriter implements AutoCloseable, Flushable { private final TimeSeriesFile bsFile; - public PdbWriter(final PdbFile pdbFile, final DiskStorage diskStorage) throws IOException { + public PdbWriter(final PdbFile pdbFile, final DiskStorage diskStorage) { this.pdbFile = pdbFile; bsFile = TimeSeriesFile.existingFile(pdbFile.getRootBlockNumber(), diskStorage); @@ -47,7 +46,7 @@ class PdbWriter implements AutoCloseable, Flushable { bsFile.appendTimeValue(epochMilli, value); lastEpochMilli = epochMilli; - } catch (final IOException e) { + } catch (final RuntimeException e) { throw new WriteException(e); } } @@ -64,8 +63,7 @@ class PdbWriter implements AutoCloseable, Flushable { bsFile.flush(); } - public static void writeEntry(final PdbFile pdbFile, final DiskStorage diskStorage, final Entry... entries) - throws IOException { + public static void writeEntry(final PdbFile pdbFile, final DiskStorage diskStorage, final Entry... entries) { try (PdbWriter writer = new PdbWriter(pdbFile, diskStorage)) { for (final Entry entry : entries) { writer.write(entry.getEpochMilli(), entry.getValue());