From c2e42ea5fb9dfad3aacde04662aeff7e2ac3cd75 Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Sun, 5 Feb 2017 11:21:09 +0100 Subject: [PATCH] test didn't stop correctly because I used offer() on a blocking queue of length 1 (chances are high, that the element won't be added). --- .../lucares/recommind/logs/TcpIngestor.java | 40 ++++++++++--------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/pdb-plotting/src/main/java/org/lucares/recommind/logs/TcpIngestor.java b/pdb-plotting/src/main/java/org/lucares/recommind/logs/TcpIngestor.java index 9f83061..d0d268a 100644 --- a/pdb-plotting/src/main/java/org/lucares/recommind/logs/TcpIngestor.java +++ b/pdb-plotting/src/main/java/org/lucares/recommind/logs/TcpIngestor.java @@ -24,7 +24,6 @@ import org.lucares.pdb.api.Entry; import org.lucares.pdb.api.Tags; import org.lucares.performance.db.BlockingQueueIterator; import org.lucares.performance.db.PerformanceDb; -import org.lucares.performance.db.ingestor.TcpIngestorTest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,7 +33,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectReader; public class TcpIngestor implements AutoCloseable { - private static final Logger LOGGER = LoggerFactory.getLogger(TcpIngestorTest.class); + private static final Logger LOGGER = LoggerFactory.getLogger(TcpIngestor.class); public static final int PORT = 17347; @@ -65,7 +64,7 @@ public class TcpIngestor implements AutoCloseable { public Void call() throws Exception { final SocketAddress clientAddress = clientSocket.getRemoteSocketAddress(); Thread.currentThread().setName("worker-" + clientAddress); - LOGGER.info("opening streams to client"); + LOGGER.debug("opening streams to client"); try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true); BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); @@ -75,11 +74,10 @@ public class TcpIngestor implements AutoCloseable { double duration = 0.0; int count = 0; - LOGGER.info("reading from stream"); + LOGGER.debug("reading from stream"); while (iterator.hasNext()) { final long start = System.nanoTime(); - // LOGGER.info("read: " + line); @SuppressWarnings("unchecked") final Map object = (Map) iterator.next(); @@ -89,8 +87,7 @@ public class TcpIngestor implements AutoCloseable { count++; if (count == 100000) { - // LOGGER.info("reading " + count + " took " + - // duration + "ms"); + LOGGER.debug("reading {} took {} ms", count, duration); duration = 0.0; count = 0; } @@ -100,7 +97,7 @@ public class TcpIngestor implements AutoCloseable { } } - LOGGER.info("connection closed: " + clientAddress); + LOGGER.debug("connection closed: " + clientAddress); } return null; @@ -156,7 +153,7 @@ public class TcpIngestor implements AutoCloseable { public TcpIngestor(final Path dataDirectory) { LOGGER.info("opening performance db: " + dataDirectory); db = new PerformanceDb(dataDirectory); - LOGGER.info("performance db open"); + LOGGER.debug("performance db open"); } public void start() throws Exception { @@ -192,15 +189,16 @@ public class TcpIngestor implements AutoCloseable { while (acceptNewConnections.get()) { try { final Socket clientSocket = serverSocket.accept(); - LOGGER.info("accepted connection: " + clientSocket.getRemoteSocketAddress()); + LOGGER.debug("accepted connection: " + clientSocket.getRemoteSocketAddress()); workerThreadPool.submit(new Handler(clientSocket, queue)); - LOGGER.info("handler submitted"); + LOGGER.debug("handler submitted"); } catch (final SocketTimeoutException e) { // expected every 100ms // needed to be able to stop the server } catch (final Exception e) { - e.printStackTrace(); + LOGGER.warn("Exception caught while waiting for a new connection. " + + "We'll ignore this error and keep going.", e); } } LOGGER.info("not accepting new connections. "); @@ -209,12 +207,16 @@ public class TcpIngestor implements AutoCloseable { workerThreadPool.shutdown(); try { workerThreadPool.awaitTermination(10, TimeUnit.MINUTES); - LOGGER.info("workers stopped"); + LOGGER.debug("workers stopped"); } catch (final InterruptedException e) { - e.printStackTrace(); + Thread.interrupted(); } - LOGGER.info("adding poison"); - queue.offer(Entry.POISON); + LOGGER.debug("adding poison"); + queue.put(Entry.POISON); + } catch (final InterruptedException e) { + LOGGER.info("Listener thread interrupted. Likely while adding the poison. " + + "That would mean that the db-ingestion thread will not terminate. "); + Thread.interrupted(); } catch (final Exception e) { LOGGER.error("", e); throw e; @@ -224,17 +226,17 @@ public class TcpIngestor implements AutoCloseable { @Override public void close() { + LOGGER.debug("stopping accept thread"); acceptNewConnections.set(false); - LOGGER.info("stopped accept thread"); serverThreadPool.shutdown(); try { serverThreadPool.awaitTermination(10, TimeUnit.MINUTES); } catch (final InterruptedException e) { Thread.interrupted(); } - LOGGER.info("closing database"); + LOGGER.debug("closing database"); db.close(); - LOGGER.info("close done"); + LOGGER.debug("close done"); } public static void main(final String[] args) throws Exception {