From ee79cb00221574cd85bb4b3f4cb686496e5eb067 Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Wed, 12 May 2021 18:20:34 +0200 Subject: [PATCH] cleanup after revert --- .../CustomExportFormatToEntryTransformer.java | 127 ------------ .../org/lucares/pdbui/IngestionHandler.java | 12 -- .../org/lucares/pdbui/TcpIngestorTest.java | 62 ------ .../org/lucares/performance/db/PdbExport.java | 186 ------------------ 4 files changed, 387 deletions(-) delete mode 100644 pdb-ui/src/main/java/org/lucares/pdbui/CustomExportFormatToEntryTransformer.java delete mode 100644 performanceDb/src/main/java/org/lucares/performance/db/PdbExport.java diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/CustomExportFormatToEntryTransformer.java b/pdb-ui/src/main/java/org/lucares/pdbui/CustomExportFormatToEntryTransformer.java deleted file mode 100644 index 300c44c..0000000 --- a/pdb-ui/src/main/java/org/lucares/pdbui/CustomExportFormatToEntryTransformer.java +++ /dev/null @@ -1,127 +0,0 @@ -package org.lucares.pdbui; - -import java.io.BufferedReader; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.regex.Pattern; - -import org.lucares.pdb.api.Tags; -import org.lucares.pdb.api.TagsBuilder; -import org.lucares.pdb.datastore.Entries; -import org.lucares.pdb.datastore.Entry; -import org.lucares.performance.db.PdbExport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * - * File format goals: Minimal size/ minimal repetition while also providing a - * file format that can be used for "normal" ingestion, not just backup/restore. - * It should be easy to implement in any language. It should be easy to debug. - *

- * Note: Line breaks are written as {@code \n}. - * - *

- * #                                    // # is the magic byte for the file format used to detect this format
- * $123:key1=value1,key2=value2\n       // $ marks the beginning of a dictionary entry that says: the following number will be used to refer to the following tags.
- *                                      // In this case the tags key1=value1,key2=value2 will be identified by 123.
- *                                      // The newline is used as an end marker.
- * 1534567890,456,123\n                 // Defines an entry with timestamp 1534567890, duration 456 and tags key1=value1,key2=value2.
- * 1,789,123\n                          // Timestamps are encoded using delta encoding. That means this triple defines
- *                                      // an entry with timestamp 1534567891, duration 789 and tags key1=value1,key2=value2
- * -2,135,123\n                         // Timestamp delta encoding can contain negative numbers. This triple defines an entry
- *                                      // with timestamp 1534567889, duration 135 and tags key1=value1,key2=value2
- * 
- */ - -public class CustomExportFormatToEntryTransformer { - - private static final int ENTRY_BUFFER_SIZE = 100; - - private static final Logger LOGGER = LoggerFactory.getLogger(CustomExportFormatToEntryTransformer.class); - - private final Pattern splitByComma = Pattern.compile(","); - - private final Map tagsDictionary = new HashMap<>(); - - private long lastEpochMilli; - - public void read(final BufferedReader in, final ArrayBlockingQueue queue) throws IOException { - - Entries bufferedEntries = new Entries(ENTRY_BUFFER_SIZE); - - try { - String line; - while ((line = in.readLine()) != null) { - try { - if (line.startsWith(PdbExport.MARKER_DICT_ENTRY)) { - readDictionaryEntry(line); - } else { - final Entry entry = readEntry(line); - if (entry != null) { - - bufferedEntries.add(entry); - - if (bufferedEntries.size() == ENTRY_BUFFER_SIZE) { - queue.put(bufferedEntries); - bufferedEntries = new Entries(ENTRY_BUFFER_SIZE); - } - } - } - } catch (final Exception e) { - LOGGER.error("ignoring line '{}'", line, e); - } - queue.put(bufferedEntries); - bufferedEntries = new Entries(ENTRY_BUFFER_SIZE); - } - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - LOGGER.info("aborting because of interruption"); - } - } - - private Entry readEntry(final String line) { - - final String[] timeValueTags = splitByComma.split(line); - - final long timeDelta = Long.parseLong(timeValueTags[0]); - final long value = Long.parseLong(timeValueTags[1]); - final long tagsId = Long.parseLong(timeValueTags[2]); - - lastEpochMilli = lastEpochMilli + timeDelta; - - final Tags tags = tagsDictionary.get(tagsId); - if (tags == null) { - LOGGER.info("no tags available for tagsId {}. Ignoring line '{}'", tagsId, line); - return null; - } - - return new Entry(lastEpochMilli, value, tags); - } - - private void readDictionaryEntry(final String line) { - final String[] tagsIdToSerializedTags = line.split(Pattern.quote(PdbExport.SEPARATOR_TAG_ID)); - - final Long tagId = Long.parseLong(tagsIdToSerializedTags[0], 1, tagsIdToSerializedTags[0].length(), 10); - final Tags tags = tagsFromCsv(tagsIdToSerializedTags[1]); - tagsDictionary.put(tagId, tags); - } - - public static Tags tagsFromCsv(final String line) { - - final TagsBuilder tagsBuilder = new TagsBuilder(); - final String[] tagsAsString = line.split(Pattern.quote(",")); - - for (final String tagAsString : tagsAsString) { - final String[] keyValue = tagAsString.split(Pattern.quote("=")); - - final int key = Tags.STRING_COMPRESSOR.put(keyValue[0]); - final int value = Tags.STRING_COMPRESSOR.put(keyValue[1]); - tagsBuilder.add(key, value); - } - - return tagsBuilder.build(); - } -} \ No newline at end of file diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java b/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java index 385f50d..2602d71 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java @@ -18,7 +18,6 @@ import java.util.zip.GZIPInputStream; import org.lucares.pdb.datastore.Entries; import org.lucares.pdb.datastore.Entry; import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions; -import org.lucares.performance.db.PdbExport; import com.fasterxml.jackson.core.JsonParseException; @@ -58,8 +57,6 @@ public final class IngestionHandler implements Callable { if (firstByte == '{') { in.reset(); readJSON(in); - } else if (firstByte == PdbExport.MAGIC_BYTE) { - readCustomExportFormat(in); } else if (isGZIP(firstByte)) { in.reset(); final GZIPInputStream gzip = new GZIPInputStream(in); @@ -80,15 +77,6 @@ public final class IngestionHandler implements Callable { return firstByte == 0x1f; } - private void readCustomExportFormat(final InputStream in) throws IOException { - - final CustomExportFormatToEntryTransformer transformer = new CustomExportFormatToEntryTransformer(); - - final BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)); - transformer.read(reader, queue); - - } - private void readJSON(final InputStream in) throws IOException, InterruptedException { final int chunksize = 100; Entries entries = new Entries(chunksize); diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java b/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java index 7d3ad74..9449df0 100644 --- a/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java +++ b/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java @@ -25,8 +25,6 @@ import org.junit.jupiter.params.provider.ValueSource; import org.lucares.collections.LongList; import org.lucares.pdb.api.DateTimeRange; import org.lucares.pdb.api.Query; -import org.lucares.pdb.datastore.internal.DataStore; -import org.lucares.performance.db.PdbExport; import org.lucares.performance.db.PerformanceDb; import org.lucares.utils.file.FileUtils; import org.slf4j.Logger; @@ -92,66 +90,6 @@ public class TcpIngestorTest { } } - @Test - public void testIngestDataViaTcpStream_CustomFormat() throws Exception { - - final long dateA = Instant.now().toEpochMilli(); - final long dateB = Instant.now().toEpochMilli() + 1; - final long dateC = Instant.now().toEpochMilli() - 1; - final DateTimeRange dateRange = DateTimeRange.relativeMinutes(1); - final String host = "someHost"; - - // 1. insert some data - try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { - ingestor.useRandomPort(); - ingestor.start(); - - final long deltaEpochMilliB = dateB - dateA; - final long deltaEpochMilliC = dateC - dateB; - - final String data = "#$0:host=someHost,pod=somePod\n"// - + dateA + ",1,0\n"// previous date is 0, therefore the delta is dateA / using tags with id 0 - + "$1:host=someHost,pod=otherPod\n" // - + deltaEpochMilliB + ",2,1\n" // dates are the delta the the previous date / using tags with id 1 - + deltaEpochMilliC + ",3,0"; // dates are the delta the the previous date / using tags with id 0 - - PdbTestUtil.send(data, ingestor.getPort()); - } catch (final Exception e) { - LOGGER.error("", e); - throw e; - } - - // 2. export the data - final List exportFiles = PdbExport.export(dataDirectory, dataDirectory.resolve("export")); - - // 3. delete database - FileUtils.delete(dataDirectory.resolve(DataStore.SUBDIR_STORAGE)); - - // 4. create a new database - try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { - ingestor.useRandomPort(); - ingestor.start(); - for (final Path exportFile : exportFiles) { - PdbTestUtil.send(exportFile, ingestor.getPort()); - } - } - - // 5. check that the data is correctly inserted - try (PerformanceDb db = new PerformanceDb(dataDirectory)) { - final LongList result = db.get(new Query("host=" + host, dateRange)).singleGroup().flatMap(); - Assertions.assertEquals(6, result.size()); - - Assertions.assertEquals(dateA, result.get(0)); - Assertions.assertEquals(1, result.get(1)); - - Assertions.assertEquals(dateC, result.get(2)); - Assertions.assertEquals(3, result.get(3)); - - Assertions.assertEquals(dateB, result.get(4)); - Assertions.assertEquals(2, result.get(5)); - } - } - @Test public void testIngestionThreadDoesNotDieOnErrors() throws Exception { final OffsetDateTime dateA = OffsetDateTime.now().minusMinutes(1); diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PdbExport.java b/performanceDb/src/main/java/org/lucares/performance/db/PdbExport.java deleted file mode 100644 index 8fa49a0..0000000 --- a/performanceDb/src/main/java/org/lucares/performance/db/PdbExport.java +++ /dev/null @@ -1,186 +0,0 @@ -package org.lucares.performance.db; - -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.Writer; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.time.Duration; -import java.time.OffsetDateTime; -import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.Locale; -import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Stream; -import java.util.zip.GZIPOutputStream; - -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.core.config.Configurator; -import org.lucares.collections.LongList; -import org.lucares.pdb.api.DateTimeRange; -import org.lucares.pdb.api.Query; -import org.lucares.pdb.api.Tags; -import org.lucares.pdb.datastore.PdbFile; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class PdbExport { - - private static final int KB = 1024; - private static final int MB = KB * 1024; - private static final int GB = MB * 1024; - - public static final char MAGIC_BYTE = '#'; - public static final char MARKER_DICT_ENTRY_CHAR = '$'; - public static final String MARKER_DICT_ENTRY = String.valueOf(MARKER_DICT_ENTRY_CHAR); - public static final char SEPARATOR_TAG_ID_CHAR = ':'; - public static final String SEPARATOR_TAG_ID = String.valueOf(SEPARATOR_TAG_ID_CHAR); - - private static final Logger LOGGER = LoggerFactory.getLogger(PdbExport.class); - - public static void main(final String[] args) throws Exception { - - initLogging(); - - final Path dataDirectory = Paths.get(args[0]); - final Path backupDir = Paths.get(args[1]) - .resolve(OffsetDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss"))); - - export(dataDirectory, backupDir); - } - - public static List export(final Path dataDirectory, final Path backupDir) throws Exception { - final List exportFiles = new ArrayList<>(); - Files.createDirectories(backupDir); - - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - LOGGER.info("shutdown hook"); - } - - }); - - final OffsetDateTime start = OffsetDateTime.now(); - final String datePrefix = start.format(DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss")); - final AtomicLong tagsIdCounter = new AtomicLong(0); - long exportFileCounter = 0; - - Path exportFile = null; - Writer writer = null; - - try (final PerformanceDb db = new PerformanceDb(dataDirectory);) { - - LOGGER.info("Searching for all files. This may take a while ..."); - final List pdbFiles = db.getFilesForQuery(new Query("", DateTimeRange.max())); - - long count = 0; - long lastEpochMilli = 0; - long begin = System.currentTimeMillis(); - - for (final PdbFile pdbFile : pdbFiles) { - - if (writer == null || Files.size(exportFile) > 4 * GB) { - if (writer != null) { - writer.flush(); - writer.close(); - } - exportFile = backupDir - .resolve(String.format(Locale.US, "%s.%05d.pdb.gz", datePrefix, exportFileCounter++)); - exportFiles.add(exportFile); - writer = createWriter(exportFile); - LOGGER.info("new export file: {}", exportFile); - - lastEpochMilli = 0; - } - - final Stream timeValueStream = PdbFile.toStream(Arrays.asList(pdbFile), db.getDataStore()); - - final Tags tags = pdbFile.getTags(); - final long tagsId = addNewTagsToDictionary(writer, tags, tagsIdCounter); - - final Iterator it = timeValueStream.iterator(); - while (it.hasNext()) { - final LongList entry = it.next(); - - for (int i = 0; i < entry.size(); i += 2) { - - final long epochMilli = entry.get(i); - final long value = entry.get(i + 1); - - final long epochMilliDiff = epochMilli - lastEpochMilli; - lastEpochMilli = epochMilli; - - writer.write(Long.toString(epochMilliDiff)); - writer.write(','); - writer.write(Long.toString(value)); - writer.write(','); - writer.write(Long.toString(tagsId)); - writer.write('\n'); - - count++; - final long chunk = 10_000_000; - if (count % chunk == 0) { - final long end = System.currentTimeMillis(); - final long duration = end - begin; - final long entriesPerSecond = (long) (chunk / (duration / 1000.0)); - LOGGER.info("progress: {} - {} entries/s + duration {}", - String.format(Locale.US, "%,d", count), - String.format(Locale.US, "%,d", entriesPerSecond), duration); - begin = System.currentTimeMillis(); - } - } - } - } - - LOGGER.info("total: " + count); - - } finally { - if (writer != null) { - writer.close(); - } - } - - final OffsetDateTime end = OffsetDateTime.now(); - - LOGGER.info("duration: " + Duration.between(start, end)); - return exportFiles; - } - - private static void initLogging() { - Configurator.setRootLevel(Level.INFO); - } - - private static long addNewTagsToDictionary(final Writer writer, final Tags tags, final AtomicLong tagsIdCounter) - throws IOException { - final long tagsId = tagsIdCounter.getAndIncrement(); - - writer.write(MARKER_DICT_ENTRY); - writer.write(Long.toString(tagsId)); - writer.write(SEPARATOR_TAG_ID); - writer.write(tags.toCsv()); - writer.write('\n'); - - return tagsId; - } - - private static Writer createWriter(final Path file) { - - try { - final OutputStreamWriter writer = new OutputStreamWriter( - new GZIPOutputStream(new FileOutputStream(file.toFile()), 4096 * 4), StandardCharsets.UTF_8); - // initialize file header - writer.write(MAGIC_BYTE); - return writer; - - } catch (final IOException e) { - throw new IllegalStateException(e); - } - } -} \ No newline at end of file