add file drop handler

You can define a folder and ingest files dropped into it.
This commit is contained in:
2021-08-07 13:31:44 +02:00
parent 85ed5f1ccb
commit 825bac24b9
10 changed files with 301 additions and 29 deletions

View File

@@ -2,6 +2,7 @@ package org.lucares.pdbui;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
@@ -12,8 +13,13 @@ import java.util.function.Function;
import org.lucares.pdbui.domain.TagMatcher; import org.lucares.pdbui.domain.TagMatcher;
import org.lucares.utils.Preconditions; import org.lucares.utils.Preconditions;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
public final class CsvReaderSettings { public final class CsvReaderSettings {
private final static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static String stripPrefixDefault(final String value) { public static String stripPrefixDefault(final String value) {
if (value.startsWith("Default")) { if (value.startsWith("Default")) {
return value.replaceFirst("Default", ""); return value.replaceFirst("Default", "");
@@ -169,7 +175,7 @@ public final class CsvReaderSettings {
private String comment = "#"; private String comment = "#";
private List<TagMatcher> firstLineMatcher = new ArrayList<>(); private final List<TagMatcher> firstLineMatcher = new ArrayList<>();
public CsvReaderSettings() { public CsvReaderSettings() {
this("@timestamp", "duration", ",", new ColumnDefinitions()); this("@timestamp", "duration", ",", new ColumnDefinitions());
@@ -244,7 +250,7 @@ public final class CsvReaderSettings {
} }
public void putAdditionalTag(final Map<String, String> additionalTags) { public void putAdditionalTag(final Map<String, String> additionalTags) {
additionalTags.putAll(additionalTags); this.additionalTags.putAll(additionalTags);
} }
public Map<String, String> getAdditionalTags() { public Map<String, String> getAdditionalTags() {
@@ -267,8 +273,22 @@ public final class CsvReaderSettings {
return firstLineMatcher; return firstLineMatcher;
} }
public void setFirstLineMatcher(final List<TagMatcher> firstLineMatcher) { public void setFirstLineMatcher(final Collection<TagMatcher> firstLineMatchers) {
this.firstLineMatcher = firstLineMatcher; this.firstLineMatcher.clear();
this.firstLineMatcher.addAll(firstLineMatchers);
}
public void addFirstLineMatcher(final TagMatcher tagMatcher) {
this.firstLineMatcher.add(tagMatcher);
}
public CsvReaderSettings copy() {
try {
final String json = OBJECT_MAPPER.writeValueAsString(this);
return OBJECT_MAPPER.readValue(json, CsvReaderSettings.class);
} catch (final JsonProcessingException e) {
throw new IllegalStateException(e);
}
} }
@Override @Override

View File

@@ -77,7 +77,8 @@ class CsvToEntryTransformer {
if (line[0] == comment) { if (line[0] == comment) {
if (lineCounter == 1) { if (lineCounter == 1) {
final String lineAsString = new String(line, StandardCharsets.UTF_8); final String lineAsString = new String(line, offsetInBuffer, length,
StandardCharsets.UTF_8);
final Tags firstLineTags = TagMatchExtractor.extractTags(lineAsString, final Tags firstLineTags = TagMatchExtractor.extractTags(lineAsString,
settings.getFirstLineMatcher()); settings.getFirstLineMatcher());
additionalTags = additionalTags.add(firstLineTags); additionalTags = additionalTags.add(firstLineTags);

View File

@@ -5,8 +5,6 @@ import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.lucares.pdbui.domain.FileDropConfig; import org.lucares.pdbui.domain.FileDropConfig;
import org.lucares.pdbui.domain.FileDropSettings; import org.lucares.pdbui.domain.FileDropSettings;
@@ -52,22 +50,11 @@ public class FileDropConfigProvider {
final Map<String, String> variables = antPathMatcher.extractUriTemplateVariables(settings.match(), final Map<String, String> variables = antPathMatcher.extractUriTemplateVariables(settings.match(),
file); file);
System.out.println("match found " + file + " regex: " + settings.match() + " " + variables); final CsvReaderSettings csvSettings = settings.csvSettings().copy();
final CsvReaderSettings csvSettings = settings.csvSettings();
csvSettings.putAdditionalTag(variables); csvSettings.putAdditionalTag(variables);
return Optional.of(csvSettings); return Optional.of(csvSettings);
} }
} }
return Optional.empty(); return Optional.empty();
} }
public static void main(final String[] args) {
final Matcher matcher = Pattern.compile("(?<source>.+)/(?<pod>.+)/(?<host>[^/]+)/performance.*.csv")
.matcher("web/vapsales01/0f5230761bb8a260e/performance.2020-10-05_000200_2.csv");
if (matcher.find()) {
System.out.println("match found");
} else {
System.out.println("not found");
}
}
} }

View File

@@ -12,12 +12,13 @@ import java.util.List;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@Component @Component
public class FileDropHandler { public class FileDropHandler implements AutoCloseable, DisposableBean {
private static final Logger LOGGER = LoggerFactory.getLogger(FileDropHandler.class); private static final Logger LOGGER = LoggerFactory.getLogger(FileDropHandler.class);
@@ -40,8 +41,6 @@ public class FileDropHandler {
final WatchKey key = watchService.take(); final WatchKey key = watchService.take();
final List<WatchEvent<?>> events = key.pollEvents(); final List<WatchEvent<?>> events = key.pollEvents();
for (final WatchEvent<?> watchEvent : events) { for (final WatchEvent<?> watchEvent : events) {
System.out.printf("Event... kind=%s, count=%d, context=%s Context type=%s%n", watchEvent.kind(),
watchEvent.count(), watchEvent.context(), ((Path) watchEvent.context()).getClass());
final Path file = baseDir.resolve((Path) watchEvent.context()); final Path file = baseDir.resolve((Path) watchEvent.context());
for (final FileDropFileTypeHandler fileHandler : fileHandlers) { for (final FileDropFileTypeHandler fileHandler : fileHandlers) {
@@ -53,15 +52,16 @@ public class FileDropHandler {
fileHandler.getClass(), e); fileHandler.getClass(), e);
} }
} }
}
}
LOGGER.info("done ingesting file {}", file);
} }
key.reset(); key.reset();
} }
} catch (final InterruptedException e) { } catch (final InterruptedException e) {
try { try {
LOGGER.error("close watchService"); LOGGER.error("closing watchService in response to an interrupt");
watchService.close(); watchService.close();
} catch (final IOException e1) { } catch (final IOException e1) {
LOGGER.error("failed to close watchService", e1); LOGGER.error("failed to close watchService", e1);
@@ -72,6 +72,7 @@ public class FileDropHandler {
private final Path baseDir; private final Path baseDir;
private final List<FileDropFileTypeHandler> fileHandlers; private final List<FileDropFileTypeHandler> fileHandlers;
private final FileWatchThread watchThread;
@Autowired @Autowired
public FileDropHandler(@Value("${path.fileDrop}") final String baseDir, public FileDropHandler(@Value("${path.fileDrop}") final String baseDir,
@@ -84,8 +85,22 @@ public class FileDropHandler {
} }
LOGGER.info("file drop location {}", this.baseDir); LOGGER.info("file drop location {}", this.baseDir);
final FileWatchThread watchThread = new FileWatchThread(); watchThread = new FileWatchThread();
watchThread.start(); watchThread.start();
} }
public Path getBaseDir() {
return baseDir;
}
@Override
public void destroy() throws Exception {
close();
}
@Override
public void close() throws Exception {
watchThread.interrupt();
}
} }

View File

@@ -50,7 +50,10 @@ public class FileDropZipHandler implements FileDropFileTypeHandler {
final Optional<CsvReaderSettings> csvSettings = configProvider.provideCsvReaderSettings(entry.getName()); final Optional<CsvReaderSettings> csvSettings = configProvider.provideCsvReaderSettings(entry.getName());
if (csvSettings.isPresent()) { if (csvSettings.isPresent()) {
final ArrayBlockingQueue<Entries> queue = performanceDb.getQueue(); final ArrayBlockingQueue<Entries> queue = performanceDb.getQueue();
final CsvToEntryTransformer csvToEntryTransformer = new CsvToEntryTransformer(queue, csvSettings.get());
final CsvReaderSettings csvReaderSettings = csvSettings.get();
final CsvToEntryTransformer csvToEntryTransformer = new CsvToEntryTransformer(queue, csvReaderSettings);
try (final InputStream inputStream = new BufferedInputStream(zipFile.getInputStream(entry), try (final InputStream inputStream = new BufferedInputStream(zipFile.getInputStream(entry),
1024 * 1024)) { 1024 * 1024)) {
csvToEntryTransformer.readCSV(inputStream); csvToEntryTransformer.readCSV(inputStream);

View File

@@ -1,19 +1,24 @@
package org.lucares.pdbui.domain; package org.lucares.pdbui.domain;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.List; import java.util.List;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
public class FileDropConfig { public class FileDropConfig {
private List<FileDropSettings> settings = new ArrayList<>(); private final List<FileDropSettings> settings = new ArrayList<>();
public List<FileDropSettings> getSettings() { public List<FileDropSettings> getSettings() {
return settings; return settings;
} }
public void setSettings(final List<FileDropSettings> settings) { public void setSettings(final Collection<FileDropSettings> settings) {
this.settings = settings; this.settings.addAll(settings);
}
public void addSettings(final FileDropSettings... dropSettings) {
this.settings.addAll(List.of(dropSettings));
} }
@Override @Override

View File

@@ -2,6 +2,11 @@ package org.lucares.pdbui.domain;
import org.lucares.pdbui.CsvReaderSettings; import org.lucares.pdbui.CsvReaderSettings;
/**
* @param match ant style path matcher, e.g.
* {source}/{pod}/{host}/performance*.csv
* @param csvSettings
*/
public record FileDropSettings(String match, CsvReaderSettings csvSettings) { public record FileDropSettings(String match, CsvReaderSettings csvSettings) {
} }

View File

@@ -0,0 +1,116 @@
package org.lucares.pdbui;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.EnumSet;
import java.util.List;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.lucares.collections.LongList;
import org.lucares.pdb.api.DateTimeRange;
import org.lucares.pdb.api.Query;
import org.lucares.pdb.api.Result;
import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions;
import org.lucares.pdbui.CsvReaderSettings.PostProcessors;
import org.lucares.pdbui.domain.FileDropConfig;
import org.lucares.pdbui.domain.FileDropSettings;
import org.lucares.pdbui.domain.TagMatcher;
import org.lucares.performance.db.PerformanceDb;
import org.lucares.utils.Retry;
import org.lucares.utils.file.FileUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
public class FileDropHandlerTest {
private Path dataDirectory;
@BeforeEach
public void beforeMethod() throws IOException {
dataDirectory = Files.createTempDirectory("pdb");
}
@AfterEach
public void afterMethod() throws IOException {
FileUtils.delete(dataDirectory);
}
@Test
public void testDropZipInFolder() throws Exception {
try (final PerformanceDb db = new PerformanceDb(dataDirectory)) {
final ColumnDefinitions columns = new ColumnDefinitions();
columns.ignoreColumn("user");
columns.ignoreColumn("trace_id");
columns.postProcess("project", EnumSet.of(PostProcessors.LOWER_CASE));
final CsvReaderSettings csvSettings = CsvReaderSettings.create("time", "duration", ";", columns);
csvSettings.addFirstLineMatcher(new TagMatcher("BUILD=([^ ]+)", "build"));
csvSettings.addFirstLineMatcher(new TagMatcher("BRANCH=([^ ]+)", "branch"));
final FileDropSettings setting = new FileDropSettings("{source}/{pod}/{host}/performance.*.csv",
csvSettings);
try (final FileDropHandler handler = createFileDropHandler(db, setting)) {
final Path fileDropBaseDir = handler.getBaseDir();
final Path droppedFile = fileDropBaseDir.resolve("logs.zip");
/**
* The zip contains one file in path /web/testpod/examplehost/ with content
*
* <pre>
* #BUILD=1.2.3 BRANCH=master
* time;duration;method;project;user;trace_id
* 2020-01-01 00:00:00,000;42;Controller.endpoint;customerProject;alice;kaw9dyzi.1
* 2020-01-01 00:00:00,001;43;Processor.endpoint;customerProject;alice;kaw9dyzi.2
* 2020-01-01 00:00:00,001;44;Controller.endpoint;customerProject;alice;kaw9dyzi.2
* </pre>
*/
copyResourceToFile("logs_FileDropHandlerTest_1.zip", droppedFile);
Retry.maxRetries(10).retry(() -> {
final DateTimeRange range = DateTimeRange
.ofDay(OffsetDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC));
final String query = "source=web and method=Controller.endpoint and branch=master and build=1.2.3";
final Result result = db.get(Query.createQuery(query, range));
final LongList values = result.singleGroup().flatMap();
Assertions.assertEquals(values.get(1), 42, "value 1 of " + values);
Assertions.assertEquals(values.get(3), 44, "value 3 of " + values);
Assertions.assertEquals(values.size(), 4);
});
}
}
}
private FileDropHandler createFileDropHandler(final PerformanceDb db, final FileDropSettings... dropSettings)
throws Exception {
final Path fileDropConfigLocation = dataDirectory.resolve("drop.json");
final FileDropConfig config = new FileDropConfig();
config.addSettings(dropSettings);
final ObjectMapper objectMapper = new ObjectMapper();
objectMapper.writeValue(fileDropConfigLocation.toFile(), config);
final FileDropConfigProvider fileDropConfigProvider = new FileDropConfigProvider(
fileDropConfigLocation.toString());
final String fileDropBaseDir = dataDirectory.resolve("drop").toAbsolutePath().toString();
final List<FileDropFileTypeHandler> handlers = List.of(new FileDropZipHandler(db, fileDropConfigProvider));
return new FileDropHandler(fileDropBaseDir, handlers);
}
private void copyResourceToFile(final String string, final Path droppedFile) throws IOException {
try (final InputStream resourceAsStream = this.getClass().getClassLoader()
.getResourceAsStream("logs_FileDropHandlerTest_1.zip");
final FileOutputStream out = new FileOutputStream(droppedFile.toFile());) {
resourceAsStream.transferTo(out);
}
}
}

View File

@@ -0,0 +1,120 @@
package org.lucares.utils;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class Retry {
public interface RetryCallable {
public void call();
}
public static class RetryException extends Exception {
private static final long serialVersionUID = 9178079589334153298L;
public RetryException(final Throwable cause, final Collection<Throwable> suppressed) {
super(cause);
for (final Throwable throwable : suppressed) {
addSuppressed(throwable);
}
}
}
public interface Strategy {
public long apply(final long oldDely);
}
public static class Builder {
int maxRetries = 1;
long delayInMs = 10;
long maxDelayInMs = 1000;
Strategy strategy = d -> d * 2;
public Builder maxRetries(final int maxRetries) {
this.maxRetries = maxRetries;
return this;
}
public Builder delay(final Duration delay) {
this.delayInMs = delay.toMillis();
return this;
}
public Builder maxDelay(final Duration delay) {
this.delayInMs = delay.toMillis();
return this;
}
public Builder constant() {
strategy = d -> d;
return this;
}
public Builder linear(final Duration increment) {
strategy = d -> d + increment.toMillis();
return this;
}
public Builder expontential() {
strategy = d -> d * 2;
return this;
}
public void retry(final RetryCallable retryCallable) throws RetryException, InterruptedException {
final List<Throwable> suppressed = new ArrayList<>();
for (int i = 0; i <= maxRetries; i++) {
try {
retryCallable.call();
break;
} catch (final Throwable e) {
if (i == maxRetries) {
throw new RetryException(e, suppressed);
} else {
suppressed.add(e);
}
TimeUnit.MILLISECONDS.sleep(delayInMs);
delayInMs = strategy.apply(delayInMs);
delayInMs = Math.min(delayInMs, maxDelayInMs);
}
}
}
}
public static void retry(final RetryCallable retryCallable) throws RetryException, InterruptedException {
new Builder().retry(retryCallable);
}
public static Builder maxRetries(final int maxRetries) {
return new Builder().maxRetries(maxRetries);
}
public static Builder maxDelay(final Duration maxDelay) {
return new Builder().maxDelay(maxDelay);
}
public Builder delay(final Duration delay) {
return new Builder().delay(delay);
}
public Builder constant() {
return new Builder().constant();
}
public Builder linear(final Duration increment) {
return new Builder().linear(increment);
}
public Builder expontential() {
return new Builder().expontential();
}
}