file drop support

- Add a folder where you can drop Zip files which will then be
  extracted on the fly and ingsted.
- CsvReaderSettings now contain TagMatcher that are applied to the
  first line and can be used to extract additional tags.
- Update to jdk 16 so that we can have records.
This commit is contained in:
2021-08-01 09:31:40 +02:00
parent 6d5cdbafca
commit 85ed5f1ccb
17 changed files with 430 additions and 14 deletions

View File

@@ -10,7 +10,7 @@ plugins {
ext { ext {
javaVersion=14 javaVersion=16
version_log4j2= '2.14.1' // keep in sync with spring-boot-starter-log4j2 version_log4j2= '2.14.1' // keep in sync with spring-boot-starter-log4j2
version_spring = '2.5.3' version_spring = '2.5.3'

View File

@@ -7,7 +7,6 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/** /**
* Wrapper for chunk of {@link Entry}s. * Wrapper for chunk of {@link Entry}s.
@@ -71,10 +70,10 @@ public class Entries implements Iterable<Entry> {
} }
public void waitUntilFlushed(final long timeout, final TimeUnit unit) public void waitUntilFlushed(final long timeout, final TimeUnit unit)
throws InterruptedException, TimeoutException { throws InterruptedException, RuntimeTimeoutException {
final boolean finished = flushLatch.await(timeout, unit); final boolean finished = flushLatch.await(timeout, unit);
if (!finished) { if (!finished) {
throw new TimeoutException(); throw new RuntimeTimeoutException();
} }
} }

View File

@@ -0,0 +1,7 @@
package org.lucares.pdb.datastore;
public class RuntimeTimeoutException extends RuntimeException {
private static final long serialVersionUID = -7159091069429132434L;
}

View File

@@ -1,7 +1,9 @@
package org.lucares.pdb.api; package org.lucares.pdb.api;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
@@ -26,12 +28,13 @@ public class Tags implements Comparable<Tags> {
tags = new ArrayList<>(); tags = new ArrayList<>();
} }
public Tags(final List<Tag> tags) { public Tags(final Collection<Tag> tags) {
Collections.sort(tags, TagByKeyAndValueComparator.INSTANCE); final List<Tag> t = new ArrayList<>(tags);
this.tags = tags; Collections.sort(t, TagByKeyAndValueComparator.INSTANCE);
this.tags = t;
} }
public static Tags create(final List<Tag> tags) { public static Tags create(final Collection<Tag> tags) {
return new Tags(tags); return new Tags(tags);
} }
@@ -139,6 +142,20 @@ public class Tags implements Comparable<Tags> {
return 0; return 0;
} }
/**
* Creates new {@link Tags} with the tags from {@code this} and the tags from
* {@code tags}, possibly overwriting tags of {@code this}.
*
* @param tags the new tags
* @return {@link Tags}
*/
public Tags add(final Tags tags) {
final Set<Tag> result = new HashSet<>();
result.addAll(this.tags);
result.addAll(tags.toTags());
return Tags.create(result);
}
public String getValue(final String key) { public String getValue(final String key) {
final Tag needle = new Tag(STRING_COMPRESSOR.put(key), 0); final Tag needle = new Tag(STRING_COMPRESSOR.put(key), 0);

View File

@@ -1,12 +1,15 @@
package org.lucares.pdbui; package org.lucares.pdbui;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.function.Function; import java.util.function.Function;
import org.lucares.pdbui.domain.TagMatcher;
import org.lucares.utils.Preconditions; import org.lucares.utils.Preconditions;
public final class CsvReaderSettings { public final class CsvReaderSettings {
@@ -166,6 +169,8 @@ public final class CsvReaderSettings {
private String comment = "#"; private String comment = "#";
private List<TagMatcher> firstLineMatcher = new ArrayList<>();
public CsvReaderSettings() { public CsvReaderSettings() {
this("@timestamp", "duration", ",", new ColumnDefinitions()); this("@timestamp", "duration", ",", new ColumnDefinitions());
} }
@@ -238,6 +243,10 @@ public final class CsvReaderSettings {
additionalTags.put(field, value); additionalTags.put(field, value);
} }
public void putAdditionalTag(final Map<String, String> additionalTags) {
additionalTags.putAll(additionalTags);
}
public Map<String, String> getAdditionalTags() { public Map<String, String> getAdditionalTags() {
return Map.copyOf(additionalTags); return Map.copyOf(additionalTags);
} }
@@ -254,4 +263,52 @@ public final class CsvReaderSettings {
this.columnDefinitions = columnDefinitions; this.columnDefinitions = columnDefinitions;
} }
public List<TagMatcher> getFirstLineMatcher() {
return firstLineMatcher;
}
public void setFirstLineMatcher(final List<TagMatcher> firstLineMatcher) {
this.firstLineMatcher = firstLineMatcher;
}
@Override
public String toString() {
final StringBuilder builder = new StringBuilder();
if (separator != null) {
builder.append("\nseparator=");
builder.append(separator);
builder.append(", ");
}
if (columnDefinitions != null) {
builder.append("\ncolumnDefinitions=");
builder.append(columnDefinitions);
builder.append(", ");
}
if (additionalTags != null) {
builder.append("\nadditionalTags=");
builder.append(additionalTags);
builder.append(", ");
}
if (timeColumn != null) {
builder.append("\ntimeColumn=");
builder.append(timeColumn);
builder.append(", ");
}
if (valueColumn != null) {
builder.append("\nvalueColumn=");
builder.append(valueColumn);
builder.append(", ");
}
if (firstLineMatcher != null) {
builder.append("\nfirstLineMatcher=");
builder.append(firstLineMatcher);
builder.append(", ");
}
if (comment != null) {
builder.append("\ncomment=");
builder.append(comment);
}
return builder.toString();
}
} }

View File

@@ -8,7 +8,6 @@ import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function; import java.util.function.Function;
import org.lucares.collections.IntList; import org.lucares.collections.IntList;
@@ -16,6 +15,7 @@ import org.lucares.pdb.api.Tags;
import org.lucares.pdb.api.TagsBuilder; import org.lucares.pdb.api.TagsBuilder;
import org.lucares.pdb.datastore.Entries; import org.lucares.pdb.datastore.Entries;
import org.lucares.pdb.datastore.Entry; import org.lucares.pdb.datastore.Entry;
import org.lucares.pdb.datastore.RuntimeTimeoutException;
import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions; import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions;
import org.lucares.pdbui.CsvReaderSettings.PostProcessors; import org.lucares.pdbui.CsvReaderSettings.PostProcessors;
import org.lucares.pdbui.date.FastISODateParser; import org.lucares.pdbui.date.FastISODateParser;
@@ -41,7 +41,7 @@ class CsvToEntryTransformer {
this.settings = settings; this.settings = settings;
} }
void readCSV(final InputStream in) throws IOException, InterruptedException, TimeoutException { void readCSV(final InputStream in) throws IOException, InterruptedException, RuntimeTimeoutException {
final int chunksize = 1000; final int chunksize = 1000;
Entries entries = new Entries(chunksize); Entries entries = new Entries(chunksize);
@@ -55,26 +55,35 @@ class CsvToEntryTransformer {
int read = 0; int read = 0;
int bytesInLine = 0; int bytesInLine = 0;
int lineCounter = 0;
final byte[] buffer = new byte[4096 * 16]; final byte[] buffer = new byte[4096 * 16];
final int keyTimestamp = Tags.STRING_COMPRESSOR.put(settings.getTimeColumn()); final int keyTimestamp = Tags.STRING_COMPRESSOR.put(settings.getTimeColumn());
final int keyDuration = Tags.STRING_COMPRESSOR.put(settings.getValueColumn()); final int keyDuration = Tags.STRING_COMPRESSOR.put(settings.getValueColumn());
final FastISODateParser dateParser = new FastISODateParser(); final FastISODateParser dateParser = new FastISODateParser();
final Tags additionalTags = initAdditionalTags(); Tags additionalTags = initAdditionalTags();
while ((read = in.read(buffer)) >= 0) { while ((read = in.read(buffer)) >= 0) {
offsetInBuffer = 0; offsetInBuffer = 0;
for (int i = 0; i < read; i++) { for (int i = 0; i < read; i++) {
if (buffer[i] == newline) { if (buffer[i] == newline) {
lineCounter++;
final int length = i - offsetInBuffer; final int length = i - offsetInBuffer;
System.arraycopy(buffer, offsetInBuffer, line, offsetInLine, length); System.arraycopy(buffer, offsetInBuffer, line, offsetInLine, length);
bytesInLine = offsetInLine + length; bytesInLine = offsetInLine + length;
separatorPositions.add(offsetInLine + i - offsetInBuffer); separatorPositions.add(offsetInLine + i - offsetInBuffer);
if (line[0] == comment) { if (line[0] == comment) {
// ignore if (lineCounter == 1) {
final String lineAsString = new String(line, StandardCharsets.UTF_8);
final Tags firstLineTags = TagMatchExtractor.extractTags(lineAsString,
settings.getFirstLineMatcher());
additionalTags = additionalTags.add(firstLineTags);
} else {
// ignore
}
} else if (compressedHeaders != null) { } else if (compressedHeaders != null) {
final Entry entry = handleCsvLine(line, bytesInLine, separatorPositions, keyTimestamp, final Entry entry = handleCsvLine(line, bytesInLine, separatorPositions, keyTimestamp,

View File

@@ -30,7 +30,7 @@ public class CsvUploadHandler implements PropertyKeys, DisposableBean {
private final PerformanceDb performanceDb; private final PerformanceDb performanceDb;
public CsvUploadHandler(final PerformanceDb performanceDb) throws IOException { public CsvUploadHandler(final PerformanceDb performanceDb) {
this.performanceDb = performanceDb; this.performanceDb = performanceDb;
} }

View File

@@ -0,0 +1,73 @@
package org.lucares.pdbui;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.lucares.pdbui.domain.FileDropConfig;
import org.lucares.pdbui.domain.FileDropSettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.AntPathMatcher;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@Component
public class FileDropConfigProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(FileDropConfigProvider.class);
private final FileDropConfig config;
public FileDropConfigProvider(@Value("${path.fileDropConfig}") final String fileDropConfig)
throws JsonParseException, JsonMappingException, IOException {
final Path configPath = Path.of(fileDropConfig);
if (Files.exists(configPath)) {
final ObjectMapper objectMapper = new ObjectMapper();
config = objectMapper.readValue(configPath.toFile(), FileDropConfig.class);
LOGGER.info("File drop config: {}", objectMapper.writeValueAsString(config));
} else {
config = new FileDropConfig();
}
}
public FileDropConfig getConfig() {
return config;
}
public Optional<CsvReaderSettings> provideCsvReaderSettings(final String file) {
for (final FileDropSettings settings : config.getSettings()) {
final AntPathMatcher antPathMatcher = new AntPathMatcher();
if (antPathMatcher.match(settings.match(), file)) {
final Map<String, String> variables = antPathMatcher.extractUriTemplateVariables(settings.match(),
file);
System.out.println("match found " + file + " regex: " + settings.match() + " " + variables);
final CsvReaderSettings csvSettings = settings.csvSettings();
csvSettings.putAdditionalTag(variables);
return Optional.of(csvSettings);
}
}
return Optional.empty();
}
public static void main(final String[] args) {
final Matcher matcher = Pattern.compile("(?<source>.+)/(?<pod>.+)/(?<host>[^/]+)/performance.*.csv")
.matcher("web/vapsales01/0f5230761bb8a260e/performance.2020-10-05_000200_2.csv");
if (matcher.find()) {
System.out.println("match found");
} else {
System.out.println("not found");
}
}
}

View File

@@ -0,0 +1,13 @@
package org.lucares.pdbui;
import java.io.IOException;
import java.nio.file.Path;
import org.lucares.pdb.datastore.RuntimeTimeoutException;
public interface FileDropFileTypeHandler {
public boolean isHandle(Path file);
public void handle(Path file) throws IOException, RuntimeTimeoutException, InterruptedException;
}

View File

@@ -0,0 +1,91 @@
package org.lucares.pdbui;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class FileDropHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(FileDropHandler.class);
private final class FileWatchThread extends Thread {
private final WatchService watchService;
public FileWatchThread() throws IOException {
setDaemon(true);
setName("file-drop-watcher");
watchService = FileSystems.getDefault().newWatchService();
baseDir.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);
}
@Override
public void run() {
try {
while (true) {
final WatchKey key = watchService.take();
final List<WatchEvent<?>> events = key.pollEvents();
for (final WatchEvent<?> watchEvent : events) {
System.out.printf("Event... kind=%s, count=%d, context=%s Context type=%s%n", watchEvent.kind(),
watchEvent.count(), watchEvent.context(), ((Path) watchEvent.context()).getClass());
final Path file = baseDir.resolve((Path) watchEvent.context());
for (final FileDropFileTypeHandler fileHandler : fileHandlers) {
if (fileHandler.isHandle(file)) {
try {
fileHandler.handle(file);
} catch (final IOException e) {
LOGGER.error("failed to handle file {} by handler: {} with exception", file,
fileHandler.getClass(), e);
}
}
}
}
key.reset();
}
} catch (final InterruptedException e) {
try {
LOGGER.error("close watchService");
watchService.close();
} catch (final IOException e1) {
LOGGER.error("failed to close watchService", e1);
}
}
}
}
private final Path baseDir;
private final List<FileDropFileTypeHandler> fileHandlers;
@Autowired
public FileDropHandler(@Value("${path.fileDrop}") final String baseDir,
final List<FileDropFileTypeHandler> fileHandlers) throws IOException {
this.fileHandlers = fileHandlers;
this.baseDir = Path.of(baseDir);
if (!Files.isDirectory(this.baseDir)) {
LOGGER.info("creating directory {}", this.baseDir);
Files.createDirectories(this.baseDir);
}
LOGGER.info("file drop location {}", this.baseDir);
final FileWatchThread watchThread = new FileWatchThread();
watchThread.start();
}
}

View File

@@ -0,0 +1,61 @@
package org.lucares.pdbui;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.Enumeration;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
import org.lucares.pdb.datastore.Entries;
import org.lucares.pdb.datastore.RuntimeTimeoutException;
import org.lucares.performance.db.PerformanceDb;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class FileDropZipHandler implements FileDropFileTypeHandler {
private final PerformanceDb performanceDb;
private final FileDropConfigProvider configProvider;
@Autowired
public FileDropZipHandler(final PerformanceDb performanceDb, final FileDropConfigProvider configProvider) {
super();
this.performanceDb = performanceDb;
this.configProvider = configProvider;
}
@Override
public boolean isHandle(final Path file) {
return file.getFileName().toString().endsWith(".zip");
}
@Override
public void handle(final Path file) throws IOException, RuntimeTimeoutException, InterruptedException {
final ZipFile zipFile = new ZipFile(file.toFile());
final Enumeration<? extends ZipEntry> entries = zipFile.entries();
while (entries.hasMoreElements()) {
final ZipEntry entry = entries.nextElement();
// System.out.println(entry.getName() + " isDir: " + entry.isDirectory());
if (entry.isDirectory()) {
continue;
}
final Optional<CsvReaderSettings> csvSettings = configProvider.provideCsvReaderSettings(entry.getName());
if (csvSettings.isPresent()) {
final ArrayBlockingQueue<Entries> queue = performanceDb.getQueue();
final CsvToEntryTransformer csvToEntryTransformer = new CsvToEntryTransformer(queue, csvSettings.get());
try (final InputStream inputStream = new BufferedInputStream(zipFile.getInputStream(entry),
1024 * 1024)) {
csvToEntryTransformer.readCSV(inputStream);
}
}
}
}
}

View File

@@ -0,0 +1,40 @@
package org.lucares.pdbui;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.lucares.pdb.api.Tag;
import org.lucares.pdb.api.Tags;
import org.lucares.pdbui.domain.TagMatcher;
public class TagMatchExtractor {
public static Tags extractTags(final String line, final List<TagMatcher> tagMatchers) {
if (tagMatchers.isEmpty()) {
return Tags.EMPTY;
}
System.out.println(line);
final List<Tag> tags = new ArrayList<>();
for (final TagMatcher tagMatcher : tagMatchers) {
final String regex = tagMatcher.regex();
final Pattern pattern = Pattern.compile(regex);
final Matcher matcher = pattern.matcher(line);
if (matcher.find() && matcher.groupCount() >= 1) {
final String group = matcher.group(1);
Tags.STRING_COMPRESSOR.put(tagMatcher.tag());
Tags.STRING_COMPRESSOR.put(group);
System.out.println(tagMatcher.tag() + " -> " + group);
final Tag tag = Tags.STRING_COMPRESSOR.createTag(tagMatcher.tag(), group);
tags.add(tag);
}
}
return Tags.create(tags);
}
}

View File

@@ -0,0 +1,31 @@
package org.lucares.pdbui.domain;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
public class FileDropConfig {
private List<FileDropSettings> settings = new ArrayList<>();
public List<FileDropSettings> getSettings() {
return settings;
}
public void setSettings(final List<FileDropSettings> settings) {
this.settings = settings;
}
@Override
public String toString() {
final StringBuilder builder = new StringBuilder();
builder.append("FileDropConfig [");
if (settings != null) {
builder.append("\nsettings=");
builder.append(StringUtils.join(settings, "\n"));
}
builder.append("]");
return builder.toString();
}
}

View File

@@ -0,0 +1,7 @@
package org.lucares.pdbui.domain;
import org.lucares.pdbui.CsvReaderSettings;
public record FileDropSettings(String match, CsvReaderSettings csvSettings) {
}

View File

@@ -0,0 +1,9 @@
package org.lucares.pdbui.domain;
/**
* @param regex a regular expression with one catching group, e.g.
* "BUILD='([a-zA-Z]+_\\d+\\.\\d_\\d+)"
* @param tag the tag, e.g. "build"
*/
public record TagMatcher(String regex, String tag) {
}

View File

@@ -3,6 +3,8 @@ db.base=${base.dir}/db
path.tmp=${base.dir}/tmp path.tmp=${base.dir}/tmp
path.output=${base.dir}/out path.output=${base.dir}/out
path.fileDrop=${base.dir}/drop
path.fileDropConfig=${base.dir}/drop.json
logging.config=classpath:log4j2.xml logging.config=classpath:log4j2.xml

View File

@@ -44,8 +44,8 @@
<logger name="org.lucares.metrics.plotter" level="DEBUG" /> <logger name="org.lucares.metrics.plotter" level="DEBUG" />
<logger name="org.lucares.metrics.gnuplot" level="DEBUG" /> <logger name="org.lucares.metrics.gnuplot" level="DEBUG" />
<logger name="org.lucares.metrics.aggregator.parallelRequests" level="DEBUG" /> <logger name="org.lucares.metrics.aggregator.parallelRequests" level="DEBUG" />
<logger name="org.lucares.metrics.dataStore" level="DEBUG" />
<!-- <!--
<logger name="org.lucares.metrics.dataStore" level="DEBUG" />
<logger name="org.lucares.metrics.ingestion.tagsToFile.newPdbWriter" level="DEBUG" /> <logger name="org.lucares.metrics.ingestion.tagsToFile.newPdbWriter" level="DEBUG" />
<logger name="org.lucares.pdb.datastore.lang.QueryCompletionPdbLangParser" level="TRACE" /> <logger name="org.lucares.pdb.datastore.lang.QueryCompletionPdbLangParser" level="TRACE" />
<logger name="org.lucares.pdb.datastore.lang.ExpressionToDocIdVisitor" level="TRACE" /> <logger name="org.lucares.pdb.datastore.lang.ExpressionToDocIdVisitor" level="TRACE" />