do not use static string compressor in upload handlers

This commit is contained in:
2021-09-18 19:41:40 +02:00
parent 3e1002a99d
commit bcba117742
11 changed files with 66 additions and 18 deletions

View File

@@ -163,6 +163,10 @@ public class DataStore implements AutoCloseable {
} }
} }
public StringCompressor getStringCompressor() {
return stringCompressor;
}
// visible for test // visible for test
QueryCompletionIndex getQueryCompletionIndex() { QueryCompletionIndex getQueryCompletionIndex() {
return queryCompletionIndex; return queryCompletionIndex;

View File

@@ -3,16 +3,17 @@ package org.lucares.pdbui;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import org.lucares.pdb.api.StringCompressor;
import org.lucares.pdb.datastore.Entries; import org.lucares.pdb.datastore.Entries;
public class CsvToEntryTransformerFactory { public class CsvToEntryTransformerFactory {
public static CsvToEntryTransformer createCsvToEntryTransformer(final ArrayBlockingQueue<Entries> queue, public static CsvToEntryTransformer createCsvToEntryTransformer(final ArrayBlockingQueue<Entries> queue,
final CsvReaderSettings settings) { final CsvReaderSettings settings, final StringCompressor stringCompressor) {
if (settings.getQuoteCharacter() == null if (settings.getQuoteCharacter() == null
&& Objects.equals(settings.getDateTimePattern(), CsvReaderSettings.ISO_8601)) { && Objects.equals(settings.getDateTimePattern(), CsvReaderSettings.ISO_8601)) {
return new NoCopyCsvToEntryTransformer(queue, settings); return new NoCopyCsvToEntryTransformer(queue, settings, stringCompressor);
} else { } else {
return new CsvReaderCsvToEntryTransformer(queue, settings); return new CsvReaderCsvToEntryTransformer(queue, settings);
} }

View File

@@ -12,6 +12,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import org.lucares.pdb.api.StringCompressor;
import org.lucares.pdb.datastore.Entries; import org.lucares.pdb.datastore.Entries;
import org.lucares.performance.db.PerformanceDb; import org.lucares.performance.db.PerformanceDb;
import org.lucares.utils.file.FileUtils; import org.lucares.utils.file.FileUtils;
@@ -30,8 +31,11 @@ public class CsvUploadHandler implements PropertyKeys, DisposableBean {
private final PerformanceDb performanceDb; private final PerformanceDb performanceDb;
public CsvUploadHandler(final PerformanceDb performanceDb) { private final StringCompressor stringCompressor;
public CsvUploadHandler(final PerformanceDb performanceDb, final StringCompressor stringCompressor) {
this.performanceDb = performanceDb; this.performanceDb = performanceDb;
this.stringCompressor = stringCompressor;
} }
public void ingest(final List<MultipartFile> files, final CsvReaderSettings settings) public void ingest(final List<MultipartFile> files, final CsvReaderSettings settings)
@@ -48,7 +52,8 @@ public class CsvUploadHandler implements PropertyKeys, DisposableBean {
// improved the // improved the
// ingestion performance fom 1.1m to 1.55m values per second on average // ingestion performance fom 1.1m to 1.55m values per second on average
synchronized (this) { synchronized (this) {
final NoCopyCsvToEntryTransformer csvToEntryTransformer = new NoCopyCsvToEntryTransformer(queue, settings); final NoCopyCsvToEntryTransformer csvToEntryTransformer = new NoCopyCsvToEntryTransformer(queue,
settings, stringCompressor);
try (InputStream in = file.getInputStream()) { try (InputStream in = file.getInputStream()) {
csvToEntryTransformer.readCSV(in); csvToEntryTransformer.readCSV(in);
} catch (final Exception e) { } catch (final Exception e) {

View File

@@ -10,6 +10,7 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.zip.ZipEntry; import java.util.zip.ZipEntry;
import java.util.zip.ZipFile; import java.util.zip.ZipFile;
import org.lucares.pdb.api.StringCompressor;
import org.lucares.pdb.datastore.Entries; import org.lucares.pdb.datastore.Entries;
import org.lucares.pdb.datastore.RuntimeTimeoutException; import org.lucares.pdb.datastore.RuntimeTimeoutException;
import org.lucares.performance.db.PerformanceDb; import org.lucares.performance.db.PerformanceDb;
@@ -21,12 +22,15 @@ public class FileDropZipHandler implements FileDropFileTypeHandler {
private final PerformanceDb performanceDb; private final PerformanceDb performanceDb;
private final FileDropConfigProvider configProvider; private final FileDropConfigProvider configProvider;
private final StringCompressor stringCompressor;
@Autowired @Autowired
public FileDropZipHandler(final PerformanceDb performanceDb, final FileDropConfigProvider configProvider) { public FileDropZipHandler(final PerformanceDb performanceDb, final FileDropConfigProvider configProvider,
final StringCompressor stringCompressor) {
super(); super();
this.performanceDb = performanceDb; this.performanceDb = performanceDb;
this.configProvider = configProvider; this.configProvider = configProvider;
this.stringCompressor = stringCompressor;
} }
@Override @Override
@@ -54,7 +58,7 @@ public class FileDropZipHandler implements FileDropFileTypeHandler {
final CsvReaderSettings csvReaderSettings = csvSettings.get(); final CsvReaderSettings csvReaderSettings = csvSettings.get();
final CsvToEntryTransformer csvToEntryTransformer = CsvToEntryTransformerFactory final CsvToEntryTransformer csvToEntryTransformer = CsvToEntryTransformerFactory
.createCsvToEntryTransformer(queue, csvReaderSettings); .createCsvToEntryTransformer(queue, csvReaderSettings, stringCompressor);
try (final InputStream inputStream = new BufferedInputStream(zipFile.getInputStream(entry), try (final InputStream inputStream = new BufferedInputStream(zipFile.getInputStream(entry),
1024 * 1024)) { 1024 * 1024)) {
csvToEntryTransformer.readCSV(inputStream); csvToEntryTransformer.readCSV(inputStream);

View File

@@ -15,6 +15,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.zip.GZIPInputStream; import java.util.zip.GZIPInputStream;
import org.lucares.pdb.api.StringCompressor;
import org.lucares.pdb.datastore.Entries; import org.lucares.pdb.datastore.Entries;
import org.lucares.pdb.datastore.Entry; import org.lucares.pdb.datastore.Entry;
import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions; import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions;
@@ -25,10 +26,13 @@ public final class IngestionHandler implements Callable<Void> {
final Socket clientSocket; final Socket clientSocket;
private final ArrayBlockingQueue<Entries> queue; private final ArrayBlockingQueue<Entries> queue;
private final StringCompressor stringCompressor;
public IngestionHandler(final Socket clientSocket, final ArrayBlockingQueue<Entries> queue) { public IngestionHandler(final Socket clientSocket, final ArrayBlockingQueue<Entries> queue,
final StringCompressor stringCompressor) {
this.clientSocket = clientSocket; this.clientSocket = clientSocket;
this.queue = queue; this.queue = queue;
this.stringCompressor = stringCompressor;
} }
@Override @Override
@@ -65,7 +69,7 @@ public final class IngestionHandler implements Callable<Void> {
} else { } else {
in.reset(); in.reset();
final NoCopyCsvToEntryTransformer csvTransformer = new NoCopyCsvToEntryTransformer(queue, final NoCopyCsvToEntryTransformer csvTransformer = new NoCopyCsvToEntryTransformer(queue,
CsvReaderSettings.create("@timestamp", "duration", ",", new ColumnDefinitions())); CsvReaderSettings.create("@timestamp", "duration", ",", new ColumnDefinitions()), stringCompressor);
csvTransformer.readCSV(in); csvTransformer.readCSV(in);
} }
} }

View File

@@ -4,6 +4,7 @@ import java.io.IOException;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import org.lucares.pdb.api.StringCompressor;
import org.lucares.performance.db.PerformanceDb; import org.lucares.performance.db.PerformanceDb;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -28,4 +29,9 @@ public class MySpringConfiguration {
return new PerformanceDb(dataDirectory); return new PerformanceDb(dataDirectory);
} }
@Bean
StringCompressor stringCompressor(final PerformanceDb performanceDb) {
return performanceDb.getRealDataStore().getStringCompressor();
}
} }

View File

@@ -11,6 +11,7 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Function; import java.util.function.Function;
import org.lucares.collections.IntList; import org.lucares.collections.IntList;
import org.lucares.pdb.api.StringCompressor;
import org.lucares.pdb.api.Tags; import org.lucares.pdb.api.Tags;
import org.lucares.pdb.api.TagsBuilder; import org.lucares.pdb.api.TagsBuilder;
import org.lucares.pdb.datastore.Entries; import org.lucares.pdb.datastore.Entries;
@@ -31,9 +32,13 @@ class NoCopyCsvToEntryTransformer implements CsvToEntryTransformer {
private int[] compressedHeaders; private int[] compressedHeaders;
private List<Function<String, String>> postProcessersForColumns; private List<Function<String, String>> postProcessersForColumns;
public NoCopyCsvToEntryTransformer(final ArrayBlockingQueue<Entries> queue, final CsvReaderSettings settings) { private final StringCompressor stringCompressor;
public NoCopyCsvToEntryTransformer(final ArrayBlockingQueue<Entries> queue, final CsvReaderSettings settings,
final StringCompressor stringCompressor) {
this.queue = queue; this.queue = queue;
this.settings = settings; this.settings = settings;
this.stringCompressor = stringCompressor;
} }
@Override @Override
@@ -54,8 +59,8 @@ class NoCopyCsvToEntryTransformer implements CsvToEntryTransformer {
int lineCounter = 0; int lineCounter = 0;
final byte[] buffer = new byte[4096 * 16]; final byte[] buffer = new byte[4096 * 16];
final int keyTimestamp = Tags.STRING_COMPRESSOR.put(settings.getTimeColumn()); final int keyTimestamp = stringCompressor.put(settings.getTimeColumn());
final int keyDuration = Tags.STRING_COMPRESSOR.put(settings.getValueColumn()); final int keyDuration = stringCompressor.put(settings.getValueColumn());
final FastISODateParser dateParser = new FastISODateParser(); final FastISODateParser dateParser = new FastISODateParser();
Tags additionalTags = initAdditionalTags(settings); Tags additionalTags = initAdditionalTags(settings);
@@ -143,7 +148,7 @@ class NoCopyCsvToEntryTransformer implements CsvToEntryTransformer {
final String renameTo = settings.getColumnDefinitions().getRenameTo(columnName); final String renameTo = settings.getColumnDefinitions().getRenameTo(columnName);
final String renamedColumn = renameTo != null ? renameTo : columnName; final String renamedColumn = renameTo != null ? renameTo : columnName;
columns[i] = Tags.STRING_COMPRESSOR.put(renamedColumn); columns[i] = stringCompressor.put(renamedColumn);
final EnumSet<PostProcessors> postProcessors = settings.getColumnDefinitions() final EnumSet<PostProcessors> postProcessors = settings.getColumnDefinitions()
.getPostProcessors(columnName); .getPostProcessors(columnName);
final Function<String, String> postProcessFunction = PostProcessors.toFunction(postProcessors); final Function<String, String> postProcessFunction = PostProcessors.toFunction(postProcessors);
@@ -185,7 +190,7 @@ class NoCopyCsvToEntryTransformer implements CsvToEntryTransformer {
duration = parseLong(line, lastSeparatorPosition + 1, separatorPosition); duration = parseLong(line, lastSeparatorPosition + 1, separatorPosition);
} else if (lastSeparatorPosition + 1 < separatorPosition) { // value is not empty } else if (lastSeparatorPosition + 1 < separatorPosition) { // value is not empty
final Function<String, String> postProcess = postProcessersForColumns.get(i); final Function<String, String> postProcess = postProcessersForColumns.get(i);
final int value = Tags.STRING_COMPRESSOR.put(line, lastSeparatorPosition + 1, separatorPosition, final int value = stringCompressor.put(line, lastSeparatorPosition + 1, separatorPosition,
postProcess); postProcess);
tagsBuilder.add(key, value); tagsBuilder.add(key, value);

View File

@@ -15,6 +15,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
import org.lucares.pdb.api.StringCompressor;
import org.lucares.pdb.datastore.Entries; import org.lucares.pdb.datastore.Entries;
import org.lucares.performance.db.PerformanceDb; import org.lucares.performance.db.PerformanceDb;
import org.lucares.recommind.logs.Config; import org.lucares.recommind.logs.Config;
@@ -40,15 +41,19 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean {
private volatile int port = PORT; private volatile int port = PORT;
private final StringCompressor stringCompressor;
public TcpIngestor(final Path dataDirectory) throws IOException { public TcpIngestor(final Path dataDirectory) throws IOException {
LOGGER.info("opening performance db: " + dataDirectory); LOGGER.info("opening performance db: " + dataDirectory);
db = new PerformanceDb(dataDirectory); db = new PerformanceDb(dataDirectory);
stringCompressor = db.getRealDataStore().getStringCompressor();
LOGGER.debug("performance db open"); LOGGER.debug("performance db open");
} }
@Autowired @Autowired
public TcpIngestor(final PerformanceDb db) { public TcpIngestor(final PerformanceDb db, final StringCompressor stringCompressor) {
this.db = db; this.db = db;
this.stringCompressor = stringCompressor;
} }
public void useRandomPort() { public void useRandomPort() {
@@ -94,7 +99,7 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean {
LOGGER.debug("accepted connection: " + clientSocket.getRemoteSocketAddress()); LOGGER.debug("accepted connection: " + clientSocket.getRemoteSocketAddress());
final ArrayBlockingQueue<Entries> queue = db.getQueue(); final ArrayBlockingQueue<Entries> queue = db.getQueue();
workerThreadPool.submit(new IngestionHandler(clientSocket, queue)); workerThreadPool.submit(new IngestionHandler(clientSocket, queue, stringCompressor));
LOGGER.debug("handler submitted"); LOGGER.debug("handler submitted");
} catch (final SocketTimeoutException e) { } catch (final SocketTimeoutException e) {
// expected every 100ms // expected every 100ms

View File

@@ -18,6 +18,7 @@ import org.lucares.collections.LongList;
import org.lucares.pdb.api.DateTimeRange; import org.lucares.pdb.api.DateTimeRange;
import org.lucares.pdb.api.Query; import org.lucares.pdb.api.Query;
import org.lucares.pdb.api.Result; import org.lucares.pdb.api.Result;
import org.lucares.pdb.api.StringCompressor;
import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions; import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions;
import org.lucares.pdbui.CsvReaderSettings.PostProcessors; import org.lucares.pdbui.CsvReaderSettings.PostProcessors;
import org.lucares.pdbui.domain.FileDropConfig; import org.lucares.pdbui.domain.FileDropConfig;
@@ -101,7 +102,11 @@ public class FileDropHandlerTest {
final FileDropConfigProvider fileDropConfigProvider = new FileDropConfigProvider( final FileDropConfigProvider fileDropConfigProvider = new FileDropConfigProvider(
fileDropConfigLocation.toString()); fileDropConfigLocation.toString());
final String fileDropBaseDir = dataDirectory.resolve("drop").toAbsolutePath().toString(); final String fileDropBaseDir = dataDirectory.resolve("drop").toAbsolutePath().toString();
final List<FileDropFileTypeHandler> handlers = List.of(new FileDropZipHandler(db, fileDropConfigProvider));
final StringCompressor stringCompressor = db.getRealDataStore().getStringCompressor();
final List<FileDropFileTypeHandler> handlers = List
.of(new FileDropZipHandler(db, fileDropConfigProvider, stringCompressor));
return new FileDropHandler(fileDropBaseDir, handlers); return new FileDropHandler(fileDropBaseDir, handlers);
} }

View File

@@ -19,6 +19,7 @@ import org.junit.jupiter.api.Test;
import org.lucares.collections.LongList; import org.lucares.collections.LongList;
import org.lucares.pdb.api.DateTimeRange; import org.lucares.pdb.api.DateTimeRange;
import org.lucares.pdb.api.Query; import org.lucares.pdb.api.Query;
import org.lucares.pdb.api.StringCompressor;
import org.lucares.pdb.datastore.Entries; import org.lucares.pdb.datastore.Entries;
import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions; import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions;
import org.lucares.performance.db.PerformanceDb; import org.lucares.performance.db.PerformanceDb;
@@ -44,6 +45,7 @@ public class NoCopyCsvToEntryTransformerTest {
final OffsetDateTime dateB = OffsetDateTime.now(); final OffsetDateTime dateB = OffsetDateTime.now();
try (final PerformanceDb db = new PerformanceDb(dataDirectory)) { try (final PerformanceDb db = new PerformanceDb(dataDirectory)) {
final StringCompressor stringCompressor = db.getRealDataStore().getStringCompressor();
final String csv = "@timestamp,duration,tag\n"// final String csv = "@timestamp,duration,tag\n"//
+ dateA.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",1,tagValue\n"// + dateA.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",1,tagValue\n"//
@@ -52,7 +54,8 @@ public class NoCopyCsvToEntryTransformerTest {
final ArrayBlockingQueue<Entries> queue = db.getQueue(); final ArrayBlockingQueue<Entries> queue = db.getQueue();
final CsvReaderSettings settings = CsvReaderSettings.create("@timestamp", "duration", ",", final CsvReaderSettings settings = CsvReaderSettings.create("@timestamp", "duration", ",",
new ColumnDefinitions()); new ColumnDefinitions());
final NoCopyCsvToEntryTransformer csvToEntryTransformer = new NoCopyCsvToEntryTransformer(queue, settings); final NoCopyCsvToEntryTransformer csvToEntryTransformer = new NoCopyCsvToEntryTransformer(queue, settings,
stringCompressor);
csvToEntryTransformer.readCSV(new ByteArrayInputStream(csv.getBytes(StandardCharsets.UTF_8))); csvToEntryTransformer.readCSV(new ByteArrayInputStream(csv.getBytes(StandardCharsets.UTF_8)));
queue.put(Entries.POISON); queue.put(Entries.POISON);
} }
@@ -84,6 +87,7 @@ public class NoCopyCsvToEntryTransformerTest {
public void testIgnoreColumns() throws IOException, InterruptedException, TimeoutException { public void testIgnoreColumns() throws IOException, InterruptedException, TimeoutException {
try (final PerformanceDb db = new PerformanceDb(dataDirectory)) { try (final PerformanceDb db = new PerformanceDb(dataDirectory)) {
final StringCompressor stringCompressor = db.getRealDataStore().getStringCompressor();
final String csv = "@timestamp,duration,ignoredColumn,-otherIgnoredColumn,tag\n"// final String csv = "@timestamp,duration,ignoredColumn,-otherIgnoredColumn,tag\n"//
+ "2000-01-01T00:00:00.000Z,1,ignoreValue,ignoreValue,tagValue\n"// + "2000-01-01T00:00:00.000Z,1,ignoreValue,ignoreValue,tagValue\n"//
@@ -94,7 +98,8 @@ public class NoCopyCsvToEntryTransformerTest {
columnDefinitions.ignoreColumn("ignoredColumn"); columnDefinitions.ignoreColumn("ignoredColumn");
final CsvReaderSettings settings = CsvReaderSettings.create("@timestamp", "duration", ",", final CsvReaderSettings settings = CsvReaderSettings.create("@timestamp", "duration", ",",
columnDefinitions); columnDefinitions);
final NoCopyCsvToEntryTransformer csvToEntryTransformer = new NoCopyCsvToEntryTransformer(queue, settings); final NoCopyCsvToEntryTransformer csvToEntryTransformer = new NoCopyCsvToEntryTransformer(queue, settings,
stringCompressor);
csvToEntryTransformer.readCSV(new ByteArrayInputStream(csv.getBytes(StandardCharsets.UTF_8))); csvToEntryTransformer.readCSV(new ByteArrayInputStream(csv.getBytes(StandardCharsets.UTF_8)));
queue.put(Entries.POISON); queue.put(Entries.POISON);
} }

View File

@@ -231,6 +231,10 @@ public class PerformanceDb implements AutoCloseable {
return fields; return fields;
} }
public DataStore getRealDataStore() {
return dataStore;
}
public PartitionDiskStore getDataStore() { public PartitionDiskStore getDataStore() {
return dataStore.getDiskStorage(); return dataStore.getDiskStorage();
} }