diff --git a/performanceDb/src/main/java/org/lucares/performance/db/GroupResult.java b/performanceDb/src/main/java/org/lucares/performance/db/GroupResult.java new file mode 100644 index 0000000..ead49d1 --- /dev/null +++ b/performanceDb/src/main/java/org/lucares/performance/db/GroupResult.java @@ -0,0 +1,29 @@ +package org.lucares.performance.db; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class GroupResult { + + private final Tags groupedBy; + + private final Stream entries; + + public GroupResult(final Stream entries, final Tags groupedBy) { + this.entries = entries; + this.groupedBy = groupedBy; + } + + public Stream asStream() { + return entries; + } + + public List asList() { + return entries.collect(Collectors.toList()); + } + + public Tags getGroupedBy() { + return groupedBy; + } +} diff --git a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java index 91047c7..eae78c3 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/PerformanceDb.java @@ -11,7 +11,6 @@ import java.util.Spliterator; import java.util.Spliterators; import java.util.concurrent.BlockingQueue; import java.util.logging.Logger; -import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -113,32 +112,17 @@ public class PerformanceDb implements AutoCloseable { } } - public List getAsList(final String query) { - return get(query).collect(Collectors.toList()); - } - - /** - * Return the entries as an unbound, ordered and non-parallel stream. - * - * @param tags - * @return {@link Stream} unbound, ordered and non-parallel - */ - public Stream get(final Tags tags) { - - final List pdbFiles = tagsToFile.getFilesMatchingTags(tags); - return toStream(pdbFiles); - } - /** * Return the entries as an unbound, ordered and non-parallel stream. * * @param query * @return {@link Stream} unbound, ordered and non-parallel */ - public Stream get(final String query) { + public Result get(final String query) { final List pdbFiles = tagsToFile.getFilesForQuery(query); - return toStream(pdbFiles); + final Stream stream = toStream(pdbFiles); + return new Result(new GroupResult(stream, Tags.EMPTY)); } private Stream toStream(final List pdbFiles) { diff --git a/performanceDb/src/main/java/org/lucares/performance/db/Result.java b/performanceDb/src/main/java/org/lucares/performance/db/Result.java new file mode 100644 index 0000000..a61e95c --- /dev/null +++ b/performanceDb/src/main/java/org/lucares/performance/db/Result.java @@ -0,0 +1,20 @@ +package org.lucares.performance.db; + +import java.util.Arrays; +import java.util.List; + +public class Result { + + private final List groupResults; + + public Result(final GroupResult... groupResults) { + this.groupResults = Arrays.asList(groupResults); + } + + public GroupResult singleGroup() { + if (groupResults.size() != 1) { + throw new IllegalStateException("the result does not contain exactly one group"); + } + return groupResults.get(0); + } +} diff --git a/performanceDb/src/main/java/org/lucares/performance/db/Tags.java b/performanceDb/src/main/java/org/lucares/performance/db/Tags.java index 357b765..40b733a 100644 --- a/performanceDb/src/main/java/org/lucares/performance/db/Tags.java +++ b/performanceDb/src/main/java/org/lucares/performance/db/Tags.java @@ -1,5 +1,6 @@ package org.lucares.performance.db; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -10,17 +11,17 @@ import java.util.TreeSet; import java.util.function.BiConsumer; public class Tags { + static final Tags EMPTY = new Tags(); - private static final Tags EMPTY = new Tags(); - - private final Map tags = new HashMap<>(); + private final Map tags; private Tags() { super(); + tags = Collections.emptyMap(); } private Tags(final Map tags) { - this.tags.putAll(tags); + this.tags = tags; ensureNoInternalFields(); } @@ -40,7 +41,7 @@ public class Tags { final Map tags = new HashMap<>(2); tags.put(key1, value1); tags.put(key2, value2); - return new Tags(tags); // TODO @ahr cache them + return new Tags(tags); } public static Tags create(final String key1, final String value1) { diff --git a/performanceDb/src/test/java/org/lucares/performance/db/PerformanceDbTest.java b/performanceDb/src/test/java/org/lucares/performance/db/PerformanceDbTest.java index c0e744f..e89db1e 100644 --- a/performanceDb/src/test/java/org/lucares/performance/db/PerformanceDbTest.java +++ b/performanceDb/src/test/java/org/lucares/performance/db/PerformanceDbTest.java @@ -10,7 +10,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.ThreadLocalRandom; -import java.util.stream.Collectors; import org.apache.commons.collections4.CollectionUtils; import org.testng.Assert; @@ -35,13 +34,14 @@ public class PerformanceDbTest { public void testInsertRead() throws Exception { - try (PerformanceDb performanceDb = new PerformanceDb(dataDirectory)) { + try (PerformanceDb db = new PerformanceDb(dataDirectory)) { final OffsetDateTime date = DateUtils.nowInUtc(); final long value = 1; final Tags tags = Tags.create("myKey", "myValue"); - performanceDb.put(new Entry(date, value, tags)); + db.put(new Entry(date, value, tags)); - final List stream = performanceDb.get(tags).collect(Collectors.toList()); + final Result result = db.get(Query.createQuery(tags)); + final List stream = result.singleGroup().asList(); Assert.assertEquals(stream.size(), 1); @@ -51,17 +51,17 @@ public class PerformanceDbTest { public void testInsertIntoMultipleFilesRead() throws Exception { - try (PerformanceDb performanceDb = new PerformanceDb(dataDirectory)) { + try (PerformanceDb db = new PerformanceDb(dataDirectory)) { final OffsetDateTime dayOne = DateUtils.getDate(2016, 11, 1, 10, 0, 0); final OffsetDateTime dayTwo = DateUtils.getDate(2016, 11, 2, 12, 34, 56); final long valueOne = 1; final long valueTwo = 2; final Tags tags = Tags.create("myKey", "myValue"); - performanceDb.put(new Entry(dayOne, valueOne, tags)); - performanceDb.put(new Entry(dayTwo, valueTwo, tags)); + db.put(new Entry(dayOne, valueOne, tags)); + db.put(new Entry(dayTwo, valueTwo, tags)); - final List stream = performanceDb.get(tags).collect(Collectors.toList()); + final List stream = db.get(Query.createQuery(tags)).singleGroup().asList(); Assert.assertEquals(stream.size(), 2); @@ -86,7 +86,7 @@ public class PerformanceDbTest { public void testAppendToExistingFile() throws Exception { - try (PerformanceDb performanceDb = new PerformanceDb(dataDirectory)) { + try (PerformanceDb db = new PerformanceDb(dataDirectory)) { final TimeRange timeRange = TimeRange.ofDay(OffsetDateTime.now(ZoneOffset.UTC)); final long numberOfEntries = 2; @@ -97,10 +97,10 @@ public class PerformanceDbTest { printEntries(entries, ""); for (final Entry entry : entries) { - performanceDb.put(entry); + db.put(entry); } - final List actualEntries = performanceDb.getAsList(Query.createQuery(tags)); + final List actualEntries = db.get(Query.createQuery(tags)).singleGroup().asList(); Assert.assertEquals(actualEntries, entries); final File storageFileForToday = StorageUtils.createStorageFile(dataDirectory, new Day(timeRange.getFrom()), @@ -114,7 +114,7 @@ public class PerformanceDbTest { public void testInsertIntoMultipleFilesWithDifferentTags() throws Exception { - try (PerformanceDb performanceDb = new PerformanceDb(dataDirectory)) { + try (PerformanceDb db = new PerformanceDb(dataDirectory)) { final OffsetDateTime from = DateUtils.getDate(2016, 1, 1, 00, 00, 00); final OffsetDateTime to = DateUtils.getDate(2016, 1, 1, 23, 59, 50); @@ -123,30 +123,29 @@ public class PerformanceDbTest { final Tags tagsCommon = Tags.create("commonKey", "commonValue"); final Tags tagsOne = Tags.create("myKey", "one", "commonKey", "commonValue"); - final List entriesOne = generateEntries(timeRange, numberOfEntries, 1, tagsOne); + final List entriesOne = storeEntries(db, timeRange, numberOfEntries, tagsOne, 1); printEntries(entriesOne, "one"); - performanceDb.put(entriesOne); final Tags tagsTwo = Tags.create("myKey", "two", "commonKey", "commonValue"); final List entriesTwo = generateEntries(timeRange, numberOfEntries, 2, tagsTwo); printEntries(entriesTwo, "two"); - performanceDb.put(entriesTwo); + db.put(entriesTwo); final Tags tagsThree = Tags.create("myKey", "three", "commonKey", "commonValue"); final List entriesThree = generateEntries(timeRange, numberOfEntries, 3, tagsThree); printEntries(entriesThree, "three"); - performanceDb.put(entriesThree); + db.put(entriesThree); - final List actualEntriesOne = performanceDb.getAsList(Query.createQuery(tagsOne)); + final List actualEntriesOne = db.get(Query.createQuery(tagsOne)).singleGroup().asList(); Assert.assertEquals(actualEntriesOne, entriesOne); - final List actualEntriesTwo = performanceDb.getAsList(Query.createQuery(tagsTwo)); + final List actualEntriesTwo = db.get(Query.createQuery(tagsTwo)).singleGroup().asList(); Assert.assertEquals(actualEntriesTwo, entriesTwo); - final List actualEntriesThree = performanceDb.getAsList(Query.createQuery(tagsThree)); + final List actualEntriesThree = db.get(Query.createQuery(tagsThree)).singleGroup().asList(); Assert.assertEquals(actualEntriesThree, entriesThree); - final List actualEntriesAll = performanceDb.getAsList(Query.createQuery(tagsCommon)); + final List actualEntriesAll = db.get(Query.createQuery(tagsCommon)).singleGroup().asList(); final List expectedAll = CollectionUtils.collate(entriesOne, CollectionUtils.collate(entriesTwo, entriesThree, Entry.BY_DATE), Entry.BY_DATE); @@ -156,6 +155,32 @@ public class PerformanceDbTest { } } + public void testGroupBy() throws Exception { + try (PerformanceDb db = new PerformanceDb(dataDirectory)) { + final OffsetDateTime from = DateUtils.getDate(2016, 1, 1, 00, 00, 00); + final OffsetDateTime to = DateUtils.getDate(2016, 1, 1, 23, 59, 50); + + final TimeRange timeRange = new TimeRange(from, to); + final long numberOfEntries = timeRange.duration().toHours(); + + final Tags tagsOne = Tags.create("myKey", "one", "commonKey", "commonValue"); + final Tags tagsTwo = Tags.create("myKey", "two", "commonKey", "commonValue"); + final Tags tagsThree = Tags.create("myKey", "three", "commonKey", "commonValue"); + storeEntries(db, timeRange, numberOfEntries, tagsOne, 1); + storeEntries(db, timeRange, numberOfEntries, tagsTwo, 2); + storeEntries(db, timeRange, numberOfEntries, tagsThree, 3); + + final Result result = db.get("commonKey=commonValue | groupBy myKey"); + } + } + + private List storeEntries(final PerformanceDb performanceDb, final TimeRange timeRange, + final long numberOfEntries, final Tags tags, final int addToDate) { + final List entries = generateEntries(timeRange, numberOfEntries, addToDate, tags); + performanceDb.put(entries); + return entries; + } + private void printEntries(final List entriesOne, final String label) { int index = 0; diff --git a/recommind-logs/src/main/java/org/lucares/recommind/logs/Plotter.java b/recommind-logs/src/main/java/org/lucares/recommind/logs/Plotter.java index f742adc..58275a1 100644 --- a/recommind-logs/src/main/java/org/lucares/recommind/logs/Plotter.java +++ b/recommind-logs/src/main/java/org/lucares/recommind/logs/Plotter.java @@ -33,7 +33,7 @@ public class Plotter { final Collection dataSeries = new ArrayList<>(); try (PerformanceDb db = new PerformanceDb(dataDirectory)) { - final Stream entries = db.get(query); + final Stream entries = db.get(query).singleGroup().asStream(); final File dataFile = File.createTempFile("data", ".dat", tmpDirectory.toFile()); final DataSeries dataSerie = new DataSeries(dataFile, query); diff --git a/recommind-logs/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java b/recommind-logs/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java index 9687926..306372e 100644 --- a/recommind-logs/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java +++ b/recommind-logs/src/test/java/org/lucares/performance/db/ingestor/TcpIngestorTest.java @@ -44,7 +44,6 @@ public class TcpIngestorTest { public void testIngestDataViaTcpStream() throws LiquibaseException, Exception { - final int value = 1; final OffsetDateTime dateA = OffsetDateTime.now(); final OffsetDateTime dateB = OffsetDateTime.now(); final String host = "someHost"; @@ -83,7 +82,7 @@ public class TcpIngestorTest { } try (PerformanceDb db = new PerformanceDb(dataDirectory)) { - final List result = db.getAsList("host=" + host); + final List result = db.get("host=" + host).singleGroup().asList(); Assert.assertEquals(result.size(), 2); Assert.assertEquals(result.get(0).getValue(), 1);