package org.lucares.pdbui; import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.SocketChannel; import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.lucares.pdbui.TcpIngestor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; public class PdbTestUtil { private static final Logger LOGGER = LoggerFactory.getLogger(PdbTestUtil.class); static final Map POISON = new HashMap<>(); public static final void send(final String format, final Collection> entries) throws IOException, InterruptedException { switch (format) { case "csv": sendAsCsv(entries); break; case "json": sendAsJson(entries); break; default: throw new IllegalStateException("unhandled format: " + format); } } @SafeVarargs public static final void sendAsCsv(final Map... entries) throws IOException, InterruptedException { sendAsCsv(Arrays.asList(entries)); } public static final void sendAsCsv(final Collection> entries) throws IOException, InterruptedException { final Set keys = entries.stream().map(Map::keySet).flatMap(Set::stream).collect(Collectors.toSet()); sendAsCsv(keys, entries); } public static final void sendAsCsv(Collection keys, final Collection> entries) throws IOException, InterruptedException { final StringBuilder csv = new StringBuilder(); csv.append(String.join(",", keys)); csv.append("\n"); for (final Map entry : entries) { final List line = new ArrayList<>(); for (final String key : keys) { final String value = String.valueOf(entry.getOrDefault(key, "")); line.add(value); } csv.append(String.join(",", line)); csv.append("\n"); } System.out.println("sending: " + csv); send(csv.toString()); } @SafeVarargs public static final void sendAsJson(final Map... entries) throws IOException, InterruptedException { sendAsJson(Arrays.asList(entries)); } public static final void sendAsJson(final Collection> entries) throws IOException, InterruptedException { final LinkedBlockingDeque> queue = new LinkedBlockingDeque<>(entries); queue.put(POISON); sendAsJson(queue); } public static final void sendAsJson(final BlockingQueue> aEntriesSupplier) throws IOException { final ObjectMapper mapper = new ObjectMapper(); final SocketChannel channel = connect(); Map entry; while ((entry = aEntriesSupplier.poll()) != POISON) { final StringBuilder streamData = new StringBuilder(); streamData.append(mapper.writeValueAsString(entry)); streamData.append("\n"); final ByteBuffer src = ByteBuffer.wrap(streamData.toString().getBytes(StandardCharsets.UTF_8)); channel.write(src); } try { // ugly workaround: the channel was closed too early and not all // data was received TimeUnit.MILLISECONDS.sleep(10); } catch (final InterruptedException e) { throw new IllegalStateException(e); } channel.close(); LOGGER.trace("closed sender connection"); } public static final void send(final String data) throws IOException { final SocketChannel channel = connect(); final StringBuilder streamData = new StringBuilder(); streamData.append(data); final ByteBuffer src = ByteBuffer.wrap(streamData.toString().getBytes(StandardCharsets.UTF_8)); channel.write(src); try { // ugly workaround: the channel was closed too early and not all // data was received TimeUnit.MILLISECONDS.sleep(10); } catch (final InterruptedException e) { throw new IllegalStateException(e); } channel.close(); LOGGER.trace("closed sender connection"); } public static void send(final Path file) throws IOException { final SocketChannel outputChannel = connect(); try (final FileChannel inputChannel = FileChannel.open(file, StandardOpenOption.READ)) { inputChannel.transferTo(0, Long.MAX_VALUE, outputChannel); } try { // ugly workaround: the channel was closed too early and not all // data was received TimeUnit.MILLISECONDS.sleep(10); } catch (final InterruptedException e) { throw new IllegalStateException(e); } outputChannel.close(); LOGGER.trace("closed sender connection"); } private static SocketChannel connect() throws IOException { SocketChannel result = null; while (true) { try { result = SocketChannel.open(); result.configureBlocking(true); result.connect(new InetSocketAddress("127.0.0.1", TcpIngestor.PORT)); break; } catch (final ConnectException e) { // server socket not yet ready, it should be ready any time soon } } return result; } }