add first most simple result object
This commit is contained in:
@@ -0,0 +1,29 @@
|
||||
package org.lucares.performance.db;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class GroupResult {
|
||||
|
||||
private final Tags groupedBy;
|
||||
|
||||
private final Stream<Entry> entries;
|
||||
|
||||
public GroupResult(final Stream<Entry> entries, final Tags groupedBy) {
|
||||
this.entries = entries;
|
||||
this.groupedBy = groupedBy;
|
||||
}
|
||||
|
||||
public Stream<Entry> asStream() {
|
||||
return entries;
|
||||
}
|
||||
|
||||
public List<Entry> asList() {
|
||||
return entries.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public Tags getGroupedBy() {
|
||||
return groupedBy;
|
||||
}
|
||||
}
|
||||
@@ -11,7 +11,6 @@ import java.util.Spliterator;
|
||||
import java.util.Spliterators;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.logging.Logger;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
@@ -113,32 +112,17 @@ public class PerformanceDb implements AutoCloseable {
|
||||
}
|
||||
}
|
||||
|
||||
public List<Entry> getAsList(final String query) {
|
||||
return get(query).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the entries as an unbound, ordered and non-parallel stream.
|
||||
*
|
||||
* @param tags
|
||||
* @return {@link Stream} unbound, ordered and non-parallel
|
||||
*/
|
||||
public Stream<Entry> get(final Tags tags) {
|
||||
|
||||
final List<PdbFile> pdbFiles = tagsToFile.getFilesMatchingTags(tags);
|
||||
return toStream(pdbFiles);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the entries as an unbound, ordered and non-parallel stream.
|
||||
*
|
||||
* @param query
|
||||
* @return {@link Stream} unbound, ordered and non-parallel
|
||||
*/
|
||||
public Stream<Entry> get(final String query) {
|
||||
public Result get(final String query) {
|
||||
|
||||
final List<PdbFile> pdbFiles = tagsToFile.getFilesForQuery(query);
|
||||
return toStream(pdbFiles);
|
||||
final Stream<Entry> stream = toStream(pdbFiles);
|
||||
return new Result(new GroupResult(stream, Tags.EMPTY));
|
||||
}
|
||||
|
||||
private Stream<Entry> toStream(final List<PdbFile> pdbFiles) {
|
||||
|
||||
@@ -0,0 +1,20 @@
|
||||
package org.lucares.performance.db;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
public class Result {
|
||||
|
||||
private final List<GroupResult> groupResults;
|
||||
|
||||
public Result(final GroupResult... groupResults) {
|
||||
this.groupResults = Arrays.asList(groupResults);
|
||||
}
|
||||
|
||||
public GroupResult singleGroup() {
|
||||
if (groupResults.size() != 1) {
|
||||
throw new IllegalStateException("the result does not contain exactly one group");
|
||||
}
|
||||
return groupResults.get(0);
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
package org.lucares.performance.db;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
@@ -10,17 +11,17 @@ import java.util.TreeSet;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
public class Tags {
|
||||
static final Tags EMPTY = new Tags();
|
||||
|
||||
private static final Tags EMPTY = new Tags();
|
||||
|
||||
private final Map<String, String> tags = new HashMap<>();
|
||||
private final Map<String, String> tags;
|
||||
|
||||
private Tags() {
|
||||
super();
|
||||
tags = Collections.emptyMap();
|
||||
}
|
||||
|
||||
private Tags(final Map<String, String> tags) {
|
||||
this.tags.putAll(tags);
|
||||
this.tags = tags;
|
||||
ensureNoInternalFields();
|
||||
}
|
||||
|
||||
@@ -40,7 +41,7 @@ public class Tags {
|
||||
final Map<String, String> tags = new HashMap<>(2);
|
||||
tags.put(key1, value1);
|
||||
tags.put(key2, value2);
|
||||
return new Tags(tags); // TODO @ahr cache them
|
||||
return new Tags(tags);
|
||||
}
|
||||
|
||||
public static Tags create(final String key1, final String value1) {
|
||||
|
||||
@@ -10,7 +10,6 @@ import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.collections4.CollectionUtils;
|
||||
import org.testng.Assert;
|
||||
@@ -35,13 +34,14 @@ public class PerformanceDbTest {
|
||||
|
||||
public void testInsertRead() throws Exception {
|
||||
|
||||
try (PerformanceDb performanceDb = new PerformanceDb(dataDirectory)) {
|
||||
try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
|
||||
final OffsetDateTime date = DateUtils.nowInUtc();
|
||||
final long value = 1;
|
||||
final Tags tags = Tags.create("myKey", "myValue");
|
||||
performanceDb.put(new Entry(date, value, tags));
|
||||
db.put(new Entry(date, value, tags));
|
||||
|
||||
final List<Entry> stream = performanceDb.get(tags).collect(Collectors.toList());
|
||||
final Result result = db.get(Query.createQuery(tags));
|
||||
final List<Entry> stream = result.singleGroup().asList();
|
||||
|
||||
Assert.assertEquals(stream.size(), 1);
|
||||
|
||||
@@ -51,17 +51,17 @@ public class PerformanceDbTest {
|
||||
|
||||
public void testInsertIntoMultipleFilesRead() throws Exception {
|
||||
|
||||
try (PerformanceDb performanceDb = new PerformanceDb(dataDirectory)) {
|
||||
try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
|
||||
final OffsetDateTime dayOne = DateUtils.getDate(2016, 11, 1, 10, 0, 0);
|
||||
final OffsetDateTime dayTwo = DateUtils.getDate(2016, 11, 2, 12, 34, 56);
|
||||
final long valueOne = 1;
|
||||
final long valueTwo = 2;
|
||||
final Tags tags = Tags.create("myKey", "myValue");
|
||||
|
||||
performanceDb.put(new Entry(dayOne, valueOne, tags));
|
||||
performanceDb.put(new Entry(dayTwo, valueTwo, tags));
|
||||
db.put(new Entry(dayOne, valueOne, tags));
|
||||
db.put(new Entry(dayTwo, valueTwo, tags));
|
||||
|
||||
final List<Entry> stream = performanceDb.get(tags).collect(Collectors.toList());
|
||||
final List<Entry> stream = db.get(Query.createQuery(tags)).singleGroup().asList();
|
||||
|
||||
Assert.assertEquals(stream.size(), 2);
|
||||
|
||||
@@ -86,7 +86,7 @@ public class PerformanceDbTest {
|
||||
|
||||
public void testAppendToExistingFile() throws Exception {
|
||||
|
||||
try (PerformanceDb performanceDb = new PerformanceDb(dataDirectory)) {
|
||||
try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
|
||||
|
||||
final TimeRange timeRange = TimeRange.ofDay(OffsetDateTime.now(ZoneOffset.UTC));
|
||||
final long numberOfEntries = 2;
|
||||
@@ -97,10 +97,10 @@ public class PerformanceDbTest {
|
||||
printEntries(entries, "");
|
||||
|
||||
for (final Entry entry : entries) {
|
||||
performanceDb.put(entry);
|
||||
db.put(entry);
|
||||
}
|
||||
|
||||
final List<Entry> actualEntries = performanceDb.getAsList(Query.createQuery(tags));
|
||||
final List<Entry> actualEntries = db.get(Query.createQuery(tags)).singleGroup().asList();
|
||||
Assert.assertEquals(actualEntries, entries);
|
||||
|
||||
final File storageFileForToday = StorageUtils.createStorageFile(dataDirectory, new Day(timeRange.getFrom()),
|
||||
@@ -114,7 +114,7 @@ public class PerformanceDbTest {
|
||||
|
||||
public void testInsertIntoMultipleFilesWithDifferentTags() throws Exception {
|
||||
|
||||
try (PerformanceDb performanceDb = new PerformanceDb(dataDirectory)) {
|
||||
try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
|
||||
final OffsetDateTime from = DateUtils.getDate(2016, 1, 1, 00, 00, 00);
|
||||
final OffsetDateTime to = DateUtils.getDate(2016, 1, 1, 23, 59, 50);
|
||||
|
||||
@@ -123,30 +123,29 @@ public class PerformanceDbTest {
|
||||
|
||||
final Tags tagsCommon = Tags.create("commonKey", "commonValue");
|
||||
final Tags tagsOne = Tags.create("myKey", "one", "commonKey", "commonValue");
|
||||
final List<Entry> entriesOne = generateEntries(timeRange, numberOfEntries, 1, tagsOne);
|
||||
final List<Entry> entriesOne = storeEntries(db, timeRange, numberOfEntries, tagsOne, 1);
|
||||
printEntries(entriesOne, "one");
|
||||
performanceDb.put(entriesOne);
|
||||
|
||||
final Tags tagsTwo = Tags.create("myKey", "two", "commonKey", "commonValue");
|
||||
final List<Entry> entriesTwo = generateEntries(timeRange, numberOfEntries, 2, tagsTwo);
|
||||
printEntries(entriesTwo, "two");
|
||||
performanceDb.put(entriesTwo);
|
||||
db.put(entriesTwo);
|
||||
|
||||
final Tags tagsThree = Tags.create("myKey", "three", "commonKey", "commonValue");
|
||||
final List<Entry> entriesThree = generateEntries(timeRange, numberOfEntries, 3, tagsThree);
|
||||
printEntries(entriesThree, "three");
|
||||
performanceDb.put(entriesThree);
|
||||
db.put(entriesThree);
|
||||
|
||||
final List<Entry> actualEntriesOne = performanceDb.getAsList(Query.createQuery(tagsOne));
|
||||
final List<Entry> actualEntriesOne = db.get(Query.createQuery(tagsOne)).singleGroup().asList();
|
||||
Assert.assertEquals(actualEntriesOne, entriesOne);
|
||||
|
||||
final List<Entry> actualEntriesTwo = performanceDb.getAsList(Query.createQuery(tagsTwo));
|
||||
final List<Entry> actualEntriesTwo = db.get(Query.createQuery(tagsTwo)).singleGroup().asList();
|
||||
Assert.assertEquals(actualEntriesTwo, entriesTwo);
|
||||
|
||||
final List<Entry> actualEntriesThree = performanceDb.getAsList(Query.createQuery(tagsThree));
|
||||
final List<Entry> actualEntriesThree = db.get(Query.createQuery(tagsThree)).singleGroup().asList();
|
||||
Assert.assertEquals(actualEntriesThree, entriesThree);
|
||||
|
||||
final List<Entry> actualEntriesAll = performanceDb.getAsList(Query.createQuery(tagsCommon));
|
||||
final List<Entry> actualEntriesAll = db.get(Query.createQuery(tagsCommon)).singleGroup().asList();
|
||||
final List<Entry> expectedAll = CollectionUtils.collate(entriesOne,
|
||||
CollectionUtils.collate(entriesTwo, entriesThree, Entry.BY_DATE), Entry.BY_DATE);
|
||||
|
||||
@@ -156,6 +155,32 @@ public class PerformanceDbTest {
|
||||
}
|
||||
}
|
||||
|
||||
public void testGroupBy() throws Exception {
|
||||
try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
|
||||
final OffsetDateTime from = DateUtils.getDate(2016, 1, 1, 00, 00, 00);
|
||||
final OffsetDateTime to = DateUtils.getDate(2016, 1, 1, 23, 59, 50);
|
||||
|
||||
final TimeRange timeRange = new TimeRange(from, to);
|
||||
final long numberOfEntries = timeRange.duration().toHours();
|
||||
|
||||
final Tags tagsOne = Tags.create("myKey", "one", "commonKey", "commonValue");
|
||||
final Tags tagsTwo = Tags.create("myKey", "two", "commonKey", "commonValue");
|
||||
final Tags tagsThree = Tags.create("myKey", "three", "commonKey", "commonValue");
|
||||
storeEntries(db, timeRange, numberOfEntries, tagsOne, 1);
|
||||
storeEntries(db, timeRange, numberOfEntries, tagsTwo, 2);
|
||||
storeEntries(db, timeRange, numberOfEntries, tagsThree, 3);
|
||||
|
||||
final Result result = db.get("commonKey=commonValue | groupBy myKey");
|
||||
}
|
||||
}
|
||||
|
||||
private List<Entry> storeEntries(final PerformanceDb performanceDb, final TimeRange timeRange,
|
||||
final long numberOfEntries, final Tags tags, final int addToDate) {
|
||||
final List<Entry> entries = generateEntries(timeRange, numberOfEntries, addToDate, tags);
|
||||
performanceDb.put(entries);
|
||||
return entries;
|
||||
}
|
||||
|
||||
private void printEntries(final List<Entry> entriesOne, final String label) {
|
||||
|
||||
int index = 0;
|
||||
|
||||
@@ -33,7 +33,7 @@ public class Plotter {
|
||||
final Collection<DataSeries> dataSeries = new ArrayList<>();
|
||||
|
||||
try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
|
||||
final Stream<Entry> entries = db.get(query);
|
||||
final Stream<Entry> entries = db.get(query).singleGroup().asStream();
|
||||
|
||||
final File dataFile = File.createTempFile("data", ".dat", tmpDirectory.toFile());
|
||||
final DataSeries dataSerie = new DataSeries(dataFile, query);
|
||||
|
||||
@@ -44,7 +44,6 @@ public class TcpIngestorTest {
|
||||
|
||||
public void testIngestDataViaTcpStream() throws LiquibaseException, Exception {
|
||||
|
||||
final int value = 1;
|
||||
final OffsetDateTime dateA = OffsetDateTime.now();
|
||||
final OffsetDateTime dateB = OffsetDateTime.now();
|
||||
final String host = "someHost";
|
||||
@@ -83,7 +82,7 @@ public class TcpIngestorTest {
|
||||
}
|
||||
|
||||
try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
|
||||
final List<Entry> result = db.getAsList("host=" + host);
|
||||
final List<Entry> result = db.get("host=" + host).singleGroup().asList();
|
||||
Assert.assertEquals(result.size(), 2);
|
||||
|
||||
Assert.assertEquals(result.get(0).getValue(), 1);
|
||||
|
||||
Reference in New Issue
Block a user