move creation of PdbWriter to the DataStore
This commit is contained in:
@@ -3,6 +3,7 @@ package org.lucares.performance.db;
|
||||
import java.util.List;
|
||||
|
||||
import org.lucares.pdb.api.Tags;
|
||||
import org.lucares.pdb.datastore.PdbFile;
|
||||
|
||||
class Group {
|
||||
private final Tags tags;
|
||||
@@ -26,9 +27,9 @@ class Group {
|
||||
public void addFile(final PdbFile file) {
|
||||
files.add(file);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return tags + ": " + files.size()+" files";
|
||||
return tags + ": " + files.size() + " files";
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,12 +8,13 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.lucares.pdb.api.Tags;
|
||||
import org.lucares.pdb.datastore.PdbFile;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class Grouping {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(Grouping.class);
|
||||
|
||||
|
||||
public static final List<String> NO_GROUPING = Collections.emptyList();
|
||||
|
||||
private final List<Group> groups = new ArrayList<>();
|
||||
@@ -64,7 +65,7 @@ public class Grouping {
|
||||
public Collection<Group> getGroups() {
|
||||
return groups;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.valueOf(groups);
|
||||
|
||||
@@ -1,10 +0,0 @@
|
||||
package org.lucares.performance.db;
|
||||
|
||||
public class InvalidValueException extends IllegalArgumentException {
|
||||
|
||||
private static final long serialVersionUID = -8707541995666127297L;
|
||||
|
||||
public InvalidValueException(final String msg) {
|
||||
super(msg);
|
||||
}
|
||||
}
|
||||
@@ -23,6 +23,7 @@ import org.apache.logging.log4j.Level;
|
||||
import org.apache.logging.log4j.core.config.Configurator;
|
||||
import org.lucares.collections.LongList;
|
||||
import org.lucares.pdb.api.Tags;
|
||||
import org.lucares.pdb.datastore.PdbFile;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@@ -79,10 +80,10 @@ public class PdbExport {
|
||||
long count = 0;
|
||||
long lastEpochMilli = 0;
|
||||
long begin = System.currentTimeMillis();
|
||||
|
||||
|
||||
for (final PdbFile pdbFile : pdbFiles) {
|
||||
|
||||
if (writer == null || Files.size(exportFile) > GB) {
|
||||
if (writer == null || Files.size(exportFile) > 4 * GB) {
|
||||
if (writer != null) {
|
||||
writer.flush();
|
||||
writer.close();
|
||||
@@ -91,7 +92,7 @@ public class PdbExport {
|
||||
exportFiles.add(exportFile);
|
||||
writer = createWriter(exportFile);
|
||||
LOGGER.info("new export file: {}", exportFile);
|
||||
|
||||
|
||||
lastEpochMilli = 0;
|
||||
}
|
||||
|
||||
@@ -100,7 +101,6 @@ public class PdbExport {
|
||||
final Tags tags = pdbFile.getTags();
|
||||
final long tagsId = addNewTagsToDictionary(writer, tags, tagsIdCounter);
|
||||
|
||||
|
||||
final Iterator<LongList> it = timeValueStream.iterator();
|
||||
while (it.hasNext()) {
|
||||
final LongList entry = it.next();
|
||||
@@ -123,10 +123,11 @@ public class PdbExport {
|
||||
count++;
|
||||
final long chunk = 10_000_000;
|
||||
if (count % chunk == 0) {
|
||||
long end = System.currentTimeMillis();
|
||||
long duration = end-begin;
|
||||
long entriesPerSecond = (long)((double)chunk / (duration / 1000.0));
|
||||
LOGGER.info("progress: {} - {} entries/s + duration {}" , String.format("%,d",count), String.format("%,d",entriesPerSecond), duration);
|
||||
final long end = System.currentTimeMillis();
|
||||
final long duration = end - begin;
|
||||
final long entriesPerSecond = (long) (chunk / (duration / 1000.0));
|
||||
LOGGER.info("progress: {} - {} entries/s + duration {}", String.format("%,d", count),
|
||||
String.format("%,d", entriesPerSecond), duration);
|
||||
begin = System.currentTimeMillis();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,94 +0,0 @@
|
||||
package org.lucares.performance.db;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.lucares.collections.LongList;
|
||||
import org.lucares.pdb.api.RuntimeIOException;
|
||||
import org.lucares.pdb.api.Tags;
|
||||
import org.lucares.pdb.blockstorage.BSFile;
|
||||
import org.lucares.pdb.diskstorage.DiskStorage;
|
||||
|
||||
class PdbFile {
|
||||
|
||||
private static class PdbFileToLongStream implements Function<PdbFile, Stream<LongList>> {
|
||||
|
||||
private final DiskStorage diskStorage;
|
||||
|
||||
public PdbFileToLongStream(final DiskStorage diskStorage) {
|
||||
this.diskStorage = diskStorage;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<LongList> apply(final PdbFile pdbFile) {
|
||||
try {
|
||||
final BSFile bsFile = BSFile.existingFile(pdbFile.getRootBlockNumber(), diskStorage);
|
||||
return bsFile.streamOfTimeValueLongLists();
|
||||
} catch (final IOException e) {
|
||||
throw new RuntimeIOException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private final Tags tags;
|
||||
|
||||
/**
|
||||
* The rootBlockNumber to be used by {@link BSFile}
|
||||
*/
|
||||
private final long rootBlockNumber;
|
||||
|
||||
public PdbFile(final long rootBlockNumber, final Tags tags) {
|
||||
this.rootBlockNumber = rootBlockNumber;
|
||||
this.tags = tags;
|
||||
}
|
||||
|
||||
public Tags getTags() {
|
||||
return tags;
|
||||
}
|
||||
|
||||
public long getRootBlockNumber() {
|
||||
return rootBlockNumber;
|
||||
}
|
||||
|
||||
public static Stream<LongList> toStream(final List<PdbFile> pdbFiles, final DiskStorage diskStorage) {
|
||||
|
||||
final Stream<LongList> longStream = pdbFiles.stream().flatMap(new PdbFileToLongStream(diskStorage));
|
||||
|
||||
return longStream;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "PdbFile [tags=" + tags + ", rootBlockNumber=" + rootBlockNumber + "]";
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = 1;
|
||||
result = prime * result + (int) (rootBlockNumber ^ (rootBlockNumber >>> 32));
|
||||
result = prime * result + ((tags == null) ? 0 : tags.hashCode());
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(final Object obj) {
|
||||
if (this == obj)
|
||||
return true;
|
||||
if (obj == null)
|
||||
return false;
|
||||
if (getClass() != obj.getClass())
|
||||
return false;
|
||||
final PdbFile other = (PdbFile) obj;
|
||||
if (rootBlockNumber != other.rootBlockNumber)
|
||||
return false;
|
||||
if (tags == null) {
|
||||
if (other.tags != null)
|
||||
return false;
|
||||
} else if (!tags.equals(other.tags))
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@@ -1,83 +0,0 @@
|
||||
package org.lucares.performance.db;
|
||||
|
||||
import java.io.Flushable;
|
||||
import java.io.IOException;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.lucares.pdb.api.Entry;
|
||||
import org.lucares.pdb.blockstorage.BSFile;
|
||||
import org.lucares.pdb.diskstorage.DiskStorage;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
class PdbWriter implements AutoCloseable, Flushable {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(PdbWriter.class);
|
||||
|
||||
private final PdbFile pdbFile;
|
||||
private long lastEpochMilli;
|
||||
|
||||
private final BSFile bsFile;
|
||||
|
||||
PdbWriter(final PdbFile pdbFile, final DiskStorage diskStorage) throws IOException {
|
||||
this.pdbFile = pdbFile;
|
||||
|
||||
bsFile = BSFile.existingFile(pdbFile.getRootBlockNumber(), diskStorage);
|
||||
final Optional<Long> optionalLastValue = bsFile.getLastValue();
|
||||
|
||||
lastEpochMilli = optionalLastValue.orElse(0L);
|
||||
}
|
||||
|
||||
public PdbFile getPdbFile() {
|
||||
return pdbFile;
|
||||
}
|
||||
|
||||
public long getDateOffsetAsEpochMilli() {
|
||||
return lastEpochMilli;
|
||||
}
|
||||
|
||||
public void write(final Entry entry) throws WriteException, InvalidValueException {
|
||||
final long epochMilli = entry.getEpochMilli();
|
||||
final long value = entry.getValue();
|
||||
write(epochMilli, value);
|
||||
}
|
||||
|
||||
private void write(final long epochMilli, final long value) throws WriteException, InvalidValueException {
|
||||
try {
|
||||
bsFile.appendTimeValue(epochMilli, value);
|
||||
|
||||
lastEpochMilli = epochMilli;
|
||||
} catch (final IOException e) {
|
||||
throw new WriteException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
|
||||
LOGGER.debug("close PdbWriter {}", pdbFile);
|
||||
bsFile.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() {
|
||||
bsFile.flush();
|
||||
}
|
||||
|
||||
public static void writeEntry(final PdbFile pdbFile, final DiskStorage diskStorage, final Entry... entries)
|
||||
throws IOException {
|
||||
try (PdbWriter writer = new PdbWriter(pdbFile, diskStorage)) {
|
||||
for (final Entry entry : entries) {
|
||||
writer.write(entry);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "PdbWriter [pdbFile=" + pdbFile + ", lastEpochMilli=" + lastEpochMilli + "]";
|
||||
}
|
||||
}
|
||||
@@ -17,7 +17,11 @@ import org.lucares.pdb.api.Entry;
|
||||
import org.lucares.pdb.api.GroupResult;
|
||||
import org.lucares.pdb.api.Result;
|
||||
import org.lucares.pdb.api.Tags;
|
||||
import org.lucares.pdb.datastore.InvalidValueException;
|
||||
import org.lucares.pdb.datastore.PdbFile;
|
||||
import org.lucares.pdb.datastore.PdbWriter;
|
||||
import org.lucares.pdb.datastore.Proposal;
|
||||
import org.lucares.pdb.datastore.WriteException;
|
||||
import org.lucares.pdb.datastore.internal.DataStore;
|
||||
import org.lucares.pdb.datastore.lang.SyntaxException;
|
||||
import org.lucares.pdb.diskstorage.DiskStorage;
|
||||
|
||||
@@ -1,12 +0,0 @@
|
||||
package org.lucares.performance.db;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class ReadException extends RuntimeException {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public ReadException(final IOException e) {
|
||||
super(e);
|
||||
}
|
||||
}
|
||||
@@ -1,18 +0,0 @@
|
||||
package org.lucares.performance.db;
|
||||
|
||||
public class ReadRuntimeException extends RuntimeException {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public ReadRuntimeException(final String message, final Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
public ReadRuntimeException(final String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public ReadRuntimeException(final Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,5 @@
|
||||
package org.lucares.performance.db;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
@@ -8,6 +7,10 @@ import java.util.function.Consumer;
|
||||
|
||||
import org.lucares.pdb.api.Tags;
|
||||
import org.lucares.pdb.datastore.Doc;
|
||||
import org.lucares.pdb.datastore.PdbFile;
|
||||
import org.lucares.pdb.datastore.PdbWriter;
|
||||
import org.lucares.pdb.datastore.ReadException;
|
||||
import org.lucares.pdb.datastore.WriteException;
|
||||
import org.lucares.pdb.datastore.internal.DataStore;
|
||||
import org.lucares.utils.cache.HotEntryCache;
|
||||
import org.lucares.utils.cache.HotEntryCache.Event;
|
||||
@@ -19,8 +22,6 @@ import org.slf4j.LoggerFactory;
|
||||
public class TagsToFile implements AutoCloseable {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(TagsToFile.class);
|
||||
private final static Logger METRICS_LOGGER_NEW_WRITER = LoggerFactory
|
||||
.getLogger("org.lucares.metrics.ingestion.tagsToFile.newPdbWriter");
|
||||
|
||||
private static final class CacheKey implements Comparable<CacheKey> {
|
||||
private final Tags tags;
|
||||
@@ -77,7 +78,6 @@ public class TagsToFile implements AutoCloseable {
|
||||
|
||||
writerCache = new HotEntryCache<>(Duration.ofSeconds(10), "writerCache");
|
||||
writerCache.addListener(new RemovalListener(), EventType.EVICTED, EventType.REMOVED);
|
||||
|
||||
}
|
||||
|
||||
public List<PdbFile> getFilesForQuery(final String query) {
|
||||
@@ -115,18 +115,7 @@ public class TagsToFile implements AutoCloseable {
|
||||
if (writer == null) {
|
||||
|
||||
LOGGER.trace("getByTags({})", tags);
|
||||
final List<Doc> docsForTags = dataStore.getByTags(tags);
|
||||
if (docsForTags.size() > 0) {
|
||||
try {
|
||||
final Doc doc = docsForTags.get(0);
|
||||
final PdbFile pdbFile = new PdbFile(doc.getRootBlockNumber(), tags);
|
||||
writer = new PdbWriter(pdbFile, dataStore.getDiskStorage());
|
||||
} catch (final IOException e) {
|
||||
throw new ReadException(e);
|
||||
}
|
||||
} else {
|
||||
writer = newPdbWriter(tags);
|
||||
}
|
||||
writer = dataStore.getWriter(tags);
|
||||
writerCache.put(cacheKey, writer);
|
||||
}
|
||||
}
|
||||
@@ -134,29 +123,6 @@ public class TagsToFile implements AutoCloseable {
|
||||
return writer;
|
||||
}
|
||||
|
||||
private PdbWriter newPdbWriter(final Tags tags) {
|
||||
final long start = System.nanoTime();
|
||||
try {
|
||||
final PdbFile pdbFile = createNewPdbFile(tags);
|
||||
final PdbWriter result = new PdbWriter(pdbFile, dataStore.getDiskStorage());
|
||||
|
||||
METRICS_LOGGER_NEW_WRITER.debug("newPdbWriter took {}ms tags: {}",
|
||||
(System.nanoTime() - start) / 1_000_000.0, tags);
|
||||
return result;
|
||||
} catch (final IOException e) {
|
||||
throw new WriteException(e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private PdbFile createNewPdbFile(final Tags tags) throws IOException {
|
||||
|
||||
final long rootBlockNumber = dataStore.createNewFile(tags);
|
||||
|
||||
final PdbFile result = new PdbFile(rootBlockNumber, tags);
|
||||
return result;
|
||||
}
|
||||
|
||||
private void forEachWriter(final Consumer<PdbWriter> consumer) {
|
||||
writerCache.forEach(writer -> {
|
||||
try {
|
||||
|
||||
@@ -1,15 +0,0 @@
|
||||
package org.lucares.performance.db;
|
||||
|
||||
public class WriteException extends RuntimeException {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public WriteException(final String message, final Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
public WriteException(final Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -8,6 +8,7 @@ import java.time.ZoneOffset;
|
||||
|
||||
import org.lucares.pdb.api.Entry;
|
||||
import org.lucares.pdb.api.Tags;
|
||||
import org.lucares.pdb.datastore.PdbWriter;
|
||||
import org.lucares.pdb.datastore.internal.DataStore;
|
||||
import org.testng.Assert;
|
||||
import org.testng.annotations.AfterMethod;
|
||||
|
||||
Reference in New Issue
Block a user