TcpIngestor that receives a stream of json objects and stores them

This commit is contained in:
2016-12-11 18:40:44 +01:00
parent e936df6f7e
commit 89fbaf2d06
12 changed files with 451 additions and 26 deletions

View File

@@ -50,7 +50,7 @@ class PdbWriter implements AutoCloseable {
private void assertEpochMilliInRange(final long epochMilli) {
if (epochMilli < minimalEpochMilli) {
LOGGER.warning("epochMilli must not be smaller than " + minimalEpochMilli + ", but was " + epochMilli
LOGGER.fine("epochMilli must not be smaller than " + minimalEpochMilli + ", but was " + epochMilli
+ ". We'll accept this for now. "
+ "Currently there is no code that relies on monotonically increasing date values. "
+ "Log4j does not guarantee it either.");
@@ -84,4 +84,8 @@ class PdbWriter implements AutoCloseable {
outputStream.flush();
outputStream.close();
}
public void flush() throws IOException {
outputStream.flush();
}
}

View File

@@ -3,7 +3,9 @@ package org.lucares.performance.db;
import java.io.IOException;
import java.time.OffsetDateTime;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -61,12 +63,16 @@ public class PdbWriterManager implements AutoCloseable {
private final PdbWriterSupplier supplier;
private Day lastDay = new Day(OffsetDateTime.MIN);
public PdbWriterManager(final PdbWriterSupplier supplier) {
this.supplier = supplier;
}
public PdbWriter get(final Tags tags, final OffsetDateTime date) {
handleDateChange(date);
final Key key = new Key(tags, date);
if (!map.containsKey(key)) {
final PdbWriter writer = supplier.supply(tags, date);
@@ -75,19 +81,51 @@ public class PdbWriterManager implements AutoCloseable {
return map.get(key);
}
private void handleDateChange(final OffsetDateTime date) {
final Day day = new Day(date);
if (!day.equals(lastDay)) {
closeFiles();
lastDay = day;
}
}
public PdbWriter put(final Tags tags, final OffsetDateTime date, final PdbWriter pdbWriter) {
final Key key = new Key(tags, date);
return map.put(key, pdbWriter);
}
@Override
public void close() {
public void flush() {
LOGGER.info("flushing all files");
for (final PdbWriter writer : map.values()) {
try {
writer.close();
writer.flush();
} catch (final IOException e) {
LOGGER.log(Level.WARNING, e.getMessage(), e);
}
}
}
@Override
public void close() {
closeFiles();
}
private void closeFiles() {
LOGGER.info("closing all files");
final Iterator<Entry<Key, PdbWriter>> it = map.entrySet().iterator();
while (it.hasNext()) {
final Entry<Key, PdbWriter> entry = it.next();
final PdbWriter writer = entry.getValue();
try {
writer.close();
} catch (final IOException e) {
LOGGER.log(Level.WARNING, e.getMessage(), e);
}
it.remove();
}
}
}

View File

@@ -17,14 +17,12 @@ import java.util.stream.StreamSupport;
import org.lucares.performance.db.PdbWriterManager.PdbWriterSupplier;
import liquibase.exception.LiquibaseException;
public class PerformanceDb implements AutoCloseable {
private static final Logger LOGGER = Logger.getLogger(PerformanceDb.class.getCanonicalName());
private final TagsToFile tagsToFile;
public PerformanceDb(final Path dataDirectory) throws LiquibaseException {
public PerformanceDb(final Path dataDirectory) {
tagsToFile = new TagsToFile(dataDirectory);
}
@@ -71,12 +69,14 @@ public class PerformanceDb implements AutoCloseable {
public void put(final BlockingIterator<Entry> entries) throws WriteException {
final long start = System.nanoTime();
final double timeSpendInWrite = 0.0;
long count = 0;
double durationInManager = 0;
try (final PdbWriterManager manager = new PdbWriterManager(new WriterSupplier(tagsToFile));) {
long start = System.nanoTime();
while (true) {
final Optional<Entry> entryOptional = entries.next();
if (!entryOptional.isPresent()) {
break;
@@ -90,20 +90,31 @@ public class PerformanceDb implements AutoCloseable {
writer.write(entry);
count++;
if (count == 10000) {
final long end = System.nanoTime();
final double duration = (end - start) / 1_000_000.0;
LOGGER.info("inserting the last " + count + " took " + duration + " ms; " + durationInManager
+ "ms in entries.next ");
System.out.println(entry);
start = System.nanoTime();
durationInManager = 0.0;
count = 0;
}
}
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.info("Thread was interrupted. Aborting exectution.");
} finally {
final double duration = (System.nanoTime() - start) / 1_000_000.0;
LOGGER.info("inserting " + count + " took " + duration + " ms of which " + timeSpendInWrite
+ " were spend in write");
//
}
}
public List<Entry> getAsList(final Tags tags) {
return get(tags).collect(Collectors.toList());
public List<Entry> getAsList(final String query) {
return get(query).collect(Collectors.toList());
}
/**
@@ -138,7 +149,7 @@ public class PerformanceDb implements AutoCloseable {
}
@Override
public void close() throws Exception {
public void close() {
tagsToFile.close();
}
}

View File

@@ -117,7 +117,7 @@ public class Tags {
public String abbreviatedRepresentation() {
final StringBuilder result = new StringBuilder();
final int maxLength = 500;
final int maxLength = 200;
final SortedSet<String> keys = new TreeSet<>(tags.keySet());

View File

@@ -172,8 +172,13 @@ public class TagsToFile implements AutoCloseable, CollectionUtils {
}
@Override
public void close() throws Exception {
public void close() {
try {
db.close();
} catch (final Exception e) {
// H2 doesn't actually do anything in close
throw new IllegalStateException(e);
}
}
}

View File

@@ -100,7 +100,7 @@ public class PerformanceDbTest {
performanceDb.put(entry);
}
final List<Entry> actualEntries = performanceDb.getAsList(tags);
final List<Entry> actualEntries = performanceDb.getAsList(Query.createQuery(tags));
Assert.assertEquals(actualEntries, entries);
final File storageFileForToday = StorageUtils.createStorageFile(dataDirectory, new Day(timeRange.getFrom()),
@@ -137,16 +137,16 @@ public class PerformanceDbTest {
printEntries(entriesThree, "three");
performanceDb.put(entriesThree);
final List<Entry> actualEntriesOne = performanceDb.getAsList(tagsOne);
final List<Entry> actualEntriesOne = performanceDb.getAsList(Query.createQuery(tagsOne));
Assert.assertEquals(actualEntriesOne, entriesOne);
final List<Entry> actualEntriesTwo = performanceDb.getAsList(tagsTwo);
final List<Entry> actualEntriesTwo = performanceDb.getAsList(Query.createQuery(tagsTwo));
Assert.assertEquals(actualEntriesTwo, entriesTwo);
final List<Entry> actualEntriesThree = performanceDb.getAsList(tagsThree);
final List<Entry> actualEntriesThree = performanceDb.getAsList(Query.createQuery(tagsThree));
Assert.assertEquals(actualEntriesThree, entriesThree);
final List<Entry> actualEntriesAll = performanceDb.getAsList(tagsCommon);
final List<Entry> actualEntriesAll = performanceDb.getAsList(Query.createQuery(tagsCommon));
final List<Entry> expectedAll = CollectionUtils.collate(entriesOne,
CollectionUtils.collate(entriesTwo, entriesThree, Entry.BY_DATE), Entry.BY_DATE);

View File

@@ -3,5 +3,6 @@ dependencies {
compile project(':performanceDb')
compile "io.thekraken:grok:0.1.5"
compile 'org.lucares:svak:1.0'
compile 'com.fasterxml.jackson.core:jackson-databind:2.8.5'
}

View File

@@ -0,0 +1,8 @@
package org.lucares.recommind.logs;
import java.nio.file.Path;
import java.nio.file.Paths;
public class Config {
public static final Path DATA_DIR = Paths.get("/home/andi/ws/performanceDb/db");
}

View File

@@ -36,8 +36,7 @@ public class Ingestor {
public static void main(final String[] args) throws LiquibaseException, Exception {
final Path dataDirectory = Paths.get("/tmp/ingestor");
Files.createDirectories(dataDirectory);
final File logFile = new File(
"/home/andi/ws/performanceDb/data/production/ondem/ondem01/ap001/1_performance.log");
final File logFile = new File("/home/andi/ws/performanceDb/data/vapondem02ap002.log");
final Grok grok = createGrok(dataDirectory);

View File

@@ -38,10 +38,21 @@ public class PerformanceLogs {
try (final BufferedReader reader = new BufferedReader(new FileReader(performanceLog))) {
String line;
int count = 0;
long start = System.nanoTime();
while ((line = reader.readLine()) != null) {
final Entry entry = filter.parse(line, tags);
if (entry != null) {
queue.put(entry);
// queue.put(entry);
System.out.println(entry);
}
count++;
if (count == 10000) {
System.out.println("duration: " + (System.nanoTime() - start) / 1_000_000.0 + "ms");
start = System.nanoTime();
count = 0;
}
}

View File

@@ -0,0 +1,234 @@
package org.lucares.recommind.logs;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.file.Path;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.lucares.performance.db.BlockingQueueIterator;
import org.lucares.performance.db.Entry;
import org.lucares.performance.db.PerformanceDb;
import org.lucares.performance.db.Tags;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
public class TcpIngestor implements AutoCloseable {
public static final int PORT = 17347;
private final AtomicBoolean acceptNewConnections = new AtomicBoolean(true);
private final ExecutorService serverThreadPool = Executors.newFixedThreadPool(2);
private final ExecutorService workerThreadPool = Executors.newCachedThreadPool();
private final ObjectMapper objectMapper = new ObjectMapper();
private final PerformanceDb db;
private final class Handler implements Callable<Void> {
final Socket clientSocket;
private final ArrayBlockingQueue<Entry> queue;
public Handler(final Socket clientSocket, final ArrayBlockingQueue<Entry> queue) {
this.clientSocket = clientSocket;
this.queue = queue;
}
@Override
public Void call() throws Exception {
Thread.currentThread().setName("worker-" + clientSocket.getInetAddress());
System.out.println("opening streams to client");
try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));) {
double duration = 0.0;
int count = 0;
System.out.println("reading from stream");
String line;
while ((line = in.readLine()) != null) {
// System.out.println("read: " + line);
final long start = System.nanoTime();
final Optional<Entry> entry = createEntry(line);
final long end = System.nanoTime();
duration += (end - start) / 1_000_000.0;
count++;
if (count == 10000) {
System.out.println("reading 10k took " + duration + "ms");
duration = 0.0;
count = 0;
}
if (entry.isPresent()) {
queue.put(entry.get());
}
}
System.out.println("connection closed");
}
return null;
}
private Optional<Entry> createEntry(final String line) {
try {
final Map<String, Object> map = objectMapper.readValue(line, new TypeReference<Map<String, Object>>() {
});
final OffsetDateTime date = getDate(map);
final long duration = (int) map.get("duration");
final Tags tags = createTags(map);
final Entry entry = new Entry(date, duration, tags);
return Optional.of(entry);
} catch (final Exception e) {
return Optional.empty();
}
}
private Tags createTags(final Map<String, Object> map) {
Tags tags = Tags.create();
for (final java.util.Map.Entry<String, Object> e : map.entrySet()) {
final String key = e.getKey();
final Object value = e.getValue();
switch (key) {
case "@timestamp":
case "duration":
// these fields are not tags
break;
case "tags":
// TODO @ahr add support for simple tags, currently we
// only support key/value tags
break;
default:
tags = tags.copyAddIfNotNull(key, String.valueOf(value));
break;
}
}
return tags;
}
private OffsetDateTime getDate(final Map<String, Object> map) {
final String timestamp = (String) map.get("@timestamp");
final OffsetDateTime date = OffsetDateTime.parse(timestamp, DateTimeFormatter.ISO_ZONED_DATE_TIME);
return date;
}
}
public TcpIngestor(final Path dataDirectory) {
System.out.println("opening performance db: " + dataDirectory);
db = new PerformanceDb(dataDirectory);
System.out.println("performance db open");
}
public void start() throws Exception {
final ArrayBlockingQueue<Entry> queue = new ArrayBlockingQueue<>(1000);
serverThreadPool.submit(() -> {
Thread.currentThread().setName("db-ingestion");
try {
db.put(new BlockingQueueIterator<>(queue, Entry.POISON));
} catch (final Exception e) {
e.printStackTrace();
throw e;
}
return null;
});
serverThreadPool.submit(() -> listen(queue));
}
private Void listen(final ArrayBlockingQueue<Entry> queue) throws IOException {
Thread.currentThread().setName("socket-listener");
try (ServerSocket serverSocket = new ServerSocket(
PORT/* , 10, InetAddress.getLocalHost() */);) {
System.out.println("listening on port " + PORT);
serverSocket.setSoTimeout((int) TimeUnit.MILLISECONDS.toMillis(100));
while (acceptNewConnections.get()) {
try {
final Socket clientSocket = serverSocket.accept();
System.out.println("accepted connection: " + clientSocket.getRemoteSocketAddress());
workerThreadPool.submit(new Handler(clientSocket, queue));
System.out.println("handler submitted");
} catch (final SocketTimeoutException e) {
// expected every 100ms
// needed to be able to stop the server
}
}
System.out.println("not accepting new connections. ");
System.out.println("stopping worker pool");
workerThreadPool.shutdown();
try {
workerThreadPool.awaitTermination(10, TimeUnit.MINUTES);
System.out.println("workers stopped");
} catch (final InterruptedException e) {
e.printStackTrace();
}
System.out.println("adding poison");
queue.offer(Entry.POISON);
} catch (final Exception e) {
e.printStackTrace();
throw e;
}
return null;
}
@Override
public void close() {
acceptNewConnections.set(false);
System.out.println("stopped accept thread");
serverThreadPool.shutdown();
try {
serverThreadPool.awaitTermination(10, TimeUnit.MINUTES);
} catch (final InterruptedException e) {
e.printStackTrace();
}
System.out.println("closing database");
db.close();
System.out.println("close done");
}
public static void main(final String[] args) throws Exception {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.out.println("shutdown hook");
}
});
try (final TcpIngestor ingestor = new TcpIngestor(Config.DATA_DIR)) {
ingestor.start();
TimeUnit.MILLISECONDS.sleep(Long.MAX_VALUE);
}
}
}

View File

@@ -0,0 +1,114 @@
package org.lucares.performance.db.ingestor;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.lucares.performance.db.Entry;
import org.lucares.performance.db.FileUtils;
import org.lucares.performance.db.PerformanceDb;
import org.lucares.recommind.logs.TcpIngestor;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.fasterxml.jackson.databind.ObjectMapper;
import liquibase.exception.LiquibaseException;
@Test
public class TcpIngestorTest {
private Path dataDirectory;
@BeforeMethod
public void beforeMethod() throws IOException {
dataDirectory = Files.createTempDirectory("pdb");
}
@AfterMethod
public void afterMethod() throws IOException {
FileUtils.delete(dataDirectory);
}
public void testIngestDataViaTcpStream() throws LiquibaseException, Exception {
final int value = 1;
final OffsetDateTime dateA = OffsetDateTime.now();
final OffsetDateTime dateB = OffsetDateTime.now();
final String host = "someHost";
try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) {
ingestor.start();
final SocketChannel channel = connect();
final Map<String, Object> entryA = new HashMap<>();
entryA.put("duration", 1);
entryA.put("@timestamp", dateA.format(DateTimeFormatter.ISO_ZONED_DATE_TIME));
entryA.put("host", host);
entryA.put("tags", Collections.emptyList());
final Map<String, Object> entryB = new HashMap<>();
entryB.put("duration", 2);
entryB.put("@timestamp", dateB.format(DateTimeFormatter.ISO_ZONED_DATE_TIME));
entryB.put("host", host);
entryB.put("tags", Collections.emptyList());
final StringBuilder streamData = new StringBuilder();
final ObjectMapper mapper = new ObjectMapper();
streamData.append(mapper.writeValueAsString(entryA));
streamData.append("\n");
streamData.append(mapper.writeValueAsString(entryB));
final ByteBuffer src = ByteBuffer.wrap(streamData.toString().getBytes(StandardCharsets.UTF_8));
channel.write(src);
channel.close();
} catch (final Exception e) {
e.printStackTrace();
throw e;
}
try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
final List<Entry> result = db.getAsList("host=" + host);
Assert.assertEquals(result.size(), 2);
Assert.assertEquals(result.get(0).getValue(), 1);
Assert.assertEquals(result.get(0).getDate().toInstant(), dateA.toInstant());
Assert.assertEquals(result.get(1).getValue(), 2);
Assert.assertEquals(result.get(1).getDate().toInstant(), dateB.toInstant());
}
}
private SocketChannel connect() throws IOException {
SocketChannel result = null;
while (true) {
try {
result = SocketChannel.open();
result.configureBlocking(true);
result.connect(new InetSocketAddress("127.0.0.1", TcpIngestor.PORT));
break;
} catch (final ConnectException e) {
// server socket not yet ready, it should be ready any time soon
}
}
return result;
}
}