cleanup after revert
This commit is contained in:
@@ -1,127 +0,0 @@
|
|||||||
package org.lucares.pdbui;
|
|
||||||
|
|
||||||
import java.io.BufferedReader;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.ArrayBlockingQueue;
|
|
||||||
import java.util.regex.Pattern;
|
|
||||||
|
|
||||||
import org.lucares.pdb.api.Tags;
|
|
||||||
import org.lucares.pdb.api.TagsBuilder;
|
|
||||||
import org.lucares.pdb.datastore.Entries;
|
|
||||||
import org.lucares.pdb.datastore.Entry;
|
|
||||||
import org.lucares.performance.db.PdbExport;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
/**
|
|
||||||
*
|
|
||||||
* File format goals: Minimal size/ minimal repetition while also providing a
|
|
||||||
* file format that can be used for "normal" ingestion, not just backup/restore.
|
|
||||||
* It should be easy to implement in any language. It should be easy to debug.
|
|
||||||
* <p>
|
|
||||||
* Note: Line breaks are written as {@code \n}.
|
|
||||||
*
|
|
||||||
* <pre>
|
|
||||||
* # // # is the magic byte for the file format used to detect this format
|
|
||||||
* $123:key1=value1,key2=value2\n // $ marks the beginning of a dictionary entry that says: the following number will be used to refer to the following tags.
|
|
||||||
* // In this case the tags key1=value1,key2=value2 will be identified by 123.
|
|
||||||
* // The newline is used as an end marker.
|
|
||||||
* 1534567890,456,123\n // Defines an entry with timestamp 1534567890, duration 456 and tags key1=value1,key2=value2.
|
|
||||||
* 1,789,123\n // Timestamps are encoded using delta encoding. That means this triple defines
|
|
||||||
* // an entry with timestamp 1534567891, duration 789 and tags key1=value1,key2=value2
|
|
||||||
* -2,135,123\n // Timestamp delta encoding can contain negative numbers. This triple defines an entry
|
|
||||||
* // with timestamp 1534567889, duration 135 and tags key1=value1,key2=value2
|
|
||||||
* </pre>
|
|
||||||
*/
|
|
||||||
|
|
||||||
public class CustomExportFormatToEntryTransformer {
|
|
||||||
|
|
||||||
private static final int ENTRY_BUFFER_SIZE = 100;
|
|
||||||
|
|
||||||
private static final Logger LOGGER = LoggerFactory.getLogger(CustomExportFormatToEntryTransformer.class);
|
|
||||||
|
|
||||||
private final Pattern splitByComma = Pattern.compile(",");
|
|
||||||
|
|
||||||
private final Map<Long, Tags> tagsDictionary = new HashMap<>();
|
|
||||||
|
|
||||||
private long lastEpochMilli;
|
|
||||||
|
|
||||||
public void read(final BufferedReader in, final ArrayBlockingQueue<Entries> queue) throws IOException {
|
|
||||||
|
|
||||||
Entries bufferedEntries = new Entries(ENTRY_BUFFER_SIZE);
|
|
||||||
|
|
||||||
try {
|
|
||||||
String line;
|
|
||||||
while ((line = in.readLine()) != null) {
|
|
||||||
try {
|
|
||||||
if (line.startsWith(PdbExport.MARKER_DICT_ENTRY)) {
|
|
||||||
readDictionaryEntry(line);
|
|
||||||
} else {
|
|
||||||
final Entry entry = readEntry(line);
|
|
||||||
if (entry != null) {
|
|
||||||
|
|
||||||
bufferedEntries.add(entry);
|
|
||||||
|
|
||||||
if (bufferedEntries.size() == ENTRY_BUFFER_SIZE) {
|
|
||||||
queue.put(bufferedEntries);
|
|
||||||
bufferedEntries = new Entries(ENTRY_BUFFER_SIZE);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (final Exception e) {
|
|
||||||
LOGGER.error("ignoring line '{}'", line, e);
|
|
||||||
}
|
|
||||||
queue.put(bufferedEntries);
|
|
||||||
bufferedEntries = new Entries(ENTRY_BUFFER_SIZE);
|
|
||||||
}
|
|
||||||
} catch (final InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
LOGGER.info("aborting because of interruption");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private Entry readEntry(final String line) {
|
|
||||||
|
|
||||||
final String[] timeValueTags = splitByComma.split(line);
|
|
||||||
|
|
||||||
final long timeDelta = Long.parseLong(timeValueTags[0]);
|
|
||||||
final long value = Long.parseLong(timeValueTags[1]);
|
|
||||||
final long tagsId = Long.parseLong(timeValueTags[2]);
|
|
||||||
|
|
||||||
lastEpochMilli = lastEpochMilli + timeDelta;
|
|
||||||
|
|
||||||
final Tags tags = tagsDictionary.get(tagsId);
|
|
||||||
if (tags == null) {
|
|
||||||
LOGGER.info("no tags available for tagsId {}. Ignoring line '{}'", tagsId, line);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
return new Entry(lastEpochMilli, value, tags);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void readDictionaryEntry(final String line) {
|
|
||||||
final String[] tagsIdToSerializedTags = line.split(Pattern.quote(PdbExport.SEPARATOR_TAG_ID));
|
|
||||||
|
|
||||||
final Long tagId = Long.parseLong(tagsIdToSerializedTags[0], 1, tagsIdToSerializedTags[0].length(), 10);
|
|
||||||
final Tags tags = tagsFromCsv(tagsIdToSerializedTags[1]);
|
|
||||||
tagsDictionary.put(tagId, tags);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Tags tagsFromCsv(final String line) {
|
|
||||||
|
|
||||||
final TagsBuilder tagsBuilder = new TagsBuilder();
|
|
||||||
final String[] tagsAsString = line.split(Pattern.quote(","));
|
|
||||||
|
|
||||||
for (final String tagAsString : tagsAsString) {
|
|
||||||
final String[] keyValue = tagAsString.split(Pattern.quote("="));
|
|
||||||
|
|
||||||
final int key = Tags.STRING_COMPRESSOR.put(keyValue[0]);
|
|
||||||
final int value = Tags.STRING_COMPRESSOR.put(keyValue[1]);
|
|
||||||
tagsBuilder.add(key, value);
|
|
||||||
}
|
|
||||||
|
|
||||||
return tagsBuilder.build();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -18,7 +18,6 @@ import java.util.zip.GZIPInputStream;
|
|||||||
import org.lucares.pdb.datastore.Entries;
|
import org.lucares.pdb.datastore.Entries;
|
||||||
import org.lucares.pdb.datastore.Entry;
|
import org.lucares.pdb.datastore.Entry;
|
||||||
import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions;
|
import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions;
|
||||||
import org.lucares.performance.db.PdbExport;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonParseException;
|
import com.fasterxml.jackson.core.JsonParseException;
|
||||||
|
|
||||||
@@ -58,8 +57,6 @@ public final class IngestionHandler implements Callable<Void> {
|
|||||||
if (firstByte == '{') {
|
if (firstByte == '{') {
|
||||||
in.reset();
|
in.reset();
|
||||||
readJSON(in);
|
readJSON(in);
|
||||||
} else if (firstByte == PdbExport.MAGIC_BYTE) {
|
|
||||||
readCustomExportFormat(in);
|
|
||||||
} else if (isGZIP(firstByte)) {
|
} else if (isGZIP(firstByte)) {
|
||||||
in.reset();
|
in.reset();
|
||||||
final GZIPInputStream gzip = new GZIPInputStream(in);
|
final GZIPInputStream gzip = new GZIPInputStream(in);
|
||||||
@@ -80,15 +77,6 @@ public final class IngestionHandler implements Callable<Void> {
|
|||||||
return firstByte == 0x1f;
|
return firstByte == 0x1f;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void readCustomExportFormat(final InputStream in) throws IOException {
|
|
||||||
|
|
||||||
final CustomExportFormatToEntryTransformer transformer = new CustomExportFormatToEntryTransformer();
|
|
||||||
|
|
||||||
final BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
|
|
||||||
transformer.read(reader, queue);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private void readJSON(final InputStream in) throws IOException, InterruptedException {
|
private void readJSON(final InputStream in) throws IOException, InterruptedException {
|
||||||
final int chunksize = 100;
|
final int chunksize = 100;
|
||||||
Entries entries = new Entries(chunksize);
|
Entries entries = new Entries(chunksize);
|
||||||
|
|||||||
@@ -25,8 +25,6 @@ import org.junit.jupiter.params.provider.ValueSource;
|
|||||||
import org.lucares.collections.LongList;
|
import org.lucares.collections.LongList;
|
||||||
import org.lucares.pdb.api.DateTimeRange;
|
import org.lucares.pdb.api.DateTimeRange;
|
||||||
import org.lucares.pdb.api.Query;
|
import org.lucares.pdb.api.Query;
|
||||||
import org.lucares.pdb.datastore.internal.DataStore;
|
|
||||||
import org.lucares.performance.db.PdbExport;
|
|
||||||
import org.lucares.performance.db.PerformanceDb;
|
import org.lucares.performance.db.PerformanceDb;
|
||||||
import org.lucares.utils.file.FileUtils;
|
import org.lucares.utils.file.FileUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@@ -92,66 +90,6 @@ public class TcpIngestorTest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testIngestDataViaTcpStream_CustomFormat() throws Exception {
|
|
||||||
|
|
||||||
final long dateA = Instant.now().toEpochMilli();
|
|
||||||
final long dateB = Instant.now().toEpochMilli() + 1;
|
|
||||||
final long dateC = Instant.now().toEpochMilli() - 1;
|
|
||||||
final DateTimeRange dateRange = DateTimeRange.relativeMinutes(1);
|
|
||||||
final String host = "someHost";
|
|
||||||
|
|
||||||
// 1. insert some data
|
|
||||||
try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) {
|
|
||||||
ingestor.useRandomPort();
|
|
||||||
ingestor.start();
|
|
||||||
|
|
||||||
final long deltaEpochMilliB = dateB - dateA;
|
|
||||||
final long deltaEpochMilliC = dateC - dateB;
|
|
||||||
|
|
||||||
final String data = "#$0:host=someHost,pod=somePod\n"//
|
|
||||||
+ dateA + ",1,0\n"// previous date is 0, therefore the delta is dateA / using tags with id 0
|
|
||||||
+ "$1:host=someHost,pod=otherPod\n" //
|
|
||||||
+ deltaEpochMilliB + ",2,1\n" // dates are the delta the the previous date / using tags with id 1
|
|
||||||
+ deltaEpochMilliC + ",3,0"; // dates are the delta the the previous date / using tags with id 0
|
|
||||||
|
|
||||||
PdbTestUtil.send(data, ingestor.getPort());
|
|
||||||
} catch (final Exception e) {
|
|
||||||
LOGGER.error("", e);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 2. export the data
|
|
||||||
final List<Path> exportFiles = PdbExport.export(dataDirectory, dataDirectory.resolve("export"));
|
|
||||||
|
|
||||||
// 3. delete database
|
|
||||||
FileUtils.delete(dataDirectory.resolve(DataStore.SUBDIR_STORAGE));
|
|
||||||
|
|
||||||
// 4. create a new database
|
|
||||||
try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) {
|
|
||||||
ingestor.useRandomPort();
|
|
||||||
ingestor.start();
|
|
||||||
for (final Path exportFile : exportFiles) {
|
|
||||||
PdbTestUtil.send(exportFile, ingestor.getPort());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 5. check that the data is correctly inserted
|
|
||||||
try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
|
|
||||||
final LongList result = db.get(new Query("host=" + host, dateRange)).singleGroup().flatMap();
|
|
||||||
Assertions.assertEquals(6, result.size());
|
|
||||||
|
|
||||||
Assertions.assertEquals(dateA, result.get(0));
|
|
||||||
Assertions.assertEquals(1, result.get(1));
|
|
||||||
|
|
||||||
Assertions.assertEquals(dateC, result.get(2));
|
|
||||||
Assertions.assertEquals(3, result.get(3));
|
|
||||||
|
|
||||||
Assertions.assertEquals(dateB, result.get(4));
|
|
||||||
Assertions.assertEquals(2, result.get(5));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testIngestionThreadDoesNotDieOnErrors() throws Exception {
|
public void testIngestionThreadDoesNotDieOnErrors() throws Exception {
|
||||||
final OffsetDateTime dateA = OffsetDateTime.now().minusMinutes(1);
|
final OffsetDateTime dateA = OffsetDateTime.now().minusMinutes(1);
|
||||||
|
|||||||
@@ -1,186 +0,0 @@
|
|||||||
package org.lucares.performance.db;
|
|
||||||
|
|
||||||
import java.io.FileOutputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.OutputStreamWriter;
|
|
||||||
import java.io.Writer;
|
|
||||||
import java.nio.charset.StandardCharsets;
|
|
||||||
import java.nio.file.Files;
|
|
||||||
import java.nio.file.Path;
|
|
||||||
import java.nio.file.Paths;
|
|
||||||
import java.time.Duration;
|
|
||||||
import java.time.OffsetDateTime;
|
|
||||||
import java.time.format.DateTimeFormatter;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Locale;
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
import java.util.stream.Stream;
|
|
||||||
import java.util.zip.GZIPOutputStream;
|
|
||||||
|
|
||||||
import org.apache.logging.log4j.Level;
|
|
||||||
import org.apache.logging.log4j.core.config.Configurator;
|
|
||||||
import org.lucares.collections.LongList;
|
|
||||||
import org.lucares.pdb.api.DateTimeRange;
|
|
||||||
import org.lucares.pdb.api.Query;
|
|
||||||
import org.lucares.pdb.api.Tags;
|
|
||||||
import org.lucares.pdb.datastore.PdbFile;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
public class PdbExport {
|
|
||||||
|
|
||||||
private static final int KB = 1024;
|
|
||||||
private static final int MB = KB * 1024;
|
|
||||||
private static final int GB = MB * 1024;
|
|
||||||
|
|
||||||
public static final char MAGIC_BYTE = '#';
|
|
||||||
public static final char MARKER_DICT_ENTRY_CHAR = '$';
|
|
||||||
public static final String MARKER_DICT_ENTRY = String.valueOf(MARKER_DICT_ENTRY_CHAR);
|
|
||||||
public static final char SEPARATOR_TAG_ID_CHAR = ':';
|
|
||||||
public static final String SEPARATOR_TAG_ID = String.valueOf(SEPARATOR_TAG_ID_CHAR);
|
|
||||||
|
|
||||||
private static final Logger LOGGER = LoggerFactory.getLogger(PdbExport.class);
|
|
||||||
|
|
||||||
public static void main(final String[] args) throws Exception {
|
|
||||||
|
|
||||||
initLogging();
|
|
||||||
|
|
||||||
final Path dataDirectory = Paths.get(args[0]);
|
|
||||||
final Path backupDir = Paths.get(args[1])
|
|
||||||
.resolve(OffsetDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss")));
|
|
||||||
|
|
||||||
export(dataDirectory, backupDir);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static List<Path> export(final Path dataDirectory, final Path backupDir) throws Exception {
|
|
||||||
final List<Path> exportFiles = new ArrayList<>();
|
|
||||||
Files.createDirectories(backupDir);
|
|
||||||
|
|
||||||
Runtime.getRuntime().addShutdownHook(new Thread() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
LOGGER.info("shutdown hook");
|
|
||||||
}
|
|
||||||
|
|
||||||
});
|
|
||||||
|
|
||||||
final OffsetDateTime start = OffsetDateTime.now();
|
|
||||||
final String datePrefix = start.format(DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss"));
|
|
||||||
final AtomicLong tagsIdCounter = new AtomicLong(0);
|
|
||||||
long exportFileCounter = 0;
|
|
||||||
|
|
||||||
Path exportFile = null;
|
|
||||||
Writer writer = null;
|
|
||||||
|
|
||||||
try (final PerformanceDb db = new PerformanceDb(dataDirectory);) {
|
|
||||||
|
|
||||||
LOGGER.info("Searching for all files. This may take a while ...");
|
|
||||||
final List<PdbFile> pdbFiles = db.getFilesForQuery(new Query("", DateTimeRange.max()));
|
|
||||||
|
|
||||||
long count = 0;
|
|
||||||
long lastEpochMilli = 0;
|
|
||||||
long begin = System.currentTimeMillis();
|
|
||||||
|
|
||||||
for (final PdbFile pdbFile : pdbFiles) {
|
|
||||||
|
|
||||||
if (writer == null || Files.size(exportFile) > 4 * GB) {
|
|
||||||
if (writer != null) {
|
|
||||||
writer.flush();
|
|
||||||
writer.close();
|
|
||||||
}
|
|
||||||
exportFile = backupDir
|
|
||||||
.resolve(String.format(Locale.US, "%s.%05d.pdb.gz", datePrefix, exportFileCounter++));
|
|
||||||
exportFiles.add(exportFile);
|
|
||||||
writer = createWriter(exportFile);
|
|
||||||
LOGGER.info("new export file: {}", exportFile);
|
|
||||||
|
|
||||||
lastEpochMilli = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
final Stream<LongList> timeValueStream = PdbFile.toStream(Arrays.asList(pdbFile), db.getDataStore());
|
|
||||||
|
|
||||||
final Tags tags = pdbFile.getTags();
|
|
||||||
final long tagsId = addNewTagsToDictionary(writer, tags, tagsIdCounter);
|
|
||||||
|
|
||||||
final Iterator<LongList> it = timeValueStream.iterator();
|
|
||||||
while (it.hasNext()) {
|
|
||||||
final LongList entry = it.next();
|
|
||||||
|
|
||||||
for (int i = 0; i < entry.size(); i += 2) {
|
|
||||||
|
|
||||||
final long epochMilli = entry.get(i);
|
|
||||||
final long value = entry.get(i + 1);
|
|
||||||
|
|
||||||
final long epochMilliDiff = epochMilli - lastEpochMilli;
|
|
||||||
lastEpochMilli = epochMilli;
|
|
||||||
|
|
||||||
writer.write(Long.toString(epochMilliDiff));
|
|
||||||
writer.write(',');
|
|
||||||
writer.write(Long.toString(value));
|
|
||||||
writer.write(',');
|
|
||||||
writer.write(Long.toString(tagsId));
|
|
||||||
writer.write('\n');
|
|
||||||
|
|
||||||
count++;
|
|
||||||
final long chunk = 10_000_000;
|
|
||||||
if (count % chunk == 0) {
|
|
||||||
final long end = System.currentTimeMillis();
|
|
||||||
final long duration = end - begin;
|
|
||||||
final long entriesPerSecond = (long) (chunk / (duration / 1000.0));
|
|
||||||
LOGGER.info("progress: {} - {} entries/s + duration {}",
|
|
||||||
String.format(Locale.US, "%,d", count),
|
|
||||||
String.format(Locale.US, "%,d", entriesPerSecond), duration);
|
|
||||||
begin = System.currentTimeMillis();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
LOGGER.info("total: " + count);
|
|
||||||
|
|
||||||
} finally {
|
|
||||||
if (writer != null) {
|
|
||||||
writer.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
final OffsetDateTime end = OffsetDateTime.now();
|
|
||||||
|
|
||||||
LOGGER.info("duration: " + Duration.between(start, end));
|
|
||||||
return exportFiles;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void initLogging() {
|
|
||||||
Configurator.setRootLevel(Level.INFO);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static long addNewTagsToDictionary(final Writer writer, final Tags tags, final AtomicLong tagsIdCounter)
|
|
||||||
throws IOException {
|
|
||||||
final long tagsId = tagsIdCounter.getAndIncrement();
|
|
||||||
|
|
||||||
writer.write(MARKER_DICT_ENTRY);
|
|
||||||
writer.write(Long.toString(tagsId));
|
|
||||||
writer.write(SEPARATOR_TAG_ID);
|
|
||||||
writer.write(tags.toCsv());
|
|
||||||
writer.write('\n');
|
|
||||||
|
|
||||||
return tagsId;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static Writer createWriter(final Path file) {
|
|
||||||
|
|
||||||
try {
|
|
||||||
final OutputStreamWriter writer = new OutputStreamWriter(
|
|
||||||
new GZIPOutputStream(new FileOutputStream(file.toFile()), 4096 * 4), StandardCharsets.UTF_8);
|
|
||||||
// initialize file header
|
|
||||||
writer.write(MAGIC_BYTE);
|
|
||||||
return writer;
|
|
||||||
|
|
||||||
} catch (final IOException e) {
|
|
||||||
throw new IllegalStateException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user