add test for sending entries with negative values to the ingestor
This commit is contained in:
@@ -22,7 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
public class PdbTestUtil {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(PdbTestUtil.class);
|
||||
|
||||
private static final Map<String, Object> POISON = new HashMap<>();
|
||||
static final Map<String, Object> POISON = new HashMap<>();
|
||||
|
||||
@SafeVarargs
|
||||
public static final void send(final Map<String, Object>... entries) throws IOException, InterruptedException {
|
||||
@@ -58,17 +58,17 @@ public class PdbTestUtil {
|
||||
channel.close();
|
||||
LOGGER.trace("closed sender connection");
|
||||
}
|
||||
|
||||
|
||||
public static final void send(final String data) throws IOException {
|
||||
|
||||
final SocketChannel channel = connect();
|
||||
|
||||
final StringBuilder streamData = new StringBuilder();
|
||||
streamData.append(data);
|
||||
|
||||
|
||||
final ByteBuffer src = ByteBuffer.wrap(streamData.toString().getBytes(StandardCharsets.UTF_8));
|
||||
channel.write(src);
|
||||
|
||||
|
||||
try {
|
||||
// ugly workaround: the channel was closed too early and not all
|
||||
// data was received
|
||||
|
||||
@@ -8,9 +8,15 @@ import java.time.OffsetDateTime;
|
||||
import java.time.ZoneOffset;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
import org.lucares.collections.LongList;
|
||||
import org.lucares.pdbui.TcpIngestor;
|
||||
@@ -30,6 +36,70 @@ public class TcpIngestorTest {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(TcpIngestorTest.class);
|
||||
|
||||
private static final class LongPair implements Comparable<LongPair> {
|
||||
private final long a, b;
|
||||
|
||||
public LongPair(final long a, final long b) {
|
||||
super();
|
||||
this.a = a;
|
||||
this.b = b;
|
||||
}
|
||||
|
||||
public static List<LongPair> fromLongList(final LongList longList) {
|
||||
final List<LongPair> result = new ArrayList<>();
|
||||
for (int i = 0; i < longList.size(); i += 2) {
|
||||
|
||||
result.add(new LongPair(longList.get(i), longList.get(i + 1)));
|
||||
|
||||
}
|
||||
Collections.sort(result);
|
||||
return result;
|
||||
}
|
||||
|
||||
public long getA() {
|
||||
return a;
|
||||
}
|
||||
|
||||
public long getB() {
|
||||
return b;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return a + "," + b;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(final LongPair o) {
|
||||
return Comparator.comparing(LongPair::getA).thenComparing(LongPair::getB).compare(this, o);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = 1;
|
||||
result = prime * result + (int) (a ^ (a >>> 32));
|
||||
result = prime * result + (int) (b ^ (b >>> 32));
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(final Object obj) {
|
||||
if (this == obj)
|
||||
return true;
|
||||
if (obj == null)
|
||||
return false;
|
||||
if (getClass() != obj.getClass())
|
||||
return false;
|
||||
final LongPair other = (LongPair) obj;
|
||||
if (a != other.a)
|
||||
return false;
|
||||
if (b != other.b)
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
private Path dataDirectory;
|
||||
|
||||
@BeforeMethod
|
||||
@@ -130,4 +200,47 @@ public class TcpIngestorTest {
|
||||
Assert.assertEquals(result.get(3), 2);
|
||||
}
|
||||
}
|
||||
|
||||
public void testRandomOrder() throws Exception {
|
||||
|
||||
final ThreadLocalRandom rnd = ThreadLocalRandom.current();
|
||||
final String host = "someHost";
|
||||
final List<String> additionalTagValues = Arrays.asList("foo", "bar", "baz");
|
||||
|
||||
final LongList expected = new LongList();
|
||||
|
||||
try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) {
|
||||
|
||||
ingestor.start();
|
||||
|
||||
final LinkedBlockingDeque<Map<String, Object>> queue = new LinkedBlockingDeque<>();
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
|
||||
final long duration = rnd.nextLong(-100000L, 100000L);
|
||||
final long timestamp = rnd.nextLong(-100000L, 100000L);
|
||||
|
||||
final Map<String, Object> entry = new HashMap<>();
|
||||
entry.put("@timestamp", Instant.ofEpochMilli(timestamp).atOffset(ZoneOffset.UTC)
|
||||
.format(DateTimeFormatter.ISO_ZONED_DATE_TIME));
|
||||
entry.put("duration", duration);
|
||||
entry.put("host", host);
|
||||
entry.put("additionalKey", additionalTagValues.get(rnd.nextInt(additionalTagValues.size())));
|
||||
|
||||
queue.put(entry);
|
||||
expected.addAll(timestamp, duration);
|
||||
}
|
||||
|
||||
queue.put(PdbTestUtil.POISON);
|
||||
PdbTestUtil.send(queue);
|
||||
} catch (final Exception e) {
|
||||
LOGGER.error("", e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
try (PerformanceDb db = new PerformanceDb(dataDirectory)) {
|
||||
final LongList result = db.get("host=" + host).singleGroup().flatMap();
|
||||
Assert.assertEquals(LongPair.fromLongList(result), LongPair.fromLongList(expected));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user