move Entry and Entries to data-store
This commit is contained in:
@@ -0,0 +1,84 @@
|
||||
package org.lucares.pdb.datastore;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/**
|
||||
* Wrapper for chunk of {@link Entry}s.
|
||||
* <p>
|
||||
* This class is supposed to be provided to the queue returned by
|
||||
* PerformanceDb.getQueue(). Processing {@link Entry}s in chunks is more
|
||||
* efficient than processing each one individually.
|
||||
* <p>
|
||||
* Optionally, you can request that the entries will be flushed to disk by
|
||||
* calling {@link #forceFlush()} before adding it to the queue.
|
||||
* <p>
|
||||
* Optionally, this class can act like a future. This is useful if you have to
|
||||
* wait until the entries have been processed. Use {@link #forceFlush()} and
|
||||
* {@link #waitUntilFlushed(long, TimeUnit)}.
|
||||
*/
|
||||
public class Entries implements Iterable<Entry> {
|
||||
/**
|
||||
* A special {@link Entries} instance that can be used as poison object for
|
||||
* blocking queues.
|
||||
*/
|
||||
public static final Entries POISON = new Entries(0);
|
||||
|
||||
private final List<Entry> entries;
|
||||
|
||||
private boolean forceFlush = false;
|
||||
|
||||
private CountDownLatch flushLatch = null;
|
||||
|
||||
public Entries(final int initialSize) {
|
||||
entries = new ArrayList<>(initialSize);
|
||||
}
|
||||
|
||||
public Entries(final Entry... entries) {
|
||||
this.entries = new ArrayList<>(Arrays.asList(entries));
|
||||
}
|
||||
|
||||
public Entries(final Collection<Entry> entries) {
|
||||
this.entries = new ArrayList<>(entries);
|
||||
}
|
||||
|
||||
public void add(final Entry entry) {
|
||||
entries.add(entry);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<Entry> iterator() {
|
||||
return entries.iterator();
|
||||
}
|
||||
|
||||
public int size() {
|
||||
return entries.size();
|
||||
}
|
||||
|
||||
public boolean isForceFlush() {
|
||||
return forceFlush;
|
||||
}
|
||||
|
||||
public void forceFlush() {
|
||||
forceFlush = true;
|
||||
flushLatch = new CountDownLatch(1);
|
||||
}
|
||||
|
||||
public void waitUntilFlushed(final long timeout, final TimeUnit unit)
|
||||
throws InterruptedException, TimeoutException {
|
||||
final boolean finished = flushLatch.await(timeout, unit);
|
||||
if (!finished) {
|
||||
throw new TimeoutException();
|
||||
}
|
||||
}
|
||||
|
||||
public void notifyFlushed() {
|
||||
flushLatch.countDown();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,73 @@
|
||||
package org.lucares.pdb.datastore;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.time.ZoneOffset;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
|
||||
import org.lucares.pdb.api.Tags;
|
||||
|
||||
public class Entry {
|
||||
|
||||
private final long value;
|
||||
|
||||
private final Tags tags;
|
||||
|
||||
private final long epochMilli;
|
||||
|
||||
public Entry(final long epochMilli, final long value, final Tags tags) {
|
||||
this.epochMilli = epochMilli;
|
||||
this.tags = tags;
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public long getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
public long getEpochMilli() {
|
||||
return epochMilli;
|
||||
}
|
||||
|
||||
public Tags getTags() {
|
||||
return tags;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
|
||||
final OffsetDateTime date = Instant.ofEpochMilli(epochMilli).atOffset(ZoneOffset.UTC);
|
||||
return date.format(DateTimeFormatter.ISO_ZONED_DATE_TIME) + " = " + value + " (" + tags.asString() + ")";
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = 1;
|
||||
result = prime * result + (int) (epochMilli ^ (epochMilli >>> 32));
|
||||
result = prime * result + ((tags == null) ? 0 : tags.hashCode());
|
||||
result = prime * result + (int) (value ^ (value >>> 32));
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(final Object obj) {
|
||||
if (this == obj)
|
||||
return true;
|
||||
if (obj == null)
|
||||
return false;
|
||||
if (getClass() != obj.getClass())
|
||||
return false;
|
||||
final Entry other = (Entry) obj;
|
||||
if (epochMilli != other.epochMilli)
|
||||
return false;
|
||||
if (tags == null) {
|
||||
if (other.tags != null)
|
||||
return false;
|
||||
} else if (!tags.equals(other.tags))
|
||||
return false;
|
||||
if (value != other.value)
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@@ -3,8 +3,8 @@ package org.lucares.pdb.datastore.internal;
|
||||
import java.io.Flushable;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.lucares.pdb.api.Entry;
|
||||
import org.lucares.pdb.blockstorage.TimeSeriesFile;
|
||||
import org.lucares.pdb.datastore.Entry;
|
||||
import org.lucares.pdb.datastore.InvalidValueException;
|
||||
import org.lucares.pdb.datastore.PdbFile;
|
||||
import org.lucares.pdb.datastore.WriteException;
|
||||
|
||||
Reference in New Issue
Block a user