diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/CustomExportFormatToEntryTransformer.java b/pdb-ui/src/main/java/org/lucares/pdbui/CustomExportFormatToEntryTransformer.java
deleted file mode 100644
index 300c44c..0000000
--- a/pdb-ui/src/main/java/org/lucares/pdbui/CustomExportFormatToEntryTransformer.java
+++ /dev/null
@@ -1,127 +0,0 @@
-package org.lucares.pdbui;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.regex.Pattern;
-
-import org.lucares.pdb.api.Tags;
-import org.lucares.pdb.api.TagsBuilder;
-import org.lucares.pdb.datastore.Entries;
-import org.lucares.pdb.datastore.Entry;
-import org.lucares.performance.db.PdbExport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- * File format goals: Minimal size/ minimal repetition while also providing a
- * file format that can be used for "normal" ingestion, not just backup/restore.
- * It should be easy to implement in any language. It should be easy to debug.
- *
- * Note: Line breaks are written as {@code \n}.
- *
- *
- * # // # is the magic byte for the file format used to detect this format
- * $123:key1=value1,key2=value2\n // $ marks the beginning of a dictionary entry that says: the following number will be used to refer to the following tags.
- * // In this case the tags key1=value1,key2=value2 will be identified by 123.
- * // The newline is used as an end marker.
- * 1534567890,456,123\n // Defines an entry with timestamp 1534567890, duration 456 and tags key1=value1,key2=value2.
- * 1,789,123\n // Timestamps are encoded using delta encoding. That means this triple defines
- * // an entry with timestamp 1534567891, duration 789 and tags key1=value1,key2=value2
- * -2,135,123\n // Timestamp delta encoding can contain negative numbers. This triple defines an entry
- * // with timestamp 1534567889, duration 135 and tags key1=value1,key2=value2
- *
- */
-
-public class CustomExportFormatToEntryTransformer {
-
- private static final int ENTRY_BUFFER_SIZE = 100;
-
- private static final Logger LOGGER = LoggerFactory.getLogger(CustomExportFormatToEntryTransformer.class);
-
- private final Pattern splitByComma = Pattern.compile(",");
-
- private final Map tagsDictionary = new HashMap<>();
-
- private long lastEpochMilli;
-
- public void read(final BufferedReader in, final ArrayBlockingQueue queue) throws IOException {
-
- Entries bufferedEntries = new Entries(ENTRY_BUFFER_SIZE);
-
- try {
- String line;
- while ((line = in.readLine()) != null) {
- try {
- if (line.startsWith(PdbExport.MARKER_DICT_ENTRY)) {
- readDictionaryEntry(line);
- } else {
- final Entry entry = readEntry(line);
- if (entry != null) {
-
- bufferedEntries.add(entry);
-
- if (bufferedEntries.size() == ENTRY_BUFFER_SIZE) {
- queue.put(bufferedEntries);
- bufferedEntries = new Entries(ENTRY_BUFFER_SIZE);
- }
- }
- }
- } catch (final Exception e) {
- LOGGER.error("ignoring line '{}'", line, e);
- }
- queue.put(bufferedEntries);
- bufferedEntries = new Entries(ENTRY_BUFFER_SIZE);
- }
- } catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
- LOGGER.info("aborting because of interruption");
- }
- }
-
- private Entry readEntry(final String line) {
-
- final String[] timeValueTags = splitByComma.split(line);
-
- final long timeDelta = Long.parseLong(timeValueTags[0]);
- final long value = Long.parseLong(timeValueTags[1]);
- final long tagsId = Long.parseLong(timeValueTags[2]);
-
- lastEpochMilli = lastEpochMilli + timeDelta;
-
- final Tags tags = tagsDictionary.get(tagsId);
- if (tags == null) {
- LOGGER.info("no tags available for tagsId {}. Ignoring line '{}'", tagsId, line);
- return null;
- }
-
- return new Entry(lastEpochMilli, value, tags);
- }
-
- private void readDictionaryEntry(final String line) {
- final String[] tagsIdToSerializedTags = line.split(Pattern.quote(PdbExport.SEPARATOR_TAG_ID));
-
- final Long tagId = Long.parseLong(tagsIdToSerializedTags[0], 1, tagsIdToSerializedTags[0].length(), 10);
- final Tags tags = tagsFromCsv(tagsIdToSerializedTags[1]);
- tagsDictionary.put(tagId, tags);
- }
-
- public static Tags tagsFromCsv(final String line) {
-
- final TagsBuilder tagsBuilder = new TagsBuilder();
- final String[] tagsAsString = line.split(Pattern.quote(","));
-
- for (final String tagAsString : tagsAsString) {
- final String[] keyValue = tagAsString.split(Pattern.quote("="));
-
- final int key = Tags.STRING_COMPRESSOR.put(keyValue[0]);
- final int value = Tags.STRING_COMPRESSOR.put(keyValue[1]);
- tagsBuilder.add(key, value);
- }
-
- return tagsBuilder.build();
- }
-}
\ No newline at end of file
diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java b/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java
index 385f50d..2602d71 100644
--- a/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java
+++ b/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java
@@ -18,7 +18,6 @@ import java.util.zip.GZIPInputStream;
import org.lucares.pdb.datastore.Entries;
import org.lucares.pdb.datastore.Entry;
import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions;
-import org.lucares.performance.db.PdbExport;
import com.fasterxml.jackson.core.JsonParseException;
@@ -58,8 +57,6 @@ public final class IngestionHandler implements Callable {
if (firstByte == '{') {
in.reset();
readJSON(in);
- } else if (firstByte == PdbExport.MAGIC_BYTE) {
- readCustomExportFormat(in);
} else if (isGZIP(firstByte)) {
in.reset();
final GZIPInputStream gzip = new GZIPInputStream(in);
@@ -80,15 +77,6 @@ public final class IngestionHandler implements Callable {
return firstByte == 0x1f;
}
- private void readCustomExportFormat(final InputStream in) throws IOException {
-
- final CustomExportFormatToEntryTransformer transformer = new CustomExportFormatToEntryTransformer();
-
- final BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
- transformer.read(reader, queue);
-
- }
-
private void readJSON(final InputStream in) throws IOException, InterruptedException {
final int chunksize = 100;
Entries entries = new Entries(chunksize);
diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java b/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java
index 7d3ad74..9449df0 100644
--- a/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java
+++ b/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java
@@ -25,8 +25,6 @@ import org.junit.jupiter.params.provider.ValueSource;
import org.lucares.collections.LongList;
import org.lucares.pdb.api.DateTimeRange;
import org.lucares.pdb.api.Query;
-import org.lucares.pdb.datastore.internal.DataStore;
-import org.lucares.performance.db.PdbExport;
import org.lucares.performance.db.PerformanceDb;
import org.lucares.utils.file.FileUtils;
import org.slf4j.Logger;
@@ -92,66 +90,6 @@ public class TcpIngestorTest {
}
}
- @Test
- public void testIngestDataViaTcpStream_CustomFormat() throws Exception {
-
- final long dateA = Instant.now().toEpochMilli();
- final long dateB = Instant.now().toEpochMilli() + 1;
- final long dateC = Instant.now().toEpochMilli() - 1;
- final DateTimeRange dateRange = DateTimeRange.relativeMinutes(1);
- final String host = "someHost";
-
- // 1. insert some data
- try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) {
- ingestor.useRandomPort();
- ingestor.start();
-
- final long deltaEpochMilliB = dateB - dateA;
- final long deltaEpochMilliC = dateC - dateB;
-
- final String data = "#$0:host=someHost,pod=somePod\n"//
- + dateA + ",1,0\n"// previous date is 0, therefore the delta is dateA / using tags with id 0
- + "$1:host=someHost,pod=otherPod\n" //
- + deltaEpochMilliB + ",2,1\n" // dates are the delta the the previous date / using tags with id 1
- + deltaEpochMilliC + ",3,0"; // dates are the delta the the previous date / using tags with id 0
-
- PdbTestUtil.send(data, ingestor.getPort());
- } catch (final Exception e) {
- LOGGER.error("", e);
- throw e;
- }
-
- // 2. export the data
- final List exportFiles = PdbExport.export(dataDirectory, dataDirectory.resolve("export"));
-
- // 3. delete database
- FileUtils.delete(dataDirectory.resolve(DataStore.SUBDIR_STORAGE));
-
- // 4. create a new database
- try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) {
- ingestor.useRandomPort();
- ingestor.start();
- for (final Path exportFile : exportFiles) {
- PdbTestUtil.send(exportFile, ingestor.getPort());
- }
- }
-
- // 5. check that the data is correctly inserted
- try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
- final LongList result = db.get(new Query("host=" + host, dateRange)).singleGroup().flatMap();
- Assertions.assertEquals(6, result.size());
-
- Assertions.assertEquals(dateA, result.get(0));
- Assertions.assertEquals(1, result.get(1));
-
- Assertions.assertEquals(dateC, result.get(2));
- Assertions.assertEquals(3, result.get(3));
-
- Assertions.assertEquals(dateB, result.get(4));
- Assertions.assertEquals(2, result.get(5));
- }
- }
-
@Test
public void testIngestionThreadDoesNotDieOnErrors() throws Exception {
final OffsetDateTime dateA = OffsetDateTime.now().minusMinutes(1);
diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PdbExport.java b/performanceDb/src/main/java/org/lucares/performance/db/PdbExport.java
deleted file mode 100644
index 8fa49a0..0000000
--- a/performanceDb/src/main/java/org/lucares/performance/db/PdbExport.java
+++ /dev/null
@@ -1,186 +0,0 @@
-package org.lucares.performance.db;
-
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.time.Duration;
-import java.time.OffsetDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Locale;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Stream;
-import java.util.zip.GZIPOutputStream;
-
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.core.config.Configurator;
-import org.lucares.collections.LongList;
-import org.lucares.pdb.api.DateTimeRange;
-import org.lucares.pdb.api.Query;
-import org.lucares.pdb.api.Tags;
-import org.lucares.pdb.datastore.PdbFile;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class PdbExport {
-
- private static final int KB = 1024;
- private static final int MB = KB * 1024;
- private static final int GB = MB * 1024;
-
- public static final char MAGIC_BYTE = '#';
- public static final char MARKER_DICT_ENTRY_CHAR = '$';
- public static final String MARKER_DICT_ENTRY = String.valueOf(MARKER_DICT_ENTRY_CHAR);
- public static final char SEPARATOR_TAG_ID_CHAR = ':';
- public static final String SEPARATOR_TAG_ID = String.valueOf(SEPARATOR_TAG_ID_CHAR);
-
- private static final Logger LOGGER = LoggerFactory.getLogger(PdbExport.class);
-
- public static void main(final String[] args) throws Exception {
-
- initLogging();
-
- final Path dataDirectory = Paths.get(args[0]);
- final Path backupDir = Paths.get(args[1])
- .resolve(OffsetDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss")));
-
- export(dataDirectory, backupDir);
- }
-
- public static List export(final Path dataDirectory, final Path backupDir) throws Exception {
- final List exportFiles = new ArrayList<>();
- Files.createDirectories(backupDir);
-
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- LOGGER.info("shutdown hook");
- }
-
- });
-
- final OffsetDateTime start = OffsetDateTime.now();
- final String datePrefix = start.format(DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss"));
- final AtomicLong tagsIdCounter = new AtomicLong(0);
- long exportFileCounter = 0;
-
- Path exportFile = null;
- Writer writer = null;
-
- try (final PerformanceDb db = new PerformanceDb(dataDirectory);) {
-
- LOGGER.info("Searching for all files. This may take a while ...");
- final List pdbFiles = db.getFilesForQuery(new Query("", DateTimeRange.max()));
-
- long count = 0;
- long lastEpochMilli = 0;
- long begin = System.currentTimeMillis();
-
- for (final PdbFile pdbFile : pdbFiles) {
-
- if (writer == null || Files.size(exportFile) > 4 * GB) {
- if (writer != null) {
- writer.flush();
- writer.close();
- }
- exportFile = backupDir
- .resolve(String.format(Locale.US, "%s.%05d.pdb.gz", datePrefix, exportFileCounter++));
- exportFiles.add(exportFile);
- writer = createWriter(exportFile);
- LOGGER.info("new export file: {}", exportFile);
-
- lastEpochMilli = 0;
- }
-
- final Stream timeValueStream = PdbFile.toStream(Arrays.asList(pdbFile), db.getDataStore());
-
- final Tags tags = pdbFile.getTags();
- final long tagsId = addNewTagsToDictionary(writer, tags, tagsIdCounter);
-
- final Iterator it = timeValueStream.iterator();
- while (it.hasNext()) {
- final LongList entry = it.next();
-
- for (int i = 0; i < entry.size(); i += 2) {
-
- final long epochMilli = entry.get(i);
- final long value = entry.get(i + 1);
-
- final long epochMilliDiff = epochMilli - lastEpochMilli;
- lastEpochMilli = epochMilli;
-
- writer.write(Long.toString(epochMilliDiff));
- writer.write(',');
- writer.write(Long.toString(value));
- writer.write(',');
- writer.write(Long.toString(tagsId));
- writer.write('\n');
-
- count++;
- final long chunk = 10_000_000;
- if (count % chunk == 0) {
- final long end = System.currentTimeMillis();
- final long duration = end - begin;
- final long entriesPerSecond = (long) (chunk / (duration / 1000.0));
- LOGGER.info("progress: {} - {} entries/s + duration {}",
- String.format(Locale.US, "%,d", count),
- String.format(Locale.US, "%,d", entriesPerSecond), duration);
- begin = System.currentTimeMillis();
- }
- }
- }
- }
-
- LOGGER.info("total: " + count);
-
- } finally {
- if (writer != null) {
- writer.close();
- }
- }
-
- final OffsetDateTime end = OffsetDateTime.now();
-
- LOGGER.info("duration: " + Duration.between(start, end));
- return exportFiles;
- }
-
- private static void initLogging() {
- Configurator.setRootLevel(Level.INFO);
- }
-
- private static long addNewTagsToDictionary(final Writer writer, final Tags tags, final AtomicLong tagsIdCounter)
- throws IOException {
- final long tagsId = tagsIdCounter.getAndIncrement();
-
- writer.write(MARKER_DICT_ENTRY);
- writer.write(Long.toString(tagsId));
- writer.write(SEPARATOR_TAG_ID);
- writer.write(tags.toCsv());
- writer.write('\n');
-
- return tagsId;
- }
-
- private static Writer createWriter(final Path file) {
-
- try {
- final OutputStreamWriter writer = new OutputStreamWriter(
- new GZIPOutputStream(new FileOutputStream(file.toFile()), 4096 * 4), StandardCharsets.UTF_8);
- // initialize file header
- writer.write(MAGIC_BYTE);
- return writer;
-
- } catch (final IOException e) {
- throw new IllegalStateException(e);
- }
- }
-}
\ No newline at end of file