introduced a new custom file format used for backup and ingestion

The new file format reduces repetition, is easy to parse,
easy to generate in any language and is human readable.
This commit is contained in:
2019-02-03 15:36:28 +01:00
parent 1d8ca0e21c
commit 668d73c926
9 changed files with 439 additions and 10 deletions

View File

@@ -46,7 +46,7 @@ public class DataStore implements AutoCloseable {
public static final char LISTING_FILE_SEPARATOR = ',';
private static final String SUBDIR_STORAGE = "storage";
public static final String SUBDIR_STORAGE = "storage";
// used to generate doc ids that are
// a) unique

View File

@@ -7,6 +7,7 @@ import java.util.Set;
import java.util.TreeSet;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.regex.Pattern;
import org.lucares.collections.IntList;
import org.lucares.collections.LongList;
@@ -198,6 +199,28 @@ public class Tags implements Comparable<Tags> {
return String.valueOf(tags);
}
public String toCsv() {
final List<String> tagsAsStrings = new ArrayList<>();
for (final Tag tag : tags) {
tagsAsStrings.add(tag.getKeyAsString() + "=" + tag.getValueAsString());
}
return String.join(",", tagsAsStrings);
}
public static Tags fromCsv(final String line) {
final TagsBuilder tagsBuilder = new TagsBuilder();
final String[] tagsAsString = line.split(Pattern.quote(","));
for (final String tagAsString : tagsAsString) {
final String[] keyValue = tagAsString.split(Pattern.quote("="));
tagsBuilder.add(keyValue[0], keyValue[1]);
}
return tagsBuilder.build();
}
@Override
public int hashCode() {
final int prime = 31;

View File

@@ -4,6 +4,7 @@ import java.io.IOException;
import java.util.Optional;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.lucares.pdb.api.Entry;
import org.lucares.pdb.api.Tags;
import org.lucares.pdb.api.TagsBuilder;
@@ -11,6 +12,7 @@ import org.lucares.pdbui.date.FastISODateParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// TODO remove?
public class CsvToEntryTransformer implements LineToEntryTransformer {
private static final Logger LOGGER = LoggerFactory.getLogger(CsvToEntryTransformer.class);
@@ -61,7 +63,9 @@ public class CsvToEntryTransformer implements LineToEntryTransformer {
duration = Long.parseLong(columns[i]);
break;
default:
tagsBuilder.add(headers[i], columns[i]);
if (!StringUtils.isBlank(columns[i])) {
tagsBuilder.add(headers[i], columns[i]);
}
break;
}

View File

@@ -0,0 +1,110 @@
package org.lucares.pdbui;
import java.io.BufferedReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.regex.Pattern;
import org.lucares.pdb.api.Entries;
import org.lucares.pdb.api.Entry;
import org.lucares.pdb.api.Tags;
import org.lucares.performance.db.PdbExport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* File format goals: Minimal size/ minimal repetition while also providing a
* file format that can be used for "normal" ingestion, not just backup/restore.
* It should be easy to implement in any language. It should be easy to debug.
* <p>
* Note: Line breaks are written as {@code \n}.
*
* <pre>
* # // # is the magic byte for the file format used to detect this format
* $123:key1=value1,key2=value2\n // $ marks the beginning of a dictionary entry that says: the following number will be used to refer to the following tags.
* // In this case the tags key1=value1,key2=value2 will be identified by 123.
* // The newline is used as an end marker.
* 1534567890,456,123\n // Defines an entry with timestamp 1534567890, duration 456 and tags key1=value1,key2=value2.
* 1,789,123\n // Timestamps are encoded using delta encoding. That means this triple defines
* // an entry with timestamp 1534567891, duration 789 and tags key1=value1,key2=value2
* -2,135,123\n // Timestamp delta encoding can contain negative numbers. This triple defines an entry
* // with timestamp 1534567889, duration 135 and tags key1=value1,key2=value2
* </pre>
*/
public class CustomExportFormatToEntryTransformer {
private static final int ENTRY_BUFFER_SIZE = 100;
private static final Logger LOGGER = LoggerFactory.getLogger(CustomExportFormatToEntryTransformer.class);
private final Pattern splitByComma = Pattern.compile(",");
private final Map<Long, Tags> tagsDictionary = new HashMap<>();
private long lastEpochMilli;
public void read(final BufferedReader in, final ArrayBlockingQueue<Entries> queue) throws IOException {
Entries bufferedEntries = new Entries(ENTRY_BUFFER_SIZE);
try {
String line;
while ((line = in.readLine()) != null) {
try {
if (line.startsWith(PdbExport.MARKER_DICT_ENTRY)) {
readDictionaryEntry(line);
} else {
final Entry entry = readEntry(line);
if (entry != null) {
bufferedEntries.add(entry);
if (bufferedEntries.size() == ENTRY_BUFFER_SIZE) {
queue.put(bufferedEntries);
bufferedEntries = new Entries(ENTRY_BUFFER_SIZE);
}
}
}
} catch (final Exception e) {
LOGGER.error("ignoring line '{}'", line, e);
}
queue.put(bufferedEntries);
bufferedEntries = new Entries(ENTRY_BUFFER_SIZE);
}
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.info("aborting because of interruption");
}
}
private Entry readEntry(final String line) {
final String[] timeValueTags = splitByComma.split(line);
final long timeDelta = Long.parseLong(timeValueTags[0]);
final long value = Long.parseLong(timeValueTags[1]);
final long tagsId = Long.parseLong(timeValueTags[2]);
lastEpochMilli = lastEpochMilli + timeDelta;
final Tags tags = tagsDictionary.get(tagsId);
if (tags == null) {
LOGGER.info("no tags available for tagsId {}. Ignoring line '{}'", tagsId, line);
return null;
}
return new Entry(lastEpochMilli, value, tags);
}
private void readDictionaryEntry(final String line) {
final String[] tagsIdToSerializedTags = line.split(Pattern.quote(PdbExport.SEPARATOR_TAG_ID));
final Long tagId = Long.parseLong(tagsIdToSerializedTags[0], 1, tagsIdToSerializedTags[0].length(), 10);
final Tags tags = Tags.fromCsv(tagsIdToSerializedTags[1]);
tagsDictionary.put(tagId, tags);
}
}

View File

@@ -1,5 +1,6 @@
package org.lucares.pdbui;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
@@ -18,6 +19,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.GZIPInputStream;
import javax.annotation.PreDestroy;
@@ -28,6 +30,7 @@ import org.lucares.pdb.api.Tags;
import org.lucares.pdb.api.TagsBuilder;
import org.lucares.pdbui.date.FastISODateParser;
import org.lucares.performance.db.BlockingQueueIterator;
import org.lucares.performance.db.PdbExport;
import org.lucares.performance.db.PerformanceDb;
import org.lucares.recommind.logs.Config;
import org.slf4j.Logger;
@@ -69,15 +72,10 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean {
Thread.currentThread().setName("worker-" + clientAddress);
LOGGER.debug("opening streams to client");
try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
InputStream in = clientSocket.getInputStream();) {
InputStream in = new BufferedInputStream(clientSocket.getInputStream());) {
LOGGER.debug("reading from stream");
final byte firstByte = (byte) in.read();
if (firstByte == '{') {
readJSON(in);
} else {
readCSV(in, firstByte);
}
redirectInputStream(in);
LOGGER.debug("connection closed: " + clientAddress);
} catch (final Throwable e) {
@@ -88,6 +86,40 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean {
return null;
}
private void redirectInputStream(final InputStream in) throws IOException, InterruptedException {
in.mark(1);
final byte firstByte = (byte) in.read();
if (firstByte == '{') {
readJSON(in);
} else if (firstByte == PdbExport.MAGIC_BYTE) {
readCustomExportFormat(in);
} else if (isGZIP(firstByte)) {
in.reset();
final GZIPInputStream gzip = new GZIPInputStream(in);
redirectInputStream(gzip);
} else {
readCSV(in, firstByte);
}
}
private boolean isGZIP(final byte firstByte) {
// GZIP starts with 0x1f, 0x8b, see https://www.ietf.org/rfc/rfc1952.txt section
// 2.3.1
// I am cheap and only check the first byte
return firstByte == 0x1f;
}
private void readCustomExportFormat(final InputStream in) throws IOException {
final CustomExportFormatToEntryTransformer transformer = new CustomExportFormatToEntryTransformer();
final BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
transformer.read(reader, queue);
}
private void readCSV(final InputStream in, final byte firstByte) throws IOException, InterruptedException {
final int chunksize = 1000;
Entries entries = new Entries(chunksize);
@@ -196,7 +228,7 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean {
epochMilli = dateParser.parseAsEpochMilli(line, lastSeparatorPosition + 1);
} else if (key == keyDuration) {
duration = parseLong(line, lastSeparatorPosition + 1);
} else {
} else if (lastSeparatorPosition + 1 < separatorPosition) { // value is not empty
final int value = Tags.STRING_COMPRESSOR.put(line, lastSeparatorPosition + 1,
separatorPosition);

View File

@@ -4,8 +4,11 @@ import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -132,6 +135,24 @@ public class PdbTestUtil {
LOGGER.trace("closed sender connection");
}
public static void send(final Path file) throws IOException {
final SocketChannel outputChannel = connect();
try (final FileChannel inputChannel = FileChannel.open(file, StandardOpenOption.READ)) {
inputChannel.transferTo(0, Long.MAX_VALUE, outputChannel);
}
try {
// ugly workaround: the channel was closed too early and not all
// data was received
TimeUnit.MILLISECONDS.sleep(10);
} catch (final InterruptedException e) {
throw new IllegalStateException(e);
}
outputChannel.close();
LOGGER.trace("closed sender connection");
}
private static SocketChannel connect() throws IOException {
SocketChannel result = null;

View File

@@ -18,7 +18,9 @@ import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadLocalRandom;
import org.lucares.collections.LongList;
import org.lucares.pdb.datastore.internal.DataStore;
import org.lucares.pdbui.TcpIngestor;
import org.lucares.performance.db.PdbExport;
import org.lucares.performance.db.PerformanceDb;
import org.lucares.utils.file.FileUtils;
import org.slf4j.Logger;
@@ -88,6 +90,63 @@ public class TcpIngestorTest {
}
}
public void testIngestDataViaTcpStream_CustomFormat() throws Exception {
final long dateA = Instant.now().toEpochMilli();
final long dateB = Instant.now().toEpochMilli() + 1;
final long dateC = Instant.now().toEpochMilli() - 1;
final String host = "someHost";
// 1. insert some data
try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) {
ingestor.start();
final long deltaEpochMilliB = dateB - dateA;
final long deltaEpochMilliC = dateC - dateB;
final String data = "#$0:host=someHost,pod=somePod\n"//
+ dateA + ",1,0\n"// previous date is 0, therefore the delta is dateA / using tags with id 0
+ "$1:host=someHost,pod=otherPod\n" //
+ deltaEpochMilliB + ",2,1\n" // dates are the delta the the previous date / using tags with id 1
+ deltaEpochMilliC + ",3,0"; // dates are the delta the the previous date / using tags with id 0
PdbTestUtil.send(data);
} catch (final Exception e) {
LOGGER.error("", e);
throw e;
}
// 2. export the data
final List<Path> exportFiles = PdbExport.export(dataDirectory, dataDirectory.resolve("export"));
// 3. delete database
FileUtils.delete(dataDirectory.resolve(DataStore.SUBDIR_STORAGE));
// 4. create a new database
try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) {
ingestor.start();
for (final Path exportFile : exportFiles) {
PdbTestUtil.send(exportFile);
}
}
// 5. check that the data is correctly inserted
try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
final LongList result = db.get("host=" + host).singleGroup().flatMap();
Assert.assertEquals(result.size(), 6);
Assert.assertEquals(result.get(0), dateA);
Assert.assertEquals(result.get(1), 1);
Assert.assertEquals(result.get(2), dateC);
Assert.assertEquals(result.get(3), 3);
Assert.assertEquals(result.get(4), dateB);
Assert.assertEquals(result.get(5), 2);
}
}
@Test
public void testIngestionThreadDoesNotDieOnErrors() throws Exception {
final OffsetDateTime dateA = OffsetDateTime.ofInstant(Instant.ofEpochMilli(-1), ZoneOffset.UTC);

View File

@@ -0,0 +1,171 @@
package org.lucares.performance.db;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import java.util.zip.GZIPOutputStream;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.config.Configurator;
import org.lucares.collections.LongList;
import org.lucares.pdb.api.Tags;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PdbExport {
private static final int KB = 1024;
private static final int MB = KB * 1024;
private static final int GB = MB * 1024;
public static final char MAGIC_BYTE = '#';
public static final char MARKER_DICT_ENTRY_CHAR = '$';
public static final String MARKER_DICT_ENTRY = String.valueOf(MARKER_DICT_ENTRY_CHAR);
public static final char SEPARATOR_TAG_ID_CHAR = ':';
public static final String SEPARATOR_TAG_ID = String.valueOf(SEPARATOR_TAG_ID_CHAR);
private static final Logger LOGGER = LoggerFactory.getLogger(PdbExport.class);
public static void main(final String[] args) throws Exception {
initLogging();
final Path dataDirectory = Paths.get(args[0]);
final Path backupDir = Paths.get(args[1])
.resolve(OffsetDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss")));
export(dataDirectory, backupDir);
}
public static List<Path> export(final Path dataDirectory, final Path backupDir) throws Exception {
final List<Path> exportFiles = new ArrayList<>();
Files.createDirectories(backupDir);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
LOGGER.info("shutdown hook");
}
});
final OffsetDateTime start = OffsetDateTime.now();
final String datePrefix = start.format(DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss"));
final AtomicLong tagsIdCounter = new AtomicLong(0);
long exportFileCounter = 0;
Path exportFile = null;
Writer writer = null;
try (final PerformanceDb db = new PerformanceDb(dataDirectory);) {
LOGGER.info("Searching for all files. This may take a while ...");
final List<PdbFile> pdbFiles = db.getFilesForQuery("");
long count = 0;
long lastEpochMilli = 0;
for (final PdbFile pdbFile : pdbFiles) {
if (writer == null || Files.size(exportFile) > GB) {
if (writer != null) {
writer.flush();
writer.close();
}
exportFile = backupDir.resolve(String.format("%s.%05d.pdb.gz", datePrefix, exportFileCounter++));
exportFiles.add(exportFile);
writer = createWriter(exportFile);
LOGGER.info("new export file: {}", exportFile);
}
final Stream<LongList> timeValueStream = PdbFile.toStream(Arrays.asList(pdbFile), db.getDataStore());
final Tags tags = pdbFile.getTags();
final long tagsId = addNewTagsToDictionary(writer, tags, tagsIdCounter);
final Iterator<LongList> it = timeValueStream.iterator();
while (it.hasNext()) {
final LongList entry = it.next();
for (int i = 0; i < entry.size(); i += 2) {
final long epochMilli = entry.get(i);
final long value = entry.get(i + 1);
final long epochMilliDiff = epochMilli - lastEpochMilli;
lastEpochMilli = epochMilli;
writer.write(Long.toString(epochMilliDiff));
writer.write(',');
writer.write(Long.toString(value));
writer.write(',');
writer.write(Long.toString(tagsId));
writer.write('\n');
count++;
if (count % 100000 == 0) {
LOGGER.info("progress: " + count);
}
}
}
}
LOGGER.info("total: " + count);
} finally {
if (writer != null) {
writer.close();
}
}
final OffsetDateTime end = OffsetDateTime.now();
LOGGER.info("duration: " + Duration.between(start, end));
return exportFiles;
}
private static void initLogging() {
Configurator.setRootLevel(Level.INFO);
}
private static long addNewTagsToDictionary(final Writer writer, final Tags tags, final AtomicLong tagsIdCounter)
throws IOException {
final long tagsId = tagsIdCounter.getAndIncrement();
writer.write(MARKER_DICT_ENTRY);
writer.write(Long.toString(tagsId));
writer.write(SEPARATOR_TAG_ID);
writer.write(tags.toCsv());
writer.write('\n');
return tagsId;
}
private static Writer createWriter(final Path file) {
try {
final OutputStreamWriter writer = new OutputStreamWriter(
new GZIPOutputStream(new FileOutputStream(file.toFile()), 4096 * 4), StandardCharsets.UTF_8);
// initialize file header
writer.write(MAGIC_BYTE);
return writer;
} catch (final IOException e) {
throw new IllegalStateException(e);
}
}
}

View File

@@ -20,6 +20,7 @@ import org.lucares.pdb.api.Tags;
import org.lucares.pdb.datastore.Proposal;
import org.lucares.pdb.datastore.internal.DataStore;
import org.lucares.pdb.datastore.lang.SyntaxException;
import org.lucares.pdb.diskstorage.DiskStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -122,6 +123,10 @@ public class PerformanceDb implements AutoCloseable {
return get(query, Grouping.NO_GROUPING);
}
public List<PdbFile> getFilesForQuery(final String query) {
return tagsToFile.getFilesForQuery(query);
}
/**
* Return the entries as an unbound, ordered and non-parallel stream.
*
@@ -177,4 +182,8 @@ public class PerformanceDb implements AutoCloseable {
public SortedSet<String> getFieldsValues(final String query, final String fieldName) {
return dataStore.getAvailableValuesForKey(query, fieldName);
}
public DiskStorage getDataStore() {
return dataStore.getDiskStorage();
}
}