From 85679ca0c82788689ba55ef2ca506a6c94751486 Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Sun, 8 Dec 2019 18:39:34 +0100 Subject: [PATCH] send CSV file via REST --- .../lucares/pdb/diskstorage/DiskStorage.java | 12 +-- .../org/lucares/utils/file/FileUtils.java | 10 +++ pdb-ui/build.gradle | 1 + .../org/lucares/pdbui/CsvUploadHandler.java | 87 +++++++++++++++++++ .../java/org/lucares/pdbui/PdbController.java | 20 ++++- .../java/org/lucares/pdbui/TcpIngestor.java | 39 +++------ .../java/org/lucares/pdbui/CsvResource.java | 20 +++++ .../pdbui/CsvToEntryTransformerTest.java | 9 +- .../pdbui/MySpringTestConfiguration.java | 29 ++++++- .../org/lucares/pdbui/PdbControllerTest.java | 85 ++++++++++++++++++ .../java/org/lucares/pdbui/PdbTestUtil.java | 28 +++++- .../lucares/performance/db/PerformanceDb.java | 42 ++++++++- 12 files changed, 334 insertions(+), 48 deletions(-) create mode 100644 pdb-ui/src/main/java/org/lucares/pdbui/CsvUploadHandler.java create mode 100644 pdb-ui/src/test/java/org/lucares/pdbui/CsvResource.java create mode 100644 pdb-ui/src/test/java/org/lucares/pdbui/PdbControllerTest.java diff --git a/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskStorage.java b/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskStorage.java index 2b6fee9..e70d5a5 100644 --- a/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskStorage.java +++ b/block-storage/src/main/java/org/lucares/pdb/diskstorage/DiskStorage.java @@ -25,9 +25,9 @@ public class DiskStorage implements AutoCloseable { private final FileChannel fileChannel; - private Path relativeDatabaseFileForLogging; + private final Path relativeDatabaseFileForLogging; - public DiskStorage(final Path databaseFile, Path storageBasePath) { + public DiskStorage(final Path databaseFile, final Path storageBasePath) { this.relativeDatabaseFileForLogging = storageBasePath != null ? storageBasePath.relativize(databaseFile) : databaseFile; try { @@ -66,10 +66,12 @@ public class DiskStorage implements AutoCloseable { } @Override - public void close() { + public synchronized void close() { try { - fileChannel.force(true); - fileChannel.close(); + if (fileChannel.isOpen()) { + fileChannel.force(true); + fileChannel.close(); + } } catch (final IOException e) { throw new DiskStorageException(e); } diff --git a/file-utils/src/main/java/org/lucares/utils/file/FileUtils.java b/file-utils/src/main/java/org/lucares/utils/file/FileUtils.java index a73ca6e..68709e6 100644 --- a/file-utils/src/main/java/org/lucares/utils/file/FileUtils.java +++ b/file-utils/src/main/java/org/lucares/utils/file/FileUtils.java @@ -38,6 +38,16 @@ public class FileUtils { } } + public static void deleteSilently(final Iterable paths) { + for (final Path path : paths) { + try { + delete(path); + } catch (final Exception e) { + LOGGER.info("failed to delete {}", path, e); + } + } + } + public static void delete(final Path path) { final int maxAttempts = 10; diff --git a/pdb-ui/build.gradle b/pdb-ui/build.gradle index c255a41..9a1d04c 100644 --- a/pdb-ui/build.gradle +++ b/pdb-ui/build.gradle @@ -14,6 +14,7 @@ dependencies { implementation project(':pdb-plotting') implementation project(':pdb-js') implementation project(':pdb-utils') + implementation project(':file-utils') implementation lib_commons_lang3 implementation lib_primitive_collections diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/CsvUploadHandler.java b/pdb-ui/src/main/java/org/lucares/pdbui/CsvUploadHandler.java new file mode 100644 index 0000000..423d4c9 --- /dev/null +++ b/pdb-ui/src/main/java/org/lucares/pdbui/CsvUploadHandler.java @@ -0,0 +1,87 @@ +package org.lucares.pdbui; + +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.lucares.pdb.api.Entries; +import org.lucares.performance.db.PerformanceDb; +import org.lucares.utils.file.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import org.springframework.web.multipart.MultipartFile; + +@Component +public class CsvUploadHandler implements PropertyKeys, DisposableBean { + + private static final Logger LOGGER = LoggerFactory.getLogger(CsvUploadHandler.class); + + private final Path tmpDir; + + private final ExecutorService threadPool = Executors.newFixedThreadPool(2); + + private final PerformanceDb performanceDb; + + public CsvUploadHandler(@Value("${" + TMP_DIR + "}") final String tmpDir, final PerformanceDb performanceDb) + throws IOException { + this.tmpDir = Paths.get(tmpDir).resolve("uploads"); + Files.createDirectories(this.tmpDir); + this.performanceDb = performanceDb; + } + + public void ingest(final List files, final CsvReaderSettings settings) + throws IllegalStateException, IOException { + + final List tmpFiles = new ArrayList(); + + try { + for (final MultipartFile file : files) { + final Path tmpFile = tmpDir.resolve(UUID.randomUUID().toString()); + tmpFiles.add(tmpFile); + LOGGER.info("writing uploaded file to {}", tmpFile); + file.transferTo(tmpFile); + } + } catch (RuntimeException | IOException e) { + FileUtils.deleteSilently(tmpFiles); + throw e; + } + + threadPool.submit(() -> { + final ArrayBlockingQueue queue = performanceDb.getQueue(); + for (final Path tmpFile : tmpFiles) { + try { + final CsvToEntryTransformer csvToEntryTransformer = new CsvToEntryTransformer(queue, settings); + try (FileInputStream in = new FileInputStream(tmpFile.toFile())) { + csvToEntryTransformer.readCSV(in); + } + + LOGGER.info("delete uploaded file {}", tmpFile); + Files.delete(tmpFile); + + } catch (final Exception e) { + LOGGER.error("csv ingestion failed", e); + } + } + queue.add(Entries.POISON); + }); + } + + @Override + public void destroy() throws Exception { + threadPool.shutdown(); + LOGGER.info("awaiting termination ..."); + threadPool.awaitTermination(10, TimeUnit.MINUTES); + } +} diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/PdbController.java b/pdb-ui/src/main/java/org/lucares/pdbui/PdbController.java index b5fda27..9fee162 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/PdbController.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/PdbController.java @@ -42,17 +42,21 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Controller; import org.springframework.util.StreamUtils; import org.springframework.web.bind.annotation.CrossOrigin; import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; +import org.springframework.web.bind.annotation.ResponseStatus; +import org.springframework.web.multipart.MultipartFile; import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody; import com.fasterxml.jackson.core.JsonParseException; @@ -83,9 +87,12 @@ public class PdbController implements HardcodedValues, PropertyKeys { @Value("${" + DEFAULTS_SPLIT_BY + ":}") private String defaultsSplitBy; - public PdbController(final PerformanceDb db, final Plotter plotter) { + private final CsvUploadHandler csvUploadHandler; + + public PdbController(final PerformanceDb db, final Plotter plotter, final CsvUploadHandler csvUploadHandler) { this.db = db; this.plotter = plotter; + this.csvUploadHandler = csvUploadHandler; } @RequestMapping(path = "/plots", // @@ -315,4 +322,15 @@ public class PdbController implements HardcodedValues, PropertyKeys { return result; } + @PostMapping(path = "/data") + @ResponseBody + @ResponseStatus(code = HttpStatus.CREATED) + public String handleCsvFileUpload(@RequestParam("file") final MultipartFile[] files) + throws IllegalStateException, IOException { + + csvUploadHandler.ingest(List.of(files), new CsvReaderSettings(',')); + return ""; // return value might become a job id that can be used to cancel, or observe + // status + } + } diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java b/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java index b69ef20..5cbe3aa 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java @@ -14,7 +14,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.PreDestroy; import org.lucares.pdb.api.Entries; -import org.lucares.performance.db.BlockingQueueIterator; import org.lucares.performance.db.PerformanceDb; import org.lucares.recommind.logs.Config; import org.slf4j.Logger; @@ -32,7 +31,7 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { private final AtomicBoolean acceptNewConnections = new AtomicBoolean(true); - private final ExecutorService serverThreadPool = Executors.newFixedThreadPool(2); + private final ExecutorService serverThreadPool = Executors.newFixedThreadPool(1); private final ExecutorService workerThreadPool = Executors.newCachedThreadPool(); @@ -57,27 +56,10 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { @Override public void start() throws Exception { - final ArrayBlockingQueue queue = new ArrayBlockingQueue<>(10); - - serverThreadPool.submit(() -> { - Thread.currentThread().setName("db-ingestion"); - - boolean finished = false; - while (!finished) { - try { - db.putEntries(new BlockingQueueIterator<>(queue, Entries.POISON)); - finished = true; - } catch (final Exception e) { - LOGGER.warn("Write to database failed. Will retry with the next element.", e); - } - } - return null; - }); - - serverThreadPool.submit(() -> listen(queue)); + serverThreadPool.submit(() -> listen()); } - private Void listen(final ArrayBlockingQueue queue) throws IOException { + private Void listen() throws IOException { Thread.currentThread().setName("socket-listener"); try (ServerSocket serverSocket = new ServerSocket(PORT);) { LOGGER.info("listening on port " + PORT); @@ -89,6 +71,7 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { final Socket clientSocket = serverSocket.accept(); LOGGER.debug("accepted connection: " + clientSocket.getRemoteSocketAddress()); + final ArrayBlockingQueue queue = db.getQueue(); workerThreadPool.submit(new IngestionHandler(clientSocket, queue)); LOGGER.debug("handler submitted"); } catch (final SocketTimeoutException e) { @@ -109,12 +92,13 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { } catch (final InterruptedException e) { Thread.interrupted(); } - LOGGER.debug("adding poison"); - queue.put(Entries.POISON); - } catch (final InterruptedException e) { - LOGGER.info("Listener thread interrupted. Likely while adding the poison. " - + "That would mean that the db-ingestion thread will not terminate. "); - Thread.interrupted(); +// LOGGER.debug("adding poison"); +// final ArrayBlockingQueue queue = db.getQueue(); +// queue.put(Entries.POISON); +// } catch (final InterruptedException e) { +// LOGGER.info("Listener thread interrupted. Likely while adding the poison. " +// + "That would mean that the db-ingestion thread will not terminate. "); +// Thread.interrupted(); } catch (final Exception e) { LOGGER.error("", e); throw e; @@ -138,7 +122,6 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { } catch (final InterruptedException e) { Thread.interrupted(); } - LOGGER.debug("closing database"); db.close(); LOGGER.info("destroyed"); } diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/CsvResource.java b/pdb-ui/src/test/java/org/lucares/pdbui/CsvResource.java new file mode 100644 index 0000000..350f42a --- /dev/null +++ b/pdb-ui/src/test/java/org/lucares/pdbui/CsvResource.java @@ -0,0 +1,20 @@ +package org.lucares.pdbui; + +import java.nio.charset.StandardCharsets; + +import org.springframework.core.io.ByteArrayResource; + +public class CsvResource extends ByteArrayResource { + + private final String filename; + + public CsvResource(final String csv, final String filename) { + super(csv.getBytes(StandardCharsets.UTF_8)); + this.filename = filename; + } + + @Override + public String getFilename() { + return filename; + } +} diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/CsvToEntryTransformerTest.java b/pdb-ui/src/test/java/org/lucares/pdbui/CsvToEntryTransformerTest.java index 75b0114..b5d9cae 100644 --- a/pdb-ui/src/test/java/org/lucares/pdbui/CsvToEntryTransformerTest.java +++ b/pdb-ui/src/test/java/org/lucares/pdbui/CsvToEntryTransformerTest.java @@ -15,7 +15,6 @@ import org.lucares.collections.LongList; import org.lucares.pdb.api.DateTimeRange; import org.lucares.pdb.api.Entries; import org.lucares.pdb.api.Query; -import org.lucares.performance.db.BlockingQueueIterator; import org.lucares.performance.db.PerformanceDb; import org.lucares.utils.file.FileUtils; import org.testng.Assert; @@ -48,13 +47,11 @@ public class CsvToEntryTransformerTest { + dateA.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",1,tagValue\n"// + dateB.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",2,tagValue\n"; - final ArrayBlockingQueue queue = new ArrayBlockingQueue(10); + final ArrayBlockingQueue queue = db.getQueue(); final CsvReaderSettings settings = new CsvReaderSettings(','); final CsvToEntryTransformer csvToEntryTransformer = new CsvToEntryTransformer(queue, settings); csvToEntryTransformer.readCSV(new ByteArrayInputStream(csv.getBytes(StandardCharsets.UTF_8))); queue.put(Entries.POISON); - - db.putEntries(new BlockingQueueIterator(queue, Entries.POISON)); } try (PerformanceDb db = new PerformanceDb(dataDirectory)) { @@ -87,13 +84,11 @@ public class CsvToEntryTransformerTest { + "2000-01-01T00:00:00.000Z,1,ignoreValue,ignoreValue,tagValue\n"// + "2000-01-01T00:00:00.001Z,2,ignoreValue,ignoreValue,tagValue\n"; - final ArrayBlockingQueue queue = new ArrayBlockingQueue(10); + final ArrayBlockingQueue queue = db.getQueue(); final CsvReaderSettings settings = new CsvReaderSettings(',', "ignoredColumn"); final CsvToEntryTransformer csvToEntryTransformer = new CsvToEntryTransformer(queue, settings); csvToEntryTransformer.readCSV(new ByteArrayInputStream(csv.getBytes(StandardCharsets.UTF_8))); queue.put(Entries.POISON); - - db.putEntries(new BlockingQueueIterator(queue, Entries.POISON)); } try (PerformanceDb db = new PerformanceDb(dataDirectory)) { diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/MySpringTestConfiguration.java b/pdb-ui/src/test/java/org/lucares/pdbui/MySpringTestConfiguration.java index 17e0f14..80e1242 100644 --- a/pdb-ui/src/test/java/org/lucares/pdbui/MySpringTestConfiguration.java +++ b/pdb-ui/src/test/java/org/lucares/pdbui/MySpringTestConfiguration.java @@ -1,11 +1,32 @@ package org.lucares.pdbui; -import org.lucares.pdbui.MySpringConfiguration; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Import; +import java.nio.file.Path; +import java.util.Properties; -@SpringBootTest +import org.lucares.utils.file.FileUtils; +import org.springframework.context.ApplicationContextInitializer; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.core.env.PropertiesPropertySource; + +@Configuration @Import(MySpringConfiguration.class) public class MySpringTestConfiguration { } + +class TestOverrides implements ApplicationContextInitializer { + @Override + public void initialize(final ConfigurableApplicationContext applicationContext) { + final Properties props = new Properties(); + + final Path tmpdir = Path.of(System.getProperty("java.io.tmpdir")).resolve("pdb-test"); + + FileUtils.delete(tmpdir); + + props.put("base.dir", tmpdir.toFile().getAbsolutePath()); + final PropertiesPropertySource testOverrides = new PropertiesPropertySource("testOverrides", props); + applicationContext.getEnvironment().getPropertySources().addFirst(testOverrides); + } +} \ No newline at end of file diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/PdbControllerTest.java b/pdb-ui/src/test/java/org/lucares/pdbui/PdbControllerTest.java new file mode 100644 index 0000000..b84c93f --- /dev/null +++ b/pdb-ui/src/test/java/org/lucares/pdbui/PdbControllerTest.java @@ -0,0 +1,85 @@ +package org.lucares.pdbui; + +import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.lucares.collections.LongList; +import org.lucares.pdb.api.DateTimeRange; +import org.lucares.pdb.api.GroupResult; +import org.lucares.pdb.api.Query; +import org.lucares.performance.db.PerformanceDb; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +@RunWith(SpringRunner.class) +@ContextConfiguration(initializers = TestOverrides.class) +@SpringBootTest(classes = MySpringTestConfiguration.class, webEnvironment = WebEnvironment.RANDOM_PORT) +public class PdbControllerTest { + + @Autowired + private PerformanceDb performanceDb; + + @Autowired + private TestRestTemplate rest; + + @Test + public void testUploadCsv() { + + final OffsetDateTime dateA = OffsetDateTime.now(); + final OffsetDateTime dateB = OffsetDateTime.now(); + + final String csv = "@timestamp,duration,tag\n"// + + dateA.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",1,tagValue\n"// + + dateB.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",2,tagValue\n"; + + uploadCsv(csv); + + final GroupResult groupResult = performanceDb.get(new Query("tag=tagValue", DateTimeRange.ofDay(dateA))) + .singleGroup(); + final LongList result = groupResult.flatMap(); + System.out.println(PdbTestUtil.timeValueLongListToString(result)); + Assert.assertEquals(result.size(), 4); + + Assert.assertEquals(result.get(0), dateA.toInstant().toEpochMilli()); + Assert.assertEquals(result.get(1), 1); + + Assert.assertEquals(result.get(2), dateB.toInstant().truncatedTo(ChronoUnit.MILLIS).toEpochMilli()); + Assert.assertEquals(result.get(3), 2); + } + + private void uploadCsv(final String... csvs) { + // final TestRestTemplate rest = new TestRestTemplate(); + final LinkedMultiValueMap parameters = new LinkedMultiValueMap(); + int count = 0; + for (final String csv : csvs) { + parameters.add("file", new CsvResource(csv, count++ + ".csv")); + } + + final HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.MULTIPART_FORM_DATA); + + final HttpEntity> entity = new HttpEntity>( + parameters, headers); + + final ResponseEntity response = rest.exchange("/data", HttpMethod.POST, entity, String.class); + + Assert.assertEquals("response status", HttpStatus.CREATED, response.getStatusCode()); + } + +} diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/PdbTestUtil.java b/pdb-ui/src/test/java/org/lucares/pdbui/PdbTestUtil.java index f80457f..7b34d40 100644 --- a/pdb-ui/src/test/java/org/lucares/pdbui/PdbTestUtil.java +++ b/pdb-ui/src/test/java/org/lucares/pdbui/PdbTestUtil.java @@ -9,6 +9,10 @@ import java.nio.channels.SocketChannel; import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -21,7 +25,7 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import org.lucares.pdbui.TcpIngestor; +import org.lucares.collections.LongList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,7 +63,7 @@ public class PdbTestUtil { sendAsCsv(keys, entries); } - public static final void sendAsCsv(Collection keys, final Collection> entries) + public static final void sendAsCsv(final Collection keys, final Collection> entries) throws IOException, InterruptedException { final StringBuilder csv = new StringBuilder(); @@ -177,4 +181,24 @@ public class PdbTestUtil { return result; } + public static String timeValueLongListToString(final LongList timeValueLongList) { + final StringBuilder result = new StringBuilder(); + + int i = 0; + while (i < timeValueLongList.size()) { + final OffsetDateTime time = OffsetDateTime.ofInstant(Instant.ofEpochMilli(timeValueLongList.get(i)), + ZoneOffset.UTC); + i++; + final long value = timeValueLongList.get(i); + i++; + + result.append(time.format(DateTimeFormatter.ISO_DATE_TIME)); + result.append("="); + result.append(value); + result.append("\n"); + } + + return result.toString(); + } + } diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java index 6aa9f4e..20f7cf8 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java @@ -8,6 +8,10 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Optional; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import org.lucares.collections.LongList; @@ -34,10 +38,36 @@ public class PerformanceDb implements AutoCloseable { private final static Logger METRICS_LOGGER = LoggerFactory.getLogger("org.lucares.metrics.ingestion.block"); private final DataStore dataStore; + private final ExecutorService serverThreadPool = Executors.newFixedThreadPool(1); + private final ArrayBlockingQueue queue; public PerformanceDb(final Path dataDirectory) throws IOException { + queue = new ArrayBlockingQueue<>(10); dataStore = new DataStore(dataDirectory); + startThread(); + } + + public ArrayBlockingQueue getQueue() { + return queue; + } + + private void startThread() { + serverThreadPool.submit(() -> { + Thread.currentThread().setName("db-ingestion"); + + // TODO move error handling to putEntries + boolean finished = false; + while (!finished) { + try { + putEntries(new BlockingQueueIterator<>(queue, Entries.POISON)); + finished = true; + } catch (final Exception e) { + LOGGER.warn("Write to database failed. Will retry with the next element.", e); + } + } + return null; + }); } @@ -56,7 +86,7 @@ public class PerformanceDb implements AutoCloseable { putEntries(iterator); } - public void putEntries(final BlockingIterator entriesIterator) throws WriteException { + void putEntries(final BlockingIterator entriesIterator) throws WriteException { final Duration timeBetweenSyncs = Duration.ofSeconds(1); long count = 0; @@ -162,6 +192,16 @@ public class PerformanceDb implements AutoCloseable { @Override public void close() { try { + LOGGER.debug("adding poison"); + queue.put(Entries.POISON); + + serverThreadPool.shutdown(); + try { + serverThreadPool.awaitTermination(10, TimeUnit.MINUTES); + } catch (final InterruptedException e) { + Thread.interrupted(); + } + dataStore.close(); } catch (final Exception e) { LOGGER.error("failed to close PerformanceDB", e);