parallelize csv generation

speedup 50% and more
This commit is contained in:
ahr
2017-12-10 17:53:53 +01:00
parent 3ee6336125
commit c4dce942a6

View File

@@ -20,10 +20,10 @@ import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import org.lucares.pdb.api.Entry;
import org.lucares.pdb.api.GroupResult;
import org.lucares.pdb.api.Result;
import org.lucares.pdb.api.Tags;
import org.lucares.pdb.plot.api.CustomAggregator;
@@ -75,6 +75,7 @@ public class Plotter {
public PlotResult plot(final PlotSettings plotSettings) throws InternalPlottingException {
LOGGER.trace("start plot: {}", plotSettings);
final String tmpSubDir = uniqueDirectoryName();
final Path tmpDir = tmpBaseDir.resolve(tmpSubDir);
@@ -91,20 +92,26 @@ public class Plotter {
final Result result = db.get(query, groupBy);
int idCounter = 0;
for (final GroupResult groupResult : result.getGroups()) {
final Stream<Entry> entries = groupResult.asStream();
final CsvSummary csvSummary = toCsv(entries, tmpDir, dateFrom, dateTo, plotSettings);
final String title = title(groupResult.getGroupedBy(), csvSummary.getValues());
final DataSeries dataSerie = new DataSeries("id"+idCounter, title, csvSummary);
if (dataSerie.getValues() > 0) {
dataSeries.add(dataSerie);
final long start = System.nanoTime();
final AtomicInteger idCounter = new AtomicInteger(0);
result.getGroups().stream().parallel().forEach(groupResult -> {
try{
final Stream<Entry> entries = groupResult.asStream();
final CsvSummary csvSummary = toCsv(entries, tmpDir, dateFrom, dateTo, plotSettings);
final int id = idCounter.getAndIncrement();
final String title = title(groupResult.getGroupedBy(), csvSummary.getValues());
final DataSeries dataSerie = new DataSeries("id"+id, title, csvSummary);
if (dataSerie.getValues() > 0) {
dataSeries.add(dataSerie);
}
}catch (Exception e){
throw new IllegalStateException( e); // TODO handle
}
idCounter++;
}
});
METRICS_LOGGER.debug("csv generation took: " + (System.nanoTime() - start) / 1_000_000.0
+ "ms");
if (dataSeries.isEmpty()) {
throw new NoDataPointsException();