insert entries for different tags in one stream

This commit is contained in:
2016-12-10 14:10:41 +01:00
parent a409c4c5d0
commit 34ee64fff1
20 changed files with 648 additions and 144 deletions

View File

@@ -0,0 +1,54 @@
package org.lucares.recommind.logs;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import org.lucares.performance.db.DateUtils;
import org.lucares.performance.db.Entry;
import org.lucares.performance.db.Tags;
import io.thekraken.grok.api.Grok;
import io.thekraken.grok.api.Match;
public class GrokToEntryFilter {
private final Grok grok;
private final static DateTimeFormatter FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss,SSS");
public GrokToEntryFilter(final Grok grok) {
this.grok = grok;
}
public Entry parse(final String singleLineOfLog, final Tags tags) {
final Match gm = grok.match(singleLineOfLog);
gm.captures();
final Map<String, Object> map = gm.toMap();
final String timestamp = (String) map.get("timestamp");
final String method = (String) map.get("method");
final String project = (String) map.get("project");
final Long value = (Long) map.get("duration");
final Entry result;
if (timestamp == null || method == null || value == null) {
result = null;
} else {
final OffsetDateTime date = DateUtils.parseAtZoneOffset(timestamp, FORMAT, ZoneOffset.UTC);
Tags entryTags = tags;
entryTags = entryTags.copyAddIfNotNull("method", method);
entryTags = entryTags.copyAddIfNotNull("project", lowerCase(project));
result = new Entry(date, value, entryTags);
}
return result;
}
private String lowerCase(final String s) {
return s != null ? s.toLowerCase().intern() : s;
}
}

View File

@@ -0,0 +1,74 @@
package org.lucares.recommind.logs;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import org.lucares.performance.db.PerformanceDb;
import org.lucares.performance.db.Tags;
import io.thekraken.grok.api.Grok;
import io.thekraken.grok.api.exception.GrokException;
import liquibase.exception.LiquibaseException;
public class Ingestor {
public static String createPattern() {
// "%{TIMESTAMP_ISO8601:timestamp}\\s+\\[.*\\]+\\s+%{LOGLEVEL}\\s+(?<logger>(?:[a-zA-Z0-9-]+\\.)*[A-Za-z0-9$]+)\\s+null
// - Executed
// %{NOTSPACE:method} in %{NUMBER:duration} ms %{WORD:status}.
// \\[.*?(?:project=%{WORD:project})?"
final String time = "%{TIMESTAMP_ISO8601:timestamp}";
final String stuff = "\\s+\\[.*\\]+\\s+%{LOGLEVEL}\\s+(?<logger>(?:[a-zA-Z0-9-]+\\.)*[A-Za-z0-9$]+)\\s+null - Executed ";
final String method = "%{NOTSPACE:method}";
final String duration = "%{NUMBER:duration:long}";
final String status = "%{WORD:status}";
final String project = "(?:project=%{WORD:project})?";
return time + stuff + method + " in " + duration + " ms " + status + ". \\[.*?" + project;
// return time;
}
public static void main(final String[] args) throws LiquibaseException, Exception {
final Path dataDirectory = Files.createTempDirectory("ingestor");
final File logFile = new File(
"/home/andi/ws/performanceDb/data/production/ondem/ondem01/ap001/1_performance.log");
final Grok grok = createGrok(dataDirectory);
grok.compile(createPattern());
try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
final PerformanceLogs performanceLogs = new PerformanceLogs();
final Tags tags = Tags.create("pod", "ondem01");
final long start = System.nanoTime();
performanceLogs.ingest(db, logFile, tags, grok);
System.out.println("duration: " + (System.nanoTime() - start) / 1_000_000.0 + "ms");
}
}
private static Grok createGrok(final Path dataDirectory) throws GrokException, IOException {
final File patternsFile = storePatterns(dataDirectory);
return Grok.create(patternsFile.getAbsolutePath());
}
private static File storePatterns(final Path dataDirectory) throws IOException {
final File file = new File(dataDirectory.toFile(), "patterns");
try (InputStream stream = Ingestor.class.getClassLoader().getResourceAsStream("org/lucares/grok/patterns")) {
Files.copy(stream, file.toPath(), StandardCopyOption.REPLACE_EXISTING);
}
return file;
}
}

View File

@@ -1,19 +1,70 @@
package org.lucares.recommind.logs;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.lucares.performance.db.BlockingIterator;
import org.lucares.performance.db.BlockingQueueIterator;
import org.lucares.performance.db.Entry;
import org.lucares.performance.db.PerformanceDb;
import org.lucares.performance.db.Tags;
import io.thekraken.grok.api.Grok;
public class PerformanceLogs {
public void ingest(final PerformanceDb db, final File performanceLog, final Tags tags) {
private final ExecutorService executor = Executors.newCachedThreadPool();
public void ingest(final PerformanceDb db, final File performanceLog, final Tags tags, final Grok grok)
throws InterruptedException {
final ArrayBlockingQueue<Entry> queue = new ArrayBlockingQueue<>(10);
db.put(queue, tags);
final BlockingIterator<Entry> iterator = new BlockingQueueIterator<>(queue, Entry.POISON);
final Future<Boolean> future = executor.submit(() -> {
final GrokToEntryFilter filter = new GrokToEntryFilter(grok);
boolean result = false;
try (final BufferedReader reader = new BufferedReader(new FileReader(performanceLog))) {
String line;
while ((line = reader.readLine()) != null) {
final Entry entry = filter.parse(line, tags);
if (entry != null) {
System.out.println(entry);
queue.put(entry);
}
}
result = true;
} finally {
queue.put(Entry.POISON);
}
return result;
});
try {
db.put(iterator);
try {
future.get(10, TimeUnit.MINUTES);
} catch (ExecutionException | TimeoutException e) {
e.printStackTrace(); // TODO @ahr handle this mess
}
} catch (final RuntimeException e) {
future.cancel(true);
} finally {
executor.shutdown();
executor.awaitTermination(10, TimeUnit.MINUTES);
}
}
}

View File

@@ -0,0 +1,94 @@
USERNAME [a-zA-Z0-9._-]+
USER %{USERNAME}
INT (?:[+-]?(?:[0-9]+))
BASE10NUM (?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+)))
NUMBER (?:%{BASE10NUM})
BASE16NUM (?<![0-9A-Fa-f])(?:[+-]?(?:0x)?(?:[0-9A-Fa-f]+))
BASE16FLOAT \b(?<![0-9A-Fa-f.])(?:[+-]?(?:0x)?(?:(?:[0-9A-Fa-f]+(?:\.[0-9A-Fa-f]*)?)|(?:\.[0-9A-Fa-f]+)))\b
POSINT \b(?:[1-9][0-9]*)\b
NONNEGINT \b(?:[0-9]+)\b
WORD \b\w+\b
NOTSPACE \S+
SPACE \s*
DATA .*?
GREEDYDATA .*
QUOTEDSTRING (?>(?<!\\)(?>"(?>\\.|[^\\"]+)+"|""|(?>'(?>\\.|[^\\']+)+')|''|(?>`(?>\\.|[^\\`]+)+`)|``))
UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12}
# Networking
MAC (?:%{CISCOMAC}|%{WINDOWSMAC}|%{COMMONMAC})
CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4})
WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2})
COMMONMAC (?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2})
IPV6 ((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?
IPV4 (?<![0-9])(?:(?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2}))(?![0-9])
IP (?:%{IPV6}|%{IPV4})
HOSTNAME \b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b)
HOST %{HOSTNAME}
IPORHOST (?:%{HOSTNAME}|%{IP})
HOSTPORT %{IPORHOST}:%{POSINT}
# paths
PATH (?:%{UNIXPATH}|%{WINPATH})
UNIXPATH (?>/(?>[\w_%!$@:.,-]+|\\.)*)+
TTY (?:/dev/(pts|tty([pq])?)(\w+)?/?(?:[0-9]+))
WINPATH (?>[A-Za-z]+:|\\)(?:\\[^\\?*]*)+
URIPROTO [A-Za-z]+(\+[A-Za-z+]+)?
URIHOST %{IPORHOST}(?::%{POSINT:port})?
# uripath comes loosely from RFC1738, but mostly from what Firefox
# doesn't turn into %XX
URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%_\-]*)+
#URIPARAM \?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)?
URIPARAM \?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]]*
URIPATHPARAM %{URIPATH}(?:%{URIPARAM})?
URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})?
# Months: January, Feb, 3, 03, 12, December
MONTH \b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\b
MONTHNUM (?:0?[1-9]|1[0-2])
MONTHNUM2 (?:0[1-9]|1[0-2])
MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])
# Days: Monday, Tue, Thu, etc...
DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)
# Years?
YEAR (?>\d\d){1,2}
HOUR (?:2[0123]|[01]?[0-9])
MINUTE (?:[0-5][0-9])
# '60' is a leap second in most time standards and thus is valid.
SECOND (?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)
TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9])
# datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it)
DATE_US %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR}
DATE_EU %{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR}
ISO8601_TIMEZONE (?:Z|[+-]%{HOUR}(?::?%{MINUTE}))
ISO8601_SECOND (?:%{SECOND}|60)
TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}?
DATE %{DATE_US}|%{DATE_EU}
DATESTAMP %{DATE}[- ]%{TIME}
TZ (?:[PMCE][SD]T|UTC)
DATESTAMP_RFC822 %{DAY} %{MONTH} %{MONTHDAY} %{YEAR} %{TIME} %{TZ}
DATESTAMP_RFC2822 %{DAY}, %{MONTHDAY} %{MONTH} %{YEAR} %{TIME} %{ISO8601_TIMEZONE}
DATESTAMP_OTHER %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{TZ} %{YEAR}
DATESTAMP_EVENTLOG %{YEAR}%{MONTHNUM2}%{MONTHDAY}%{HOUR}%{MINUTE}%{SECOND}
# Syslog Dates: Month Day HH:MM:SS
SYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME}
PROG (?:[\w._/%-]+)
SYSLOGPROG %{PROG:program}(?:\[%{POSINT:pid}\])?
SYSLOGHOST %{IPORHOST}
SYSLOGFACILITY <%{NONNEGINT:facility}.%{NONNEGINT:priority}>
HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT}
# Shortcuts
QS %{QUOTEDSTRING}
# Log formats
SYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:
COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-)
COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}
# Log Levels
LOGLEVEL ([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)