send CSV file via REST
This commit is contained in:
@@ -25,9 +25,9 @@ public class DiskStorage implements AutoCloseable {
|
|||||||
|
|
||||||
private final FileChannel fileChannel;
|
private final FileChannel fileChannel;
|
||||||
|
|
||||||
private Path relativeDatabaseFileForLogging;
|
private final Path relativeDatabaseFileForLogging;
|
||||||
|
|
||||||
public DiskStorage(final Path databaseFile, Path storageBasePath) {
|
public DiskStorage(final Path databaseFile, final Path storageBasePath) {
|
||||||
this.relativeDatabaseFileForLogging = storageBasePath != null ? storageBasePath.relativize(databaseFile)
|
this.relativeDatabaseFileForLogging = storageBasePath != null ? storageBasePath.relativize(databaseFile)
|
||||||
: databaseFile;
|
: databaseFile;
|
||||||
try {
|
try {
|
||||||
@@ -66,10 +66,12 @@ public class DiskStorage implements AutoCloseable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public synchronized void close() {
|
||||||
try {
|
try {
|
||||||
fileChannel.force(true);
|
if (fileChannel.isOpen()) {
|
||||||
fileChannel.close();
|
fileChannel.force(true);
|
||||||
|
fileChannel.close();
|
||||||
|
}
|
||||||
} catch (final IOException e) {
|
} catch (final IOException e) {
|
||||||
throw new DiskStorageException(e);
|
throw new DiskStorageException(e);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -38,6 +38,16 @@ public class FileUtils {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void deleteSilently(final Iterable<Path> paths) {
|
||||||
|
for (final Path path : paths) {
|
||||||
|
try {
|
||||||
|
delete(path);
|
||||||
|
} catch (final Exception e) {
|
||||||
|
LOGGER.info("failed to delete {}", path, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static void delete(final Path path) {
|
public static void delete(final Path path) {
|
||||||
|
|
||||||
final int maxAttempts = 10;
|
final int maxAttempts = 10;
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ dependencies {
|
|||||||
implementation project(':pdb-plotting')
|
implementation project(':pdb-plotting')
|
||||||
implementation project(':pdb-js')
|
implementation project(':pdb-js')
|
||||||
implementation project(':pdb-utils')
|
implementation project(':pdb-utils')
|
||||||
|
implementation project(':file-utils')
|
||||||
|
|
||||||
implementation lib_commons_lang3
|
implementation lib_commons_lang3
|
||||||
implementation lib_primitive_collections
|
implementation lib_primitive_collections
|
||||||
|
|||||||
87
pdb-ui/src/main/java/org/lucares/pdbui/CsvUploadHandler.java
Normal file
87
pdb-ui/src/main/java/org/lucares/pdbui/CsvUploadHandler.java
Normal file
@@ -0,0 +1,87 @@
|
|||||||
|
package org.lucares.pdbui;
|
||||||
|
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.nio.file.Paths;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.lucares.pdb.api.Entries;
|
||||||
|
import org.lucares.performance.db.PerformanceDb;
|
||||||
|
import org.lucares.utils.file.FileUtils;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.beans.factory.DisposableBean;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.springframework.web.multipart.MultipartFile;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
public class CsvUploadHandler implements PropertyKeys, DisposableBean {
|
||||||
|
|
||||||
|
private static final Logger LOGGER = LoggerFactory.getLogger(CsvUploadHandler.class);
|
||||||
|
|
||||||
|
private final Path tmpDir;
|
||||||
|
|
||||||
|
private final ExecutorService threadPool = Executors.newFixedThreadPool(2);
|
||||||
|
|
||||||
|
private final PerformanceDb performanceDb;
|
||||||
|
|
||||||
|
public CsvUploadHandler(@Value("${" + TMP_DIR + "}") final String tmpDir, final PerformanceDb performanceDb)
|
||||||
|
throws IOException {
|
||||||
|
this.tmpDir = Paths.get(tmpDir).resolve("uploads");
|
||||||
|
Files.createDirectories(this.tmpDir);
|
||||||
|
this.performanceDb = performanceDb;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void ingest(final List<MultipartFile> files, final CsvReaderSettings settings)
|
||||||
|
throws IllegalStateException, IOException {
|
||||||
|
|
||||||
|
final List<Path> tmpFiles = new ArrayList<Path>();
|
||||||
|
|
||||||
|
try {
|
||||||
|
for (final MultipartFile file : files) {
|
||||||
|
final Path tmpFile = tmpDir.resolve(UUID.randomUUID().toString());
|
||||||
|
tmpFiles.add(tmpFile);
|
||||||
|
LOGGER.info("writing uploaded file to {}", tmpFile);
|
||||||
|
file.transferTo(tmpFile);
|
||||||
|
}
|
||||||
|
} catch (RuntimeException | IOException e) {
|
||||||
|
FileUtils.deleteSilently(tmpFiles);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
|
||||||
|
threadPool.submit(() -> {
|
||||||
|
final ArrayBlockingQueue<Entries> queue = performanceDb.getQueue();
|
||||||
|
for (final Path tmpFile : tmpFiles) {
|
||||||
|
try {
|
||||||
|
final CsvToEntryTransformer csvToEntryTransformer = new CsvToEntryTransformer(queue, settings);
|
||||||
|
try (FileInputStream in = new FileInputStream(tmpFile.toFile())) {
|
||||||
|
csvToEntryTransformer.readCSV(in);
|
||||||
|
}
|
||||||
|
|
||||||
|
LOGGER.info("delete uploaded file {}", tmpFile);
|
||||||
|
Files.delete(tmpFile);
|
||||||
|
|
||||||
|
} catch (final Exception e) {
|
||||||
|
LOGGER.error("csv ingestion failed", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
queue.add(Entries.POISON);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void destroy() throws Exception {
|
||||||
|
threadPool.shutdown();
|
||||||
|
LOGGER.info("awaiting termination ...");
|
||||||
|
threadPool.awaitTermination(10, TimeUnit.MINUTES);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -42,17 +42,21 @@ import org.slf4j.Logger;
|
|||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||||
|
import org.springframework.http.HttpStatus;
|
||||||
import org.springframework.http.MediaType;
|
import org.springframework.http.MediaType;
|
||||||
import org.springframework.http.ResponseEntity;
|
import org.springframework.http.ResponseEntity;
|
||||||
import org.springframework.stereotype.Controller;
|
import org.springframework.stereotype.Controller;
|
||||||
import org.springframework.util.StreamUtils;
|
import org.springframework.util.StreamUtils;
|
||||||
import org.springframework.web.bind.annotation.CrossOrigin;
|
import org.springframework.web.bind.annotation.CrossOrigin;
|
||||||
import org.springframework.web.bind.annotation.PathVariable;
|
import org.springframework.web.bind.annotation.PathVariable;
|
||||||
|
import org.springframework.web.bind.annotation.PostMapping;
|
||||||
import org.springframework.web.bind.annotation.RequestBody;
|
import org.springframework.web.bind.annotation.RequestBody;
|
||||||
import org.springframework.web.bind.annotation.RequestMapping;
|
import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
import org.springframework.web.bind.annotation.RequestMethod;
|
import org.springframework.web.bind.annotation.RequestMethod;
|
||||||
import org.springframework.web.bind.annotation.RequestParam;
|
import org.springframework.web.bind.annotation.RequestParam;
|
||||||
import org.springframework.web.bind.annotation.ResponseBody;
|
import org.springframework.web.bind.annotation.ResponseBody;
|
||||||
|
import org.springframework.web.bind.annotation.ResponseStatus;
|
||||||
|
import org.springframework.web.multipart.MultipartFile;
|
||||||
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
|
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonParseException;
|
import com.fasterxml.jackson.core.JsonParseException;
|
||||||
@@ -83,9 +87,12 @@ public class PdbController implements HardcodedValues, PropertyKeys {
|
|||||||
@Value("${" + DEFAULTS_SPLIT_BY + ":}")
|
@Value("${" + DEFAULTS_SPLIT_BY + ":}")
|
||||||
private String defaultsSplitBy;
|
private String defaultsSplitBy;
|
||||||
|
|
||||||
public PdbController(final PerformanceDb db, final Plotter plotter) {
|
private final CsvUploadHandler csvUploadHandler;
|
||||||
|
|
||||||
|
public PdbController(final PerformanceDb db, final Plotter plotter, final CsvUploadHandler csvUploadHandler) {
|
||||||
this.db = db;
|
this.db = db;
|
||||||
this.plotter = plotter;
|
this.plotter = plotter;
|
||||||
|
this.csvUploadHandler = csvUploadHandler;
|
||||||
}
|
}
|
||||||
|
|
||||||
@RequestMapping(path = "/plots", //
|
@RequestMapping(path = "/plots", //
|
||||||
@@ -315,4 +322,15 @@ public class PdbController implements HardcodedValues, PropertyKeys {
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@PostMapping(path = "/data")
|
||||||
|
@ResponseBody
|
||||||
|
@ResponseStatus(code = HttpStatus.CREATED)
|
||||||
|
public String handleCsvFileUpload(@RequestParam("file") final MultipartFile[] files)
|
||||||
|
throws IllegalStateException, IOException {
|
||||||
|
|
||||||
|
csvUploadHandler.ingest(List.of(files), new CsvReaderSettings(','));
|
||||||
|
return ""; // return value might become a job id that can be used to cancel, or observe
|
||||||
|
// status
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,7 +14,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||||||
import javax.annotation.PreDestroy;
|
import javax.annotation.PreDestroy;
|
||||||
|
|
||||||
import org.lucares.pdb.api.Entries;
|
import org.lucares.pdb.api.Entries;
|
||||||
import org.lucares.performance.db.BlockingQueueIterator;
|
|
||||||
import org.lucares.performance.db.PerformanceDb;
|
import org.lucares.performance.db.PerformanceDb;
|
||||||
import org.lucares.recommind.logs.Config;
|
import org.lucares.recommind.logs.Config;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@@ -32,7 +31,7 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean {
|
|||||||
|
|
||||||
private final AtomicBoolean acceptNewConnections = new AtomicBoolean(true);
|
private final AtomicBoolean acceptNewConnections = new AtomicBoolean(true);
|
||||||
|
|
||||||
private final ExecutorService serverThreadPool = Executors.newFixedThreadPool(2);
|
private final ExecutorService serverThreadPool = Executors.newFixedThreadPool(1);
|
||||||
|
|
||||||
private final ExecutorService workerThreadPool = Executors.newCachedThreadPool();
|
private final ExecutorService workerThreadPool = Executors.newCachedThreadPool();
|
||||||
|
|
||||||
@@ -57,27 +56,10 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean {
|
|||||||
@Override
|
@Override
|
||||||
public void start() throws Exception {
|
public void start() throws Exception {
|
||||||
|
|
||||||
final ArrayBlockingQueue<Entries> queue = new ArrayBlockingQueue<>(10);
|
serverThreadPool.submit(() -> listen());
|
||||||
|
|
||||||
serverThreadPool.submit(() -> {
|
|
||||||
Thread.currentThread().setName("db-ingestion");
|
|
||||||
|
|
||||||
boolean finished = false;
|
|
||||||
while (!finished) {
|
|
||||||
try {
|
|
||||||
db.putEntries(new BlockingQueueIterator<>(queue, Entries.POISON));
|
|
||||||
finished = true;
|
|
||||||
} catch (final Exception e) {
|
|
||||||
LOGGER.warn("Write to database failed. Will retry with the next element.", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
});
|
|
||||||
|
|
||||||
serverThreadPool.submit(() -> listen(queue));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private Void listen(final ArrayBlockingQueue<Entries> queue) throws IOException {
|
private Void listen() throws IOException {
|
||||||
Thread.currentThread().setName("socket-listener");
|
Thread.currentThread().setName("socket-listener");
|
||||||
try (ServerSocket serverSocket = new ServerSocket(PORT);) {
|
try (ServerSocket serverSocket = new ServerSocket(PORT);) {
|
||||||
LOGGER.info("listening on port " + PORT);
|
LOGGER.info("listening on port " + PORT);
|
||||||
@@ -89,6 +71,7 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean {
|
|||||||
final Socket clientSocket = serverSocket.accept();
|
final Socket clientSocket = serverSocket.accept();
|
||||||
LOGGER.debug("accepted connection: " + clientSocket.getRemoteSocketAddress());
|
LOGGER.debug("accepted connection: " + clientSocket.getRemoteSocketAddress());
|
||||||
|
|
||||||
|
final ArrayBlockingQueue<Entries> queue = db.getQueue();
|
||||||
workerThreadPool.submit(new IngestionHandler(clientSocket, queue));
|
workerThreadPool.submit(new IngestionHandler(clientSocket, queue));
|
||||||
LOGGER.debug("handler submitted");
|
LOGGER.debug("handler submitted");
|
||||||
} catch (final SocketTimeoutException e) {
|
} catch (final SocketTimeoutException e) {
|
||||||
@@ -109,12 +92,13 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean {
|
|||||||
} catch (final InterruptedException e) {
|
} catch (final InterruptedException e) {
|
||||||
Thread.interrupted();
|
Thread.interrupted();
|
||||||
}
|
}
|
||||||
LOGGER.debug("adding poison");
|
// LOGGER.debug("adding poison");
|
||||||
queue.put(Entries.POISON);
|
// final ArrayBlockingQueue<Entries> queue = db.getQueue();
|
||||||
} catch (final InterruptedException e) {
|
// queue.put(Entries.POISON);
|
||||||
LOGGER.info("Listener thread interrupted. Likely while adding the poison. "
|
// } catch (final InterruptedException e) {
|
||||||
+ "That would mean that the db-ingestion thread will not terminate. ");
|
// LOGGER.info("Listener thread interrupted. Likely while adding the poison. "
|
||||||
Thread.interrupted();
|
// + "That would mean that the db-ingestion thread will not terminate. ");
|
||||||
|
// Thread.interrupted();
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
LOGGER.error("", e);
|
LOGGER.error("", e);
|
||||||
throw e;
|
throw e;
|
||||||
@@ -138,7 +122,6 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean {
|
|||||||
} catch (final InterruptedException e) {
|
} catch (final InterruptedException e) {
|
||||||
Thread.interrupted();
|
Thread.interrupted();
|
||||||
}
|
}
|
||||||
LOGGER.debug("closing database");
|
|
||||||
db.close();
|
db.close();
|
||||||
LOGGER.info("destroyed");
|
LOGGER.info("destroyed");
|
||||||
}
|
}
|
||||||
|
|||||||
20
pdb-ui/src/test/java/org/lucares/pdbui/CsvResource.java
Normal file
20
pdb-ui/src/test/java/org/lucares/pdbui/CsvResource.java
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
package org.lucares.pdbui;
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
|
import org.springframework.core.io.ByteArrayResource;
|
||||||
|
|
||||||
|
public class CsvResource extends ByteArrayResource {
|
||||||
|
|
||||||
|
private final String filename;
|
||||||
|
|
||||||
|
public CsvResource(final String csv, final String filename) {
|
||||||
|
super(csv.getBytes(StandardCharsets.UTF_8));
|
||||||
|
this.filename = filename;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getFilename() {
|
||||||
|
return filename;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -15,7 +15,6 @@ import org.lucares.collections.LongList;
|
|||||||
import org.lucares.pdb.api.DateTimeRange;
|
import org.lucares.pdb.api.DateTimeRange;
|
||||||
import org.lucares.pdb.api.Entries;
|
import org.lucares.pdb.api.Entries;
|
||||||
import org.lucares.pdb.api.Query;
|
import org.lucares.pdb.api.Query;
|
||||||
import org.lucares.performance.db.BlockingQueueIterator;
|
|
||||||
import org.lucares.performance.db.PerformanceDb;
|
import org.lucares.performance.db.PerformanceDb;
|
||||||
import org.lucares.utils.file.FileUtils;
|
import org.lucares.utils.file.FileUtils;
|
||||||
import org.testng.Assert;
|
import org.testng.Assert;
|
||||||
@@ -48,13 +47,11 @@ public class CsvToEntryTransformerTest {
|
|||||||
+ dateA.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",1,tagValue\n"//
|
+ dateA.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",1,tagValue\n"//
|
||||||
+ dateB.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",2,tagValue\n";
|
+ dateB.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",2,tagValue\n";
|
||||||
|
|
||||||
final ArrayBlockingQueue<Entries> queue = new ArrayBlockingQueue<Entries>(10);
|
final ArrayBlockingQueue<Entries> queue = db.getQueue();
|
||||||
final CsvReaderSettings settings = new CsvReaderSettings(',');
|
final CsvReaderSettings settings = new CsvReaderSettings(',');
|
||||||
final CsvToEntryTransformer csvToEntryTransformer = new CsvToEntryTransformer(queue, settings);
|
final CsvToEntryTransformer csvToEntryTransformer = new CsvToEntryTransformer(queue, settings);
|
||||||
csvToEntryTransformer.readCSV(new ByteArrayInputStream(csv.getBytes(StandardCharsets.UTF_8)));
|
csvToEntryTransformer.readCSV(new ByteArrayInputStream(csv.getBytes(StandardCharsets.UTF_8)));
|
||||||
queue.put(Entries.POISON);
|
queue.put(Entries.POISON);
|
||||||
|
|
||||||
db.putEntries(new BlockingQueueIterator<Entries>(queue, Entries.POISON));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
|
try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
|
||||||
@@ -87,13 +84,11 @@ public class CsvToEntryTransformerTest {
|
|||||||
+ "2000-01-01T00:00:00.000Z,1,ignoreValue,ignoreValue,tagValue\n"//
|
+ "2000-01-01T00:00:00.000Z,1,ignoreValue,ignoreValue,tagValue\n"//
|
||||||
+ "2000-01-01T00:00:00.001Z,2,ignoreValue,ignoreValue,tagValue\n";
|
+ "2000-01-01T00:00:00.001Z,2,ignoreValue,ignoreValue,tagValue\n";
|
||||||
|
|
||||||
final ArrayBlockingQueue<Entries> queue = new ArrayBlockingQueue<Entries>(10);
|
final ArrayBlockingQueue<Entries> queue = db.getQueue();
|
||||||
final CsvReaderSettings settings = new CsvReaderSettings(',', "ignoredColumn");
|
final CsvReaderSettings settings = new CsvReaderSettings(',', "ignoredColumn");
|
||||||
final CsvToEntryTransformer csvToEntryTransformer = new CsvToEntryTransformer(queue, settings);
|
final CsvToEntryTransformer csvToEntryTransformer = new CsvToEntryTransformer(queue, settings);
|
||||||
csvToEntryTransformer.readCSV(new ByteArrayInputStream(csv.getBytes(StandardCharsets.UTF_8)));
|
csvToEntryTransformer.readCSV(new ByteArrayInputStream(csv.getBytes(StandardCharsets.UTF_8)));
|
||||||
queue.put(Entries.POISON);
|
queue.put(Entries.POISON);
|
||||||
|
|
||||||
db.putEntries(new BlockingQueueIterator<Entries>(queue, Entries.POISON));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
|
try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
|
||||||
|
|||||||
@@ -1,11 +1,32 @@
|
|||||||
package org.lucares.pdbui;
|
package org.lucares.pdbui;
|
||||||
|
|
||||||
import org.lucares.pdbui.MySpringConfiguration;
|
import java.nio.file.Path;
|
||||||
import org.springframework.boot.test.context.SpringBootTest;
|
import java.util.Properties;
|
||||||
import org.springframework.context.annotation.Import;
|
|
||||||
|
|
||||||
@SpringBootTest
|
import org.lucares.utils.file.FileUtils;
|
||||||
|
import org.springframework.context.ApplicationContextInitializer;
|
||||||
|
import org.springframework.context.ConfigurableApplicationContext;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.context.annotation.Import;
|
||||||
|
import org.springframework.core.env.PropertiesPropertySource;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
@Import(MySpringConfiguration.class)
|
@Import(MySpringConfiguration.class)
|
||||||
public class MySpringTestConfiguration {
|
public class MySpringTestConfiguration {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class TestOverrides implements ApplicationContextInitializer<ConfigurableApplicationContext> {
|
||||||
|
@Override
|
||||||
|
public void initialize(final ConfigurableApplicationContext applicationContext) {
|
||||||
|
final Properties props = new Properties();
|
||||||
|
|
||||||
|
final Path tmpdir = Path.of(System.getProperty("java.io.tmpdir")).resolve("pdb-test");
|
||||||
|
|
||||||
|
FileUtils.delete(tmpdir);
|
||||||
|
|
||||||
|
props.put("base.dir", tmpdir.toFile().getAbsolutePath());
|
||||||
|
final PropertiesPropertySource testOverrides = new PropertiesPropertySource("testOverrides", props);
|
||||||
|
applicationContext.getEnvironment().getPropertySources().addFirst(testOverrides);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,85 @@
|
|||||||
|
package org.lucares.pdbui;
|
||||||
|
|
||||||
|
import java.time.OffsetDateTime;
|
||||||
|
import java.time.format.DateTimeFormatter;
|
||||||
|
import java.time.temporal.ChronoUnit;
|
||||||
|
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.lucares.collections.LongList;
|
||||||
|
import org.lucares.pdb.api.DateTimeRange;
|
||||||
|
import org.lucares.pdb.api.GroupResult;
|
||||||
|
import org.lucares.pdb.api.Query;
|
||||||
|
import org.lucares.performance.db.PerformanceDb;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.boot.test.context.SpringBootTest;
|
||||||
|
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
|
||||||
|
import org.springframework.boot.test.web.client.TestRestTemplate;
|
||||||
|
import org.springframework.http.HttpEntity;
|
||||||
|
import org.springframework.http.HttpHeaders;
|
||||||
|
import org.springframework.http.HttpMethod;
|
||||||
|
import org.springframework.http.HttpStatus;
|
||||||
|
import org.springframework.http.MediaType;
|
||||||
|
import org.springframework.http.ResponseEntity;
|
||||||
|
import org.springframework.test.context.ContextConfiguration;
|
||||||
|
import org.springframework.test.context.junit4.SpringRunner;
|
||||||
|
import org.springframework.util.LinkedMultiValueMap;
|
||||||
|
import org.springframework.util.MultiValueMap;
|
||||||
|
|
||||||
|
@RunWith(SpringRunner.class)
|
||||||
|
@ContextConfiguration(initializers = TestOverrides.class)
|
||||||
|
@SpringBootTest(classes = MySpringTestConfiguration.class, webEnvironment = WebEnvironment.RANDOM_PORT)
|
||||||
|
public class PdbControllerTest {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private PerformanceDb performanceDb;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private TestRestTemplate rest;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUploadCsv() {
|
||||||
|
|
||||||
|
final OffsetDateTime dateA = OffsetDateTime.now();
|
||||||
|
final OffsetDateTime dateB = OffsetDateTime.now();
|
||||||
|
|
||||||
|
final String csv = "@timestamp,duration,tag\n"//
|
||||||
|
+ dateA.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",1,tagValue\n"//
|
||||||
|
+ dateB.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + ",2,tagValue\n";
|
||||||
|
|
||||||
|
uploadCsv(csv);
|
||||||
|
|
||||||
|
final GroupResult groupResult = performanceDb.get(new Query("tag=tagValue", DateTimeRange.ofDay(dateA)))
|
||||||
|
.singleGroup();
|
||||||
|
final LongList result = groupResult.flatMap();
|
||||||
|
System.out.println(PdbTestUtil.timeValueLongListToString(result));
|
||||||
|
Assert.assertEquals(result.size(), 4);
|
||||||
|
|
||||||
|
Assert.assertEquals(result.get(0), dateA.toInstant().toEpochMilli());
|
||||||
|
Assert.assertEquals(result.get(1), 1);
|
||||||
|
|
||||||
|
Assert.assertEquals(result.get(2), dateB.toInstant().truncatedTo(ChronoUnit.MILLIS).toEpochMilli());
|
||||||
|
Assert.assertEquals(result.get(3), 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void uploadCsv(final String... csvs) {
|
||||||
|
// final TestRestTemplate rest = new TestRestTemplate();
|
||||||
|
final LinkedMultiValueMap<String, Object> parameters = new LinkedMultiValueMap<String, Object>();
|
||||||
|
int count = 0;
|
||||||
|
for (final String csv : csvs) {
|
||||||
|
parameters.add("file", new CsvResource(csv, count++ + ".csv"));
|
||||||
|
}
|
||||||
|
|
||||||
|
final HttpHeaders headers = new HttpHeaders();
|
||||||
|
headers.setContentType(MediaType.MULTIPART_FORM_DATA);
|
||||||
|
|
||||||
|
final HttpEntity<MultiValueMap<String, Object>> entity = new HttpEntity<MultiValueMap<String, Object>>(
|
||||||
|
parameters, headers);
|
||||||
|
|
||||||
|
final ResponseEntity<String> response = rest.exchange("/data", HttpMethod.POST, entity, String.class);
|
||||||
|
|
||||||
|
Assert.assertEquals("response status", HttpStatus.CREATED, response.getStatusCode());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -9,6 +9,10 @@ import java.nio.channels.SocketChannel;
|
|||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.nio.file.StandardOpenOption;
|
import java.nio.file.StandardOpenOption;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.time.OffsetDateTime;
|
||||||
|
import java.time.ZoneOffset;
|
||||||
|
import java.time.format.DateTimeFormatter;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
@@ -21,7 +25,7 @@ import java.util.concurrent.LinkedBlockingDeque;
|
|||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.lucares.pdbui.TcpIngestor;
|
import org.lucares.collections.LongList;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@@ -59,7 +63,7 @@ public class PdbTestUtil {
|
|||||||
sendAsCsv(keys, entries);
|
sendAsCsv(keys, entries);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static final void sendAsCsv(Collection<String> keys, final Collection<Map<String, Object>> entries)
|
public static final void sendAsCsv(final Collection<String> keys, final Collection<Map<String, Object>> entries)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
|
|
||||||
final StringBuilder csv = new StringBuilder();
|
final StringBuilder csv = new StringBuilder();
|
||||||
@@ -177,4 +181,24 @@ public class PdbTestUtil {
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static String timeValueLongListToString(final LongList timeValueLongList) {
|
||||||
|
final StringBuilder result = new StringBuilder();
|
||||||
|
|
||||||
|
int i = 0;
|
||||||
|
while (i < timeValueLongList.size()) {
|
||||||
|
final OffsetDateTime time = OffsetDateTime.ofInstant(Instant.ofEpochMilli(timeValueLongList.get(i)),
|
||||||
|
ZoneOffset.UTC);
|
||||||
|
i++;
|
||||||
|
final long value = timeValueLongList.get(i);
|
||||||
|
i++;
|
||||||
|
|
||||||
|
result.append(time.format(DateTimeFormatter.ISO_DATE_TIME));
|
||||||
|
result.append("=");
|
||||||
|
result.append(value);
|
||||||
|
result.append("\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
return result.toString();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,6 +8,10 @@ import java.util.Arrays;
|
|||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import org.lucares.collections.LongList;
|
import org.lucares.collections.LongList;
|
||||||
@@ -34,10 +38,36 @@ public class PerformanceDb implements AutoCloseable {
|
|||||||
private final static Logger METRICS_LOGGER = LoggerFactory.getLogger("org.lucares.metrics.ingestion.block");
|
private final static Logger METRICS_LOGGER = LoggerFactory.getLogger("org.lucares.metrics.ingestion.block");
|
||||||
|
|
||||||
private final DataStore dataStore;
|
private final DataStore dataStore;
|
||||||
|
private final ExecutorService serverThreadPool = Executors.newFixedThreadPool(1);
|
||||||
|
private final ArrayBlockingQueue<Entries> queue;
|
||||||
|
|
||||||
public PerformanceDb(final Path dataDirectory) throws IOException {
|
public PerformanceDb(final Path dataDirectory) throws IOException {
|
||||||
|
|
||||||
|
queue = new ArrayBlockingQueue<>(10);
|
||||||
dataStore = new DataStore(dataDirectory);
|
dataStore = new DataStore(dataDirectory);
|
||||||
|
startThread();
|
||||||
|
}
|
||||||
|
|
||||||
|
public ArrayBlockingQueue<Entries> getQueue() {
|
||||||
|
return queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startThread() {
|
||||||
|
serverThreadPool.submit(() -> {
|
||||||
|
Thread.currentThread().setName("db-ingestion");
|
||||||
|
|
||||||
|
// TODO move error handling to putEntries
|
||||||
|
boolean finished = false;
|
||||||
|
while (!finished) {
|
||||||
|
try {
|
||||||
|
putEntries(new BlockingQueueIterator<>(queue, Entries.POISON));
|
||||||
|
finished = true;
|
||||||
|
} catch (final Exception e) {
|
||||||
|
LOGGER.warn("Write to database failed. Will retry with the next element.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -56,7 +86,7 @@ public class PerformanceDb implements AutoCloseable {
|
|||||||
putEntries(iterator);
|
putEntries(iterator);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void putEntries(final BlockingIterator<Entries> entriesIterator) throws WriteException {
|
void putEntries(final BlockingIterator<Entries> entriesIterator) throws WriteException {
|
||||||
|
|
||||||
final Duration timeBetweenSyncs = Duration.ofSeconds(1);
|
final Duration timeBetweenSyncs = Duration.ofSeconds(1);
|
||||||
long count = 0;
|
long count = 0;
|
||||||
@@ -162,6 +192,16 @@ public class PerformanceDb implements AutoCloseable {
|
|||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
try {
|
try {
|
||||||
|
LOGGER.debug("adding poison");
|
||||||
|
queue.put(Entries.POISON);
|
||||||
|
|
||||||
|
serverThreadPool.shutdown();
|
||||||
|
try {
|
||||||
|
serverThreadPool.awaitTermination(10, TimeUnit.MINUTES);
|
||||||
|
} catch (final InterruptedException e) {
|
||||||
|
Thread.interrupted();
|
||||||
|
}
|
||||||
|
|
||||||
dataStore.close();
|
dataStore.close();
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
LOGGER.error("failed to close PerformanceDB", e);
|
LOGGER.error("failed to close PerformanceDB", e);
|
||||||
|
|||||||
Reference in New Issue
Block a user