remove obsolete classes
This commit is contained in:
@@ -1,54 +0,0 @@
|
||||
package org.lucares.recommind.logs;
|
||||
|
||||
import java.time.OffsetDateTime;
|
||||
import java.time.ZoneOffset;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.Map;
|
||||
|
||||
import org.lucares.performance.db.DateUtils;
|
||||
import org.lucares.performance.db.Entry;
|
||||
import org.lucares.performance.db.Tags;
|
||||
|
||||
import io.thekraken.grok.api.Grok;
|
||||
import io.thekraken.grok.api.Match;
|
||||
|
||||
public class GrokToEntryFilter {
|
||||
|
||||
private final Grok grok;
|
||||
|
||||
private final static DateTimeFormatter FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss,SSS");
|
||||
|
||||
public GrokToEntryFilter(final Grok grok) {
|
||||
this.grok = grok;
|
||||
}
|
||||
|
||||
public Entry parse(final String singleLineOfLog, final Tags tags) {
|
||||
final Match gm = grok.match(singleLineOfLog);
|
||||
gm.captures();
|
||||
final Map<String, Object> map = gm.toMap();
|
||||
|
||||
final String timestamp = (String) map.get("timestamp");
|
||||
final String method = (String) map.get("method");
|
||||
final String project = (String) map.get("project");
|
||||
final Long value = (Long) map.get("duration");
|
||||
|
||||
final Entry result;
|
||||
if (timestamp == null || method == null || value == null) {
|
||||
result = null;
|
||||
} else {
|
||||
|
||||
final OffsetDateTime date = DateUtils.parseAtZoneOffset(timestamp, FORMAT, ZoneOffset.UTC);
|
||||
|
||||
Tags entryTags = tags;
|
||||
entryTags = entryTags.copyAddIfNotNull("method", method);
|
||||
entryTags = entryTags.copyAddIfNotNull("project", lowerCase(project));
|
||||
|
||||
result = new Entry(date, value, entryTags);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private String lowerCase(final String s) {
|
||||
return s != null ? s.toLowerCase().intern() : s;
|
||||
}
|
||||
}
|
||||
@@ -1,75 +0,0 @@
|
||||
package org.lucares.recommind.logs;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.nio.file.StandardCopyOption;
|
||||
|
||||
import org.lucares.performance.db.PerformanceDb;
|
||||
import org.lucares.performance.db.Tags;
|
||||
|
||||
import io.thekraken.grok.api.Grok;
|
||||
import io.thekraken.grok.api.exception.GrokException;
|
||||
import liquibase.exception.LiquibaseException;
|
||||
|
||||
public class Ingestor {
|
||||
|
||||
public static String createPattern() {
|
||||
// "%{TIMESTAMP_ISO8601:timestamp}\\s+\\[.*\\]+\\s+%{LOGLEVEL}\\s+(?<logger>(?:[a-zA-Z0-9-]+\\.)*[A-Za-z0-9$]+)\\s+null
|
||||
// - Executed
|
||||
// %{NOTSPACE:method} in %{NUMBER:duration} ms %{WORD:status}.
|
||||
// \\[.*?(?:project=%{WORD:project})?"
|
||||
final String time = "%{TIMESTAMP_ISO8601:timestamp}";
|
||||
final String stuff = "\\s+\\[.*\\]+\\s+%{LOGLEVEL}\\s+(?<logger>(?:[a-zA-Z0-9-]+\\.)*[A-Za-z0-9$]+)\\s+null - Executed ";
|
||||
final String method = "%{NOTSPACE:method}";
|
||||
final String duration = "%{NUMBER:duration:long}";
|
||||
final String status = "%{WORD:status}";
|
||||
final String project = "(?:project=%{WORD:project})?";
|
||||
return time + stuff + method + " in " + duration + " ms " + status + ". \\[.*?" + project;
|
||||
// return time;
|
||||
|
||||
}
|
||||
|
||||
public static void main(final String[] args) throws LiquibaseException, Exception {
|
||||
final Path dataDirectory = Paths.get("/tmp/ingestor");
|
||||
Files.createDirectories(dataDirectory);
|
||||
final File logFile = new File("/home/andi/ws/performanceDb/data/vapondem02ap002.log");
|
||||
|
||||
final Grok grok = createGrok(dataDirectory);
|
||||
|
||||
grok.compile(createPattern());
|
||||
|
||||
try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
|
||||
|
||||
final PerformanceLogs performanceLogs = new PerformanceLogs();
|
||||
final Tags tags = Tags.create("pod", "ondem01");
|
||||
|
||||
final long start = System.nanoTime();
|
||||
|
||||
performanceLogs.ingest(db, logFile, tags, grok);
|
||||
|
||||
System.out.println("duration: " + (System.nanoTime() - start) / 1_000_000.0 + "ms");
|
||||
}
|
||||
}
|
||||
|
||||
private static Grok createGrok(final Path dataDirectory) throws GrokException, IOException {
|
||||
|
||||
final File patternsFile = storePatterns(dataDirectory);
|
||||
|
||||
return Grok.create(patternsFile.getAbsolutePath());
|
||||
}
|
||||
|
||||
private static File storePatterns(final Path dataDirectory) throws IOException {
|
||||
|
||||
final File file = new File(dataDirectory.toFile(), "patterns");
|
||||
try (InputStream stream = Ingestor.class.getClassLoader().getResourceAsStream("org/lucares/grok/patterns")) {
|
||||
|
||||
Files.copy(stream, file.toPath(), StandardCopyOption.REPLACE_EXISTING);
|
||||
}
|
||||
|
||||
return file;
|
||||
}
|
||||
}
|
||||
@@ -1,37 +0,0 @@
|
||||
package org.lucares.recommind.logs;
|
||||
|
||||
import java.util.Iterator;
|
||||
|
||||
import org.lucares.performance.db.Entry;
|
||||
|
||||
import io.thekraken.grok.api.Grok;
|
||||
import io.thekraken.grok.api.Match;
|
||||
|
||||
public class LogReader implements Iterable<Entry> {
|
||||
|
||||
private final Grok grok;
|
||||
|
||||
public LogReader(final Grok grok) {
|
||||
super();
|
||||
this.grok = grok;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<Entry> iterator() {
|
||||
|
||||
// Grok grok = Grok.create("patterns/patterns");
|
||||
|
||||
/** Grok pattern to compile, here httpd logs */
|
||||
// grok.compile("%{COMBINEDAPACHELOG}");
|
||||
|
||||
/** Line of log to match */
|
||||
final String log = "112.169.19.192 - - [06/Mar/2013:01:36:30 +0900] \"GET / HTTP/1.1\" 200 44346 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_2) AppleWebKit/537.22 (KHTML, like Gecko) Chrome/25.0.1364.152 Safari/537.22\"";
|
||||
|
||||
final Match gm = grok.match(log);
|
||||
gm.captures();
|
||||
gm.toMap();
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,81 +0,0 @@
|
||||
package org.lucares.recommind.logs;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.FileReader;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.lucares.performance.db.BlockingIterator;
|
||||
import org.lucares.performance.db.BlockingQueueIterator;
|
||||
import org.lucares.performance.db.Entry;
|
||||
import org.lucares.performance.db.PerformanceDb;
|
||||
import org.lucares.performance.db.Tags;
|
||||
|
||||
import io.thekraken.grok.api.Grok;
|
||||
|
||||
public class PerformanceLogs {
|
||||
|
||||
private final ExecutorService executor = Executors.newCachedThreadPool();
|
||||
|
||||
public void ingest(final PerformanceDb db, final File performanceLog, final Tags tags, final Grok grok)
|
||||
throws InterruptedException {
|
||||
|
||||
final ArrayBlockingQueue<Entry> queue = new ArrayBlockingQueue<>(10);
|
||||
|
||||
final BlockingIterator<Entry> iterator = new BlockingQueueIterator<>(queue, Entry.POISON);
|
||||
|
||||
final Future<Boolean> future = executor.submit(() -> {
|
||||
|
||||
final GrokToEntryFilter filter = new GrokToEntryFilter(grok);
|
||||
|
||||
boolean result = false;
|
||||
|
||||
try (final BufferedReader reader = new BufferedReader(new FileReader(performanceLog))) {
|
||||
String line;
|
||||
int count = 0;
|
||||
long start = System.nanoTime();
|
||||
while ((line = reader.readLine()) != null) {
|
||||
|
||||
final Entry entry = filter.parse(line, tags);
|
||||
if (entry != null) {
|
||||
// queue.put(entry);
|
||||
System.out.println(entry);
|
||||
}
|
||||
count++;
|
||||
|
||||
if (count == 10000) {
|
||||
System.out.println("duration: " + (System.nanoTime() - start) / 1_000_000.0 + "ms");
|
||||
start = System.nanoTime();
|
||||
count = 0;
|
||||
}
|
||||
}
|
||||
|
||||
result = true;
|
||||
} finally {
|
||||
System.out.println("added poison");
|
||||
queue.put(Entry.POISON);
|
||||
}
|
||||
return result;
|
||||
});
|
||||
|
||||
try {
|
||||
db.put(iterator);
|
||||
try {
|
||||
future.get(10, TimeUnit.MINUTES);
|
||||
} catch (ExecutionException | TimeoutException e) {
|
||||
e.printStackTrace(); // TODO @ahr handle this mess
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
future.cancel(true);
|
||||
} finally {
|
||||
executor.shutdown();
|
||||
executor.awaitTermination(10, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -73,6 +73,7 @@ public class TcpIngestor implements AutoCloseable {
|
||||
while (iterator.hasNext()) {
|
||||
|
||||
// System.out.println("read: " + line);
|
||||
@SuppressWarnings("unchecked")
|
||||
final Map<String, Object> object = (Map<String, Object>) iterator.next();
|
||||
|
||||
final long start = System.nanoTime();
|
||||
|
||||
Reference in New Issue
Block a user