From d895eba47c969c98a55c05633ffa756917e7f8a5 Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Sun, 29 Sep 2019 18:57:57 +0200 Subject: [PATCH] remove duplicate values when rendering MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rendering plots with millions of values is expensive. Before this fix we wrote all values into CSV files. The CSV files were then read by Gnuplot that did the rendering. But in an image with n×m pixes there can only be nm different values. In most realistic scenarios we will have many values that will be drawn to the same pixels. So we are wasting time yb first generation the CSV for too many values and then by parsing that CSV again. Fixed by using a sparse 2D array to de-duplicate many values before they get written to the CSV. The additional time we spend de-duplicating is often smaller than the time saved when writing the CSV, so that the total CSV writing is about as 'fast' as before (sometimes a little faster, sometimes a little slower). But the time Gnuplot needs for rendering drastically reduces. The factor depends on the data, of course. We have seen factor 50 for realistic examples. Making a 15s job run in 300ms. --- .../recommind/logs/GnuplotSettings.java | 1 + .../recommind/logs/LambdaFriendlyWriter.java | 79 ++++++++++++ .../lucares/recommind/logs/ScatterPlot.java | 113 +++++++++++++++++- 3 files changed, 192 insertions(+), 1 deletion(-) create mode 100644 pdb-plotting/src/main/java/org/lucares/recommind/logs/LambdaFriendlyWriter.java diff --git a/pdb-plotting/src/main/java/org/lucares/recommind/logs/GnuplotSettings.java b/pdb-plotting/src/main/java/org/lucares/recommind/logs/GnuplotSettings.java index fb7cd07..0fb090e 100644 --- a/pdb-plotting/src/main/java/org/lucares/recommind/logs/GnuplotSettings.java +++ b/pdb-plotting/src/main/java/org/lucares/recommind/logs/GnuplotSettings.java @@ -12,6 +12,7 @@ public class GnuplotSettings { public final static int GNUPLOT_TOP_MARGIN = 57; // The top margin configured for gnuplot public final static int GNUPLOT_BOTTOM_MARGIN = 76; // The bottom margin configured for gnuplot public final static int GNUPLOT_TOP_BOTTOM_MARGIN = GNUPLOT_TOP_MARGIN + GNUPLOT_BOTTOM_MARGIN; + public final static int GNUPLOT_LEFT_RIGHT_MARGIN = GNUPLOT_LEFT_MARGIN+GNUPLOT_RIGHT_MARGIN; private String terminal = "png"; private int height = 1200; diff --git a/pdb-plotting/src/main/java/org/lucares/recommind/logs/LambdaFriendlyWriter.java b/pdb-plotting/src/main/java/org/lucares/recommind/logs/LambdaFriendlyWriter.java new file mode 100644 index 0000000..0ef0e9b --- /dev/null +++ b/pdb-plotting/src/main/java/org/lucares/recommind/logs/LambdaFriendlyWriter.java @@ -0,0 +1,79 @@ +package org.lucares.recommind.logs; + +import java.io.IOException; +import java.io.Writer; + +import org.lucares.pdb.api.RuntimeIOException; + +public class LambdaFriendlyWriter extends Writer{ + + private final Writer writer; + + public LambdaFriendlyWriter(Writer writer) { + this.writer = writer; + } + + @Override + public void write(char[] cbuf, int off, int len) { + try { + writer.write(cbuf, off, len); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } + + @Override + public void write(int c) { + try { + writer.write(c); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } + + @Override + public void write(String str) { + try { + writer.write(str); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } + + @Override + public Writer append(CharSequence csq) { + try { + return writer.append(csq); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } + + @Override + public Writer append(char c) { + try { + return writer.append(c); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } + + @Override + public void flush() { + try { + writer.flush(); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } + + @Override + public void close() { + try { + writer.close(); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } + +} diff --git a/pdb-plotting/src/main/java/org/lucares/recommind/logs/ScatterPlot.java b/pdb-plotting/src/main/java/org/lucares/recommind/logs/ScatterPlot.java index 7a677d8..a59b371 100644 --- a/pdb-plotting/src/main/java/org/lucares/recommind/logs/ScatterPlot.java +++ b/pdb-plotting/src/main/java/org/lucares/recommind/logs/ScatterPlot.java @@ -23,12 +23,15 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; +import org.apache.commons.lang3.math.NumberUtils; import org.lucares.collections.LongList; +import org.lucares.collections.Sparse2DLongArray; import org.lucares.pdb.api.DateTimeRange; import org.lucares.pdb.api.GroupResult; import org.lucares.pdb.api.Query; import org.lucares.pdb.api.Result; import org.lucares.pdb.api.Tags; +import org.lucares.pdb.plot.api.AxisScale; import org.lucares.pdb.plot.api.CustomAggregator; import org.lucares.pdb.plot.api.Limit; import org.lucares.pdb.plot.api.PlotSettings; @@ -90,7 +93,9 @@ public class ScatterPlot { final AtomicInteger idCounter = new AtomicInteger(0); result.getGroups().stream().parallel().forEach(groupResult -> { try { - final CsvSummary csvSummary = toCsv(groupResult, tmpDir, dateFrom, dateTo, plotSettings); + final CsvSummary csvSummary = true + ? toCsvDeduplicated(groupResult, tmpDir, dateFrom, dateTo, plotSettings) + :toCsv(groupResult, tmpDir, dateFrom, dateTo, plotSettings); final int id = idCounter.incrementAndGet(); final String title = title(groupResult.getGroupedBy(), csvSummary); @@ -286,6 +291,112 @@ public class ScatterPlot { aggregator.getAggregatedData()); } + + private static CsvSummary toCsvDeduplicated(final GroupResult groupResult, final Path tmpDir, final OffsetDateTime dateFrom, + final OffsetDateTime dateTo, final PlotSettings plotSettings) throws IOException { + + final File dataFile = File.createTempFile("data", ".dat", tmpDir.toFile()); + final long start = System.nanoTime(); + final Stream timeValueStream = groupResult.asStream(); + final long fromEpochMilli = dateFrom.toInstant().toEpochMilli(); + final long toEpochMilli = dateTo.toInstant().toEpochMilli(); + final boolean useMillis = (toEpochMilli - fromEpochMilli) < TimeUnit.MINUTES.toMillis(5); + final long plotAreaWidthInPx = plotSettings.getWidth() - GnuplotSettings.GNUPLOT_LEFT_RIGHT_MARGIN; + final long plotAreaHeightInPx = plotSettings.getHeight() - GnuplotSettings.GNUPLOT_TOP_BOTTOM_MARGIN; + final long epochMillisPerPixel = Math.max(1, (toEpochMilli - fromEpochMilli) / plotAreaWidthInPx); + + final long minValue = plotSettings.getYRangeUnit() == TimeRangeUnitInternal.AUTOMATIC ? 0 + : plotSettings.getYRangeUnit().toMilliSeconds(plotSettings.getYRangeMin()); + final long maxValue = plotSettings.getYRangeUnit() == TimeRangeUnitInternal.AUTOMATIC ? Long.MAX_VALUE + : plotSettings.getYRangeUnit().toMilliSeconds(plotSettings.getYRangeMax()); + final long durationMillisPerPixel = plotSettings.getYAxisScale() == AxisScale.LINEAR + ? Math.max(1, (maxValue -minValue) / plotAreaHeightInPx) + : 1; + + final CustomAggregator aggregator = plotSettings.getAggregate().createCustomAggregator(tmpDir, fromEpochMilli, + toEpochMilli); + + final Sparse2DLongArray matrix2d = new Sparse2DLongArray(); + int count = 0; // number of values in the x-axis range (used to compute stats) + int plottedValues = 0; + long statsMaxValue = 0; + double statsCurrentAverage = 0.0; + long ignoredValues = 0; + final int separator = ','; + final int newline = '\n'; + + final Iterator it = timeValueStream.iterator(); + while (it.hasNext()) { + final LongList entry = it.next(); + + for (int i = 0; i < entry.size(); i += 2) { + + final long epochMilli = entry.get(i); + if (fromEpochMilli > epochMilli || epochMilli > toEpochMilli) { + ignoredValues++; + continue; + } + + final long value = entry.get(i + 1); + + aggregator.addValue(epochMilli, value); + + // compute stats + count++; + statsMaxValue = Math.max(statsMaxValue, value); + + // compute average (important to do this after 'count' has been incremented) + statsCurrentAverage = statsCurrentAverage + (value - statsCurrentAverage) / count; + + // check if value is in the selected y-range + if (value < minValue || value > maxValue) { + ignoredValues++; + continue; + } + + final long roundedEpochMilli = epochMilli - epochMilli % epochMillisPerPixel; + final long roundedValue = value - value % durationMillisPerPixel; + matrix2d.put(roundedEpochMilli, roundedValue, 1); + + plottedValues++; + } + } + + + long[] actualValuesWritten = new long[1]; + final StringBuilder formattedDateBuilder = new StringBuilder(); + try (final LambdaFriendlyWriter output = new LambdaFriendlyWriter(new BufferedWriter( + new OutputStreamWriter(new FileOutputStream(dataFile), StandardCharsets.ISO_8859_1))); + final Formatter formatter = new Formatter(formattedDateBuilder);) { + + matrix2d.forEach((epochMilli, value, __)-> { + + final String stringValue = LongUtils.longToString(value); + final String formattedDate; + + if (useMillis) { + formattedDateBuilder.delete(0, formattedDateBuilder.length()); + formatter.format("%.3f", epochMilli / 1000.0); + formattedDate = formattedDateBuilder.toString(); + } else { + formattedDate = String.valueOf(epochMilli / 1000); + } + + output.write(formattedDate); + output.write(separator); + output.write(stringValue); + output.write(newline); + actualValuesWritten[0]++; + }); + } + + METRICS_LOGGER.debug("wrote {} (actual: {} factor: {}%) values to csv in: {}ms (ignored {} values) use millis: {}, grouping={}, file={}", + actualValuesWritten[0], count, (double)count/(actualValuesWritten[0]), (System.nanoTime() - start) / 1_000_000.0, ignoredValues, Boolean.toString(useMillis), + groupResult.getGroupedBy().asString(), dataFile); + return new CsvSummary(dataFile, count, plottedValues, statsMaxValue, statsCurrentAverage, + aggregator.getAggregatedData()); + + } static String uniqueDirectoryName() { return OffsetDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd_HH_mm_ss")) + "_"