remove duplicate values when rendering
Rendering plots with millions of values is expensive. Before this fix we wrote all values into CSV files. The CSV files were then read by Gnuplot that did the rendering. But in an image with n×m pixes there can only be nm different values. In most realistic scenarios we will have many values that will be drawn to the same pixels. So we are wasting time yb first generation the CSV for too many values and then by parsing that CSV again. Fixed by using a sparse 2D array to de-duplicate many values before they get written to the CSV. The additional time we spend de-duplicating is often smaller than the time saved when writing the CSV, so that the total CSV writing is about as 'fast' as before (sometimes a little faster, sometimes a little slower). But the time Gnuplot needs for rendering drastically reduces. The factor depends on the data, of course. We have seen factor 50 for realistic examples. Making a 15s job run in 300ms.
This commit is contained in:
@@ -12,6 +12,7 @@ public class GnuplotSettings {
|
|||||||
public final static int GNUPLOT_TOP_MARGIN = 57; // The top margin configured for gnuplot
|
public final static int GNUPLOT_TOP_MARGIN = 57; // The top margin configured for gnuplot
|
||||||
public final static int GNUPLOT_BOTTOM_MARGIN = 76; // The bottom margin configured for gnuplot
|
public final static int GNUPLOT_BOTTOM_MARGIN = 76; // The bottom margin configured for gnuplot
|
||||||
public final static int GNUPLOT_TOP_BOTTOM_MARGIN = GNUPLOT_TOP_MARGIN + GNUPLOT_BOTTOM_MARGIN;
|
public final static int GNUPLOT_TOP_BOTTOM_MARGIN = GNUPLOT_TOP_MARGIN + GNUPLOT_BOTTOM_MARGIN;
|
||||||
|
public final static int GNUPLOT_LEFT_RIGHT_MARGIN = GNUPLOT_LEFT_MARGIN+GNUPLOT_RIGHT_MARGIN;
|
||||||
|
|
||||||
private String terminal = "png";
|
private String terminal = "png";
|
||||||
private int height = 1200;
|
private int height = 1200;
|
||||||
|
|||||||
@@ -0,0 +1,79 @@
|
|||||||
|
package org.lucares.recommind.logs;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.Writer;
|
||||||
|
|
||||||
|
import org.lucares.pdb.api.RuntimeIOException;
|
||||||
|
|
||||||
|
public class LambdaFriendlyWriter extends Writer{
|
||||||
|
|
||||||
|
private final Writer writer;
|
||||||
|
|
||||||
|
public LambdaFriendlyWriter(Writer writer) {
|
||||||
|
this.writer = writer;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(char[] cbuf, int off, int len) {
|
||||||
|
try {
|
||||||
|
writer.write(cbuf, off, len);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeIOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(int c) {
|
||||||
|
try {
|
||||||
|
writer.write(c);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeIOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(String str) {
|
||||||
|
try {
|
||||||
|
writer.write(str);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeIOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Writer append(CharSequence csq) {
|
||||||
|
try {
|
||||||
|
return writer.append(csq);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeIOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Writer append(char c) {
|
||||||
|
try {
|
||||||
|
return writer.append(c);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeIOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void flush() {
|
||||||
|
try {
|
||||||
|
writer.flush();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeIOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
try {
|
||||||
|
writer.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeIOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -23,12 +23,15 @@ import java.util.concurrent.TimeUnit;
|
|||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.math.NumberUtils;
|
||||||
import org.lucares.collections.LongList;
|
import org.lucares.collections.LongList;
|
||||||
|
import org.lucares.collections.Sparse2DLongArray;
|
||||||
import org.lucares.pdb.api.DateTimeRange;
|
import org.lucares.pdb.api.DateTimeRange;
|
||||||
import org.lucares.pdb.api.GroupResult;
|
import org.lucares.pdb.api.GroupResult;
|
||||||
import org.lucares.pdb.api.Query;
|
import org.lucares.pdb.api.Query;
|
||||||
import org.lucares.pdb.api.Result;
|
import org.lucares.pdb.api.Result;
|
||||||
import org.lucares.pdb.api.Tags;
|
import org.lucares.pdb.api.Tags;
|
||||||
|
import org.lucares.pdb.plot.api.AxisScale;
|
||||||
import org.lucares.pdb.plot.api.CustomAggregator;
|
import org.lucares.pdb.plot.api.CustomAggregator;
|
||||||
import org.lucares.pdb.plot.api.Limit;
|
import org.lucares.pdb.plot.api.Limit;
|
||||||
import org.lucares.pdb.plot.api.PlotSettings;
|
import org.lucares.pdb.plot.api.PlotSettings;
|
||||||
@@ -90,7 +93,9 @@ public class ScatterPlot {
|
|||||||
final AtomicInteger idCounter = new AtomicInteger(0);
|
final AtomicInteger idCounter = new AtomicInteger(0);
|
||||||
result.getGroups().stream().parallel().forEach(groupResult -> {
|
result.getGroups().stream().parallel().forEach(groupResult -> {
|
||||||
try {
|
try {
|
||||||
final CsvSummary csvSummary = toCsv(groupResult, tmpDir, dateFrom, dateTo, plotSettings);
|
final CsvSummary csvSummary = true
|
||||||
|
? toCsvDeduplicated(groupResult, tmpDir, dateFrom, dateTo, plotSettings)
|
||||||
|
:toCsv(groupResult, tmpDir, dateFrom, dateTo, plotSettings);
|
||||||
|
|
||||||
final int id = idCounter.incrementAndGet();
|
final int id = idCounter.incrementAndGet();
|
||||||
final String title = title(groupResult.getGroupedBy(), csvSummary);
|
final String title = title(groupResult.getGroupedBy(), csvSummary);
|
||||||
@@ -287,6 +292,112 @@ public class ScatterPlot {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static CsvSummary toCsvDeduplicated(final GroupResult groupResult, final Path tmpDir, final OffsetDateTime dateFrom,
|
||||||
|
final OffsetDateTime dateTo, final PlotSettings plotSettings) throws IOException {
|
||||||
|
|
||||||
|
final File dataFile = File.createTempFile("data", ".dat", tmpDir.toFile());
|
||||||
|
final long start = System.nanoTime();
|
||||||
|
final Stream<LongList> timeValueStream = groupResult.asStream();
|
||||||
|
final long fromEpochMilli = dateFrom.toInstant().toEpochMilli();
|
||||||
|
final long toEpochMilli = dateTo.toInstant().toEpochMilli();
|
||||||
|
final boolean useMillis = (toEpochMilli - fromEpochMilli) < TimeUnit.MINUTES.toMillis(5);
|
||||||
|
final long plotAreaWidthInPx = plotSettings.getWidth() - GnuplotSettings.GNUPLOT_LEFT_RIGHT_MARGIN;
|
||||||
|
final long plotAreaHeightInPx = plotSettings.getHeight() - GnuplotSettings.GNUPLOT_TOP_BOTTOM_MARGIN;
|
||||||
|
final long epochMillisPerPixel = Math.max(1, (toEpochMilli - fromEpochMilli) / plotAreaWidthInPx);
|
||||||
|
|
||||||
|
final long minValue = plotSettings.getYRangeUnit() == TimeRangeUnitInternal.AUTOMATIC ? 0
|
||||||
|
: plotSettings.getYRangeUnit().toMilliSeconds(plotSettings.getYRangeMin());
|
||||||
|
final long maxValue = plotSettings.getYRangeUnit() == TimeRangeUnitInternal.AUTOMATIC ? Long.MAX_VALUE
|
||||||
|
: plotSettings.getYRangeUnit().toMilliSeconds(plotSettings.getYRangeMax());
|
||||||
|
final long durationMillisPerPixel = plotSettings.getYAxisScale() == AxisScale.LINEAR
|
||||||
|
? Math.max(1, (maxValue -minValue) / plotAreaHeightInPx)
|
||||||
|
: 1;
|
||||||
|
|
||||||
|
final CustomAggregator aggregator = plotSettings.getAggregate().createCustomAggregator(tmpDir, fromEpochMilli,
|
||||||
|
toEpochMilli);
|
||||||
|
|
||||||
|
final Sparse2DLongArray matrix2d = new Sparse2DLongArray();
|
||||||
|
int count = 0; // number of values in the x-axis range (used to compute stats)
|
||||||
|
int plottedValues = 0;
|
||||||
|
long statsMaxValue = 0;
|
||||||
|
double statsCurrentAverage = 0.0;
|
||||||
|
long ignoredValues = 0;
|
||||||
|
final int separator = ',';
|
||||||
|
final int newline = '\n';
|
||||||
|
|
||||||
|
final Iterator<LongList> it = timeValueStream.iterator();
|
||||||
|
while (it.hasNext()) {
|
||||||
|
final LongList entry = it.next();
|
||||||
|
|
||||||
|
for (int i = 0; i < entry.size(); i += 2) {
|
||||||
|
|
||||||
|
final long epochMilli = entry.get(i);
|
||||||
|
if (fromEpochMilli > epochMilli || epochMilli > toEpochMilli) {
|
||||||
|
ignoredValues++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
final long value = entry.get(i + 1);
|
||||||
|
|
||||||
|
aggregator.addValue(epochMilli, value);
|
||||||
|
|
||||||
|
// compute stats
|
||||||
|
count++;
|
||||||
|
statsMaxValue = Math.max(statsMaxValue, value);
|
||||||
|
|
||||||
|
// compute average (important to do this after 'count' has been incremented)
|
||||||
|
statsCurrentAverage = statsCurrentAverage + (value - statsCurrentAverage) / count;
|
||||||
|
|
||||||
|
// check if value is in the selected y-range
|
||||||
|
if (value < minValue || value > maxValue) {
|
||||||
|
ignoredValues++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
final long roundedEpochMilli = epochMilli - epochMilli % epochMillisPerPixel;
|
||||||
|
final long roundedValue = value - value % durationMillisPerPixel;
|
||||||
|
matrix2d.put(roundedEpochMilli, roundedValue, 1);
|
||||||
|
|
||||||
|
plottedValues++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
long[] actualValuesWritten = new long[1];
|
||||||
|
final StringBuilder formattedDateBuilder = new StringBuilder();
|
||||||
|
try (final LambdaFriendlyWriter output = new LambdaFriendlyWriter(new BufferedWriter(
|
||||||
|
new OutputStreamWriter(new FileOutputStream(dataFile), StandardCharsets.ISO_8859_1)));
|
||||||
|
final Formatter formatter = new Formatter(formattedDateBuilder);) {
|
||||||
|
|
||||||
|
matrix2d.forEach((epochMilli, value, __)-> {
|
||||||
|
|
||||||
|
final String stringValue = LongUtils.longToString(value);
|
||||||
|
final String formattedDate;
|
||||||
|
|
||||||
|
if (useMillis) {
|
||||||
|
formattedDateBuilder.delete(0, formattedDateBuilder.length());
|
||||||
|
formatter.format("%.3f", epochMilli / 1000.0);
|
||||||
|
formattedDate = formattedDateBuilder.toString();
|
||||||
|
} else {
|
||||||
|
formattedDate = String.valueOf(epochMilli / 1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
output.write(formattedDate);
|
||||||
|
output.write(separator);
|
||||||
|
output.write(stringValue);
|
||||||
|
output.write(newline);
|
||||||
|
actualValuesWritten[0]++;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
METRICS_LOGGER.debug("wrote {} (actual: {} factor: {}%) values to csv in: {}ms (ignored {} values) use millis: {}, grouping={}, file={}",
|
||||||
|
actualValuesWritten[0], count, (double)count/(actualValuesWritten[0]), (System.nanoTime() - start) / 1_000_000.0, ignoredValues, Boolean.toString(useMillis),
|
||||||
|
groupResult.getGroupedBy().asString(), dataFile);
|
||||||
|
return new CsvSummary(dataFile, count, plottedValues, statsMaxValue, statsCurrentAverage,
|
||||||
|
aggregator.getAggregatedData());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
static String uniqueDirectoryName() {
|
static String uniqueDirectoryName() {
|
||||||
return OffsetDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd_HH_mm_ss")) + "_"
|
return OffsetDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd_HH_mm_ss")) + "_"
|
||||||
+ UUID.randomUUID().toString();
|
+ UUID.randomUUID().toString();
|
||||||
|
|||||||
Reference in New Issue
Block a user