make tests more robust
1. we are now using a random port for listening 2. TcpIngestor.start() waits until the socket is established.
This commit is contained in:
@@ -6,9 +6,11 @@ import java.net.Socket;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.nio.file.Path;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import javax.annotation.PreDestroy;
|
||||
@@ -20,7 +22,6 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.DisposableBean;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
@@ -37,6 +38,8 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean {
|
||||
|
||||
private final PerformanceDb db;
|
||||
|
||||
private volatile int port = PORT;
|
||||
|
||||
public TcpIngestor(final Path dataDirectory) throws IOException {
|
||||
LOGGER.info("opening performance db: " + dataDirectory);
|
||||
db = new PerformanceDb(dataDirectory);
|
||||
@@ -48,23 +51,42 @@ public class TcpIngestor implements Ingestor, AutoCloseable, DisposableBean {
|
||||
this.db = db;
|
||||
}
|
||||
|
||||
public void useRandomPort() {
|
||||
port = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the port used. If {@link #useRandomPort()} is used then the port may
|
||||
* be null until the socket is open.
|
||||
*
|
||||
* @return the port used
|
||||
*/
|
||||
public int getPort() {
|
||||
return port;
|
||||
}
|
||||
|
||||
public PerformanceDb getDb() {
|
||||
return db;
|
||||
}
|
||||
|
||||
@Async
|
||||
@Override
|
||||
public void start() throws Exception {
|
||||
|
||||
serverThreadPool.submit(() -> listen());
|
||||
final CountDownLatch started = new CountDownLatch(1);
|
||||
serverThreadPool.submit(() -> listen(started));
|
||||
final boolean startedSuccessfully = started.await(5, TimeUnit.SECONDS);
|
||||
if (!startedSuccessfully) {
|
||||
throw new TimeoutException("failed to start listener");
|
||||
}
|
||||
}
|
||||
|
||||
private Void listen() throws IOException {
|
||||
private Void listen(final CountDownLatch started) throws IOException {
|
||||
Thread.currentThread().setName("socket-listener");
|
||||
try (ServerSocket serverSocket = new ServerSocket(PORT);) {
|
||||
LOGGER.info("listening on port " + PORT);
|
||||
try (ServerSocket serverSocket = new ServerSocket(port);) {
|
||||
port = serverSocket.getLocalPort();
|
||||
LOGGER.info("listening on port " + serverSocket.getLocalPort());
|
||||
|
||||
serverSocket.setSoTimeout((int) TimeUnit.MILLISECONDS.toMillis(2));
|
||||
started.countDown(); // notify caller that the socket is now listening
|
||||
|
||||
while (acceptNewConnections.get()) {
|
||||
try {
|
||||
|
||||
Reference in New Issue
Block a user