package org.lucares.pdbui; import java.io.FileInputStream; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.lucares.pdb.datastore.Entries; import org.lucares.performance.db.PerformanceDb; import org.lucares.utils.file.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.springframework.web.multipart.MultipartFile; @Component public class CsvUploadHandler implements PropertyKeys, DisposableBean { private static final Logger LOGGER = LoggerFactory.getLogger(CsvUploadHandler.class); private final Path tmpDir; private final ExecutorService threadPool = Executors.newFixedThreadPool(2); private final PerformanceDb performanceDb; public CsvUploadHandler(@Value("${" + TMP_DIR + "}") final String tmpDir, final PerformanceDb performanceDb) throws IOException { this.tmpDir = Paths.get(tmpDir).resolve("uploads"); Files.createDirectories(this.tmpDir); this.performanceDb = performanceDb; } public void ingest(final List files, final CsvReaderSettings settings, final boolean waitUntilFinished) throws IllegalStateException, IOException, InterruptedException, ExecutionException, TimeoutException { final List tmpFiles = new ArrayList(); try { for (final MultipartFile file : files) { final Path tmpFile = tmpDir.resolve(UUID.randomUUID().toString()); tmpFiles.add(tmpFile); LOGGER.debug("writing uploaded file to {}", tmpFile); file.transferTo(tmpFile); } } catch (RuntimeException | IOException e) { FileUtils.deleteSilently(tmpFiles); throw e; } final Future future = threadPool.submit(() -> { final ArrayBlockingQueue queue = performanceDb.getQueue(); for (final Path tmpFile : tmpFiles) { try { final CsvToEntryTransformer csvToEntryTransformer = new CsvToEntryTransformer(queue, settings); try (FileInputStream in = new FileInputStream(tmpFile.toFile())) { csvToEntryTransformer.readCSV(in); } LOGGER.debug("delete uploaded file {}", tmpFile); Files.delete(tmpFile); } catch (final Exception e) { LOGGER.error("csv ingestion failed", e); } } }); if (waitUntilFinished) { future.get(1, TimeUnit.HOURS); } } @Override public void destroy() throws Exception { threadPool.shutdown(); LOGGER.info("awaiting termination ..."); threadPool.awaitTermination(10, TimeUnit.MINUTES); } }