From 550d7ba44e01db230147e3e81aeb59b38c30782b Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Fri, 13 Dec 2019 18:05:20 +0100 Subject: [PATCH] add flag to make CSV upload wait until entries are flushed To make it easier/possible to write stable unit test the CSV upload can optionally wait until all entries have been flushed to disk. This is necessary for tests that ingest data and then read the data. --- .../java/org/lucares/pdb/api/Entries.java | 42 +++++++++++++++++++ .../lucares/pdbui/CsvToEntryTransformer.java | 6 ++- .../org/lucares/pdbui/CsvUploadHandler.java | 14 +++++-- .../org/lucares/pdbui/IngestionHandler.java | 3 +- .../java/org/lucares/pdbui/PdbController.java | 9 ++-- .../pdbui/CsvToEntryTransformerTest.java | 8 ++-- .../org/lucares/pdbui/PdbControllerTest.java | 7 ++-- .../lucares/performance/db/PerformanceDb.java | 6 +++ 8 files changed, 79 insertions(+), 16 deletions(-) diff --git a/pdb-api/src/main/java/org/lucares/pdb/api/Entries.java b/pdb-api/src/main/java/org/lucares/pdb/api/Entries.java index 6c8c8f6..7cd354b 100644 --- a/pdb-api/src/main/java/org/lucares/pdb/api/Entries.java +++ b/pdb-api/src/main/java/org/lucares/pdb/api/Entries.java @@ -5,7 +5,24 @@ import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +/** + * Wrapper for chunk of {@link Entry}s. + *

+ * This class is supposed to be provided to the queue returned by + * PerformanceDb.getQueue(). Processing {@link Entry}s in chunks is more + * efficient than processing each one individually. + *

+ * Optionally, you can request that the entries will be flushed to disk by + * calling {@link #forceFlush()} before adding it to the queue. + *

+ * Optionally, this class can act like a future. This is useful if you have to + * wait until the entries have been processed. Use {@link #forceFlush()} and + * {@link #waitUntilFlushed(long, TimeUnit)}. + */ public class Entries implements Iterable { /** * A special {@link Entries} instance that can be used as poison object for @@ -15,6 +32,10 @@ public class Entries implements Iterable { private final List entries; + private boolean forceFlush = false; + + private CountDownLatch flushLatch = null; + public Entries(final int initialSize) { entries = new ArrayList<>(initialSize); } @@ -39,4 +60,25 @@ public class Entries implements Iterable { public int size() { return entries.size(); } + + public boolean isForceFlush() { + return forceFlush; + } + + public void forceFlush() { + forceFlush = true; + flushLatch = new CountDownLatch(1); + } + + public void waitUntilFlushed(final long timeout, final TimeUnit unit) + throws InterruptedException, TimeoutException { + final boolean finished = flushLatch.await(timeout, unit); + if (!finished) { + throw new TimeoutException(); + } + } + + public void notifyFlushed() { + flushLatch.countDown(); + } } diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java b/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java index 1ddd479..3ee8450 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java @@ -4,6 +4,8 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.lucares.collections.IntList; import org.lucares.pdb.api.Entries; @@ -26,7 +28,7 @@ class CsvToEntryTransformer { this.settings = settings; } - void readCSV(final InputStream in) throws IOException, InterruptedException { + void readCSV(final InputStream in) throws IOException, InterruptedException, TimeoutException { final int chunksize = 1000; Entries entries = new Entries(chunksize); @@ -93,7 +95,9 @@ class CsvToEntryTransformer { if (entry != null) { entries.add(entry); } + entries.forceFlush(); queue.put(entries); + entries.waitUntilFlushed(5, TimeUnit.MINUTES); } private int[] handleCsvHeaderLine(final byte[] line, final int bytesInLine, final IntList separatorPositions) { diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/CsvUploadHandler.java b/pdb-ui/src/main/java/org/lucares/pdbui/CsvUploadHandler.java index 423d4c9..c70fa80 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/CsvUploadHandler.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/CsvUploadHandler.java @@ -9,9 +9,12 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.lucares.pdb.api.Entries; import org.lucares.performance.db.PerformanceDb; @@ -41,8 +44,9 @@ public class CsvUploadHandler implements PropertyKeys, DisposableBean { this.performanceDb = performanceDb; } - public void ingest(final List files, final CsvReaderSettings settings) - throws IllegalStateException, IOException { + public void ingest(final List files, final CsvReaderSettings settings, + final boolean waitUntilFinished) + throws IllegalStateException, IOException, InterruptedException, ExecutionException, TimeoutException { final List tmpFiles = new ArrayList(); @@ -58,7 +62,7 @@ public class CsvUploadHandler implements PropertyKeys, DisposableBean { throw e; } - threadPool.submit(() -> { + final Future future = threadPool.submit(() -> { final ArrayBlockingQueue queue = performanceDb.getQueue(); for (final Path tmpFile : tmpFiles) { try { @@ -74,8 +78,10 @@ public class CsvUploadHandler implements PropertyKeys, DisposableBean { LOGGER.error("csv ingestion failed", e); } } - queue.add(Entries.POISON); }); + if (waitUntilFinished) { + future.get(1, TimeUnit.HOURS); + } } @Override diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java b/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java index 795ce26..72f3ae7 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java @@ -12,6 +12,7 @@ import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; +import java.util.concurrent.TimeoutException; import java.util.zip.GZIPInputStream; import org.lucares.pdb.api.Entries; @@ -50,7 +51,7 @@ public final class IngestionHandler implements Callable { return null; } - private void handleInputStream(final InputStream in) throws IOException, InterruptedException { + private void handleInputStream(final InputStream in) throws IOException, InterruptedException, TimeoutException { in.mark(1); final byte firstByte = (byte) in.read(); if (firstByte == '{') { diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/PdbController.java b/pdb-ui/src/main/java/org/lucares/pdbui/PdbController.java index 4747358..d4ca225 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/PdbController.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/PdbController.java @@ -12,7 +12,9 @@ import java.util.Locale; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; @@ -327,10 +329,11 @@ public class PdbController implements HardcodedValues, PropertyKeys { @ResponseBody @ResponseStatus(code = HttpStatus.CREATED) public String handleCsvFileUpload(@RequestParam("file") final MultipartFile[] files, - @RequestPart("settings") final CsvReaderSettings csvReaderSettings) - throws IllegalStateException, IOException { + @RequestPart("settings") final CsvReaderSettings csvReaderSettings, + @RequestParam(name = "waitUntilFinished", defaultValue = "false") final boolean waitUntilFinished) + throws IllegalStateException, IOException, InterruptedException, ExecutionException, TimeoutException { - csvUploadHandler.ingest(List.of(files), csvReaderSettings); + csvUploadHandler.ingest(List.of(files), csvReaderSettings, waitUntilFinished); return ""; // return value might become a job id that can be used to cancel, or observe // status } diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/CsvToEntryTransformerTest.java b/pdb-ui/src/test/java/org/lucares/pdbui/CsvToEntryTransformerTest.java index 989160e..489645e 100644 --- a/pdb-ui/src/test/java/org/lucares/pdbui/CsvToEntryTransformerTest.java +++ b/pdb-ui/src/test/java/org/lucares/pdbui/CsvToEntryTransformerTest.java @@ -10,8 +10,10 @@ import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeoutException; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.lucares.collections.LongList; @@ -19,7 +21,6 @@ import org.lucares.pdb.api.DateTimeRange; import org.lucares.pdb.api.Entries; import org.lucares.pdb.api.Query; import org.lucares.performance.db.PerformanceDb; -import org.junit.jupiter.api.Assertions; import org.lucares.utils.file.FileUtils; public class CsvToEntryTransformerTest { @@ -37,7 +38,7 @@ public class CsvToEntryTransformerTest { } @Test - public void testIngest() throws IOException, InterruptedException { + public void testIngest() throws IOException, InterruptedException, TimeoutException { final OffsetDateTime dateA = OffsetDateTime.now(); final OffsetDateTime dateB = OffsetDateTime.now(); @@ -75,9 +76,10 @@ public class CsvToEntryTransformerTest { * * @throws IOException * @throws InterruptedException + * @throws TimeoutException */ @Test - public void testIgnoreColumns() throws IOException, InterruptedException { + public void testIgnoreColumns() throws IOException, InterruptedException, TimeoutException { try (final PerformanceDb db = new PerformanceDb(dataDirectory)) { diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/PdbControllerTest.java b/pdb-ui/src/test/java/org/lucares/pdbui/PdbControllerTest.java index eedb2ae..53af5ef 100644 --- a/pdb-ui/src/test/java/org/lucares/pdbui/PdbControllerTest.java +++ b/pdb-ui/src/test/java/org/lucares/pdbui/PdbControllerTest.java @@ -4,10 +4,10 @@ import java.time.OffsetDateTime; import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.util.List; -import java.util.concurrent.TimeUnit; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.core.config.Configurator; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.lucares.collections.LongList; @@ -15,7 +15,6 @@ import org.lucares.pdb.api.DateTimeRange; import org.lucares.pdb.api.GroupResult; import org.lucares.pdb.api.Query; import org.lucares.performance.db.PerformanceDb; -import org.junit.jupiter.api.Assertions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; @@ -60,7 +59,6 @@ public class PdbControllerTest { final CsvReaderSettings settings = CsvReaderSettings.create(timeColumn, ',', ignoredColumn); uploadCsv(settings, csv); - TimeUnit.SECONDS.sleep(1); { final GroupResult groupResult = performanceDb.get(new Query("tag=tagValue", DateTimeRange.ofDay(dateA))) .singleGroup(); @@ -96,7 +94,8 @@ public class PdbControllerTest { final HttpEntity> entity = new HttpEntity>( parameters, headers); - final ResponseEntity response = rest.exchange("/data", HttpMethod.POST, entity, String.class); + final ResponseEntity response = rest.exchange("/data?waitUntilFinished=true", HttpMethod.POST, entity, + String.class); Assertions.assertEquals(response.getStatusCode(), HttpStatus.CREATED, "response status"); } diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java index 20f7cf8..20a7b2b 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java @@ -134,6 +134,12 @@ public class PerformanceDb implements AutoCloseable { LOGGER.info("", e); } } + + if (entries.isForceFlush()) { + LOGGER.info("flush triggered via entries.isForceFlush()"); + dataStore.flush(); + entries.notifyFlushed(); + } } } catch (final RuntimeException e) {