add slf4j via log4j 2 logging

This commit is contained in:
2017-02-05 09:53:25 +01:00
parent 175a866c90
commit 3722ba02b1
5 changed files with 61 additions and 22 deletions

View File

@@ -6,6 +6,7 @@ import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.nio.file.Path;
import java.time.OffsetDateTime;
@@ -23,6 +24,9 @@ import org.lucares.pdb.api.Entry;
import org.lucares.pdb.api.Tags;
import org.lucares.performance.db.BlockingQueueIterator;
import org.lucares.performance.db.PerformanceDb;
import org.lucares.performance.db.ingestor.TcpIngestorTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.MappingIterator;
@@ -30,6 +34,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
public class TcpIngestor implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(TcpIngestorTest.class);
public static final int PORT = 17347;
@@ -58,8 +63,9 @@ public class TcpIngestor implements AutoCloseable {
@Override
public Void call() throws Exception {
Thread.currentThread().setName("worker-" + clientSocket.getInetAddress());
System.out.println("opening streams to client");
final SocketAddress clientAddress = clientSocket.getRemoteSocketAddress();
Thread.currentThread().setName("worker-" + clientAddress);
LOGGER.info("opening streams to client");
try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
@@ -69,11 +75,11 @@ public class TcpIngestor implements AutoCloseable {
double duration = 0.0;
int count = 0;
System.out.println("reading from stream");
LOGGER.info("reading from stream");
while (iterator.hasNext()) {
final long start = System.nanoTime();
// System.out.println("read: " + line);
// LOGGER.info("read: " + line);
@SuppressWarnings("unchecked")
final Map<String, Object> object = (Map<String, Object>) iterator.next();
@@ -83,7 +89,7 @@ public class TcpIngestor implements AutoCloseable {
count++;
if (count == 100000) {
// System.out.println("reading " + count + " took " +
// LOGGER.info("reading " + count + " took " +
// duration + "ms");
duration = 0.0;
count = 0;
@@ -94,7 +100,7 @@ public class TcpIngestor implements AutoCloseable {
}
}
System.out.println("connection closed");
LOGGER.info("connection closed: " + clientAddress);
}
return null;
@@ -148,9 +154,9 @@ public class TcpIngestor implements AutoCloseable {
}
public TcpIngestor(final Path dataDirectory) {
System.out.println("opening performance db: " + dataDirectory);
LOGGER.info("opening performance db: " + dataDirectory);
db = new PerformanceDb(dataDirectory);
System.out.println("performance db open");
LOGGER.info("performance db open");
}
public void start() throws Exception {
@@ -179,17 +185,17 @@ public class TcpIngestor implements AutoCloseable {
Thread.currentThread().setName("socket-listener");
try (ServerSocket serverSocket = new ServerSocket(
PORT/* , 10, InetAddress.getLocalHost() */);) {
System.out.println("listening on port " + PORT);
LOGGER.info("listening on port " + PORT);
serverSocket.setSoTimeout((int) TimeUnit.MILLISECONDS.toMillis(100));
while (acceptNewConnections.get()) {
try {
final Socket clientSocket = serverSocket.accept();
System.out.println("accepted connection: " + clientSocket.getRemoteSocketAddress());
LOGGER.info("accepted connection: " + clientSocket.getRemoteSocketAddress());
workerThreadPool.submit(new Handler(clientSocket, queue));
System.out.println("handler submitted");
LOGGER.info("handler submitted");
} catch (final SocketTimeoutException e) {
// expected every 100ms
// needed to be able to stop the server
@@ -197,20 +203,20 @@ public class TcpIngestor implements AutoCloseable {
e.printStackTrace();
}
}
System.out.println("not accepting new connections. ");
LOGGER.info("not accepting new connections. ");
System.out.println("stopping worker pool");
LOGGER.info("stopping worker pool");
workerThreadPool.shutdown();
try {
workerThreadPool.awaitTermination(10, TimeUnit.MINUTES);
System.out.println("workers stopped");
LOGGER.info("workers stopped");
} catch (final InterruptedException e) {
e.printStackTrace();
}
System.out.println("adding poison");
LOGGER.info("adding poison");
queue.offer(Entry.POISON);
} catch (final Exception e) {
e.printStackTrace();
LOGGER.error("", e);
throw e;
}
return null;
@@ -219,16 +225,16 @@ public class TcpIngestor implements AutoCloseable {
@Override
public void close() {
acceptNewConnections.set(false);
System.out.println("stopped accept thread");
LOGGER.info("stopped accept thread");
serverThreadPool.shutdown();
try {
serverThreadPool.awaitTermination(10, TimeUnit.MINUTES);
} catch (final InterruptedException e) {
e.printStackTrace();
Thread.interrupted();
}
System.out.println("closing database");
LOGGER.info("closing database");
db.close();
System.out.println("close done");
LOGGER.info("close done");
}
public static void main(final String[] args) throws Exception {
@@ -236,7 +242,7 @@ public class TcpIngestor implements AutoCloseable {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.out.println("shutdown hook");
LOGGER.info("shutdown hook");
}
});