handle IOExceptions earlier
This commit is contained in:
@@ -1,12 +1,10 @@
|
||||
package org.lucares.pdb.datastore;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.lucares.collections.LongList;
|
||||
import org.lucares.pdb.api.RuntimeIOException;
|
||||
import org.lucares.pdb.api.Tags;
|
||||
import org.lucares.pdb.blockstorage.BSFile;
|
||||
import org.lucares.pdb.blockstorage.TimeSeriesFile;
|
||||
@@ -26,13 +24,9 @@ public class PdbFile {
|
||||
|
||||
@Override
|
||||
public Stream<LongList> apply(final PdbFile pdbFile) {
|
||||
try {
|
||||
final DiskStorage diskStorage = clusteredDiskStorage.getExisting(pdbFile.getClusterId());
|
||||
final TimeSeriesFile bsFile = TimeSeriesFile.existingFile(pdbFile.getRootBlockNumber(), diskStorage);
|
||||
return bsFile.streamOfLongLists();
|
||||
} catch (final IOException e) {
|
||||
throw new RuntimeIOException(e);
|
||||
}
|
||||
final DiskStorage diskStorage = clusteredDiskStorage.getExisting(pdbFile.getClusterId());
|
||||
final TimeSeriesFile bsFile = TimeSeriesFile.existingFile(pdbFile.getRootBlockNumber(), diskStorage);
|
||||
return bsFile.streamOfLongLists();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,12 +1,10 @@
|
||||
package org.lucares.pdb.datastore;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class ReadException extends RuntimeException {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public ReadException(final IOException e) {
|
||||
public ReadException(final RuntimeException e) {
|
||||
super(e);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,28 +22,20 @@ public class ClusteredDiskStore {
|
||||
public ClusteredDiskStore(final Path storageBasePath, final String filename) {
|
||||
|
||||
creator = clusterId -> {
|
||||
try {
|
||||
final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename);
|
||||
final boolean isNew = !Files.exists(file);
|
||||
final DiskStorage diskStorage = new DiskStorage(file);
|
||||
if (isNew) {
|
||||
diskStorage.ensureAlignmentForNewBlocks(BSFile.BLOCK_SIZE);
|
||||
}
|
||||
return diskStorage;
|
||||
} catch (final IOException e) {
|
||||
throw new RuntimeIOException(e);
|
||||
final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename);
|
||||
final boolean isNew = !Files.exists(file);
|
||||
final DiskStorage diskStorage = new DiskStorage(file);
|
||||
if (isNew) {
|
||||
diskStorage.ensureAlignmentForNewBlocks(BSFile.BLOCK_SIZE);
|
||||
}
|
||||
return diskStorage;
|
||||
};
|
||||
supplier = clusterId -> {
|
||||
try {
|
||||
final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename);
|
||||
if (Files.exists(file)) {
|
||||
return new DiskStorage(file);
|
||||
}
|
||||
return null;
|
||||
} catch (final IOException e) {
|
||||
throw new RuntimeIOException(e);
|
||||
final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename);
|
||||
if (Files.exists(file)) {
|
||||
return new DiskStorage(file);
|
||||
}
|
||||
return null;
|
||||
};
|
||||
}
|
||||
|
||||
@@ -56,12 +48,8 @@ public class ClusteredDiskStore {
|
||||
}
|
||||
|
||||
public long allocateBlock(final ClusterId clusterId, final int blockSize) {
|
||||
try {
|
||||
final DiskStorage diskStorage = getCreateIfNotExists(clusterId);
|
||||
return diskStorage.allocateBlock(blockSize);
|
||||
} catch (final IOException e) {
|
||||
throw new RuntimeIOException(e);
|
||||
}
|
||||
final DiskStorage diskStorage = getCreateIfNotExists(clusterId);
|
||||
return diskStorage.allocateBlock(blockSize);
|
||||
}
|
||||
|
||||
public LongStreamFile streamExistingFile(final Long diskStoreOffsetForDocIdsOfTag, final ClusterId clusterId) {
|
||||
@@ -79,7 +67,7 @@ public class ClusteredDiskStore {
|
||||
for (final DiskStorage diskStorage : diskStorages.values()) {
|
||||
try {
|
||||
diskStorage.close();
|
||||
} catch (final IOException e) {
|
||||
} catch (final RuntimeException e) {
|
||||
throwables.add(e);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package org.lucares.pdb.datastore.internal;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
@@ -8,8 +7,6 @@ import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.lucares.pdb.api.RuntimeIOException;
|
||||
import org.lucares.pdb.datastore.ReadRuntimeException;
|
||||
import org.lucares.pdb.map.PersistentMap;
|
||||
import org.lucares.pdb.map.PersistentMap.EncoderDecoder;
|
||||
import org.lucares.pdb.map.Visitor;
|
||||
@@ -37,23 +34,15 @@ public class ClusteredPersistentMap<K, V, P> implements AutoCloseable {
|
||||
|
||||
this.valueEncoder = valueEncoder;
|
||||
creator = clusterId -> {
|
||||
try {
|
||||
final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename);
|
||||
return new PersistentMap<>(file, keyEncoder, valueEncoder);
|
||||
} catch (final IOException e) {
|
||||
throw new RuntimeIOException(e);
|
||||
}
|
||||
final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename);
|
||||
return new PersistentMap<>(file, keyEncoder, valueEncoder);
|
||||
};
|
||||
supplier = clusterId -> {
|
||||
try {
|
||||
final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename);
|
||||
if (Files.exists(file)) {
|
||||
return new PersistentMap<>(file, keyEncoder, valueEncoder);
|
||||
}
|
||||
return null;
|
||||
} catch (final IOException e) {
|
||||
throw new RuntimeIOException(e);
|
||||
final Path file = storageBasePath.resolve(clusterId.getClusterId()).resolve(filename);
|
||||
if (Files.exists(file)) {
|
||||
return new PersistentMap<>(file, keyEncoder, valueEncoder);
|
||||
}
|
||||
return null;
|
||||
};
|
||||
}
|
||||
|
||||
@@ -66,51 +55,49 @@ public class ClusteredPersistentMap<K, V, P> implements AutoCloseable {
|
||||
}
|
||||
|
||||
public V getValue(final ClusterId clusterId, final K key) {
|
||||
try {
|
||||
|
||||
final PersistentMap<K, P> map = getExistingPersistentMap(clusterId);
|
||||
final P persistedValue = map != null ? map.getValue(key) : null;
|
||||
return valueEncoder.decodeValue(clusterId, persistedValue);
|
||||
} catch (final IOException e) {
|
||||
throw new ReadRuntimeException(e);
|
||||
}
|
||||
final PersistentMap<K, P> map = getExistingPersistentMap(clusterId);
|
||||
final P persistedValue = map != null ? map.getValue(key) : null;
|
||||
return valueEncoder.decodeValue(clusterId, persistedValue);
|
||||
}
|
||||
|
||||
public List<V> getValues(final ClusterIdSource clusterIdSource, final K key) {
|
||||
try {
|
||||
final List<V> result = new ArrayList<>();
|
||||
final List<ClusterId> clusterIds = clusterIdSource.toClusterIds();
|
||||
final List<V> result = new ArrayList<>();
|
||||
final List<ClusterId> clusterIds = clusterIdSource.toClusterIds();
|
||||
|
||||
for (final ClusterId clusterId : clusterIds) {
|
||||
final PersistentMap<K, P> map = getPersistentMapCreateIfNotExists(clusterId);
|
||||
if (map != null) {
|
||||
final V value = valueEncoder.decodeValue(clusterId, map.getValue(key));
|
||||
if (value != null) {
|
||||
result.add(value);
|
||||
}
|
||||
for (final ClusterId clusterId : clusterIds) {
|
||||
final PersistentMap<K, P> map = getPersistentMapCreateIfNotExists(clusterId);
|
||||
if (map != null) {
|
||||
final V value = valueEncoder.decodeValue(clusterId, map.getValue(key));
|
||||
if (value != null) {
|
||||
result.add(value);
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
} catch (final IOException e) {
|
||||
throw new ReadRuntimeException(e);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
public V putValue(final ClusterId clusterId, final K key, final V value) {
|
||||
try {
|
||||
|
||||
final PersistentMap<K, P> map = getPersistentMapCreateIfNotExists(clusterId);
|
||||
final P persistedValue = valueEncoder.encodeValue(value);
|
||||
final P previousPersistedValue = map.putValue(key, persistedValue);
|
||||
return valueEncoder.decodeValue(clusterId, previousPersistedValue);
|
||||
} catch (final IOException e) {
|
||||
throw new ReadRuntimeException(e);
|
||||
}
|
||||
final PersistentMap<K, P> map = getPersistentMapCreateIfNotExists(clusterId);
|
||||
final P persistedValue = valueEncoder.encodeValue(value);
|
||||
final P previousPersistedValue = map.putValue(key, persistedValue);
|
||||
return valueEncoder.decodeValue(clusterId, previousPersistedValue);
|
||||
}
|
||||
|
||||
public void visitValues(final ClusterId clusterId, final K keyPrefix, final Visitor<K, V> visitor) {
|
||||
try {
|
||||
final PersistentMap<K, P> map = getExistingPersistentMap(clusterId);
|
||||
if (map != null) {
|
||||
map.visitValues(keyPrefix, (k, p) -> {
|
||||
final V value = valueEncoder.decodeValue(clusterId, p);
|
||||
visitor.visit(k, value);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public void visitValues(final ClusterIdSource clusterIdSource, final K keyPrefix, final Visitor<K, V> visitor) {
|
||||
final List<ClusterId> clusterIds = clusterIdSource.toClusterIds();
|
||||
|
||||
for (final ClusterId clusterId : clusterIds) {
|
||||
final PersistentMap<K, P> map = getExistingPersistentMap(clusterId);
|
||||
if (map != null) {
|
||||
map.visitValues(keyPrefix, (k, p) -> {
|
||||
@@ -118,26 +105,6 @@ public class ClusteredPersistentMap<K, V, P> implements AutoCloseable {
|
||||
visitor.visit(k, value);
|
||||
});
|
||||
}
|
||||
} catch (final IOException e) {
|
||||
throw new ReadRuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void visitValues(final ClusterIdSource clusterIdSource, final K keyPrefix, final Visitor<K, V> visitor) {
|
||||
try {
|
||||
final List<ClusterId> clusterIds = clusterIdSource.toClusterIds();
|
||||
|
||||
for (final ClusterId clusterId : clusterIds) {
|
||||
final PersistentMap<K, P> map = getExistingPersistentMap(clusterId);
|
||||
if (map != null) {
|
||||
map.visitValues(keyPrefix, (k, p) -> {
|
||||
final V value = valueEncoder.decodeValue(clusterId, p);
|
||||
visitor.visit(k, value);
|
||||
});
|
||||
}
|
||||
}
|
||||
} catch (final IOException e) {
|
||||
throw new ReadRuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -148,7 +115,7 @@ public class ClusteredPersistentMap<K, V, P> implements AutoCloseable {
|
||||
for (final PersistentMap<K, P> map : maps.values()) {
|
||||
try {
|
||||
map.close();
|
||||
} catch (final IOException e) {
|
||||
} catch (final RuntimeException e) {
|
||||
throwables.add(e);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -371,7 +371,7 @@ public class DataStore implements AutoCloseable {
|
||||
final Doc doc = docsForTags.get();
|
||||
final PdbFile pdbFile = new PdbFile(clusterId, doc.getRootBlockNumber(), tags);
|
||||
writer = new PdbWriter(pdbFile, diskStorage.getExisting(clusterId));
|
||||
} catch (final IOException e) {
|
||||
} catch (final RuntimeException e) {
|
||||
throw new ReadException(e);
|
||||
}
|
||||
} else {
|
||||
@@ -389,12 +389,12 @@ public class DataStore implements AutoCloseable {
|
||||
METRICS_LOGGER_NEW_WRITER.debug("newPdbWriter took {}ms tags: {}",
|
||||
(System.nanoTime() - start) / 1_000_000.0, tags);
|
||||
return result;
|
||||
} catch (final IOException e) {
|
||||
} catch (final RuntimeException e) {
|
||||
throw new WriteException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private PdbFile createNewPdbFile(final ClusterId clusterId, final Tags tags) throws IOException {
|
||||
private PdbFile createNewPdbFile(final ClusterId clusterId, final Tags tags) {
|
||||
|
||||
final long rootBlockNumber = createNewFile(clusterId, tags);
|
||||
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package org.lucares.pdb.datastore.internal;
|
||||
|
||||
import java.io.Flushable;
|
||||
import java.io.IOException;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.lucares.pdb.api.Entry;
|
||||
@@ -25,7 +24,7 @@ class PdbWriter implements AutoCloseable, Flushable {
|
||||
|
||||
private final TimeSeriesFile bsFile;
|
||||
|
||||
public PdbWriter(final PdbFile pdbFile, final DiskStorage diskStorage) throws IOException {
|
||||
public PdbWriter(final PdbFile pdbFile, final DiskStorage diskStorage) {
|
||||
this.pdbFile = pdbFile;
|
||||
|
||||
bsFile = TimeSeriesFile.existingFile(pdbFile.getRootBlockNumber(), diskStorage);
|
||||
@@ -47,7 +46,7 @@ class PdbWriter implements AutoCloseable, Flushable {
|
||||
bsFile.appendTimeValue(epochMilli, value);
|
||||
|
||||
lastEpochMilli = epochMilli;
|
||||
} catch (final IOException e) {
|
||||
} catch (final RuntimeException e) {
|
||||
throw new WriteException(e);
|
||||
}
|
||||
}
|
||||
@@ -64,8 +63,7 @@ class PdbWriter implements AutoCloseable, Flushable {
|
||||
bsFile.flush();
|
||||
}
|
||||
|
||||
public static void writeEntry(final PdbFile pdbFile, final DiskStorage diskStorage, final Entry... entries)
|
||||
throws IOException {
|
||||
public static void writeEntry(final PdbFile pdbFile, final DiskStorage diskStorage, final Entry... entries) {
|
||||
try (PdbWriter writer = new PdbWriter(pdbFile, diskStorage)) {
|
||||
for (final Entry entry : entries) {
|
||||
writer.write(entry.getEpochMilli(), entry.getValue());
|
||||
|
||||
Reference in New Issue
Block a user