diff --git a/pdb-plotting/src/main/java/org/lucares/recommind/logs/GnuplotSettings.java b/pdb-plotting/src/main/java/org/lucares/recommind/logs/GnuplotSettings.java index fb7cd07..0fb090e 100644 --- a/pdb-plotting/src/main/java/org/lucares/recommind/logs/GnuplotSettings.java +++ b/pdb-plotting/src/main/java/org/lucares/recommind/logs/GnuplotSettings.java @@ -12,6 +12,7 @@ public class GnuplotSettings { public final static int GNUPLOT_TOP_MARGIN = 57; // The top margin configured for gnuplot public final static int GNUPLOT_BOTTOM_MARGIN = 76; // The bottom margin configured for gnuplot public final static int GNUPLOT_TOP_BOTTOM_MARGIN = GNUPLOT_TOP_MARGIN + GNUPLOT_BOTTOM_MARGIN; + public final static int GNUPLOT_LEFT_RIGHT_MARGIN = GNUPLOT_LEFT_MARGIN+GNUPLOT_RIGHT_MARGIN; private String terminal = "png"; private int height = 1200; diff --git a/pdb-plotting/src/main/java/org/lucares/recommind/logs/LambdaFriendlyWriter.java b/pdb-plotting/src/main/java/org/lucares/recommind/logs/LambdaFriendlyWriter.java new file mode 100644 index 0000000..0ef0e9b --- /dev/null +++ b/pdb-plotting/src/main/java/org/lucares/recommind/logs/LambdaFriendlyWriter.java @@ -0,0 +1,79 @@ +package org.lucares.recommind.logs; + +import java.io.IOException; +import java.io.Writer; + +import org.lucares.pdb.api.RuntimeIOException; + +public class LambdaFriendlyWriter extends Writer{ + + private final Writer writer; + + public LambdaFriendlyWriter(Writer writer) { + this.writer = writer; + } + + @Override + public void write(char[] cbuf, int off, int len) { + try { + writer.write(cbuf, off, len); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } + + @Override + public void write(int c) { + try { + writer.write(c); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } + + @Override + public void write(String str) { + try { + writer.write(str); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } + + @Override + public Writer append(CharSequence csq) { + try { + return writer.append(csq); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } + + @Override + public Writer append(char c) { + try { + return writer.append(c); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } + + @Override + public void flush() { + try { + writer.flush(); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } + + @Override + public void close() { + try { + writer.close(); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } + +} diff --git a/pdb-plotting/src/main/java/org/lucares/recommind/logs/ScatterPlot.java b/pdb-plotting/src/main/java/org/lucares/recommind/logs/ScatterPlot.java index 7a677d8..a59b371 100644 --- a/pdb-plotting/src/main/java/org/lucares/recommind/logs/ScatterPlot.java +++ b/pdb-plotting/src/main/java/org/lucares/recommind/logs/ScatterPlot.java @@ -23,12 +23,15 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; +import org.apache.commons.lang3.math.NumberUtils; import org.lucares.collections.LongList; +import org.lucares.collections.Sparse2DLongArray; import org.lucares.pdb.api.DateTimeRange; import org.lucares.pdb.api.GroupResult; import org.lucares.pdb.api.Query; import org.lucares.pdb.api.Result; import org.lucares.pdb.api.Tags; +import org.lucares.pdb.plot.api.AxisScale; import org.lucares.pdb.plot.api.CustomAggregator; import org.lucares.pdb.plot.api.Limit; import org.lucares.pdb.plot.api.PlotSettings; @@ -90,7 +93,9 @@ public class ScatterPlot { final AtomicInteger idCounter = new AtomicInteger(0); result.getGroups().stream().parallel().forEach(groupResult -> { try { - final CsvSummary csvSummary = toCsv(groupResult, tmpDir, dateFrom, dateTo, plotSettings); + final CsvSummary csvSummary = true + ? toCsvDeduplicated(groupResult, tmpDir, dateFrom, dateTo, plotSettings) + :toCsv(groupResult, tmpDir, dateFrom, dateTo, plotSettings); final int id = idCounter.incrementAndGet(); final String title = title(groupResult.getGroupedBy(), csvSummary); @@ -286,6 +291,112 @@ public class ScatterPlot { aggregator.getAggregatedData()); } + + private static CsvSummary toCsvDeduplicated(final GroupResult groupResult, final Path tmpDir, final OffsetDateTime dateFrom, + final OffsetDateTime dateTo, final PlotSettings plotSettings) throws IOException { + + final File dataFile = File.createTempFile("data", ".dat", tmpDir.toFile()); + final long start = System.nanoTime(); + final Stream timeValueStream = groupResult.asStream(); + final long fromEpochMilli = dateFrom.toInstant().toEpochMilli(); + final long toEpochMilli = dateTo.toInstant().toEpochMilli(); + final boolean useMillis = (toEpochMilli - fromEpochMilli) < TimeUnit.MINUTES.toMillis(5); + final long plotAreaWidthInPx = plotSettings.getWidth() - GnuplotSettings.GNUPLOT_LEFT_RIGHT_MARGIN; + final long plotAreaHeightInPx = plotSettings.getHeight() - GnuplotSettings.GNUPLOT_TOP_BOTTOM_MARGIN; + final long epochMillisPerPixel = Math.max(1, (toEpochMilli - fromEpochMilli) / plotAreaWidthInPx); + + final long minValue = plotSettings.getYRangeUnit() == TimeRangeUnitInternal.AUTOMATIC ? 0 + : plotSettings.getYRangeUnit().toMilliSeconds(plotSettings.getYRangeMin()); + final long maxValue = plotSettings.getYRangeUnit() == TimeRangeUnitInternal.AUTOMATIC ? Long.MAX_VALUE + : plotSettings.getYRangeUnit().toMilliSeconds(plotSettings.getYRangeMax()); + final long durationMillisPerPixel = plotSettings.getYAxisScale() == AxisScale.LINEAR + ? Math.max(1, (maxValue -minValue) / plotAreaHeightInPx) + : 1; + + final CustomAggregator aggregator = plotSettings.getAggregate().createCustomAggregator(tmpDir, fromEpochMilli, + toEpochMilli); + + final Sparse2DLongArray matrix2d = new Sparse2DLongArray(); + int count = 0; // number of values in the x-axis range (used to compute stats) + int plottedValues = 0; + long statsMaxValue = 0; + double statsCurrentAverage = 0.0; + long ignoredValues = 0; + final int separator = ','; + final int newline = '\n'; + + final Iterator it = timeValueStream.iterator(); + while (it.hasNext()) { + final LongList entry = it.next(); + + for (int i = 0; i < entry.size(); i += 2) { + + final long epochMilli = entry.get(i); + if (fromEpochMilli > epochMilli || epochMilli > toEpochMilli) { + ignoredValues++; + continue; + } + + final long value = entry.get(i + 1); + + aggregator.addValue(epochMilli, value); + + // compute stats + count++; + statsMaxValue = Math.max(statsMaxValue, value); + + // compute average (important to do this after 'count' has been incremented) + statsCurrentAverage = statsCurrentAverage + (value - statsCurrentAverage) / count; + + // check if value is in the selected y-range + if (value < minValue || value > maxValue) { + ignoredValues++; + continue; + } + + final long roundedEpochMilli = epochMilli - epochMilli % epochMillisPerPixel; + final long roundedValue = value - value % durationMillisPerPixel; + matrix2d.put(roundedEpochMilli, roundedValue, 1); + + plottedValues++; + } + } + + + long[] actualValuesWritten = new long[1]; + final StringBuilder formattedDateBuilder = new StringBuilder(); + try (final LambdaFriendlyWriter output = new LambdaFriendlyWriter(new BufferedWriter( + new OutputStreamWriter(new FileOutputStream(dataFile), StandardCharsets.ISO_8859_1))); + final Formatter formatter = new Formatter(formattedDateBuilder);) { + + matrix2d.forEach((epochMilli, value, __)-> { + + final String stringValue = LongUtils.longToString(value); + final String formattedDate; + + if (useMillis) { + formattedDateBuilder.delete(0, formattedDateBuilder.length()); + formatter.format("%.3f", epochMilli / 1000.0); + formattedDate = formattedDateBuilder.toString(); + } else { + formattedDate = String.valueOf(epochMilli / 1000); + } + + output.write(formattedDate); + output.write(separator); + output.write(stringValue); + output.write(newline); + actualValuesWritten[0]++; + }); + } + + METRICS_LOGGER.debug("wrote {} (actual: {} factor: {}%) values to csv in: {}ms (ignored {} values) use millis: {}, grouping={}, file={}", + actualValuesWritten[0], count, (double)count/(actualValuesWritten[0]), (System.nanoTime() - start) / 1_000_000.0, ignoredValues, Boolean.toString(useMillis), + groupResult.getGroupedBy().asString(), dataFile); + return new CsvSummary(dataFile, count, plottedValues, statsMaxValue, statsCurrentAverage, + aggregator.getAggregatedData()); + + } static String uniqueDirectoryName() { return OffsetDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd_HH_mm_ss")) + "_"