add flag to make CSV upload wait until entries are flushed
To make it easier/possible to write stable unit test the CSV upload can optionally wait until all entries have been flushed to disk. This is necessary for tests that ingest data and then read the data.
This commit is contained in:
@@ -4,6 +4,8 @@ import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.lucares.collections.IntList;
|
||||
import org.lucares.pdb.api.Entries;
|
||||
@@ -26,7 +28,7 @@ class CsvToEntryTransformer {
|
||||
this.settings = settings;
|
||||
}
|
||||
|
||||
void readCSV(final InputStream in) throws IOException, InterruptedException {
|
||||
void readCSV(final InputStream in) throws IOException, InterruptedException, TimeoutException {
|
||||
final int chunksize = 1000;
|
||||
Entries entries = new Entries(chunksize);
|
||||
|
||||
@@ -93,7 +95,9 @@ class CsvToEntryTransformer {
|
||||
if (entry != null) {
|
||||
entries.add(entry);
|
||||
}
|
||||
entries.forceFlush();
|
||||
queue.put(entries);
|
||||
entries.waitUntilFlushed(5, TimeUnit.MINUTES);
|
||||
}
|
||||
|
||||
private int[] handleCsvHeaderLine(final byte[] line, final int bytesInLine, final IntList separatorPositions) {
|
||||
|
||||
@@ -9,9 +9,12 @@ import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.lucares.pdb.api.Entries;
|
||||
import org.lucares.performance.db.PerformanceDb;
|
||||
@@ -41,8 +44,9 @@ public class CsvUploadHandler implements PropertyKeys, DisposableBean {
|
||||
this.performanceDb = performanceDb;
|
||||
}
|
||||
|
||||
public void ingest(final List<MultipartFile> files, final CsvReaderSettings settings)
|
||||
throws IllegalStateException, IOException {
|
||||
public void ingest(final List<MultipartFile> files, final CsvReaderSettings settings,
|
||||
final boolean waitUntilFinished)
|
||||
throws IllegalStateException, IOException, InterruptedException, ExecutionException, TimeoutException {
|
||||
|
||||
final List<Path> tmpFiles = new ArrayList<Path>();
|
||||
|
||||
@@ -58,7 +62,7 @@ public class CsvUploadHandler implements PropertyKeys, DisposableBean {
|
||||
throw e;
|
||||
}
|
||||
|
||||
threadPool.submit(() -> {
|
||||
final Future<?> future = threadPool.submit(() -> {
|
||||
final ArrayBlockingQueue<Entries> queue = performanceDb.getQueue();
|
||||
for (final Path tmpFile : tmpFiles) {
|
||||
try {
|
||||
@@ -74,8 +78,10 @@ public class CsvUploadHandler implements PropertyKeys, DisposableBean {
|
||||
LOGGER.error("csv ingestion failed", e);
|
||||
}
|
||||
}
|
||||
queue.add(Entries.POISON);
|
||||
});
|
||||
if (waitUntilFinished) {
|
||||
future.get(1, TimeUnit.HOURS);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -12,6 +12,7 @@ import java.nio.charset.StandardCharsets;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.zip.GZIPInputStream;
|
||||
|
||||
import org.lucares.pdb.api.Entries;
|
||||
@@ -50,7 +51,7 @@ public final class IngestionHandler implements Callable<Void> {
|
||||
return null;
|
||||
}
|
||||
|
||||
private void handleInputStream(final InputStream in) throws IOException, InterruptedException {
|
||||
private void handleInputStream(final InputStream in) throws IOException, InterruptedException, TimeoutException {
|
||||
in.mark(1);
|
||||
final byte firstByte = (byte) in.read();
|
||||
if (firstByte == '{') {
|
||||
|
||||
@@ -12,7 +12,9 @@ import java.util.Locale;
|
||||
import java.util.Set;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
@@ -327,10 +329,11 @@ public class PdbController implements HardcodedValues, PropertyKeys {
|
||||
@ResponseBody
|
||||
@ResponseStatus(code = HttpStatus.CREATED)
|
||||
public String handleCsvFileUpload(@RequestParam("file") final MultipartFile[] files,
|
||||
@RequestPart("settings") final CsvReaderSettings csvReaderSettings)
|
||||
throws IllegalStateException, IOException {
|
||||
@RequestPart("settings") final CsvReaderSettings csvReaderSettings,
|
||||
@RequestParam(name = "waitUntilFinished", defaultValue = "false") final boolean waitUntilFinished)
|
||||
throws IllegalStateException, IOException, InterruptedException, ExecutionException, TimeoutException {
|
||||
|
||||
csvUploadHandler.ingest(List.of(files), csvReaderSettings);
|
||||
csvUploadHandler.ingest(List.of(files), csvReaderSettings, waitUntilFinished);
|
||||
return ""; // return value might become a job id that can be used to cancel, or observe
|
||||
// status
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user