diff --git a/pdb-api/src/main/java/org/lucares/pdb/api/Entries.java b/pdb-api/src/main/java/org/lucares/pdb/api/Entries.java index 6c8c8f6..7cd354b 100644 --- a/pdb-api/src/main/java/org/lucares/pdb/api/Entries.java +++ b/pdb-api/src/main/java/org/lucares/pdb/api/Entries.java @@ -5,7 +5,24 @@ import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +/** + * Wrapper for chunk of {@link Entry}s. + *

+ * This class is supposed to be provided to the queue returned by + * PerformanceDb.getQueue(). Processing {@link Entry}s in chunks is more + * efficient than processing each one individually. + *

+ * Optionally, you can request that the entries will be flushed to disk by + * calling {@link #forceFlush()} before adding it to the queue. + *

+ * Optionally, this class can act like a future. This is useful if you have to + * wait until the entries have been processed. Use {@link #forceFlush()} and + * {@link #waitUntilFlushed(long, TimeUnit)}. + */ public class Entries implements Iterable { /** * A special {@link Entries} instance that can be used as poison object for @@ -15,6 +32,10 @@ public class Entries implements Iterable { private final List entries; + private boolean forceFlush = false; + + private CountDownLatch flushLatch = null; + public Entries(final int initialSize) { entries = new ArrayList<>(initialSize); } @@ -39,4 +60,25 @@ public class Entries implements Iterable { public int size() { return entries.size(); } + + public boolean isForceFlush() { + return forceFlush; + } + + public void forceFlush() { + forceFlush = true; + flushLatch = new CountDownLatch(1); + } + + public void waitUntilFlushed(final long timeout, final TimeUnit unit) + throws InterruptedException, TimeoutException { + final boolean finished = flushLatch.await(timeout, unit); + if (!finished) { + throw new TimeoutException(); + } + } + + public void notifyFlushed() { + flushLatch.countDown(); + } } diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java b/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java index 1ddd479..3ee8450 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/CsvToEntryTransformer.java @@ -4,6 +4,8 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.lucares.collections.IntList; import org.lucares.pdb.api.Entries; @@ -26,7 +28,7 @@ class CsvToEntryTransformer { this.settings = settings; } - void readCSV(final InputStream in) throws IOException, InterruptedException { + void readCSV(final InputStream in) throws IOException, InterruptedException, TimeoutException { final int chunksize = 1000; Entries entries = new Entries(chunksize); @@ -93,7 +95,9 @@ class CsvToEntryTransformer { if (entry != null) { entries.add(entry); } + entries.forceFlush(); queue.put(entries); + entries.waitUntilFlushed(5, TimeUnit.MINUTES); } private int[] handleCsvHeaderLine(final byte[] line, final int bytesInLine, final IntList separatorPositions) { diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/CsvUploadHandler.java b/pdb-ui/src/main/java/org/lucares/pdbui/CsvUploadHandler.java index 423d4c9..c70fa80 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/CsvUploadHandler.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/CsvUploadHandler.java @@ -9,9 +9,12 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.lucares.pdb.api.Entries; import org.lucares.performance.db.PerformanceDb; @@ -41,8 +44,9 @@ public class CsvUploadHandler implements PropertyKeys, DisposableBean { this.performanceDb = performanceDb; } - public void ingest(final List files, final CsvReaderSettings settings) - throws IllegalStateException, IOException { + public void ingest(final List files, final CsvReaderSettings settings, + final boolean waitUntilFinished) + throws IllegalStateException, IOException, InterruptedException, ExecutionException, TimeoutException { final List tmpFiles = new ArrayList(); @@ -58,7 +62,7 @@ public class CsvUploadHandler implements PropertyKeys, DisposableBean { throw e; } - threadPool.submit(() -> { + final Future future = threadPool.submit(() -> { final ArrayBlockingQueue queue = performanceDb.getQueue(); for (final Path tmpFile : tmpFiles) { try { @@ -74,8 +78,10 @@ public class CsvUploadHandler implements PropertyKeys, DisposableBean { LOGGER.error("csv ingestion failed", e); } } - queue.add(Entries.POISON); }); + if (waitUntilFinished) { + future.get(1, TimeUnit.HOURS); + } } @Override diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java b/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java index 795ce26..72f3ae7 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/IngestionHandler.java @@ -12,6 +12,7 @@ import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; +import java.util.concurrent.TimeoutException; import java.util.zip.GZIPInputStream; import org.lucares.pdb.api.Entries; @@ -50,7 +51,7 @@ public final class IngestionHandler implements Callable { return null; } - private void handleInputStream(final InputStream in) throws IOException, InterruptedException { + private void handleInputStream(final InputStream in) throws IOException, InterruptedException, TimeoutException { in.mark(1); final byte firstByte = (byte) in.read(); if (firstByte == '{') { diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/PdbController.java b/pdb-ui/src/main/java/org/lucares/pdbui/PdbController.java index 4747358..d4ca225 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/PdbController.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/PdbController.java @@ -12,7 +12,9 @@ import java.util.Locale; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; @@ -327,10 +329,11 @@ public class PdbController implements HardcodedValues, PropertyKeys { @ResponseBody @ResponseStatus(code = HttpStatus.CREATED) public String handleCsvFileUpload(@RequestParam("file") final MultipartFile[] files, - @RequestPart("settings") final CsvReaderSettings csvReaderSettings) - throws IllegalStateException, IOException { + @RequestPart("settings") final CsvReaderSettings csvReaderSettings, + @RequestParam(name = "waitUntilFinished", defaultValue = "false") final boolean waitUntilFinished) + throws IllegalStateException, IOException, InterruptedException, ExecutionException, TimeoutException { - csvUploadHandler.ingest(List.of(files), csvReaderSettings); + csvUploadHandler.ingest(List.of(files), csvReaderSettings, waitUntilFinished); return ""; // return value might become a job id that can be used to cancel, or observe // status } diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/CsvToEntryTransformerTest.java b/pdb-ui/src/test/java/org/lucares/pdbui/CsvToEntryTransformerTest.java index 989160e..489645e 100644 --- a/pdb-ui/src/test/java/org/lucares/pdbui/CsvToEntryTransformerTest.java +++ b/pdb-ui/src/test/java/org/lucares/pdbui/CsvToEntryTransformerTest.java @@ -10,8 +10,10 @@ import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeoutException; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.lucares.collections.LongList; @@ -19,7 +21,6 @@ import org.lucares.pdb.api.DateTimeRange; import org.lucares.pdb.api.Entries; import org.lucares.pdb.api.Query; import org.lucares.performance.db.PerformanceDb; -import org.junit.jupiter.api.Assertions; import org.lucares.utils.file.FileUtils; public class CsvToEntryTransformerTest { @@ -37,7 +38,7 @@ public class CsvToEntryTransformerTest { } @Test - public void testIngest() throws IOException, InterruptedException { + public void testIngest() throws IOException, InterruptedException, TimeoutException { final OffsetDateTime dateA = OffsetDateTime.now(); final OffsetDateTime dateB = OffsetDateTime.now(); @@ -75,9 +76,10 @@ public class CsvToEntryTransformerTest { * * @throws IOException * @throws InterruptedException + * @throws TimeoutException */ @Test - public void testIgnoreColumns() throws IOException, InterruptedException { + public void testIgnoreColumns() throws IOException, InterruptedException, TimeoutException { try (final PerformanceDb db = new PerformanceDb(dataDirectory)) { diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/PdbControllerTest.java b/pdb-ui/src/test/java/org/lucares/pdbui/PdbControllerTest.java index eedb2ae..53af5ef 100644 --- a/pdb-ui/src/test/java/org/lucares/pdbui/PdbControllerTest.java +++ b/pdb-ui/src/test/java/org/lucares/pdbui/PdbControllerTest.java @@ -4,10 +4,10 @@ import java.time.OffsetDateTime; import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.util.List; -import java.util.concurrent.TimeUnit; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.core.config.Configurator; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.lucares.collections.LongList; @@ -15,7 +15,6 @@ import org.lucares.pdb.api.DateTimeRange; import org.lucares.pdb.api.GroupResult; import org.lucares.pdb.api.Query; import org.lucares.performance.db.PerformanceDb; -import org.junit.jupiter.api.Assertions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; @@ -60,7 +59,6 @@ public class PdbControllerTest { final CsvReaderSettings settings = CsvReaderSettings.create(timeColumn, ',', ignoredColumn); uploadCsv(settings, csv); - TimeUnit.SECONDS.sleep(1); { final GroupResult groupResult = performanceDb.get(new Query("tag=tagValue", DateTimeRange.ofDay(dateA))) .singleGroup(); @@ -96,7 +94,8 @@ public class PdbControllerTest { final HttpEntity> entity = new HttpEntity>( parameters, headers); - final ResponseEntity response = rest.exchange("/data", HttpMethod.POST, entity, String.class); + final ResponseEntity response = rest.exchange("/data?waitUntilFinished=true", HttpMethod.POST, entity, + String.class); Assertions.assertEquals(response.getStatusCode(), HttpStatus.CREATED, "response status"); } diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java index 20f7cf8..20a7b2b 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java @@ -134,6 +134,12 @@ public class PerformanceDb implements AutoCloseable { LOGGER.info("", e); } } + + if (entries.isForceFlush()) { + LOGGER.info("flush triggered via entries.isForceFlush()"); + dataStore.flush(); + entries.notifyFlushed(); + } } } catch (final RuntimeException e) {