From bcba1177422c07ea3dc229edb6067531a0a9c1b9 Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Sat, 18 Sep 2021 19:41:40 +0200 Subject: [PATCH] do not use static string compressor in upload handlers --- .../lucares/pdb/datastore/internal/DataStore.java | 4 ++++ .../pdbui/CsvToEntryTransformerFactory.java | 5 +++-- .../java/org/lucares/pdbui/CsvUploadHandler.java | 9 +++++++-- .../org/lucares/pdbui/FileDropZipHandler.java | 8 ++++++-- .../java/org/lucares/pdbui/IngestionHandler.java | 8 ++++++-- .../org/lucares/pdbui/MySpringConfiguration.java | 6 ++++++ .../pdbui/NoCopyCsvToEntryTransformer.java | 15 ++++++++++----- .../main/java/org/lucares/pdbui/TcpIngestor.java | 9 +++++++-- .../org/lucares/pdbui/FileDropHandlerTest.java | 7 ++++++- .../pdbui/NoCopyCsvToEntryTransformerTest.java | 9 +++++++-- .../org/lucares/performance/db/PerformanceDb.java | 4 ++++ 11 files changed, 66 insertions(+), 18 deletions(-) diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java index 4745a6f..ff85354 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java @@ -163,6 +163,10 @@ public class DataStore implements AutoCloseable { } } + public StringCompressor getStringCompressor() { + return stringCompressor; + } + // visible for test QueryCompletionIndex getQueryCompletionIndex() { return queryCompletionIndex; diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformerFactory.java b/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformerFactory.java index 17636aa..937e443 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformerFactory.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformerFactory.java @@ -3,16 +3,17 @@ package org.lucares.pdbui; import java.util.Objects; import java.util.concurrent.ArrayBlockingQueue; +import org.lucares.pdb.api.StringCompressor; import org.lucares.pdb.datastore.Entries; public class CsvToEntryTransformerFactory { public static CsvToEntryTransformer createCsvToEntryTransformer(final ArrayBlockingQueue queue, - final CsvReaderSettings settings) { + final CsvReaderSettings settings, final StringCompressor stringCompressor) { if (settings.getQuoteCharacter() == null && Objects.equals(settings.getDateTimePattern(), CsvReaderSettings.ISO_8601)) { - return new NoCopyCsvToEntryTransformer(queue, settings); + return new NoCopyCsvToEntryTransformer(queue, settings, stringCompressor); } else { return new CsvReaderCsvToEntryTransformer(queue, settings); } diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/CsvUploadHandler.java b/pdb-ui/src/main/java/org/lucares/pdbui/CsvUploadHandler.java index bbacd4f..68784de 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/CsvUploadHandler.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/CsvUploadHandler.java @@ -12,6 +12,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.lucares.pdb.api.StringCompressor; import org.lucares.pdb.datastore.Entries; import org.lucares.performance.db.PerformanceDb; import org.lucares.utils.file.FileUtils; @@ -30,8 +31,11 @@ public class CsvUploadHandler implements PropertyKeys, DisposableBean { private final PerformanceDb performanceDb; - public CsvUploadHandler(final PerformanceDb performanceDb) { + private final StringCompressor stringCompressor; + + public CsvUploadHandler(final PerformanceDb performanceDb, final StringCompressor stringCompressor) { this.performanceDb = performanceDb; + this.stringCompressor = stringCompressor; } public void ingest(final List files, final CsvReaderSettings settings) @@ -48,7 +52,8 @@ public class CsvUploadHandler implements PropertyKeys, DisposableBean { // improved the // ingestion performance fom 1.1m to 1.55m values per second on average synchronized (this) { - final NoCopyCsvToEntryTransformer csvToEntryTransformer = new NoCopyCsvToEntryTransformer(queue, settings); + final NoCopyCsvToEntryTransformer csvToEntryTransformer = new NoCopyCsvToEntryTransformer(queue, + settings, stringCompressor); try (InputStream in = file.getInputStream()) { csvToEntryTransformer.readCSV(in); } catch (final Exception e) { diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/FileDropZipHandler.java b/pdb-ui/src/main/java/org/lucares/pdbui/FileDropZipHandler.java index 768da62..b72bb48 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/FileDropZipHandler.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/FileDropZipHandler.java @@ -10,6 +10,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.zip.ZipEntry; import java.util.zip.ZipFile; +import org.lucares.pdb.api.StringCompressor; import org.lucares.pdb.datastore.Entries; import org.lucares.pdb.datastore.RuntimeTimeoutException; import org.lucares.performance.db.PerformanceDb; @@ -21,12 +22,15 @@ public class FileDropZipHandler implements FileDropFileTypeHandler { private final PerformanceDb performanceDb; private final FileDropConfigProvider configProvider; + private final StringCompressor stringCompressor; @Autowired - public FileDropZipHandler(final PerformanceDb performanceDb, final FileDropConfigProvider configProvider) { + public FileDropZipHandler(final PerformanceDb performanceDb, final FileDropConfigProvider configProvider, + final StringCompressor stringCompressor) { super(); this.performanceDb = performanceDb; this.configProvider = configProvider; + this.stringCompressor = stringCompressor; } @Override @@ -54,7 +58,7 @@ public class FileDropZipHandler implements FileDropFileTypeHandler { final CsvReaderSettings csvReaderSettings = csvSettings.get(); final CsvToEntryTransformer csvToEntryTransformer = CsvToEntryTransformerFactory - .createCsvToEntryTransformer(queue, csvReaderSettings); + .createCsvToEntryTransformer(queue, csvReaderSettings, stringCompressor); try (final InputStream inputStream = new BufferedInputStream(zipFile.getInputStream(entry), 1024 * 1024)) { csvToEntryTransformer.readCSV(inputStream); diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java b/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java index 99a960d..a7a9e58 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java @@ -15,6 +15,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.TimeoutException; import java.util.zip.GZIPInputStream; +import org.lucares.pdb.api.StringCompressor; import org.lucares.pdb.datastore.Entries; import org.lucares.pdb.datastore.Entry; import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions; @@ -25,10 +26,13 @@ public final class IngestionHandler implements Callable { final Socket clientSocket; private final ArrayBlockingQueue queue; + private final StringCompressor stringCompressor; - public IngestionHandler(final Socket clientSocket, final ArrayBlockingQueue queue) { + public IngestionHandler(final Socket clientSocket, final ArrayBlockingQueue queue, + final StringCompressor stringCompressor) { this.clientSocket = clientSocket; this.queue = queue; + this.stringCompressor = stringCompressor; } @Override @@ -65,7 +69,7 @@ public final class IngestionHandler implements Callable { } else { in.reset(); final NoCopyCsvToEntryTransformer csvTransformer = new NoCopyCsvToEntryTransformer(queue, - CsvReaderSettings.create("@timestamp", "duration", ",", new ColumnDefinitions())); + CsvReaderSettings.create("@timestamp", "duration", ",", new ColumnDefinitions()), stringCompressor); csvTransformer.readCSV(in); } } diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/MySpringConfiguration.java b/pdb-ui/src/main/java/org/lucares/pdbui/MySpringConfiguration.java index 263d4d2..423f38d 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/MySpringConfiguration.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/MySpringConfiguration.java @@ -4,6 +4,7 @@ import java.io.IOException; import java.nio.file.Path; import java.nio.file.Paths; +import org.lucares.pdb.api.StringCompressor; import org.lucares.performance.db.PerformanceDb; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,4 +29,9 @@ public class MySpringConfiguration { return new PerformanceDb(dataDirectory); } + + @Bean + StringCompressor stringCompressor(final PerformanceDb performanceDb) { + return performanceDb.getRealDataStore().getStringCompressor(); + } } diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/NoCopyCsvToEntryTransformer.java b/pdb-ui/src/main/java/org/lucares/pdbui/NoCopyCsvToEntryTransformer.java index 43d28d6..9a74a83 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/NoCopyCsvToEntryTransformer.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/NoCopyCsvToEntryTransformer.java @@ -11,6 +11,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import org.lucares.collections.IntList; +import org.lucares.pdb.api.StringCompressor; import org.lucares.pdb.api.Tags; import org.lucares.pdb.api.TagsBuilder; import org.lucares.pdb.datastore.Entries; @@ -31,9 +32,13 @@ class NoCopyCsvToEntryTransformer implements CsvToEntryTransformer { private int[] compressedHeaders; private List> postProcessersForColumns; - public NoCopyCsvToEntryTransformer(final ArrayBlockingQueue queue, final CsvReaderSettings settings) { + private final StringCompressor stringCompressor; + + public NoCopyCsvToEntryTransformer(final ArrayBlockingQueue queue, final CsvReaderSettings settings, + final StringCompressor stringCompressor) { this.queue = queue; this.settings = settings; + this.stringCompressor = stringCompressor; } @Override @@ -54,8 +59,8 @@ class NoCopyCsvToEntryTransformer implements CsvToEntryTransformer { int lineCounter = 0; final byte[] buffer = new byte[4096 * 16]; - final int keyTimestamp = Tags.STRING_COMPRESSOR.put(settings.getTimeColumn()); - final int keyDuration = Tags.STRING_COMPRESSOR.put(settings.getValueColumn()); + final int keyTimestamp = stringCompressor.put(settings.getTimeColumn()); + final int keyDuration = stringCompressor.put(settings.getValueColumn()); final FastISODateParser dateParser = new FastISODateParser(); Tags additionalTags = initAdditionalTags(settings); @@ -143,7 +148,7 @@ class NoCopyCsvToEntryTransformer implements CsvToEntryTransformer { final String renameTo = settings.getColumnDefinitions().getRenameTo(columnName); final String renamedColumn = renameTo != null ? renameTo : columnName; - columns[i] = Tags.STRING_COMPRESSOR.put(renamedColumn); + columns[i] = stringCompressor.put(renamedColumn); final EnumSet postProcessors = settings.getColumnDefinitions() .getPostProcessors(columnName); final Function postProcessFunction = PostProcessors.toFunction(postProcessors); @@ -185,7 +190,7 @@ class NoCopyCsvToEntryTransformer implements CsvToEntryTransformer { duration = parseLong(line, lastSeparatorPosition + 1, separatorPosition); } else if (lastSeparatorPosition + 1 < separatorPosition) { // value is not empty final Function postProcess = postProcessersForColumns.get(i); - final int value = Tags.STRING_COMPRESSOR.put(line, lastSeparatorPosition + 1, separatorPosition, + final int value = stringCompressor.put(line, lastSeparatorPosition + 1, separatorPosition, postProcess); tagsBuilder.add(key, value); diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java b/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java index 2ded87b..ffbece1 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java @@ -15,6 +15,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.PreDestroy; +import org.lucares.pdb.api.StringCompressor; import org.lucares.pdb.datastore.Entries; import org.lucares.performance.db.PerformanceDb; import org.lucares.recommind.logs.Config; @@ -40,15 +41,19 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { private volatile int port = PORT; + private final StringCompressor stringCompressor; + public TcpIngestor(final Path dataDirectory) throws IOException { LOGGER.info("opening performance db: " + dataDirectory); db = new PerformanceDb(dataDirectory); + stringCompressor = db.getRealDataStore().getStringCompressor(); LOGGER.debug("performance db open"); } @Autowired - public TcpIngestor(final PerformanceDb db) { + public TcpIngestor(final PerformanceDb db, final StringCompressor stringCompressor) { this.db = db; + this.stringCompressor = stringCompressor; } public void useRandomPort() { @@ -94,7 +99,7 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { LOGGER.debug("accepted connection: " + clientSocket.getRemoteSocketAddress()); final ArrayBlockingQueue queue = db.getQueue(); - workerThreadPool.submit(new IngestionHandler(clientSocket, queue)); + workerThreadPool.submit(new IngestionHandler(clientSocket, queue, stringCompressor)); LOGGER.debug("handler submitted"); } catch (final SocketTimeoutException e) { // expected every 100ms diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/FileDropHandlerTest.java b/pdb-ui/src/test/java/org/lucares/pdbui/FileDropHandlerTest.java index b08890a..2dd7c89 100644 --- a/pdb-ui/src/test/java/org/lucares/pdbui/FileDropHandlerTest.java +++ b/pdb-ui/src/test/java/org/lucares/pdbui/FileDropHandlerTest.java @@ -18,6 +18,7 @@ import org.lucares.collections.LongList; import org.lucares.pdb.api.DateTimeRange; import org.lucares.pdb.api.Query; import org.lucares.pdb.api.Result; +import org.lucares.pdb.api.StringCompressor; import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions; import org.lucares.pdbui.CsvReaderSettings.PostProcessors; import org.lucares.pdbui.domain.FileDropConfig; @@ -101,7 +102,11 @@ public class FileDropHandlerTest { final FileDropConfigProvider fileDropConfigProvider = new FileDropConfigProvider( fileDropConfigLocation.toString()); final String fileDropBaseDir = dataDirectory.resolve("drop").toAbsolutePath().toString(); - final List handlers = List.of(new FileDropZipHandler(db, fileDropConfigProvider)); + + final StringCompressor stringCompressor = db.getRealDataStore().getStringCompressor(); + + final List handlers = List + .of(new FileDropZipHandler(db, fileDropConfigProvider, stringCompressor)); return new FileDropHandler(fileDropBaseDir, handlers); } diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/NoCopyCsvToEntryTransformerTest.java b/pdb-ui/src/test/java/org/lucares/pdbui/NoCopyCsvToEntryTransformerTest.java index de96369..530ae00 100644 --- a/pdb-ui/src/test/java/org/lucares/pdbui/NoCopyCsvToEntryTransformerTest.java +++ b/pdb-ui/src/test/java/org/lucares/pdbui/NoCopyCsvToEntryTransformerTest.java @@ -19,6 +19,7 @@ import org.junit.jupiter.api.Test; import org.lucares.collections.LongList; import org.lucares.pdb.api.DateTimeRange; import org.lucares.pdb.api.Query; +import org.lucares.pdb.api.StringCompressor; import org.lucares.pdb.datastore.Entries; import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions; import org.lucares.performance.db.PerformanceDb; @@ -44,6 +45,7 @@ public class NoCopyCsvToEntryTransformerTest { final OffsetDateTime dateB = OffsetDateTime.now(); try (final PerformanceDb db = new PerformanceDb(dataDirectory)) { + final StringCompressor stringCompressor = db.getRealDataStore().getStringCompressor(); final String csv = "@timestamp,duration,tag\n"// + dateA.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",1,tagValue\n"// @@ -52,7 +54,8 @@ public class NoCopyCsvToEntryTransformerTest { final ArrayBlockingQueue queue = db.getQueue(); final CsvReaderSettings settings = CsvReaderSettings.create("@timestamp", "duration", ",", new ColumnDefinitions()); - final NoCopyCsvToEntryTransformer csvToEntryTransformer = new NoCopyCsvToEntryTransformer(queue, settings); + final NoCopyCsvToEntryTransformer csvToEntryTransformer = new NoCopyCsvToEntryTransformer(queue, settings, + stringCompressor); csvToEntryTransformer.readCSV(new ByteArrayInputStream(csv.getBytes(StandardCharsets.UTF_8))); queue.put(Entries.POISON); } @@ -84,6 +87,7 @@ public class NoCopyCsvToEntryTransformerTest { public void testIgnoreColumns() throws IOException, InterruptedException, TimeoutException { try (final PerformanceDb db = new PerformanceDb(dataDirectory)) { + final StringCompressor stringCompressor = db.getRealDataStore().getStringCompressor(); final String csv = "@timestamp,duration,ignoredColumn,-otherIgnoredColumn,tag\n"// + "2000-01-01T00:00:00.000Z,1,ignoreValue,ignoreValue,tagValue\n"// @@ -94,7 +98,8 @@ public class NoCopyCsvToEntryTransformerTest { columnDefinitions.ignoreColumn("ignoredColumn"); final CsvReaderSettings settings = CsvReaderSettings.create("@timestamp", "duration", ",", columnDefinitions); - final NoCopyCsvToEntryTransformer csvToEntryTransformer = new NoCopyCsvToEntryTransformer(queue, settings); + final NoCopyCsvToEntryTransformer csvToEntryTransformer = new NoCopyCsvToEntryTransformer(queue, settings, + stringCompressor); csvToEntryTransformer.readCSV(new ByteArrayInputStream(csv.getBytes(StandardCharsets.UTF_8))); queue.put(Entries.POISON); } diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java index 82a05cf..e52418a 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java @@ -231,6 +231,10 @@ public class PerformanceDb implements AutoCloseable { return fields; } + public DataStore getRealDataStore() { + return dataStore; + } + public PartitionDiskStore getDataStore() { return dataStore.getDiskStorage(); }