reduce memory usage

Reduce memory usage by storing the filename as string instead of
individual tags.
This commit is contained in:
2018-03-19 19:21:57 +01:00
parent 181fce805d
commit 5343c0d427
20 changed files with 315 additions and 454 deletions

View File

@@ -42,7 +42,6 @@ import com.fasterxml.jackson.databind.ObjectReader;
@Component
public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean {
private static final Logger LOGGER = LoggerFactory.getLogger(TcpIngestor.class);
private static final Logger METRICS_LOGGER = LoggerFactory.getLogger("org.lucares.metrics.tcpIngestor");
public static final int PORT = 17347;
@@ -73,8 +72,7 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean {
Thread.currentThread().setName("worker-" + clientAddress);
LOGGER.debug("opening streams to client");
try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
) {
BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));) {
final ObjectMapper objectMapper = new ObjectMapper();
final ObjectReader objectReader = objectMapper.readerFor(typeReferenceForMap);
@@ -91,12 +89,12 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean {
LOGGER.debug("adding entry to queue: {}", entry);
queue.put(entry.get());
}
} catch (JsonParseException e) {
} catch (final JsonParseException e) {
LOGGER.info("json parse error in line '" + line + "'", e);
}
}
LOGGER.debug("connection closed: " + clientAddress);
} catch (Throwable e) {
} catch (final Throwable e) {
LOGGER.warn("Stream handling failed", e);
throw e;
}

View File

@@ -7,6 +7,7 @@ import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -75,10 +76,12 @@ public class TcpIngestorTest {
Assert.assertEquals(result.size(), 2);
Assert.assertEquals(result.get(0).getValue(), 1);
Assert.assertEquals(result.get(0).getDate().toInstant(), dateA.toInstant());
Assert.assertEquals(result.get(0).getDate().toInstant().truncatedTo(ChronoUnit.MILLIS),
dateA.toInstant().truncatedTo(ChronoUnit.MILLIS));
Assert.assertEquals(result.get(1).getValue(), 2);
Assert.assertEquals(result.get(1).getDate().toInstant(), dateB.toInstant());
Assert.assertEquals(result.get(1).getDate().toInstant().truncatedTo(ChronoUnit.MILLIS),
dateB.toInstant().truncatedTo(ChronoUnit.MILLIS));
}
}
@@ -99,8 +102,8 @@ public class TcpIngestorTest {
entryA.put("tags", Collections.emptyList());
// skipped, because it is not valid json
String corrupEntry = "{\"corrupt...";
final String corrupEntry = "{\"corrupt...";
// valid entry
final Map<String, Object> entryB = new HashMap<>();
entryB.put("duration", 2);
@@ -109,9 +112,9 @@ public class TcpIngestorTest {
entryB.put("tags", Collections.emptyList());
final ObjectMapper objectMapper = new ObjectMapper();
final String data = objectMapper.writeValueAsString(entryA)+"\n"+corrupEntry+"\n"+objectMapper.writeValueAsString(entryB)+"\n";
final String data = objectMapper.writeValueAsString(entryA) + "\n" + corrupEntry + "\n"
+ objectMapper.writeValueAsString(entryB) + "\n";
PdbTestUtil.send(data);
}
@@ -120,7 +123,8 @@ public class TcpIngestorTest {
Assert.assertEquals(result.size(), 1);
Assert.assertEquals(result.get(0).getValue(), 2);
Assert.assertEquals(result.get(0).getDate().toInstant(), dateB.toInstant());
Assert.assertEquals(result.get(0).getDate().toInstant().truncatedTo(ChronoUnit.MILLIS),
dateB.toInstant().truncatedTo(ChronoUnit.MILLIS));
}
}
}