reuse pdb writers

This commit is contained in:
2016-12-28 08:39:20 +01:00
parent db0b3d6d24
commit 68ac1dd631
8 changed files with 157 additions and 150 deletions

View File

@@ -158,11 +158,15 @@ public class TcpIngestor implements AutoCloseable {
serverThreadPool.submit(() -> { serverThreadPool.submit(() -> {
Thread.currentThread().setName("db-ingestion"); Thread.currentThread().setName("db-ingestion");
boolean finished = false;
while (!finished) {
try { try {
db.put(new BlockingQueueIterator<>(queue, Entry.POISON)); db.put(new BlockingQueueIterator<>(queue, Entry.POISON));
finished = true;
} catch (final Exception e) { } catch (final Exception e) {
e.printStackTrace(); e.printStackTrace();
throw e; }
} }
return null; return null;
}); });
@@ -188,6 +192,8 @@ public class TcpIngestor implements AutoCloseable {
} catch (final SocketTimeoutException e) { } catch (final SocketTimeoutException e) {
// expected every 100ms // expected every 100ms
// needed to be able to stop the server // needed to be able to stop the server
} catch (final Exception e) {
e.printStackTrace();
} }
} }
System.out.println("not accepting new connections. "); System.out.println("not accepting new connections. ");

View File

@@ -9,6 +9,7 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.OffsetDateTime; import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@@ -52,8 +53,6 @@ public class TcpIngestorTest {
ingestor.start(); ingestor.start();
final SocketChannel channel = connect();
final Map<String, Object> entryA = new HashMap<>(); final Map<String, Object> entryA = new HashMap<>();
entryA.put("duration", 1); entryA.put("duration", 1);
entryA.put("@timestamp", dateA.format(DateTimeFormatter.ISO_ZONED_DATE_TIME)); entryA.put("@timestamp", dateA.format(DateTimeFormatter.ISO_ZONED_DATE_TIME));
@@ -66,16 +65,7 @@ public class TcpIngestorTest {
entryB.put("host", host); entryB.put("host", host);
entryB.put("tags", Collections.emptyList()); entryB.put("tags", Collections.emptyList());
final StringBuilder streamData = new StringBuilder(); send(entryA, entryB);
final ObjectMapper mapper = new ObjectMapper();
streamData.append(mapper.writeValueAsString(entryA));
streamData.append("\n");
streamData.append(mapper.writeValueAsString(entryB));
final ByteBuffer src = ByteBuffer.wrap(streamData.toString().getBytes(StandardCharsets.UTF_8));
channel.write(src);
channel.close();
} catch (final Exception e) { } catch (final Exception e) {
e.printStackTrace(); e.printStackTrace();
throw e; throw e;
@@ -93,6 +83,25 @@ public class TcpIngestorTest {
} }
} }
@SafeVarargs
private final void send(final Map<String, Object>... entries) throws IOException {
final StringBuilder streamData = new StringBuilder();
final ObjectMapper mapper = new ObjectMapper();
for (final Map<String, Object> entry : entries) {
streamData.append(mapper.writeValueAsString(entry));
streamData.append("\n");
}
final SocketChannel channel = connect();
final ByteBuffer src = ByteBuffer.wrap(streamData.toString().getBytes(StandardCharsets.UTF_8));
channel.write(src);
channel.close();
}
private SocketChannel connect() throws IOException { private SocketChannel connect() throws IOException {
SocketChannel result = null; SocketChannel result = null;
@@ -110,4 +119,37 @@ public class TcpIngestorTest {
return result; return result;
} }
public void testIngestionThreadDoesNotDieOnErrors() throws Exception {
final OffsetDateTime invalidDate = OffsetDateTime.of(1969, 12, 31, 23, 59, 59, 999, ZoneOffset.UTC);
final OffsetDateTime dateB = OffsetDateTime.now();
final String host = "someHost";
try (TcpIngestor tcpIngestor = new TcpIngestor(dataDirectory)) {
tcpIngestor.start();
// this entry will be skipped, because the date is invalid
final Map<String, Object> entryA = new HashMap<>();
entryA.put("duration", 1);
entryA.put("@timestamp", invalidDate.format(DateTimeFormatter.ISO_ZONED_DATE_TIME));
entryA.put("host", host);
entryA.put("tags", Collections.emptyList());
final Map<String, Object> entryB = new HashMap<>();
entryB.put("duration", 2);
entryB.put("@timestamp", dateB.format(DateTimeFormatter.ISO_ZONED_DATE_TIME));
entryB.put("host", host);
entryB.put("tags", Collections.emptyList());
send(entryA, entryB);
}
try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
final List<Entry> result = db.get("host=" + host).singleGroup().asList();
Assert.assertEquals(result.size(), 1);
Assert.assertEquals(result.get(0).getValue(), 2);
Assert.assertEquals(result.get(0).getDate().toInstant(), dateB.toInstant());
}
}
} }

View File

@@ -0,0 +1,27 @@
package org.lucares.performance.db;
import java.io.IOException;
import java.time.OffsetDateTime;
import org.lucares.pdb.api.Tags;
import org.lucares.performance.db.PdbWriterManager.PdbFileSupplier;
class FileSupplier implements PdbFileSupplier {
private final TagsToFile tagsToFile;
public FileSupplier(final TagsToFile tagsToFile) {
super();
this.tagsToFile = tagsToFile;
}
@Override
public PdbFile supply(final Tags tags, final OffsetDateTime date) {
try {
final PdbFile pdbFile = tagsToFile.getFile(date, tags);
return pdbFile;
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
}

View File

@@ -69,10 +69,4 @@ class PdbFile {
return false; return false;
return true; return true;
} }
public TimeRange getTimeRange() {
// TODO @ahr should return the minimal date that can be added
return null;
}
} }

View File

@@ -105,6 +105,7 @@ class PdbWriter implements AutoCloseable {
} }
public void write(final Entry entry) throws WriteException { public void write(final Entry entry) throws WriteException {
System.out.println(entry);
final long epochMilli = entry.getEpochMilli(); final long epochMilli = entry.getEpochMilli();
final long value = entry.getValue(); final long value = entry.getValue();
write(epochMilli, value); write(epochMilli, value);
@@ -112,19 +113,9 @@ class PdbWriter implements AutoCloseable {
private void write(final long epochMilli, final long value) throws WriteException { private void write(final long epochMilli, final long value) throws WriteException {
try { try {
if (epochMilli < lastEpochMilli) {
LOGGER.info("epochMilli must not be smaller than " + lastEpochMilli + ", but was " + epochMilli
+ ". We'll accept this for now. "
+ "Currently there is no code that relies on monotonically increasing date values. "
+ "Log4j does not guarantee it either.");
return;
}
final long epochMilliIncrement = epochMilli - lastEpochMilli; final long epochMilliIncrement = epochMilli - lastEpochMilli;
assertValueInRange(epochMilliIncrement); assertValueInRange(epochMilliIncrement);
assertValueInRange(value); assertValueInRange(value);
assertEpochMilliInRange(epochMilli);
writeValue(epochMilliIncrement, ByteType.DATE_INCREMENT, outputStream); writeValue(epochMilliIncrement, ByteType.DATE_INCREMENT, outputStream);
writeValue(value, ByteType.MEASUREMENT, outputStream); writeValue(value, ByteType.MEASUREMENT, outputStream);
@@ -134,15 +125,6 @@ class PdbWriter implements AutoCloseable {
} }
} }
private void assertEpochMilliInRange(final long epochMilli) {
if (epochMilli < lastEpochMilli) {
LOGGER.info("epochMilli must not be smaller than " + lastEpochMilli + ", but was " + epochMilli
+ ". We'll accept this for now. "
+ "Currently there is no code that relies on monotonically increasing date values. "
+ "Log4j does not guarantee it either.");
}
}
private void assertValueInRange(final long value) { private void assertValueInRange(final long value) {
if (value < 0) { if (value < 0) {
throw new IllegalArgumentException("value must not be negative: " + value); throw new IllegalArgumentException("value must not be negative: " + value);

View File

@@ -15,72 +15,31 @@ public class PdbWriterManager implements AutoCloseable {
private final static Logger LOGGER = Logger.getLogger(PdbWriterManager.class.getCanonicalName()); private final static Logger LOGGER = Logger.getLogger(PdbWriterManager.class.getCanonicalName());
private final static class Key { public interface PdbFileSupplier {
private final Tags tags; public PdbFile supply(Tags tags, OffsetDateTime date);
private final Day day;
public Key(final Tags tags, final OffsetDateTime date) {
super();
this.tags = tags;
this.day = new Day(date);
} }
@Override final Map<PdbFile, PdbWriter> map = new HashMap<>();
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((day == null) ? 0 : day.hashCode());
result = prime * result + ((tags == null) ? 0 : tags.hashCode());
return result;
}
@Override private final PdbFileSupplier supplier;
public boolean equals(final Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
final Key other = (Key) obj;
if (day == null) {
if (other.day != null)
return false;
} else if (!day.equals(other.day))
return false;
if (tags == null) {
if (other.tags != null)
return false;
} else if (!tags.equals(other.tags))
return false;
return true;
}
}
public interface PdbWriterSupplier {
public PdbWriter supply(Tags tags, OffsetDateTime date);
}
final Map<Key, PdbWriter> map = new HashMap<>();
private final PdbWriterSupplier supplier;
private Day lastDay = new Day(OffsetDateTime.MIN); private Day lastDay = new Day(OffsetDateTime.MIN);
public PdbWriterManager(final PdbWriterSupplier supplier) { public PdbWriterManager(final PdbFileSupplier supplier) {
this.supplier = supplier; this.supplier = supplier;
} }
public PdbWriter get(final Tags tags, final OffsetDateTime date) { public PdbWriter getWriter(final Tags tags, final OffsetDateTime date) throws IOException {
handleDateChange(date); handleDateChange(date);
final Key key = new Key(tags, date); final PdbFile pdbFile = supplier.supply(tags, date);
if (!map.containsKey(key)) {
final PdbWriter writer = supplier.supply(tags, date); if (!map.containsKey(pdbFile)) {
put(tags, date, writer); final PdbWriter writer = new PdbWriter(pdbFile);
map.put(pdbFile, writer);
} }
return map.get(key); return map.get(pdbFile);
} }
private void handleDateChange(final OffsetDateTime date) { private void handleDateChange(final OffsetDateTime date) {
@@ -94,11 +53,6 @@ public class PdbWriterManager implements AutoCloseable {
} }
} }
public PdbWriter put(final Tags tags, final OffsetDateTime date, final PdbWriter pdbWriter) {
final Key key = new Key(tags, date);
return map.put(key, pdbWriter);
}
public void flush() { public void flush() {
LOGGER.info("flushing all files"); LOGGER.info("flushing all files");
for (final PdbWriter writer : map.values()) { for (final PdbWriter writer : map.values()) {
@@ -116,10 +70,10 @@ public class PdbWriterManager implements AutoCloseable {
} }
private void closeFiles() { private void closeFiles() {
final Iterator<Entry<Key, PdbWriter>> it = map.entrySet().iterator(); final Iterator<Entry<PdbFile, PdbWriter>> it = map.entrySet().iterator();
while (it.hasNext()) { while (it.hasNext()) {
final Entry<Key, PdbWriter> entry = it.next(); final Entry<PdbFile, PdbWriter> entry = it.next();
final PdbWriter writer = entry.getValue(); final PdbWriter writer = entry.getValue();
try { try {
writer.close(); writer.close();

View File

@@ -22,7 +22,6 @@ import org.lucares.pdb.api.Entry;
import org.lucares.pdb.api.GroupResult; import org.lucares.pdb.api.GroupResult;
import org.lucares.pdb.api.Result; import org.lucares.pdb.api.Result;
import org.lucares.pdb.api.Tags; import org.lucares.pdb.api.Tags;
import org.lucares.performance.db.PdbWriterManager.PdbWriterSupplier;
public class PerformanceDb implements AutoCloseable { public class PerformanceDb implements AutoCloseable {
private static final Logger LOGGER = Logger.getLogger(PerformanceDb.class.getCanonicalName()); private static final Logger LOGGER = Logger.getLogger(PerformanceDb.class.getCanonicalName());
@@ -57,34 +56,12 @@ public class PerformanceDb implements AutoCloseable {
put(iterator); put(iterator);
} }
private static class WriterSupplier implements PdbWriterSupplier {
private final TagsToFile tagsToFile;
public WriterSupplier(final TagsToFile tagsToFile) {
super();
this.tagsToFile = tagsToFile;
}
@Override
public PdbWriter supply(final Tags tags, final OffsetDateTime date) {
try {
final PdbFile pdbFile = tagsToFile.getFile(date, tags);
final PdbWriter writer = new PdbWriter(pdbFile);
return writer;
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
}
public void put(final BlockingIterator<Entry> entries) throws WriteException { public void put(final BlockingIterator<Entry> entries) throws WriteException {
long count = 0; long count = 0;
double durationInManager = 0; double durationInManager = 0;
try (final PdbWriterManager manager = new PdbWriterManager(new WriterSupplier(tagsToFile));) { try (final PdbWriterManager manager = new PdbWriterManager(new FileSupplier(tagsToFile))) {
long start = System.nanoTime(); long start = System.nanoTime();
while (true) { while (true) {
@@ -98,7 +75,7 @@ public class PerformanceDb implements AutoCloseable {
final Tags tags = entry.getTags(); final Tags tags = entry.getTags();
final OffsetDateTime date = entry.getDate(); final OffsetDateTime date = entry.getDate();
final PdbWriter writer = manager.get(tags, date); final PdbWriter writer = manager.getWriter(tags, date);
writer.write(entry); writer.write(entry);
count++; count++;
@@ -117,6 +94,8 @@ public class PerformanceDb implements AutoCloseable {
} }
} }
} catch (final IOException e) {
throw new WriteException(e);
} catch (final InterruptedException e) { } catch (final InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
LOGGER.info("Thread was interrupted. Aborting exectution."); LOGGER.info("Thread was interrupted. Aborting exectution.");

View File

@@ -4,9 +4,11 @@ import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.OffsetDateTime; import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import org.lucares.ludb.H2DB;
import org.lucares.pdb.api.Entry;
import org.lucares.pdb.api.Tags; import org.lucares.pdb.api.Tags;
import org.lucares.performance.db.PdbWriterManager.PdbWriterSupplier;
import org.testng.Assert; import org.testng.Assert;
import org.testng.annotations.AfterMethod; import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod; import org.testng.annotations.BeforeMethod;
@@ -29,31 +31,52 @@ public class PdbWriterManagerTest {
@Test @Test
public void testManager() throws Exception { public void testManager() throws Exception {
try (H2DB db = new H2DB(dataDirectory.toFile())) {
final TagsToFile tagsToFile = new TagsToFile(dataDirectory, db);
final PdbWriterSupplier supplier = (tags, date) -> {
Path path;
try {
path = Files.createTempFile(dataDirectory, "pdb", ".data");
return new PdbWriter(new PdbFile(path, tags));
} catch (final IOException e) {
throw new AssertionError(e.getMessage(), e);
}
};
final Tags tagsA = Tags.create("key", "A"); final Tags tagsA = Tags.create("key", "A");
final Tags tagsB = Tags.create("key", "B"); final Tags tagsB = Tags.create("key", "B");
try (PdbWriterManager manager = new PdbWriterManager(supplier)) { try (PdbWriterManager manager = new PdbWriterManager(new FileSupplier(tagsToFile))) {
final OffsetDateTime date = OffsetDateTime.now(); final OffsetDateTime date = OffsetDateTime.now();
final PdbWriter firstWriterForTagsA = manager.get(tagsA, date); final PdbWriter firstWriterForTagsA = manager.getWriter(tagsA, date);
final PdbWriter secondWriterForTagsA = manager.get(tagsA, date); final PdbWriter secondWriterForTagsA = manager.getWriter(tagsA, date);
final PdbWriter firstWriterForTagsB = manager.get(tagsB, date); final PdbWriter firstWriterForTagsB = manager.getWriter(tagsB, date);
Assert.assertSame(firstWriterForTagsA, secondWriterForTagsA); Assert.assertSame(firstWriterForTagsA, secondWriterForTagsA);
Assert.assertNotSame(firstWriterForTagsA, firstWriterForTagsB); Assert.assertNotSame(firstWriterForTagsA, firstWriterForTagsB);
}
}
} }
@Test
public void testManager2() throws Exception {
try (H2DB db = new H2DB(dataDirectory.toFile())) {
final TagsToFile tagsToFile = new TagsToFile(dataDirectory, db);
final Tags tags = Tags.create("key", "A");
try (PdbWriterManager manager = new PdbWriterManager(new FileSupplier(tagsToFile))) {
final OffsetDateTime morning = OffsetDateTime.of(2016, 1, 1, 8, 0, 0, 0, ZoneOffset.UTC);
final OffsetDateTime noon = OffsetDateTime.of(2016, 1, 1, 12, 0, 0, 0, ZoneOffset.UTC);
final OffsetDateTime afternoon = OffsetDateTime.of(2016, 1, 1, 17, 0, 0, 0, ZoneOffset.UTC);
final PdbWriter writerNoon = manager.getWriter(tags, noon);
writerNoon.write(new Entry(noon, 1, tags));
final PdbWriter writerMorning = manager.getWriter(tags, morning);
writerMorning.write(new Entry(morning, 2, tags));
final PdbWriter writerAfternoon = manager.getWriter(tags, afternoon);
writerAfternoon.write(new Entry(afternoon, 3, tags));
Assert.assertSame(writerNoon, writerAfternoon);
Assert.assertNotSame(writerNoon, writerMorning);
}
}
} }
} }