create web application
This commit is contained in:
@@ -0,0 +1,8 @@
|
||||
package org.lucares.recommind.logs;
|
||||
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
|
||||
public class Config {
|
||||
public static final Path DATA_DIR = Paths.get("/home/andi/ws/performanceDb/db");
|
||||
}
|
||||
@@ -0,0 +1,45 @@
|
||||
package org.lucares.recommind.logs;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
public class DataSeries {
|
||||
private final File dataFile;
|
||||
|
||||
private final String title;
|
||||
|
||||
private final GnuplotColor color;
|
||||
|
||||
private final Integer pointType;
|
||||
|
||||
public DataSeries(final File dataFile, final String title) {
|
||||
super();
|
||||
this.dataFile = dataFile;
|
||||
this.title = title;
|
||||
this.color = null;
|
||||
this.pointType = null;
|
||||
}
|
||||
|
||||
public DataSeries(final File dataFile, final String title, final GnuplotColor color, final Integer pointType) {
|
||||
super();
|
||||
this.dataFile = dataFile;
|
||||
this.title = title;
|
||||
this.color = color;
|
||||
this.pointType = pointType;
|
||||
}
|
||||
|
||||
public GnuplotColor getColor() {
|
||||
return color;
|
||||
}
|
||||
|
||||
public Integer getPointType() {
|
||||
return pointType;
|
||||
}
|
||||
|
||||
public File getDataFile() {
|
||||
return dataFile;
|
||||
}
|
||||
|
||||
public String getTitle() {
|
||||
return title;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,39 @@
|
||||
package org.lucares.recommind.logs;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Collection;
|
||||
|
||||
import com.google.common.io.Files;
|
||||
|
||||
public class Gnuplot {
|
||||
|
||||
private final Path tmpDirectory;
|
||||
|
||||
public Gnuplot(final Path tmpDirectory) {
|
||||
this.tmpDirectory = tmpDirectory;
|
||||
}
|
||||
|
||||
public void plot(final GnuplotSettings settings, final Collection<DataSeries> dataSeries)
|
||||
throws IOException, InterruptedException {
|
||||
|
||||
final GnuplotFileGenerator generator = new GnuplotFileGenerator();
|
||||
|
||||
final String gnuplotFileContent = generator.generate(settings, dataSeries);
|
||||
System.out.println(gnuplotFileContent);
|
||||
|
||||
final File gnuplotFile = File.createTempFile("gnuplot", ".dem", tmpDirectory.toFile());
|
||||
Files.write(gnuplotFileContent, gnuplotFile, StandardCharsets.UTF_8);
|
||||
|
||||
final long start = System.nanoTime();
|
||||
|
||||
final ProcessBuilder processBuilder = new ProcessBuilder("gnuplot", gnuplotFile.getAbsolutePath());
|
||||
processBuilder.inheritIO();
|
||||
final Process process = processBuilder.start();
|
||||
process.waitFor();
|
||||
|
||||
System.out.println("gnuplot: " + (System.nanoTime() - start) / 1_000_000.0 + "ms");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,296 @@
|
||||
package org.lucares.recommind.logs;
|
||||
|
||||
public enum GnuplotColor {
|
||||
|
||||
ALICEBLUE("aliceblue"),
|
||||
|
||||
ANTIQUEWHITE("antiquewhite"),
|
||||
|
||||
AQUA("aqua"),
|
||||
|
||||
AQUAMARINE("aquamarine"),
|
||||
|
||||
AZURE("azure"),
|
||||
|
||||
BEIGE("beige"),
|
||||
|
||||
BISQUE("bisque"),
|
||||
|
||||
BLACK("black"),
|
||||
|
||||
BLANCHEDALMOND("blanchedalmond"),
|
||||
|
||||
BLUE("blue"),
|
||||
|
||||
BLUEVIOLET("blueviolet"),
|
||||
|
||||
BROWN("brown"),
|
||||
|
||||
BURLYWOOD("burlywood"),
|
||||
|
||||
CADETBLUE("cadetblue"),
|
||||
|
||||
CHARTREUSE("chartreuse"),
|
||||
|
||||
CHOCOLATE("chocolate"),
|
||||
|
||||
CORAL("coral"),
|
||||
|
||||
CORNFLOWERBLUE("cornflowerblue"),
|
||||
|
||||
CORNSILK("cornsilk"),
|
||||
|
||||
CRIMSON("crimson"),
|
||||
|
||||
CYAN("cyan"),
|
||||
|
||||
DARKBLUE("darkblue"),
|
||||
|
||||
DARKCYAN("darkcyan"),
|
||||
|
||||
DARKGOLDENROD("darkgoldenrod"),
|
||||
|
||||
DARKGRAY("darkgray"),
|
||||
|
||||
DARKGREEN("darkgreen"),
|
||||
|
||||
DARKKHAKI("darkkhaki"),
|
||||
|
||||
DARKMAGENTA("darkmagenta"),
|
||||
|
||||
DARKOLIVEGREEN("darkolivegreen"),
|
||||
|
||||
DARKORANGE("darkorange"),
|
||||
|
||||
DARKORCHID("darkorchid"),
|
||||
|
||||
DARKRED("darkred"),
|
||||
|
||||
DARKSALMON("darksalmon"),
|
||||
|
||||
DARKSEAGREEN("darkseagreen"),
|
||||
|
||||
DARKSLATEBLUE("darkslateblue"),
|
||||
|
||||
DARKSLATEGRAY("darkslategray"),
|
||||
|
||||
DARKTURQUOISE("darkturquoise"),
|
||||
|
||||
DARKVIOLET("darkviolet"),
|
||||
|
||||
DEEPPINK("deeppink"),
|
||||
|
||||
DEEPSKYBLUE("deepskyblue"),
|
||||
|
||||
DIMGRAY("dimgray"),
|
||||
|
||||
DODGERBLUE("dodgerblue"),
|
||||
|
||||
FIREBRICK("firebrick"),
|
||||
|
||||
FLORALWHITE("floralwhite"),
|
||||
|
||||
FORESTGREEN("forestgreen"),
|
||||
|
||||
FUCHSIA("fuchsia"),
|
||||
|
||||
GAINSBORO("gainsboro"),
|
||||
|
||||
GHOSTWHITE("ghostwhite"),
|
||||
|
||||
GOLD("gold"),
|
||||
|
||||
GOLDENROD("goldenrod"),
|
||||
|
||||
GRAY("gray"),
|
||||
|
||||
GREEN("green"),
|
||||
|
||||
GREENYELLOW("greenyellow"),
|
||||
|
||||
HONEYDEW("honeydew"),
|
||||
|
||||
HOTPINK("hotpink"),
|
||||
|
||||
INDIANRED("indianred"),
|
||||
|
||||
INDIGO("indigo"),
|
||||
|
||||
IVORY("ivory"),
|
||||
|
||||
KHAKI("khaki"),
|
||||
|
||||
LAVENDER("lavender"),
|
||||
|
||||
LAVENDERBLUSH("lavenderblush"),
|
||||
|
||||
LAWNGREEN("lawngreen"),
|
||||
|
||||
LEMONCHIFFON("lemonchiffon"),
|
||||
|
||||
LIGHTBLUE("lightblue"),
|
||||
|
||||
LIGHTCORAL("lightcoral"),
|
||||
|
||||
LIGHTCYAN("lightcyan"),
|
||||
|
||||
LIGHTGOLDENRODYE("lightgoldenrodye"),
|
||||
|
||||
LIGHTGREEN("lightgreen"),
|
||||
|
||||
LIGHTGREY("lightgrey"),
|
||||
|
||||
LIGHTPINK("lightpink"),
|
||||
|
||||
LIGHTSALMON("lightsalmon"),
|
||||
|
||||
LIGHTSEAGREEN("lightseagreen"),
|
||||
|
||||
LIGHTSKYBLUE("lightskyblue"),
|
||||
|
||||
LIGHTSLATEGRAY("lightslategray"),
|
||||
|
||||
LIGHTSTEELBLUE("lightsteelblue"),
|
||||
|
||||
LIGHTYELLOW("lightyellow"),
|
||||
|
||||
LIME("lime"),
|
||||
|
||||
LIMEGREEN("limegreen"),
|
||||
|
||||
LINEN("linen"),
|
||||
|
||||
MAGENTA("magenta"),
|
||||
|
||||
MAROON("maroon"),
|
||||
|
||||
MEDIUMAQUAMARINE("mediumaquamarine"),
|
||||
|
||||
MEDIUMBLUE("mediumblue"),
|
||||
|
||||
MEDIUMORCHID("mediumorchid"),
|
||||
|
||||
MEDIUMPURPLE("mediumpurple"),
|
||||
|
||||
MEDIUMSEAGREEN("mediumseagreen"),
|
||||
|
||||
MEDIUMSLATEBLUE("mediumslateblue"),
|
||||
|
||||
MEDIUMSPRINGGREE("mediumspringgree"),
|
||||
|
||||
MEDIUMTURQUOISE("mediumturquoise"),
|
||||
|
||||
MEDIUMVIOLETRED("mediumvioletred"),
|
||||
|
||||
MIDNIGHTBLUE("midnightblue"),
|
||||
|
||||
MINTCREAM("mintcream"),
|
||||
|
||||
MISTYROSE("mistyrose"),
|
||||
|
||||
MOCCASIN("moccasin"),
|
||||
|
||||
NAVAJOWHITE("navajowhite"),
|
||||
|
||||
NAVY("navy"),
|
||||
|
||||
NAVYBLUE("navyblue"),
|
||||
|
||||
OLDLACE("oldlace"),
|
||||
|
||||
OLIVE("olive"),
|
||||
|
||||
OLIVEDRAB("olivedrab"),
|
||||
|
||||
ORANGE("orange"),
|
||||
|
||||
ORANGERED("orangered"),
|
||||
|
||||
ORCHID("orchid"),
|
||||
|
||||
PALEGOLDENROD("palegoldenrod"),
|
||||
|
||||
PALEGREEN("palegreen"),
|
||||
|
||||
PALETURQUOISE("paleturquoise"),
|
||||
|
||||
PALEVIOLETRED("palevioletred"),
|
||||
|
||||
PAPAYAWHIP("papayawhip"),
|
||||
|
||||
PEACHPUFF("peachpuff"),
|
||||
|
||||
PERU("peru"),
|
||||
|
||||
PINK("pink"),
|
||||
|
||||
PLUM("plum"),
|
||||
|
||||
POWDERBLUE("powderblue"),
|
||||
|
||||
PURPLE("purple"),
|
||||
|
||||
RED("red"),
|
||||
|
||||
ROSYBROWN("rosybrown"),
|
||||
|
||||
ROYALBLUE("royalblue"),
|
||||
|
||||
SADDLEBROWN("saddlebrown"),
|
||||
|
||||
SALMON("salmon"),
|
||||
|
||||
SANDYBROWN("sandybrown"),
|
||||
|
||||
SEAGREEN("seagreen"),
|
||||
|
||||
SEASHELL("seashell"),
|
||||
|
||||
SIENNA("sienna"),
|
||||
|
||||
SILVER("silver"),
|
||||
|
||||
SKYBLUE("skyblue"),
|
||||
|
||||
SLATEBLUE("slateblue"),
|
||||
|
||||
SLATEGRAY("slategray"),
|
||||
|
||||
SNOW("snow"),
|
||||
|
||||
SPRINGGREEN("springgreen"),
|
||||
|
||||
STEELBLUE("steelblue"),
|
||||
|
||||
TAN("tan"),
|
||||
|
||||
TEAL("teal"),
|
||||
|
||||
THISTLE("thistle"),
|
||||
|
||||
TOMATO("tomato"),
|
||||
|
||||
TURQUOISE("turquoise"),
|
||||
|
||||
VIOLET("violet"),
|
||||
|
||||
WHEAT("wheat"),
|
||||
|
||||
WHITE("white"),
|
||||
|
||||
WHITESMOKE("whitesmoke"),
|
||||
|
||||
YELLOW("yellow"),
|
||||
|
||||
YELLOWGREEN("yellowgreen");
|
||||
|
||||
private final String color;
|
||||
|
||||
private GnuplotColor(final String color) {
|
||||
this.color = color;
|
||||
}
|
||||
|
||||
public String getColor() {
|
||||
return color;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,43 @@
|
||||
package org.lucares.recommind.logs;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
public class GnuplotFileGenerator {
|
||||
|
||||
public String generate(final GnuplotSettings settings, final Collection<DataSeries> dataSeries) {
|
||||
|
||||
final StringBuilder result = new StringBuilder();
|
||||
|
||||
appendfln(result, "set terminal %s size %d,%d", settings.getTerminal(), settings.getWidth(),
|
||||
settings.getHeight());
|
||||
|
||||
appendfln(result, "set datafile separator \"%s\"", settings.getDatafileSeparator());
|
||||
appendfln(result, "set timefmt '%s'", settings.getTimefmt());
|
||||
|
||||
appendfln(result, "set xdata time");
|
||||
appendfln(result, "set format x \"%s\"", settings.getFormatX());
|
||||
appendfln(result, "set xlabel \"%s\"", settings.getXlabel());
|
||||
appendfln(result, "set xtics rotate by %d", settings.getRotateXAxisLabel());
|
||||
|
||||
appendfln(result, "set ylabel \"%s\"", settings.getYlabel());
|
||||
|
||||
appendfln(result, "set output \"%s\"", settings.getOutput().getAbsolutePath());
|
||||
appendf(result, "plot ");
|
||||
|
||||
for (final DataSeries dataSerie : dataSeries) {
|
||||
appendfln(result, "'%s' using 1:2 title '%s' with points, \\", dataSerie.getDataFile(),
|
||||
dataSerie.getTitle());
|
||||
}
|
||||
|
||||
return result.toString();
|
||||
}
|
||||
|
||||
private void appendfln(final StringBuilder builder, final String format, final Object... args) {
|
||||
builder.append(String.format(format + "\n", args));
|
||||
}
|
||||
|
||||
private void appendf(final StringBuilder builder, final String format, final Object... args) {
|
||||
builder.append(String.format(format, args));
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,115 @@
|
||||
package org.lucares.recommind.logs;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
public class GnuplotSettings {
|
||||
private String terminal = "png";
|
||||
private int height = 1200;
|
||||
private int width = 1600;
|
||||
private String timefmt = "%Y-%m-%dT%H:%M:%S";
|
||||
|
||||
// set format x "%m-%d\n%H:%M"
|
||||
private String formatX = "%Y-%m-%d %H:%M:%S";
|
||||
|
||||
// set datafile separator ","
|
||||
private String datafileSeparator = ",";
|
||||
|
||||
// set xlabel "Time"
|
||||
private String xlabel = "Time";
|
||||
|
||||
// set ylabel "Traffic"
|
||||
private String ylabel = "Duration in ms";
|
||||
|
||||
// set output "datausage.png"
|
||||
private File output = new File("/tmp/out.png");
|
||||
|
||||
// set xtics rotate by 80
|
||||
private int rotateXAxisLabel = -80;
|
||||
|
||||
public GnuplotSettings(final File output) {
|
||||
this.output = output;
|
||||
}
|
||||
|
||||
public int getRotateXAxisLabel() {
|
||||
return rotateXAxisLabel;
|
||||
}
|
||||
|
||||
public void setRotateXAxisLabel(final int rotateXAxisLabel) {
|
||||
this.rotateXAxisLabel = rotateXAxisLabel;
|
||||
}
|
||||
|
||||
public String getTerminal() {
|
||||
return terminal;
|
||||
}
|
||||
|
||||
public void setTerminal(final String terminal) {
|
||||
this.terminal = terminal;
|
||||
}
|
||||
|
||||
public int getHeight() {
|
||||
return height;
|
||||
}
|
||||
|
||||
public void setHeight(final int height) {
|
||||
this.height = height;
|
||||
}
|
||||
|
||||
public int getWidth() {
|
||||
return width;
|
||||
}
|
||||
|
||||
public void setWidth(final int width) {
|
||||
this.width = width;
|
||||
}
|
||||
|
||||
public String getTimefmt() {
|
||||
return timefmt;
|
||||
}
|
||||
|
||||
public void setTimefmt(final String timefmt) {
|
||||
this.timefmt = timefmt;
|
||||
}
|
||||
|
||||
public String getFormatX() {
|
||||
return formatX;
|
||||
}
|
||||
|
||||
public void setFormatX(final String formatX) {
|
||||
this.formatX = formatX;
|
||||
}
|
||||
|
||||
public String getDatafileSeparator() {
|
||||
return datafileSeparator;
|
||||
}
|
||||
|
||||
public void setDatafileSeparator(final String datafileSeparator) {
|
||||
this.datafileSeparator = datafileSeparator;
|
||||
}
|
||||
|
||||
public String getXlabel() {
|
||||
return xlabel;
|
||||
}
|
||||
|
||||
public void setXlabel(final String xlabel) {
|
||||
this.xlabel = xlabel;
|
||||
}
|
||||
|
||||
public String getYlabel() {
|
||||
return ylabel;
|
||||
}
|
||||
|
||||
public void setYlabel(final String ylabel) {
|
||||
this.ylabel = ylabel;
|
||||
}
|
||||
|
||||
public File getOutput() {
|
||||
return output;
|
||||
}
|
||||
|
||||
public void setOutput(final File output) {
|
||||
this.output = output;
|
||||
}
|
||||
|
||||
// plot 'sample.txt' using 1:2 title 'Bytes' with linespoints 2
|
||||
|
||||
}
|
||||
@@ -0,0 +1,77 @@
|
||||
package org.lucares.recommind.logs;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.io.Writer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.lucares.pdb.api.Entry;
|
||||
import org.lucares.performance.db.FileUtils;
|
||||
import org.lucares.performance.db.PerformanceDb;
|
||||
|
||||
public class Plotter {
|
||||
public static void main(final String[] args) throws Exception {
|
||||
final Path dataDirectory = Paths.get(args[0]);
|
||||
final Path outputDirectory = Paths.get(args[1]);
|
||||
final String query = args[2];
|
||||
final Path tmpBaseDir = Paths.get(args[0], "tmp");
|
||||
Files.createDirectories(tmpBaseDir);
|
||||
Files.createDirectories(outputDirectory);
|
||||
final Path tmpDirectory = Files.createTempDirectory(tmpBaseDir, "gnuplot");
|
||||
try {
|
||||
|
||||
final Collection<DataSeries> dataSeries = new ArrayList<>();
|
||||
|
||||
try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
|
||||
final Stream<Entry> entries = db.get(query).singleGroup().asStream();
|
||||
|
||||
final File dataFile = File.createTempFile("data", ".dat", tmpDirectory.toFile());
|
||||
final DataSeries dataSerie = new DataSeries(dataFile, query);
|
||||
toCsv(entries, dataFile);
|
||||
|
||||
dataSeries.add(dataSerie);
|
||||
}
|
||||
|
||||
final File outputFile = File.createTempFile("out", ".png", outputDirectory.toFile());
|
||||
final Gnuplot gnuplot = new Gnuplot(tmpDirectory);
|
||||
final GnuplotSettings settings = new GnuplotSettings(outputFile);
|
||||
gnuplot.plot(settings, dataSeries);
|
||||
} finally {
|
||||
FileUtils.delete(tmpDirectory);
|
||||
}
|
||||
}
|
||||
|
||||
private static void toCsv(final Stream<Entry> entries, final File dataFile) throws IOException {
|
||||
|
||||
final long start = System.nanoTime();
|
||||
int count = 0;
|
||||
final int separator = ',';
|
||||
final int newline = '\n';
|
||||
try (final Writer output = new OutputStreamWriter(new FileOutputStream(dataFile), StandardCharsets.US_ASCII);) {
|
||||
|
||||
final Iterator<Entry> it = entries.iterator();
|
||||
while (it.hasNext()) {
|
||||
final Entry entry = it.next();
|
||||
|
||||
final String value = String.valueOf(entry.getValue());
|
||||
final String date = entry.getDate().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME);
|
||||
output.write(date);
|
||||
output.write(separator);
|
||||
output.write(value);
|
||||
output.write(newline);
|
||||
count++;
|
||||
}
|
||||
}
|
||||
System.out.println("wrote " + count + " values to csv in: " + (System.nanoTime() - start) / 1_000_000.0 + "ms");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,241 @@
|
||||
package org.lucares.recommind.logs;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.PrintWriter;
|
||||
import java.net.ServerSocket;
|
||||
import java.net.Socket;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.nio.file.Path;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.lucares.pdb.api.Entry;
|
||||
import org.lucares.pdb.api.Tags;
|
||||
import org.lucares.performance.db.BlockingQueueIterator;
|
||||
import org.lucares.performance.db.PerformanceDb;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.MappingIterator;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.ObjectReader;
|
||||
|
||||
public class TcpIngestor implements AutoCloseable {
|
||||
|
||||
public static final int PORT = 17347;
|
||||
|
||||
private final AtomicBoolean acceptNewConnections = new AtomicBoolean(true);
|
||||
|
||||
private final ExecutorService serverThreadPool = Executors.newFixedThreadPool(2);
|
||||
|
||||
private final ExecutorService workerThreadPool = Executors.newCachedThreadPool();
|
||||
|
||||
private final PerformanceDb db;
|
||||
|
||||
public final static class Handler implements Callable<Void> {
|
||||
|
||||
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||
|
||||
private final TypeReference<Map<String, Object>> typeReferenceForMap = new TypeReference<Map<String, Object>>() {
|
||||
};
|
||||
|
||||
final Socket clientSocket;
|
||||
private final ArrayBlockingQueue<Entry> queue;
|
||||
|
||||
public Handler(final Socket clientSocket, final ArrayBlockingQueue<Entry> queue) {
|
||||
this.clientSocket = clientSocket;
|
||||
this.queue = queue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
Thread.currentThread().setName("worker-" + clientSocket.getInetAddress());
|
||||
System.out.println("opening streams to client");
|
||||
try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
|
||||
BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
|
||||
|
||||
) {
|
||||
final ObjectReader objectReader = objectMapper.readerFor(typeReferenceForMap);
|
||||
final MappingIterator<Object> iterator = objectReader.readValues(in);
|
||||
|
||||
double duration = 0.0;
|
||||
int count = 0;
|
||||
System.out.println("reading from stream");
|
||||
while (iterator.hasNext()) {
|
||||
|
||||
// System.out.println("read: " + line);
|
||||
@SuppressWarnings("unchecked")
|
||||
final Map<String, Object> object = (Map<String, Object>) iterator.next();
|
||||
|
||||
final long start = System.nanoTime();
|
||||
final Optional<Entry> entry = createEntry(object);
|
||||
final long end = System.nanoTime();
|
||||
duration += (end - start) / 1_000_000.0;
|
||||
|
||||
count++;
|
||||
if (count == 100000) {
|
||||
System.out.println("reading " + count + " took " + duration + "ms");
|
||||
duration = 0.0;
|
||||
count = 0;
|
||||
}
|
||||
|
||||
if (entry.isPresent()) {
|
||||
queue.put(entry.get());
|
||||
}
|
||||
|
||||
}
|
||||
System.out.println("connection closed");
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
public Optional<Entry> createEntry(final Map<String, Object> map) {
|
||||
try {
|
||||
|
||||
final OffsetDateTime date = getDate(map);
|
||||
final long duration = (int) map.get("duration");
|
||||
|
||||
final Tags tags = createTags(map);
|
||||
|
||||
final Entry entry = new Entry(date, duration, tags);
|
||||
return Optional.of(entry);
|
||||
} catch (final Exception e) {
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
private Tags createTags(final Map<String, Object> map) {
|
||||
Tags tags = Tags.create();
|
||||
for (final java.util.Map.Entry<String, Object> e : map.entrySet()) {
|
||||
|
||||
final String key = e.getKey();
|
||||
final Object value = e.getValue();
|
||||
|
||||
switch (key) {
|
||||
case "@timestamp":
|
||||
case "duration":
|
||||
// these fields are not tags
|
||||
break;
|
||||
case "tags":
|
||||
// TODO @ahr add support for simple tags, currently we
|
||||
// only support key/value tags
|
||||
break;
|
||||
default:
|
||||
tags = tags.copyAddIfNotNull(key, String.valueOf(value));
|
||||
break;
|
||||
}
|
||||
}
|
||||
return tags;
|
||||
}
|
||||
|
||||
private OffsetDateTime getDate(final Map<String, Object> map) {
|
||||
final String timestamp = (String) map.get("@timestamp");
|
||||
|
||||
final OffsetDateTime date = OffsetDateTime.parse(timestamp, DateTimeFormatter.ISO_ZONED_DATE_TIME);
|
||||
return date;
|
||||
}
|
||||
}
|
||||
|
||||
public TcpIngestor(final Path dataDirectory) {
|
||||
System.out.println("opening performance db: " + dataDirectory);
|
||||
db = new PerformanceDb(dataDirectory);
|
||||
System.out.println("performance db open");
|
||||
}
|
||||
|
||||
public void start() throws Exception {
|
||||
|
||||
final ArrayBlockingQueue<Entry> queue = new ArrayBlockingQueue<>(100000);
|
||||
|
||||
serverThreadPool.submit(() -> {
|
||||
Thread.currentThread().setName("db-ingestion");
|
||||
try {
|
||||
db.put(new BlockingQueueIterator<>(queue, Entry.POISON));
|
||||
} catch (final Exception e) {
|
||||
e.printStackTrace();
|
||||
throw e;
|
||||
}
|
||||
return null;
|
||||
});
|
||||
|
||||
serverThreadPool.submit(() -> listen(queue));
|
||||
}
|
||||
|
||||
private Void listen(final ArrayBlockingQueue<Entry> queue) throws IOException {
|
||||
Thread.currentThread().setName("socket-listener");
|
||||
try (ServerSocket serverSocket = new ServerSocket(
|
||||
PORT/* , 10, InetAddress.getLocalHost() */);) {
|
||||
System.out.println("listening on port " + PORT);
|
||||
|
||||
serverSocket.setSoTimeout((int) TimeUnit.MILLISECONDS.toMillis(100));
|
||||
|
||||
while (acceptNewConnections.get()) {
|
||||
try {
|
||||
final Socket clientSocket = serverSocket.accept();
|
||||
System.out.println("accepted connection: " + clientSocket.getRemoteSocketAddress());
|
||||
|
||||
workerThreadPool.submit(new Handler(clientSocket, queue));
|
||||
System.out.println("handler submitted");
|
||||
} catch (final SocketTimeoutException e) {
|
||||
// expected every 100ms
|
||||
// needed to be able to stop the server
|
||||
}
|
||||
}
|
||||
System.out.println("not accepting new connections. ");
|
||||
|
||||
System.out.println("stopping worker pool");
|
||||
workerThreadPool.shutdown();
|
||||
try {
|
||||
workerThreadPool.awaitTermination(10, TimeUnit.MINUTES);
|
||||
System.out.println("workers stopped");
|
||||
} catch (final InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
System.out.println("adding poison");
|
||||
queue.offer(Entry.POISON);
|
||||
} catch (final Exception e) {
|
||||
e.printStackTrace();
|
||||
throw e;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
acceptNewConnections.set(false);
|
||||
System.out.println("stopped accept thread");
|
||||
serverThreadPool.shutdown();
|
||||
try {
|
||||
serverThreadPool.awaitTermination(10, TimeUnit.MINUTES);
|
||||
} catch (final InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
System.out.println("closing database");
|
||||
db.close();
|
||||
System.out.println("close done");
|
||||
}
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
|
||||
Runtime.getRuntime().addShutdownHook(new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
System.out.println("shutdown hook");
|
||||
}
|
||||
});
|
||||
|
||||
try (final TcpIngestor ingestor = new TcpIngestor(Config.DATA_DIR)) {
|
||||
ingestor.start();
|
||||
TimeUnit.MILLISECONDS.sleep(Long.MAX_VALUE);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user