diff --git a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java index f3f6ecf..9da605c 100644 --- a/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java +++ b/data-store/src/main/java/org/lucares/pdb/datastore/internal/DataStore.java @@ -46,7 +46,7 @@ public class DataStore implements AutoCloseable { public static final char LISTING_FILE_SEPARATOR = ','; - private static final String SUBDIR_STORAGE = "storage"; + public static final String SUBDIR_STORAGE = "storage"; // used to generate doc ids that are // a) unique diff --git a/pdb-api/src/main/java/org/lucares/pdb/api/Tags.java b/pdb-api/src/main/java/org/lucares/pdb/api/Tags.java index 44ad3c3..db9f132 100644 --- a/pdb-api/src/main/java/org/lucares/pdb/api/Tags.java +++ b/pdb-api/src/main/java/org/lucares/pdb/api/Tags.java @@ -7,6 +7,7 @@ import java.util.Set; import java.util.TreeSet; import java.util.function.BiConsumer; import java.util.function.Function; +import java.util.regex.Pattern; import org.lucares.collections.IntList; import org.lucares.collections.LongList; @@ -198,6 +199,28 @@ public class Tags implements Comparable { return String.valueOf(tags); } + public String toCsv() { + final List tagsAsStrings = new ArrayList<>(); + for (final Tag tag : tags) { + tagsAsStrings.add(tag.getKeyAsString() + "=" + tag.getValueAsString()); + } + + return String.join(",", tagsAsStrings); + } + + public static Tags fromCsv(final String line) { + + final TagsBuilder tagsBuilder = new TagsBuilder(); + final String[] tagsAsString = line.split(Pattern.quote(",")); + + for (final String tagAsString : tagsAsString) { + final String[] keyValue = tagAsString.split(Pattern.quote("=")); + tagsBuilder.add(keyValue[0], keyValue[1]); + } + + return tagsBuilder.build(); + } + @Override public int hashCode() { final int prime = 31; diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java b/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java index c4cd1ff..2e85c36 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java @@ -4,6 +4,7 @@ import java.io.IOException; import java.util.Optional; import java.util.regex.Pattern; +import org.apache.commons.lang3.StringUtils; import org.lucares.pdb.api.Entry; import org.lucares.pdb.api.Tags; import org.lucares.pdb.api.TagsBuilder; @@ -11,6 +12,7 @@ import org.lucares.pdbui.date.FastISODateParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +// TODO remove? public class CsvToEntryTransformer implements LineToEntryTransformer { private static final Logger LOGGER = LoggerFactory.getLogger(CsvToEntryTransformer.class); @@ -61,7 +63,9 @@ public class CsvToEntryTransformer implements LineToEntryTransformer { duration = Long.parseLong(columns[i]); break; default: - tagsBuilder.add(headers[i], columns[i]); + if (!StringUtils.isBlank(columns[i])) { + tagsBuilder.add(headers[i], columns[i]); + } break; } diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/CustomExportFormatToEntryTransformer.java b/pdb-ui/src/main/java/org/lucares/pdbui/CustomExportFormatToEntryTransformer.java new file mode 100644 index 0000000..c320078 --- /dev/null +++ b/pdb-ui/src/main/java/org/lucares/pdbui/CustomExportFormatToEntryTransformer.java @@ -0,0 +1,110 @@ +package org.lucares.pdbui; + +import java.io.BufferedReader; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.regex.Pattern; + +import org.lucares.pdb.api.Entries; +import org.lucares.pdb.api.Entry; +import org.lucares.pdb.api.Tags; +import org.lucares.performance.db.PdbExport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * File format goals: Minimal size/ minimal repetition while also providing a + * file format that can be used for "normal" ingestion, not just backup/restore. + * It should be easy to implement in any language. It should be easy to debug. + *

+ * Note: Line breaks are written as {@code \n}. + * + *

+ * #                                    // # is the magic byte for the file format used to detect this format
+ * $123:key1=value1,key2=value2\n       // $ marks the beginning of a dictionary entry that says: the following number will be used to refer to the following tags.
+ *                                      // In this case the tags key1=value1,key2=value2 will be identified by 123.
+ *                                      // The newline is used as an end marker.
+ * 1534567890,456,123\n                 // Defines an entry with timestamp 1534567890, duration 456 and tags key1=value1,key2=value2.
+ * 1,789,123\n                          // Timestamps are encoded using delta encoding. That means this triple defines
+ *                                      // an entry with timestamp 1534567891, duration 789 and tags key1=value1,key2=value2
+ * -2,135,123\n                         // Timestamp delta encoding can contain negative numbers. This triple defines an entry
+ *                                      // with timestamp 1534567889, duration 135 and tags key1=value1,key2=value2
+ * 
+ */ + +public class CustomExportFormatToEntryTransformer { + + private static final int ENTRY_BUFFER_SIZE = 100; + + private static final Logger LOGGER = LoggerFactory.getLogger(CustomExportFormatToEntryTransformer.class); + + private final Pattern splitByComma = Pattern.compile(","); + + private final Map tagsDictionary = new HashMap<>(); + + private long lastEpochMilli; + + public void read(final BufferedReader in, final ArrayBlockingQueue queue) throws IOException { + + Entries bufferedEntries = new Entries(ENTRY_BUFFER_SIZE); + + try { + String line; + while ((line = in.readLine()) != null) { + try { + if (line.startsWith(PdbExport.MARKER_DICT_ENTRY)) { + readDictionaryEntry(line); + } else { + final Entry entry = readEntry(line); + if (entry != null) { + + bufferedEntries.add(entry); + + if (bufferedEntries.size() == ENTRY_BUFFER_SIZE) { + queue.put(bufferedEntries); + bufferedEntries = new Entries(ENTRY_BUFFER_SIZE); + } + } + } + } catch (final Exception e) { + LOGGER.error("ignoring line '{}'", line, e); + } + queue.put(bufferedEntries); + bufferedEntries = new Entries(ENTRY_BUFFER_SIZE); + } + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.info("aborting because of interruption"); + } + } + + private Entry readEntry(final String line) { + + final String[] timeValueTags = splitByComma.split(line); + + final long timeDelta = Long.parseLong(timeValueTags[0]); + final long value = Long.parseLong(timeValueTags[1]); + final long tagsId = Long.parseLong(timeValueTags[2]); + + lastEpochMilli = lastEpochMilli + timeDelta; + + final Tags tags = tagsDictionary.get(tagsId); + if (tags == null) { + LOGGER.info("no tags available for tagsId {}. Ignoring line '{}'", tagsId, line); + return null; + } + + return new Entry(lastEpochMilli, value, tags); + } + + private void readDictionaryEntry(final String line) { + final String[] tagsIdToSerializedTags = line.split(Pattern.quote(PdbExport.SEPARATOR_TAG_ID)); + + final Long tagId = Long.parseLong(tagsIdToSerializedTags[0], 1, tagsIdToSerializedTags[0].length(), 10); + final Tags tags = Tags.fromCsv(tagsIdToSerializedTags[1]); + tagsDictionary.put(tagId, tags); + } +} \ No newline at end of file diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java b/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java index 43d1745..a076415 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java @@ -1,5 +1,6 @@ package org.lucares.pdbui; +import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; @@ -18,6 +19,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.zip.GZIPInputStream; import javax.annotation.PreDestroy; @@ -28,6 +30,7 @@ import org.lucares.pdb.api.Tags; import org.lucares.pdb.api.TagsBuilder; import org.lucares.pdbui.date.FastISODateParser; import org.lucares.performance.db.BlockingQueueIterator; +import org.lucares.performance.db.PdbExport; import org.lucares.performance.db.PerformanceDb; import org.lucares.recommind.logs.Config; import org.slf4j.Logger; @@ -69,15 +72,10 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { Thread.currentThread().setName("worker-" + clientAddress); LOGGER.debug("opening streams to client"); try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true); - InputStream in = clientSocket.getInputStream();) { + InputStream in = new BufferedInputStream(clientSocket.getInputStream());) { LOGGER.debug("reading from stream"); - final byte firstByte = (byte) in.read(); - if (firstByte == '{') { - readJSON(in); - } else { - readCSV(in, firstByte); - } + redirectInputStream(in); LOGGER.debug("connection closed: " + clientAddress); } catch (final Throwable e) { @@ -88,6 +86,40 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { return null; } + private void redirectInputStream(final InputStream in) throws IOException, InterruptedException { + in.mark(1); + final byte firstByte = (byte) in.read(); + if (firstByte == '{') { + readJSON(in); + } else if (firstByte == PdbExport.MAGIC_BYTE) { + + readCustomExportFormat(in); + } else if (isGZIP(firstByte)) { + in.reset(); + final GZIPInputStream gzip = new GZIPInputStream(in); + + redirectInputStream(gzip); + } else { + readCSV(in, firstByte); + } + } + + private boolean isGZIP(final byte firstByte) { + // GZIP starts with 0x1f, 0x8b, see https://www.ietf.org/rfc/rfc1952.txt section + // 2.3.1 + // I am cheap and only check the first byte + return firstByte == 0x1f; + } + + private void readCustomExportFormat(final InputStream in) throws IOException { + + final CustomExportFormatToEntryTransformer transformer = new CustomExportFormatToEntryTransformer(); + + final BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)); + transformer.read(reader, queue); + + } + private void readCSV(final InputStream in, final byte firstByte) throws IOException, InterruptedException { final int chunksize = 1000; Entries entries = new Entries(chunksize); @@ -196,7 +228,7 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { epochMilli = dateParser.parseAsEpochMilli(line, lastSeparatorPosition + 1); } else if (key == keyDuration) { duration = parseLong(line, lastSeparatorPosition + 1); - } else { + } else if (lastSeparatorPosition + 1 < separatorPosition) { // value is not empty final int value = Tags.STRING_COMPRESSOR.put(line, lastSeparatorPosition + 1, separatorPosition); diff --git a/pdb-ui/src/test/java/org/lucares/performance/db/ingestor/PdbTestUtil.java b/pdb-ui/src/test/java/org/lucares/performance/db/ingestor/PdbTestUtil.java index a4aa974..3920ec7 100644 --- a/pdb-ui/src/test/java/org/lucares/performance/db/ingestor/PdbTestUtil.java +++ b/pdb-ui/src/test/java/org/lucares/performance/db/ingestor/PdbTestUtil.java @@ -4,8 +4,11 @@ import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.nio.channels.SocketChannel; import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -132,6 +135,24 @@ public class PdbTestUtil { LOGGER.trace("closed sender connection"); } + public static void send(final Path file) throws IOException { + final SocketChannel outputChannel = connect(); + + try (final FileChannel inputChannel = FileChannel.open(file, StandardOpenOption.READ)) { + inputChannel.transferTo(0, Long.MAX_VALUE, outputChannel); + } + + try { + // ugly workaround: the channel was closed too early and not all + // data was received + TimeUnit.MILLISECONDS.sleep(10); + } catch (final InterruptedException e) { + throw new IllegalStateException(e); + } + outputChannel.close(); + LOGGER.trace("closed sender connection"); + } + private static SocketChannel connect() throws IOException { SocketChannel result = null; diff --git a/pdb-ui/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java b/pdb-ui/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java index 38a2186..2cfa305 100644 --- a/pdb-ui/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java +++ b/pdb-ui/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java @@ -18,7 +18,9 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadLocalRandom; import org.lucares.collections.LongList; +import org.lucares.pdb.datastore.internal.DataStore; import org.lucares.pdbui.TcpIngestor; +import org.lucares.performance.db.PdbExport; import org.lucares.performance.db.PerformanceDb; import org.lucares.utils.file.FileUtils; import org.slf4j.Logger; @@ -88,6 +90,63 @@ public class TcpIngestorTest { } } + public void testIngestDataViaTcpStream_CustomFormat() throws Exception { + + final long dateA = Instant.now().toEpochMilli(); + final long dateB = Instant.now().toEpochMilli() + 1; + final long dateC = Instant.now().toEpochMilli() - 1; + final String host = "someHost"; + + // 1. insert some data + try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { + + ingestor.start(); + + final long deltaEpochMilliB = dateB - dateA; + final long deltaEpochMilliC = dateC - dateB; + + final String data = "#$0:host=someHost,pod=somePod\n"// + + dateA + ",1,0\n"// previous date is 0, therefore the delta is dateA / using tags with id 0 + + "$1:host=someHost,pod=otherPod\n" // + + deltaEpochMilliB + ",2,1\n" // dates are the delta the the previous date / using tags with id 1 + + deltaEpochMilliC + ",3,0"; // dates are the delta the the previous date / using tags with id 0 + + PdbTestUtil.send(data); + } catch (final Exception e) { + LOGGER.error("", e); + throw e; + } + + // 2. export the data + final List exportFiles = PdbExport.export(dataDirectory, dataDirectory.resolve("export")); + + // 3. delete database + FileUtils.delete(dataDirectory.resolve(DataStore.SUBDIR_STORAGE)); + + // 4. create a new database + try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { + ingestor.start(); + for (final Path exportFile : exportFiles) { + PdbTestUtil.send(exportFile); + } + } + + // 5. check that the data is correctly inserted + try (PerformanceDb db = new PerformanceDb(dataDirectory)) { + final LongList result = db.get("host=" + host).singleGroup().flatMap(); + Assert.assertEquals(result.size(), 6); + + Assert.assertEquals(result.get(0), dateA); + Assert.assertEquals(result.get(1), 1); + + Assert.assertEquals(result.get(2), dateC); + Assert.assertEquals(result.get(3), 3); + + Assert.assertEquals(result.get(4), dateB); + Assert.assertEquals(result.get(5), 2); + } + } + @Test public void testIngestionThreadDoesNotDieOnErrors() throws Exception { final OffsetDateTime dateA = OffsetDateTime.ofInstant(Instant.ofEpochMilli(-1), ZoneOffset.UTC); diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PdbExport.java b/performanceDb/src/main/java/org/lucares/performance/db/PdbExport.java new file mode 100644 index 0000000..5173539 --- /dev/null +++ b/performanceDb/src/main/java/org/lucares/performance/db/PdbExport.java @@ -0,0 +1,171 @@ +package org.lucares.performance.db; + +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; +import java.util.zip.GZIPOutputStream; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.core.config.Configurator; +import org.lucares.collections.LongList; +import org.lucares.pdb.api.Tags; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PdbExport { + + private static final int KB = 1024; + private static final int MB = KB * 1024; + private static final int GB = MB * 1024; + + public static final char MAGIC_BYTE = '#'; + public static final char MARKER_DICT_ENTRY_CHAR = '$'; + public static final String MARKER_DICT_ENTRY = String.valueOf(MARKER_DICT_ENTRY_CHAR); + public static final char SEPARATOR_TAG_ID_CHAR = ':'; + public static final String SEPARATOR_TAG_ID = String.valueOf(SEPARATOR_TAG_ID_CHAR); + + private static final Logger LOGGER = LoggerFactory.getLogger(PdbExport.class); + + public static void main(final String[] args) throws Exception { + + initLogging(); + + final Path dataDirectory = Paths.get(args[0]); + final Path backupDir = Paths.get(args[1]) + .resolve(OffsetDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss"))); + + export(dataDirectory, backupDir); + } + + public static List export(final Path dataDirectory, final Path backupDir) throws Exception { + final List exportFiles = new ArrayList<>(); + Files.createDirectories(backupDir); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + LOGGER.info("shutdown hook"); + } + + }); + + final OffsetDateTime start = OffsetDateTime.now(); + final String datePrefix = start.format(DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss")); + final AtomicLong tagsIdCounter = new AtomicLong(0); + long exportFileCounter = 0; + + Path exportFile = null; + Writer writer = null; + + try (final PerformanceDb db = new PerformanceDb(dataDirectory);) { + + LOGGER.info("Searching for all files. This may take a while ..."); + final List pdbFiles = db.getFilesForQuery(""); + + long count = 0; + long lastEpochMilli = 0; + + for (final PdbFile pdbFile : pdbFiles) { + + if (writer == null || Files.size(exportFile) > GB) { + if (writer != null) { + writer.flush(); + writer.close(); + } + exportFile = backupDir.resolve(String.format("%s.%05d.pdb.gz", datePrefix, exportFileCounter++)); + exportFiles.add(exportFile); + writer = createWriter(exportFile); + LOGGER.info("new export file: {}", exportFile); + } + + final Stream timeValueStream = PdbFile.toStream(Arrays.asList(pdbFile), db.getDataStore()); + + final Tags tags = pdbFile.getTags(); + final long tagsId = addNewTagsToDictionary(writer, tags, tagsIdCounter); + + final Iterator it = timeValueStream.iterator(); + while (it.hasNext()) { + final LongList entry = it.next(); + + for (int i = 0; i < entry.size(); i += 2) { + + final long epochMilli = entry.get(i); + final long value = entry.get(i + 1); + + final long epochMilliDiff = epochMilli - lastEpochMilli; + lastEpochMilli = epochMilli; + + writer.write(Long.toString(epochMilliDiff)); + writer.write(','); + writer.write(Long.toString(value)); + writer.write(','); + writer.write(Long.toString(tagsId)); + writer.write('\n'); + + count++; + if (count % 100000 == 0) { + LOGGER.info("progress: " + count); + } + } + } + } + + LOGGER.info("total: " + count); + + } finally { + if (writer != null) { + writer.close(); + } + } + + final OffsetDateTime end = OffsetDateTime.now(); + + LOGGER.info("duration: " + Duration.between(start, end)); + return exportFiles; + } + + private static void initLogging() { + Configurator.setRootLevel(Level.INFO); + } + + private static long addNewTagsToDictionary(final Writer writer, final Tags tags, final AtomicLong tagsIdCounter) + throws IOException { + final long tagsId = tagsIdCounter.getAndIncrement(); + + writer.write(MARKER_DICT_ENTRY); + writer.write(Long.toString(tagsId)); + writer.write(SEPARATOR_TAG_ID); + writer.write(tags.toCsv()); + writer.write('\n'); + + return tagsId; + } + + private static Writer createWriter(final Path file) { + + try { + final OutputStreamWriter writer = new OutputStreamWriter( + new GZIPOutputStream(new FileOutputStream(file.toFile()), 4096 * 4), StandardCharsets.UTF_8); + // initialize file header + writer.write(MAGIC_BYTE); + return writer; + + } catch (final IOException e) { + throw new IllegalStateException(e); + } + } +} \ No newline at end of file diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java index dc71288..0c658ef 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java @@ -20,6 +20,7 @@ import org.lucares.pdb.api.Tags; import org.lucares.pdb.datastore.Proposal; import org.lucares.pdb.datastore.internal.DataStore; import org.lucares.pdb.datastore.lang.SyntaxException; +import org.lucares.pdb.diskstorage.DiskStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -122,6 +123,10 @@ public class PerformanceDb implements AutoCloseable { return get(query, Grouping.NO_GROUPING); } + public List getFilesForQuery(final String query) { + return tagsToFile.getFilesForQuery(query); + } + /** * Return the entries as an unbound, ordered and non-parallel stream. * @@ -177,4 +182,8 @@ public class PerformanceDb implements AutoCloseable { public SortedSet getFieldsValues(final String query, final String fieldName) { return dataStore.getAvailableValuesForKey(query, fieldName); } + + public DiskStorage getDataStore() { + return dataStore.getDiskStorage(); + } }