diff --git a/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java b/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java index e8f56e7..2ded87b 100644 --- a/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java +++ b/pdb-ui/src/main/java/org/lucares/pdbui/TcpIngestor.java @@ -6,9 +6,11 @@ import java.net.Socket; import java.net.SocketTimeoutException; import java.nio.file.Path; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.PreDestroy; @@ -20,7 +22,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; @Component @@ -37,6 +38,8 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { private final PerformanceDb db; + private volatile int port = PORT; + public TcpIngestor(final Path dataDirectory) throws IOException { LOGGER.info("opening performance db: " + dataDirectory); db = new PerformanceDb(dataDirectory); @@ -48,23 +51,42 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean { this.db = db; } + public void useRandomPort() { + port = 0; + } + + /** + * Returns the port used. If {@link #useRandomPort()} is used then the port may + * be null until the socket is open. + * + * @return the port used + */ + public int getPort() { + return port; + } + public PerformanceDb getDb() { return db; } - @Async @Override public void start() throws Exception { - - serverThreadPool.submit(() -> listen()); + final CountDownLatch started = new CountDownLatch(1); + serverThreadPool.submit(() -> listen(started)); + final boolean startedSuccessfully = started.await(5, TimeUnit.SECONDS); + if (!startedSuccessfully) { + throw new TimeoutException("failed to start listener"); + } } - private Void listen() throws IOException { + private Void listen(final CountDownLatch started) throws IOException { Thread.currentThread().setName("socket-listener"); - try (ServerSocket serverSocket = new ServerSocket(PORT);) { - LOGGER.info("listening on port " + PORT); + try (ServerSocket serverSocket = new ServerSocket(port);) { + port = serverSocket.getLocalPort(); + LOGGER.info("listening on port " + serverSocket.getLocalPort()); serverSocket.setSoTimeout((int) TimeUnit.MILLISECONDS.toMillis(2)); + started.countDown(); // notify caller that the socket is now listening while (acceptNewConnections.get()) { try { diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/PdbControllerTest.java b/pdb-ui/src/test/java/org/lucares/pdbui/PdbControllerTest.java index 01b4aff..5480591 100644 --- a/pdb-ui/src/test/java/org/lucares/pdbui/PdbControllerTest.java +++ b/pdb-ui/src/test/java/org/lucares/pdbui/PdbControllerTest.java @@ -113,10 +113,10 @@ public class PdbControllerTest { final HttpEntity> entity = new HttpEntity>( parameters, headers); - final ResponseEntity response = rest.exchange("/data?waitUntilFinished=true", HttpMethod.POST, entity, - String.class); + final ResponseEntity response = rest.exchange("/api/data?waitUntilFinished=true", HttpMethod.POST, + entity, String.class); - Assertions.assertEquals(response.getStatusCode(), HttpStatus.CREATED, "response status"); + Assertions.assertEquals(HttpStatus.CREATED, response.getStatusCode(), "response status"); } } diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/PdbTestUtil.java b/pdb-ui/src/test/java/org/lucares/pdbui/PdbTestUtil.java index 7b34d40..b54a23b 100644 --- a/pdb-ui/src/test/java/org/lucares/pdbui/PdbTestUtil.java +++ b/pdb-ui/src/test/java/org/lucares/pdbui/PdbTestUtil.java @@ -36,14 +36,14 @@ public class PdbTestUtil { static final Map POISON = new HashMap<>(); - public static final void send(final String format, final Collection> entries) + public static final void send(final String format, final Collection> entries, final int port) throws IOException, InterruptedException { switch (format) { case "csv": - sendAsCsv(entries); + sendAsCsv(entries, port); break; case "json": - sendAsJson(entries); + sendAsJson(entries, port); break; default: throw new IllegalStateException("unhandled format: " + format); @@ -51,20 +51,21 @@ public class PdbTestUtil { } @SafeVarargs - public static final void sendAsCsv(final Map... entries) throws IOException, InterruptedException { - sendAsCsv(Arrays.asList(entries)); + public static final void sendAsCsv(final int port, final Map... entries) + throws IOException, InterruptedException { + sendAsCsv(Arrays.asList(entries), port); } - public static final void sendAsCsv(final Collection> entries) + public static final void sendAsCsv(final Collection> entries, final int port) throws IOException, InterruptedException { final Set keys = entries.stream().map(Map::keySet).flatMap(Set::stream).collect(Collectors.toSet()); - sendAsCsv(keys, entries); + sendAsCsv(keys, entries, port); } - public static final void sendAsCsv(final Collection keys, final Collection> entries) - throws IOException, InterruptedException { + public static final void sendAsCsv(final Collection keys, final Collection> entries, + final int port) throws IOException, InterruptedException { final StringBuilder csv = new StringBuilder(); @@ -81,26 +82,28 @@ public class PdbTestUtil { csv.append("\n"); } System.out.println("sending: " + csv); - send(csv.toString()); + send(csv.toString(), port); } @SafeVarargs - public static final void sendAsJson(final Map... entries) throws IOException, InterruptedException { + public static final void sendAsJson(final int port, final Map... entries) + throws IOException, InterruptedException { - sendAsJson(Arrays.asList(entries)); + sendAsJson(Arrays.asList(entries), port); } - public static final void sendAsJson(final Collection> entries) + public static final void sendAsJson(final Collection> entries, final int port) throws IOException, InterruptedException { final LinkedBlockingDeque> queue = new LinkedBlockingDeque<>(entries); queue.put(POISON); - sendAsJson(queue); + sendAsJson(queue, port); } - public static final void sendAsJson(final BlockingQueue> aEntriesSupplier) throws IOException { + public static final void sendAsJson(final BlockingQueue> aEntriesSupplier, final int port) + throws IOException { final ObjectMapper mapper = new ObjectMapper(); - final SocketChannel channel = connect(); + final SocketChannel channel = connect(port); Map entry; while ((entry = aEntriesSupplier.poll()) != POISON) { @@ -124,9 +127,9 @@ public class PdbTestUtil { LOGGER.trace("closed sender connection"); } - public static final void send(final String data) throws IOException { + public static final void send(final String data, final int port) throws IOException { - final SocketChannel channel = connect(); + final SocketChannel channel = connect(port); final StringBuilder streamData = new StringBuilder(); streamData.append(data); @@ -145,8 +148,8 @@ public class PdbTestUtil { LOGGER.trace("closed sender connection"); } - public static void send(final Path file) throws IOException { - final SocketChannel outputChannel = connect(); + public static void send(final Path file, final int port) throws IOException { + final SocketChannel outputChannel = connect(port); try (final FileChannel inputChannel = FileChannel.open(file, StandardOpenOption.READ)) { inputChannel.transferTo(0, Long.MAX_VALUE, outputChannel); @@ -163,7 +166,7 @@ public class PdbTestUtil { LOGGER.trace("closed sender connection"); } - private static SocketChannel connect() throws IOException { + private static SocketChannel connect(final int port) throws IOException { SocketChannel result = null; @@ -171,7 +174,7 @@ public class PdbTestUtil { try { result = SocketChannel.open(); result.configureBlocking(true); - result.connect(new InetSocketAddress("127.0.0.1", TcpIngestor.PORT)); + result.connect(new InetSocketAddress("127.0.0.1", port)); break; } catch (final ConnectException e) { // server socket not yet ready, it should be ready any time soon diff --git a/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java b/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java index c709e66..7d3ad74 100644 --- a/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java +++ b/pdb-ui/src/test/java/org/lucares/pdbui/TcpIngestorTest.java @@ -58,7 +58,7 @@ public class TcpIngestorTest { final String host = "someHost"; try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { - + ingestor.useRandomPort(); ingestor.start(); final Map entryA = new HashMap<>(); @@ -73,7 +73,7 @@ public class TcpIngestorTest { entryB.put("host", host); entryB.put("tags", Collections.emptyList()); - PdbTestUtil.sendAsJson(entryA, entryB); + PdbTestUtil.sendAsJson(ingestor.getPort(), entryA, entryB); } catch (final Exception e) { LOGGER.error("", e); throw e; @@ -103,7 +103,7 @@ public class TcpIngestorTest { // 1. insert some data try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { - + ingestor.useRandomPort(); ingestor.start(); final long deltaEpochMilliB = dateB - dateA; @@ -115,7 +115,7 @@ public class TcpIngestorTest { + deltaEpochMilliB + ",2,1\n" // dates are the delta the the previous date / using tags with id 1 + deltaEpochMilliC + ",3,0"; // dates are the delta the the previous date / using tags with id 0 - PdbTestUtil.send(data); + PdbTestUtil.send(data, ingestor.getPort()); } catch (final Exception e) { LOGGER.error("", e); throw e; @@ -129,9 +129,10 @@ public class TcpIngestorTest { // 4. create a new database try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { + ingestor.useRandomPort(); ingestor.start(); for (final Path exportFile : exportFiles) { - PdbTestUtil.send(exportFile); + PdbTestUtil.send(exportFile, ingestor.getPort()); } } @@ -159,6 +160,7 @@ public class TcpIngestorTest { final String host = "someHost"; try (TcpIngestor tcpIngestor = new TcpIngestor(dataDirectory)) { + tcpIngestor.useRandomPort(); tcpIngestor.start(); // has a negative epoch time milli and negative value @@ -186,7 +188,7 @@ public class TcpIngestorTest { )// + "\n"; - PdbTestUtil.send(data); + PdbTestUtil.send(data, tcpIngestor.getPort()); } try (PerformanceDb db = new PerformanceDb(dataDirectory)) { @@ -214,7 +216,7 @@ public class TcpIngestorTest { final LongList expected = new LongList(); try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { - + ingestor.useRandomPort(); ingestor.start(); final LinkedBlockingDeque> queue = new LinkedBlockingDeque<>(); @@ -236,7 +238,7 @@ public class TcpIngestorTest { expected.addAll(timestamp, duration); } - PdbTestUtil.send(format, queue); + PdbTestUtil.send(format, queue, ingestor.getPort()); } catch (final Exception e) { LOGGER.error("", e); throw e; @@ -252,7 +254,7 @@ public class TcpIngestorTest { public void testCsvIngestorIgnoresColumns() throws Exception { try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { - + ingestor.useRandomPort(); ingestor.start(); final Map entry = new HashMap<>(); @@ -262,7 +264,7 @@ public class TcpIngestorTest { entry.put("host", "someHost"); entry.put(CsvToEntryTransformer.COLUM_IGNORE_PREFIX + "ignored", "ignoredValue"); - PdbTestUtil.sendAsCsv(entry); + PdbTestUtil.sendAsCsv(ingestor.getPort(), entry); } catch (final Exception e) { LOGGER.error("", e); throw e; @@ -282,7 +284,7 @@ public class TcpIngestorTest { final long value1 = 222; final long value2 = 1; try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { - + ingestor.useRandomPort(); ingestor.start(); final Map entry1 = new HashMap<>(); @@ -297,7 +299,8 @@ public class TcpIngestorTest { entry2.put("host", host); entry2.put("duration", value2); - PdbTestUtil.sendAsCsv(List.of("@timestamp", "host", "duration"), List.of(entry1, entry2)); + PdbTestUtil.sendAsCsv(List.of("@timestamp", "host", "duration"), List.of(entry1, entry2), + ingestor.getPort()); } catch (final Exception e) { LOGGER.error("", e); throw e; @@ -323,7 +326,7 @@ public class TcpIngestorTest { final OffsetDateTime dateDecember = OffsetDateTime.of(2019, 12, 1, 0, 0, 0, 0, ZoneOffset.UTC); try (TcpIngestor ingestor = new TcpIngestor(dataDirectory)) { - + ingestor.useRandomPort(); ingestor.start(); final Map entry1 = new HashMap<>(); @@ -336,7 +339,8 @@ public class TcpIngestorTest { entry2.put("host", host); entry2.put("duration", value2); - PdbTestUtil.sendAsCsv(List.of("@timestamp", "host", "duration"), List.of(entry1, entry2)); + PdbTestUtil.sendAsCsv(List.of("@timestamp", "host", "duration"), List.of(entry1, entry2), + ingestor.getPort()); } catch (final Exception e) { LOGGER.error("", e); throw e;