various fixes
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
package org.lucares.pdbui;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
@@ -38,51 +39,51 @@ public final class CsvReaderSettings {
|
||||
}
|
||||
|
||||
public static final class ColumnDefinitions {
|
||||
Map<String, ColumnDefinition> columnDefinitions = new HashMap<>();
|
||||
Map<String, ColumnDefinition> columns = new HashMap<>();
|
||||
|
||||
public Map<String, ColumnDefinition> getColumnDefinitions() {
|
||||
return columnDefinitions;
|
||||
public Map<String, ColumnDefinition> getColumns() {
|
||||
return columns;
|
||||
}
|
||||
|
||||
public void setColumnDefinitions(final Map<String, ColumnDefinition> columnDefinitions) {
|
||||
this.columnDefinitions = columnDefinitions;
|
||||
public void setColumns(final Map<String, ColumnDefinition> columnDefinitions) {
|
||||
this.columns = columnDefinitions;
|
||||
}
|
||||
|
||||
public void ignoreColumn(final String csvColumnHeader) {
|
||||
columnDefinitions.putIfAbsent(csvColumnHeader, new ColumnDefinition());
|
||||
columnDefinitions.get(csvColumnHeader).setIgnore(true);
|
||||
columns.putIfAbsent(csvColumnHeader, new ColumnDefinition());
|
||||
columns.get(csvColumnHeader).setIgnore(true);
|
||||
}
|
||||
|
||||
public void rename(final String csvColumnHeader, final String renameTo) {
|
||||
columnDefinitions.putIfAbsent(csvColumnHeader, new ColumnDefinition());
|
||||
columnDefinitions.get(csvColumnHeader).setRenameTo(renameTo);
|
||||
columns.putIfAbsent(csvColumnHeader, new ColumnDefinition());
|
||||
columns.get(csvColumnHeader).setRenameTo(renameTo);
|
||||
}
|
||||
|
||||
public void postProcess(final String csvColumnHeader, final EnumSet<PostProcessors> postProcessors) {
|
||||
columnDefinitions.putIfAbsent(csvColumnHeader, new ColumnDefinition());
|
||||
columnDefinitions.get(csvColumnHeader).setPostProcessors(postProcessors);
|
||||
columns.putIfAbsent(csvColumnHeader, new ColumnDefinition());
|
||||
columns.get(csvColumnHeader).setPostProcessors(postProcessors);
|
||||
}
|
||||
|
||||
public boolean isIgnoredColumn(final String csvColumnHeader) {
|
||||
return columnDefinitions.getOrDefault(csvColumnHeader, new ColumnDefinition()).isIgnore();
|
||||
return columns.getOrDefault(csvColumnHeader, new ColumnDefinition()).isIgnore();
|
||||
}
|
||||
|
||||
public String getRenameTo(final String csvColumnHeader) {
|
||||
return columnDefinitions.getOrDefault(csvColumnHeader, new ColumnDefinition()).getRenameTo();
|
||||
return columns.getOrDefault(csvColumnHeader, new ColumnDefinition()).getRenameTo();
|
||||
}
|
||||
|
||||
public EnumSet<PostProcessors> getPostProcessors(final String csvColumnHeader) {
|
||||
return columnDefinitions.getOrDefault(csvColumnHeader, new ColumnDefinition()).getPostProcessors();
|
||||
return columns.getOrDefault(csvColumnHeader, new ColumnDefinition()).getPostProcessors();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringBuilder result = new StringBuilder();
|
||||
|
||||
for (final String col : columnDefinitions.keySet()) {
|
||||
for (final String col : columns.keySet()) {
|
||||
result.append(col);
|
||||
result.append(":");
|
||||
result.append(columnDefinitions.get(col));
|
||||
result.append(columns.get(col));
|
||||
result.append("\n");
|
||||
}
|
||||
|
||||
@@ -145,7 +146,7 @@ public final class CsvReaderSettings {
|
||||
}
|
||||
}
|
||||
|
||||
private byte separator;
|
||||
private String separator;
|
||||
|
||||
private ColumnDefinitions columnDefinitions = new ColumnDefinitions();
|
||||
|
||||
@@ -155,13 +156,13 @@ public final class CsvReaderSettings {
|
||||
|
||||
private String valueColumn;
|
||||
|
||||
private byte comment = '#';
|
||||
private String comment = "#";
|
||||
|
||||
public CsvReaderSettings() {
|
||||
this("@timestamp", "duration", (byte) ',', new ColumnDefinitions());
|
||||
this("@timestamp", "duration", ",", new ColumnDefinitions());
|
||||
}
|
||||
|
||||
private CsvReaderSettings(final String timeColumn, final String valueColumn, final byte separator,
|
||||
private CsvReaderSettings(final String timeColumn, final String valueColumn, final String separator,
|
||||
final ColumnDefinitions columnDefinitions) {
|
||||
|
||||
this.timeColumn = timeColumn;
|
||||
@@ -170,13 +171,13 @@ public final class CsvReaderSettings {
|
||||
this.columnDefinitions = columnDefinitions;
|
||||
}
|
||||
|
||||
public static CsvReaderSettings create(final String timeColumn, final String valueColumn, final char separator,
|
||||
public static CsvReaderSettings create(final String timeColumn, final String valueColumn, final String separator,
|
||||
final ColumnDefinitions columnDefinitions) {
|
||||
Preconditions.checkTrue(separator == (byte) separator,
|
||||
Preconditions.checkTrue(separator.getBytes(StandardCharsets.UTF_8).length == 1,
|
||||
"Only separators that fulfill separator == (byte)separator are supported. "
|
||||
+ "This restriction is because the parsing algorithm skips the overhead of "
|
||||
+ "translating bytes to characters.");
|
||||
return new CsvReaderSettings(timeColumn, valueColumn, (byte) separator, columnDefinitions);
|
||||
return new CsvReaderSettings(timeColumn, valueColumn, separator, columnDefinitions);
|
||||
}
|
||||
|
||||
public String getTimeColumn() {
|
||||
@@ -195,22 +196,36 @@ public final class CsvReaderSettings {
|
||||
this.valueColumn = valueColumn;
|
||||
}
|
||||
|
||||
public byte getSeparator() {
|
||||
public String getSeparator() {
|
||||
return separator;
|
||||
}
|
||||
|
||||
public void setSeparator(final byte separator) {
|
||||
public void setSeparator(final String separator) {
|
||||
this.separator = separator;
|
||||
}
|
||||
|
||||
public byte getComment() {
|
||||
public byte separatorByte() {
|
||||
final byte[] bytes = separator.getBytes(StandardCharsets.UTF_8);
|
||||
Preconditions.checkEqual(bytes.length, 1,
|
||||
"separator must be a character that is represented as a single byte in UTF-8");
|
||||
return bytes[0];
|
||||
}
|
||||
|
||||
public String getComment() {
|
||||
return comment;
|
||||
}
|
||||
|
||||
public void setComment(final byte comment) {
|
||||
public void setComment(final String comment) {
|
||||
this.comment = comment;
|
||||
}
|
||||
|
||||
public byte commentByte() {
|
||||
final byte[] bytes = comment.getBytes(StandardCharsets.UTF_8);
|
||||
Preconditions.checkEqual(bytes.length, 1,
|
||||
"comment must be a character that is represented as a single byte in UTF-8");
|
||||
return bytes[0];
|
||||
}
|
||||
|
||||
public void putAdditionalTag(final String field, final String value) {
|
||||
additionalTags.put(field, value);
|
||||
}
|
||||
|
||||
@@ -46,8 +46,8 @@ class CsvToEntryTransformer {
|
||||
Entries entries = new Entries(chunksize);
|
||||
|
||||
final byte newline = '\n';
|
||||
final byte separator = settings.getSeparator();
|
||||
final byte comment = settings.getComment();
|
||||
final byte separator = settings.separatorByte();
|
||||
final byte comment = settings.commentByte();
|
||||
final byte[] line = new byte[64 * 1024]; // max line length
|
||||
int offsetInLine = 0;
|
||||
int offsetInBuffer = 0;
|
||||
|
||||
@@ -54,7 +54,7 @@ public class CsvUploadHandler implements PropertyKeys, DisposableBean {
|
||||
for (final MultipartFile file : files) {
|
||||
final Path tmpFile = tmpDir.resolve(UUID.randomUUID().toString());
|
||||
tmpFiles.add(tmpFile);
|
||||
LOGGER.info("writing uploaded file to {}", tmpFile);
|
||||
LOGGER.debug("writing uploaded file to {}", tmpFile);
|
||||
file.transferTo(tmpFile);
|
||||
}
|
||||
} catch (RuntimeException | IOException e) {
|
||||
@@ -71,7 +71,7 @@ public class CsvUploadHandler implements PropertyKeys, DisposableBean {
|
||||
csvToEntryTransformer.readCSV(in);
|
||||
}
|
||||
|
||||
LOGGER.info("delete uploaded file {}", tmpFile);
|
||||
LOGGER.debug("delete uploaded file {}", tmpFile);
|
||||
Files.delete(tmpFile);
|
||||
|
||||
} catch (final Exception e) {
|
||||
|
||||
@@ -68,7 +68,7 @@ public final class IngestionHandler implements Callable<Void> {
|
||||
} else {
|
||||
in.reset();
|
||||
final CsvToEntryTransformer csvTransformer = new CsvToEntryTransformer(queue,
|
||||
CsvReaderSettings.create("@timestamp", "duration", ',', new ColumnDefinitions()));
|
||||
CsvReaderSettings.create("@timestamp", "duration", ",", new ColumnDefinitions()));
|
||||
csvTransformer.readCSV(in);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -192,7 +192,11 @@ public class FastISODateParser {
|
||||
final long nanos = nanosAndCharsRead[0];
|
||||
final int offsetTimezone = beginIndex + 19 + nanosAndCharsRead[1];
|
||||
|
||||
final long zoneOffsetMillis = date[offsetTimezone] == 'Z' ? 0 : parseZoneToMillis(date, offsetTimezone);
|
||||
final byte firstTimeZoneChar = date[offsetTimezone];
|
||||
final boolean isNumericTimeZone = firstTimeZoneChar >= '0' && firstTimeZoneChar <= '9'
|
||||
|| (firstTimeZoneChar == '-' || firstTimeZoneChar == '+');
|
||||
final long zoneOffsetMillis = firstTimeZoneChar == 'Z' || !isNumericTimeZone ? 0
|
||||
: parseZoneToMillis(date, offsetTimezone);
|
||||
|
||||
final int epochMilliMonthOffsetKey = (int) (year * 12 + month - 1);
|
||||
final long epochMilliMonthOffset;
|
||||
@@ -261,7 +265,7 @@ public class FastISODateParser {
|
||||
while (i < date.length) {
|
||||
final byte c = date[i];
|
||||
i++;
|
||||
if (c == '.') {
|
||||
if (c == '.' || c == ',') {
|
||||
continue;
|
||||
}
|
||||
if (c < '0' || c > '9') {
|
||||
|
||||
Reference in New Issue
Block a user