From 825bac24b903007456ccdbdacd5c47c60101ac25 Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Sat, 7 Aug 2021 13:31:44 +0200 Subject: [PATCH] add file drop handler You can define a folder and ingest files dropped into it. --- .../org/lucares/pdbui/CsvReaderSettings.java | 28 +++- .../lucares/pdbui/CsvToEntryTransformer.java | 3 +- .../lucares/pdbui/FileDropConfigProvider.java | 15 +-- .../org/lucares/pdbui/FileDropHandler.java | 27 +++- .../org/lucares/pdbui/FileDropZipHandler.java | 5 +- .../lucares/pdbui/domain/FileDropConfig.java | 11 +- .../pdbui/domain/FileDropSettings.java | 5 + .../lucares/pdbui/FileDropHandlerTest.java | 116 +++++++++++++++++ .../resources/logs_FileDropHandlerTest_1.zip | Bin 0 -> 821 bytes .../main/java/org/lucares/utils/Retry.java | 120 ++++++++++++++++++ 10 files changed, 301 insertions(+), 29 deletions(-) create mode 100644 pdb-ui/src/test/java/org/lucares/pdbui/FileDropHandlerTest.java create mode 100644 pdb-ui/src/test/resources/logs_FileDropHandlerTest_1.zip create mode 100644 pdb-utils/src/main/java/org/lucares/utils/Retry.java diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/CsvReaderSettings.java b/pdb-ui/src/main/java/org/lucares/pdbui/CsvReaderSettings.java index 152d231..fdeceba 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/CsvReaderSettings.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/CsvReaderSettings.java @@ -2,6 +2,7 @@ package org.lucares.pdbui; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; @@ -12,8 +13,13 @@ import java.util.function.Function; import org.lucares.pdbui.domain.TagMatcher; import org.lucares.utils.Preconditions; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + public final class CsvReaderSettings { + private final static ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public static String stripPrefixDefault(final String value) { if (value.startsWith("Default")) { return value.replaceFirst("Default", ""); @@ -169,7 +175,7 @@ public final class CsvReaderSettings { private String comment = "#"; - private List firstLineMatcher = new ArrayList<>(); + private final List firstLineMatcher = new ArrayList<>(); public CsvReaderSettings() { this("@timestamp", "duration", ",", new ColumnDefinitions()); @@ -244,7 +250,7 @@ public final class CsvReaderSettings { } public void putAdditionalTag(final Map additionalTags) { - additionalTags.putAll(additionalTags); + this.additionalTags.putAll(additionalTags); } public Map getAdditionalTags() { @@ -267,8 +273,22 @@ public final class CsvReaderSettings { return firstLineMatcher; } - public void setFirstLineMatcher(final List firstLineMatcher) { - this.firstLineMatcher = firstLineMatcher; + public void setFirstLineMatcher(final Collection firstLineMatchers) { + this.firstLineMatcher.clear(); + this.firstLineMatcher.addAll(firstLineMatchers); + } + + public void addFirstLineMatcher(final TagMatcher tagMatcher) { + this.firstLineMatcher.add(tagMatcher); + } + + public CsvReaderSettings copy() { + try { + final String json = OBJECT_MAPPER.writeValueAsString(this); + return OBJECT_MAPPER.readValue(json, CsvReaderSettings.class); + } catch (final JsonProcessingException e) { + throw new IllegalStateException(e); + } } @Override diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java b/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java index c818615..15bc9b7 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java @@ -77,7 +77,8 @@ class CsvToEntryTransformer { if (line[0] == comment) { if (lineCounter == 1) { - final String lineAsString = new String(line, StandardCharsets.UTF_8); + final String lineAsString = new String(line, offsetInBuffer, length, + StandardCharsets.UTF_8); final Tags firstLineTags = TagMatchExtractor.extractTags(lineAsString, settings.getFirstLineMatcher()); additionalTags = additionalTags.add(firstLineTags); diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/FileDropConfigProvider.java b/pdb-ui/src/main/java/org/lucares/pdbui/FileDropConfigProvider.java index f2ff44a..6112c0d 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/FileDropConfigProvider.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/FileDropConfigProvider.java @@ -5,8 +5,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Map; import java.util.Optional; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import org.lucares.pdbui.domain.FileDropConfig; import org.lucares.pdbui.domain.FileDropSettings; @@ -52,22 +50,11 @@ public class FileDropConfigProvider { final Map variables = antPathMatcher.extractUriTemplateVariables(settings.match(), file); - System.out.println("match found " + file + " regex: " + settings.match() + " " + variables); - final CsvReaderSettings csvSettings = settings.csvSettings(); + final CsvReaderSettings csvSettings = settings.csvSettings().copy(); csvSettings.putAdditionalTag(variables); return Optional.of(csvSettings); } } return Optional.empty(); } - - public static void main(final String[] args) { - final Matcher matcher = Pattern.compile("(?.+)/(?.+)/(?[^/]+)/performance.*.csv") - .matcher("web/vapsales01/0f5230761bb8a260e/performance.2020-10-05_000200_2.csv"); - if (matcher.find()) { - System.out.println("match found"); - } else { - System.out.println("not found"); - } - } } diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/FileDropHandler.java b/pdb-ui/src/main/java/org/lucares/pdbui/FileDropHandler.java index 4cd5f40..978c7df 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/FileDropHandler.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/FileDropHandler.java @@ -12,12 +12,13 @@ import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @Component -public class FileDropHandler { +public class FileDropHandler implements AutoCloseable, DisposableBean { private static final Logger LOGGER = LoggerFactory.getLogger(FileDropHandler.class); @@ -40,8 +41,6 @@ public class FileDropHandler { final WatchKey key = watchService.take(); final List> events = key.pollEvents(); for (final WatchEvent watchEvent : events) { - System.out.printf("Event... kind=%s, count=%d, context=%s Context type=%s%n", watchEvent.kind(), - watchEvent.count(), watchEvent.context(), ((Path) watchEvent.context()).getClass()); final Path file = baseDir.resolve((Path) watchEvent.context()); for (final FileDropFileTypeHandler fileHandler : fileHandlers) { @@ -53,15 +52,16 @@ public class FileDropHandler { fileHandler.getClass(), e); } } - } + } + LOGGER.info("done ingesting file {}", file); } key.reset(); } } catch (final InterruptedException e) { try { - LOGGER.error("close watchService"); + LOGGER.error("closing watchService in response to an interrupt"); watchService.close(); } catch (final IOException e1) { LOGGER.error("failed to close watchService", e1); @@ -72,6 +72,7 @@ public class FileDropHandler { private final Path baseDir; private final List fileHandlers; + private final FileWatchThread watchThread; @Autowired public FileDropHandler(@Value("${path.fileDrop}") final String baseDir, @@ -84,8 +85,22 @@ public class FileDropHandler { } LOGGER.info("file drop location {}", this.baseDir); - final FileWatchThread watchThread = new FileWatchThread(); + watchThread = new FileWatchThread(); watchThread.start(); } + public Path getBaseDir() { + return baseDir; + } + + @Override + public void destroy() throws Exception { + close(); + } + + @Override + public void close() throws Exception { + watchThread.interrupt(); + } + } diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/FileDropZipHandler.java b/pdb-ui/src/main/java/org/lucares/pdbui/FileDropZipHandler.java index 4332920..6e7553d 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/FileDropZipHandler.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/FileDropZipHandler.java @@ -50,7 +50,10 @@ public class FileDropZipHandler implements FileDropFileTypeHandler { final Optional csvSettings = configProvider.provideCsvReaderSettings(entry.getName()); if (csvSettings.isPresent()) { final ArrayBlockingQueue queue = performanceDb.getQueue(); - final CsvToEntryTransformer csvToEntryTransformer = new CsvToEntryTransformer(queue, csvSettings.get()); + + final CsvReaderSettings csvReaderSettings = csvSettings.get(); + + final CsvToEntryTransformer csvToEntryTransformer = new CsvToEntryTransformer(queue, csvReaderSettings); try (final InputStream inputStream = new BufferedInputStream(zipFile.getInputStream(entry), 1024 * 1024)) { csvToEntryTransformer.readCSV(inputStream); diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/domain/FileDropConfig.java b/pdb-ui/src/main/java/org/lucares/pdbui/domain/FileDropConfig.java index 75a7462..b4893a0 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/domain/FileDropConfig.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/domain/FileDropConfig.java @@ -1,19 +1,24 @@ package org.lucares.pdbui.domain; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import org.apache.commons.lang3.StringUtils; public class FileDropConfig { - private List settings = new ArrayList<>(); + private final List settings = new ArrayList<>(); public List getSettings() { return settings; } - public void setSettings(final List settings) { - this.settings = settings; + public void setSettings(final Collection settings) { + this.settings.addAll(settings); + } + + public void addSettings(final FileDropSettings... dropSettings) { + this.settings.addAll(List.of(dropSettings)); } @Override diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/domain/FileDropSettings.java b/pdb-ui/src/main/java/org/lucares/pdbui/domain/FileDropSettings.java index 3be517b..f005ee1 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/domain/FileDropSettings.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/domain/FileDropSettings.java @@ -2,6 +2,11 @@ package org.lucares.pdbui.domain; import org.lucares.pdbui.CsvReaderSettings; +/** + * @param match ant style path matcher, e.g. + * {source}/{pod}/{host}/performance*.csv + * @param csvSettings + */ public record FileDropSettings(String match, CsvReaderSettings csvSettings) { } diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/FileDropHandlerTest.java b/pdb-ui/src/test/java/org/lucares/pdbui/FileDropHandlerTest.java new file mode 100644 index 0000000..b08890a --- /dev/null +++ b/pdb-ui/src/test/java/org/lucares/pdbui/FileDropHandlerTest.java @@ -0,0 +1,116 @@ +package org.lucares.pdbui; + +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.EnumSet; +import java.util.List; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.lucares.collections.LongList; +import org.lucares.pdb.api.DateTimeRange; +import org.lucares.pdb.api.Query; +import org.lucares.pdb.api.Result; +import org.lucares.pdbui.CsvReaderSettings.ColumnDefinitions; +import org.lucares.pdbui.CsvReaderSettings.PostProcessors; +import org.lucares.pdbui.domain.FileDropConfig; +import org.lucares.pdbui.domain.FileDropSettings; +import org.lucares.pdbui.domain.TagMatcher; +import org.lucares.performance.db.PerformanceDb; +import org.lucares.utils.Retry; +import org.lucares.utils.file.FileUtils; + +import com.fasterxml.jackson.databind.ObjectMapper; + +public class FileDropHandlerTest { + + private Path dataDirectory; + + @BeforeEach + public void beforeMethod() throws IOException { + dataDirectory = Files.createTempDirectory("pdb"); + } + + @AfterEach + public void afterMethod() throws IOException { + FileUtils.delete(dataDirectory); + } + + @Test + public void testDropZipInFolder() throws Exception { + try (final PerformanceDb db = new PerformanceDb(dataDirectory)) { + + final ColumnDefinitions columns = new ColumnDefinitions(); + columns.ignoreColumn("user"); + columns.ignoreColumn("trace_id"); + columns.postProcess("project", EnumSet.of(PostProcessors.LOWER_CASE)); + final CsvReaderSettings csvSettings = CsvReaderSettings.create("time", "duration", ";", columns); + csvSettings.addFirstLineMatcher(new TagMatcher("BUILD=([^ ]+)", "build")); + csvSettings.addFirstLineMatcher(new TagMatcher("BRANCH=([^ ]+)", "branch")); + final FileDropSettings setting = new FileDropSettings("{source}/{pod}/{host}/performance.*.csv", + csvSettings); + try (final FileDropHandler handler = createFileDropHandler(db, setting)) { + + final Path fileDropBaseDir = handler.getBaseDir(); + final Path droppedFile = fileDropBaseDir.resolve("logs.zip"); + + /** + * The zip contains one file in path /web/testpod/examplehost/ with content + * + *
+                 * #BUILD=1.2.3 BRANCH=master 
+                 * time;duration;method;project;user;trace_id
+                 * 2020-01-01 00:00:00,000;42;Controller.endpoint;customerProject;alice;kaw9dyzi.1
+                 * 2020-01-01 00:00:00,001;43;Processor.endpoint;customerProject;alice;kaw9dyzi.2
+                 * 2020-01-01 00:00:00,001;44;Controller.endpoint;customerProject;alice;kaw9dyzi.2
+                 * 
+ */ + copyResourceToFile("logs_FileDropHandlerTest_1.zip", droppedFile); + + Retry.maxRetries(10).retry(() -> { + final DateTimeRange range = DateTimeRange + .ofDay(OffsetDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC)); + final String query = "source=web and method=Controller.endpoint and branch=master and build=1.2.3"; + final Result result = db.get(Query.createQuery(query, range)); + final LongList values = result.singleGroup().flatMap(); + Assertions.assertEquals(values.get(1), 42, "value 1 of " + values); + Assertions.assertEquals(values.get(3), 44, "value 3 of " + values); + Assertions.assertEquals(values.size(), 4); + }); + } + } + } + + private FileDropHandler createFileDropHandler(final PerformanceDb db, final FileDropSettings... dropSettings) + throws Exception { + final Path fileDropConfigLocation = dataDirectory.resolve("drop.json"); + + final FileDropConfig config = new FileDropConfig(); + config.addSettings(dropSettings); + + final ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.writeValue(fileDropConfigLocation.toFile(), config); + + final FileDropConfigProvider fileDropConfigProvider = new FileDropConfigProvider( + fileDropConfigLocation.toString()); + final String fileDropBaseDir = dataDirectory.resolve("drop").toAbsolutePath().toString(); + final List handlers = List.of(new FileDropZipHandler(db, fileDropConfigProvider)); + return new FileDropHandler(fileDropBaseDir, handlers); + } + + private void copyResourceToFile(final String string, final Path droppedFile) throws IOException { + + try (final InputStream resourceAsStream = this.getClass().getClassLoader() + .getResourceAsStream("logs_FileDropHandlerTest_1.zip"); + final FileOutputStream out = new FileOutputStream(droppedFile.toFile());) { + resourceAsStream.transferTo(out); + } + } +} diff --git a/pdb-ui/src/test/resources/logs_FileDropHandlerTest_1.zip b/pdb-ui/src/test/resources/logs_FileDropHandlerTest_1.zip new file mode 100644 index 0000000000000000000000000000000000000000..9d0d773a3ef7250d780fd9123e91b5cdfc1ebe49 GIT binary patch literal 821 zcmWIWW@Zs#0D)V{0fAr!lwbkU<*7;f0U#C3Ai-P3s494%DoRp|OA7K+kW@ZLQz?O_ zGPNQxw;(4qBfq!=u8V_V)dZ&C|FzEYbAhT&85tNX@#rc@ElSHT%1z8mPSrCqFf!0J zFa)A_0|Ox2KrgwtZ0fm)XOsHSPYy~A#= zE6ehxL;L6T8?W`heRtQSou|6j*sc3^cG)K@!%Ho5-8a}~mKipzh@2bW@NHwz!mN)K!1aP0E7a^0vCeFq;$_wlp^3v58;`L2=3h6T=-qPcj^x@f<`q bT$%d>L#1qfFHlVlkXGcW)Ea?<7< literal 0 HcmV?d00001 diff --git a/pdb-utils/src/main/java/org/lucares/utils/Retry.java b/pdb-utils/src/main/java/org/lucares/utils/Retry.java new file mode 100644 index 0000000..576895a --- /dev/null +++ b/pdb-utils/src/main/java/org/lucares/utils/Retry.java @@ -0,0 +1,120 @@ +package org.lucares.utils; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class Retry { + + public interface RetryCallable { + public void call(); + } + + public static class RetryException extends Exception { + + private static final long serialVersionUID = 9178079589334153298L; + + public RetryException(final Throwable cause, final Collection suppressed) { + super(cause); + + for (final Throwable throwable : suppressed) { + addSuppressed(throwable); + } + } + + } + + public interface Strategy { + public long apply(final long oldDely); + } + + public static class Builder { + int maxRetries = 1; + long delayInMs = 10; + long maxDelayInMs = 1000; + Strategy strategy = d -> d * 2; + + public Builder maxRetries(final int maxRetries) { + this.maxRetries = maxRetries; + return this; + } + + public Builder delay(final Duration delay) { + this.delayInMs = delay.toMillis(); + return this; + } + + public Builder maxDelay(final Duration delay) { + this.delayInMs = delay.toMillis(); + return this; + } + + public Builder constant() { + strategy = d -> d; + return this; + } + + public Builder linear(final Duration increment) { + strategy = d -> d + increment.toMillis(); + return this; + } + + public Builder expontential() { + strategy = d -> d * 2; + return this; + } + + public void retry(final RetryCallable retryCallable) throws RetryException, InterruptedException { + + final List suppressed = new ArrayList<>(); + + for (int i = 0; i <= maxRetries; i++) { + try { + retryCallable.call(); + break; + } catch (final Throwable e) { + if (i == maxRetries) { + throw new RetryException(e, suppressed); + } else { + suppressed.add(e); + } + + TimeUnit.MILLISECONDS.sleep(delayInMs); + delayInMs = strategy.apply(delayInMs); + delayInMs = Math.min(delayInMs, maxDelayInMs); + } + } + } + } + + public static void retry(final RetryCallable retryCallable) throws RetryException, InterruptedException { + new Builder().retry(retryCallable); + } + + public static Builder maxRetries(final int maxRetries) { + return new Builder().maxRetries(maxRetries); + } + + public static Builder maxDelay(final Duration maxDelay) { + return new Builder().maxDelay(maxDelay); + } + + public Builder delay(final Duration delay) { + return new Builder().delay(delay); + } + + public Builder constant() { + return new Builder().constant(); + } + + public Builder linear(final Duration increment) { + return new Builder().linear(increment); + } + + public Builder expontential() { + return new Builder().expontential(); + } + +}