support for negative values in variable byte encoding

We now support negative values. This will allow us to
store time/value sequences that are not monotonically
increasing, so that we do not have to create multiple
files just because some values were send out of order.

This is done by first transforming the values into
positive values by using interleaved encoding (there
is a name for it, but I don't remember it). We are
mapping values like this:
 0 -> 1
 1 -> 2
-1 -> 3
 2 -> 4
-2 -> 5
...

Renamed LongSequenceEncoderDecoder to VariableByteEncoder.
Made methods static.
This commit is contained in:
2018-09-29 19:48:57 +02:00
parent f07977c27a
commit e03fccbdf7
6 changed files with 244 additions and 317 deletions

View File

@@ -11,7 +11,7 @@ import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.lucares.collections.LongList;
import org.lucares.pdb.blockstorage.intsequence.LongSequenceEncoderDecoder;
import org.lucares.pdb.blockstorage.intsequence.VariableByteEncoder;
import org.lucares.pdb.diskstorage.DiskBlock;
import org.lucares.pdb.diskstorage.DiskStorage;
import org.slf4j.Logger;
@@ -57,9 +57,6 @@ public class BSFile implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(BSFile.class);
private static final ThreadLocal<LongSequenceEncoderDecoder> INT_ENCODER = ThreadLocal
.withInitial(LongSequenceEncoderDecoder::new);
private static final TimeStampDeltaDecoder TIME_DELTA_DECODER = new TimeStampDeltaDecoder();
/*
@@ -104,11 +101,9 @@ public class BSFile implements AutoCloseable {
private long determineLastEpochMilli(final DiskBlock diskBlock) {
LongList longList = new LongList();
// get the time/value delta encoded longs
final byte[] buf = diskBlock.getBuffer();
INT_ENCODER.get().decodeInto(buf, longList);
LongList longList = VariableByteEncoder.decode(buf);
final long result;
if (longList.isEmpty()) {
// only new files have empty disk blocks
@@ -147,11 +142,10 @@ public class BSFile implements AutoCloseable {
}
public void appendTimeValue(final long epochMilli, final long value) throws IOException {
final LongSequenceEncoderDecoder intEncoder = INT_ENCODER.get();
final long epochMilliDelta = epochMilli - lastEpochMilli;
final int bytesWritten = intEncoder.encodeInto(epochMilliDelta, value, buffer.getBuffer(), offsetInBuffer);
final int bytesWritten = VariableByteEncoder.encodeInto(epochMilliDelta, value, buffer.getBuffer(),
offsetInBuffer);
if (bytesWritten == 0) {
flushFullBufferAndCreateNew();
@@ -169,13 +163,11 @@ public class BSFile implements AutoCloseable {
}
private void writeValuesToBuffer(final long value) throws IOException {
final LongSequenceEncoderDecoder intEncoder = INT_ENCODER.get();
int bytesWritten = intEncoder.encodeInto(value, buffer.getBuffer(), offsetInBuffer);
int bytesWritten = VariableByteEncoder.encodeInto(value, buffer.getBuffer(), offsetInBuffer);
if (bytesWritten == 0) {
flushFullBufferAndCreateNew();
bytesWritten = intEncoder.encodeInto(value, buffer.getBuffer(), offsetInBuffer);
bytesWritten = VariableByteEncoder.encodeInto(value, buffer.getBuffer(), offsetInBuffer);
assert bytesWritten > 0 : "after a flush the buffer is emtpy, so it should be possible to write a few bytes";
}
offsetInBuffer += bytesWritten;
@@ -219,8 +211,7 @@ public class BSFile implements AutoCloseable {
public Optional<Long> getLastValue() {
final byte[] buf = buffer.getBuffer();
final LongList bufferedLongs = new LongList();
INT_ENCODER.get().decodeInto(buf, bufferedLongs);
final LongList bufferedLongs = VariableByteEncoder.decode(buf);
final Optional<Long> result;
if (bufferedLongs.isEmpty()) {
@@ -266,18 +257,16 @@ public class BSFile implements AutoCloseable {
throw new NoSuchElementException();
}
next = new LongList();
final DiskBlock diskBlock = diskStorage.getDiskBlock(nextBlockNumber);
nextBlockNumber = diskBlock.getNextBlockNumber();
final byte[] buf = diskBlock.getBuffer();
INT_ENCODER.get().decodeInto(buf, next);
next = VariableByteEncoder.decode(buf);
return next;
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
}
public LongList asLongList() {

View File

@@ -1,214 +0,0 @@
package org.lucares.pdb.blockstorage.intsequence;
import org.lucares.collections.LongList;
public class LongSequenceEncoderDecoder {
private static final int VALUE_NUM_DATA_BITS = 6;
private static final int VALUE_PREFIX = 1 << VALUE_NUM_DATA_BITS; // 0x01000000
/**
* the value bits are the prefix minus 1, because prefixes start with 0⋯010⋯0,
* so prefix -1 is 0⋯01⋯1 which exactly represents the value bits.
*/
private static final int VALUE_DATA_BITS = VALUE_PREFIX - 1; // 00111111
private static final int CONTINUATION_NUM_DATA_BITS = 7;
private static final int CONTINUATION_PREFIX = 1 << CONTINUATION_NUM_DATA_BITS; // 0x10000000;
/**
* the value bits are the prefix minus 1, because prefixes start with 0⋯010⋯0,
* so prefix -1 is 0⋯01⋯1 which exactly represents the value bits.
*/
private static final int CONTINUATION_DATA_BITS = CONTINUATION_PREFIX - 1;
private static final int CONTINUATION_PREFIX_BITS = (~CONTINUATION_DATA_BITS) & 0xff; // 10000000
private static final ThreadLocal<byte[]> TMP_BUFFER = ThreadLocal.withInitial(() -> new byte[20]);
/**
* Encodes time and value into the given buffer.
* <p>
* If the encoded values do not fit into the buffer, then 0 is returned. The
* caller will have to provide a new buffer with more space.
*
* @param value1 first value, non-negative
* @param value2 second value, non-negative
* @param buffer
* @param offsetInBuffer
* @return number of bytes appended to the provided buffer
*/
public int encodeInto(final long value1, final long value2, final byte[] buffer, final int offsetInBuffer) {
assert value1 >= 0 : "value must be non-negative";
assert value2 >= 0 : "value must be non-negative";
final int bytesNeeded = computeNumberOfEncodedBytes(value1) + computeNumberOfEncodedBytes(value2);
// check if the encoded bytes fit into the provided buffer and copy them into
// the buffer if they fit
if (bytesNeeded <= buffer.length - offsetInBuffer) {
// encode values into temporary buffers
final byte[] tmpBuffer = TMP_BUFFER.get();
final int valueIndex = encode(value1, value2, tmpBuffer);
System.arraycopy(tmpBuffer, valueIndex, buffer, offsetInBuffer, bytesNeeded);
return bytesNeeded;
}
// return 0 if the encoded bytes do not fit
// the caller will have to provide a new buffer
return 0;
}
public int encodeInto(final long value, final byte[] buffer, final int offsetInBuffer) {
assert value >= 0 : "value must be non-negative";
final int bytesNeeded = computeNumberOfEncodedBytes(value);
// check if the encoded bytes fit into the provided buffer and copy them into
// the buffer if they fit
if (bytesNeeded <= buffer.length - offsetInBuffer) {
// encode values into temporary buffers
final byte[] tmpBuffer = TMP_BUFFER.get();
final int valueIndex = encode(value, tmpBuffer);
System.arraycopy(tmpBuffer, valueIndex, buffer, offsetInBuffer, bytesNeeded);
return bytesNeeded;
}
// return 0 if the encoded bytes do not fit
// the caller will have to provide a new buffer
return 0;
}
public static int computeNumberOfEncodedBytes(final long value) {
// the first byte stores 6 bit, the continuation bytes store 7 bits:
// 2^6-1 = 63 -> 1 byte
// 2^13-1 = 8191 -> 2 byte
// 2^20-1 = 1048575 -> 3 byte
// 2^27-1 = 134217727 -> 4 byte
// 2^34-1 = 17179869183 -> 5 byte
// 2^41-1 = 2199023255551 -> 6 byte
// 2^48-1 = 281474976710655-> 7 byte
// 2^55-1 = 36028797018963967-> 8 byte
// 2^62-1 = 4611686018427387903-> 9 byte
// 2^69-1 = 590295810358705651711 -> 10 byte
final int highestOneBit = 64 - Long.numberOfLeadingZeros(value);
// 1 2 3 4 5 6 -> 1
// 7 8 9 10 11 12 13 -> 2
// 14 15 16 17 18 19 20 -> 3
// 21 22 23 24 25 26 27 -> 4
return highestOneBit / 7 + 1;
}
/**
* Encodes the value into the buffer.
* <p>
* The buffer is filled from the end, so that the encoded bytes will be in
* {@code Arrays.copyOfRange(buffer, index, buffer.length)}
*
* @param value the value to encode
* @param buffer the value will be encoded into this buffer. The length must be
* at least 10 bytes.
* @return index of the value start
*/
private int encode(final long value, final byte[] buffer) {
int index = buffer.length - 1;
final long maxFirstByteValue = 63;
long val = value;
while (val > maxFirstByteValue) {
// handles continuation bytes
buffer[index] = (byte) ((val & CONTINUATION_DATA_BITS) | CONTINUATION_PREFIX);
index--;
val = val >> 7; // shift by number of value bits
}
buffer[index] = (byte) (val | VALUE_PREFIX);
return index;
}
private int encode(final long value1, final long value2, final byte[] buffer) {
int index = buffer.length - 1;
final long maxFirstByteValue = 63;
// we are encoding from the end, so the second value must be encoded first
// encode value2
{
long val = value2;
while (val > maxFirstByteValue) {
// handles continuation bytes
buffer[index] = (byte) ((val & CONTINUATION_DATA_BITS) | CONTINUATION_PREFIX);
index--;
val = val >> 7; // shift by number of value bits
}
buffer[index] = (byte) (val | VALUE_PREFIX);
}
index--;
// we are encoding from the end, so the first value must be encoded second
// encode value1
{
long val = value1;
while (val > maxFirstByteValue) {
// handles continuation bytes
buffer[index] = (byte) ((val & CONTINUATION_DATA_BITS) | CONTINUATION_PREFIX);
index--;
val = val >> 7; // shift by number of value bits
}
buffer[index] = (byte) (val | VALUE_PREFIX);
}
return index;
}
public LongList decode(final byte[] buffer) {
final LongList result = new LongList();
decodeInto(buffer, result);
return result;
}
private boolean isContinuationByte(final byte b) {
return (b & CONTINUATION_PREFIX_BITS) == CONTINUATION_PREFIX;
}
public void decodeInto(final byte[] buffer, final LongList bufferedLongs) {
for (int i = 0; i < buffer.length; i++) {
if ((buffer[i] & VALUE_PREFIX) == VALUE_PREFIX) {
long val = buffer[i] & VALUE_DATA_BITS;
while (i + 1 < buffer.length) {
// if ((buffer[i + 1] & CONTINUATION_PREFIX_BITS) == CONTINUATION_PREFIX) {
if (isContinuationByte(buffer[i + 1])) {
val = val << CONTINUATION_NUM_DATA_BITS;
val = val | (buffer[i + 1] & CONTINUATION_DATA_BITS);
i++;
} else {
break;
}
}
bufferedLongs.add(val);
} else if (buffer[i] != 0) {
assert false;
} else {
assert buffer[i] == 0;
// No value, not event the value 0, can be encoded as the null byte.
// Therefore the sequences are null-terminated
break;
}
}
}
}

View File

@@ -0,0 +1,150 @@
package org.lucares.pdb.blockstorage.intsequence;
import java.util.Arrays;
import org.lucares.collections.LongList;
/**
* Encodes longs into bytes using variable byte encoding. We are using a
* transformation that encodes negative values into positive values. Even
* numbers represent positive longs, uneven values represent negative longs, or
* the null.
* <p>
* We then encode encode each 7 bits into one byte. This highest value bit is
* reserved for a flag that tells us whether or not more bytes follow. This bit
* is set for all but the last byte.
* <p>
* Please note two things:
* <ol>
* <li>0 is encoded to 1; the encoded values do not contain 0
* <li>all but the last byte have the high value bit set
* </ol>
* That means no byte will have the value 0. This is important when decoding
* bytes, because we can decode bytes until we encounter the first null byte, or
* we reach the end of the array.
*/
public class VariableByteEncoder {
private static final int CONTINUATION_BYTE_FLAG = 1 << 7; // 10000000
private static final long DATA_BITS = (1 << 7) - 1; // 01111111
/**
* Encodes time and value into the given buffer.
* <p>
* If the encoded values do not fit into the buffer, then 0 is returned. The
* caller will have to provide a new buffer with more space.
*
* @param value1 first value, (between -(2^62)+1 and 2^62)
* @param value2 second value, (between -(2^62)+1 and 2^62)
* @param buffer
* @param offsetInBuffer
* @return number of bytes appended to the provided buffer
*/
public static int encodeInto(final long value1, final long value2, final byte[] buffer, final int offsetInBuffer) {
int offset = offsetInBuffer;
final int bytesAdded1 = encodeInto(value1, buffer, offset);
if (bytesAdded1 > 0) {
offset += bytesAdded1;
final int bytesAdded2 = encodeInto(value2, buffer, offset);
if (bytesAdded2 > 0) {
// both value fit into the buffer
// return the number of added bytes
return bytesAdded1 + bytesAdded2;
} else {
// second value did not fit into the buffer,
// remove the first value
// and return 0 to indicate that the values did not fit
Arrays.fill(buffer, offsetInBuffer, buffer.length, (byte) 0);
return 0;
}
}
// return 0 if the encoded bytes do not fit
// the caller will have to provide a new buffer
return 0;
}
public static LongList decode(final byte[] buffer) {
final LongList result = new LongList();
decodeInto(buffer, result);
return result;
}
public static int encodeInto(final long value, final byte[] buffer, final int offsetInBuffer) {
int offset = offsetInBuffer;
assert value >= Long.MIN_VALUE / 2 + 1 : "min encodable value is -2^62+1";
assert value <= Long.MAX_VALUE / 2 : "max encodable value is 2^62";
long normVal = encodeIntoPositiveValue(value);
try {
final long maxFirstByteValue = 127;
while (normVal > maxFirstByteValue) {
buffer[offset] = (byte) ((normVal & DATA_BITS) | CONTINUATION_BYTE_FLAG);
offset++;
normVal = normVal >> 7; // shift by number of value bits
}
buffer[offset] = (byte) (normVal);
return offset - offsetInBuffer + 1; // return number of encoded bytes
} catch (final ArrayIndexOutOfBoundsException e) {
// We need more bytes to store the value than are available.
// Reset the bytes we just wrote.
Arrays.fill(buffer, offsetInBuffer, buffer.length, (byte) 0);
return 0;
}
}
private static void decodeInto(final byte[] buffer, final LongList bufferedLongs) {
for (int i = 0; i < buffer.length; i++) {
if (buffer[i] == 0) {
// no value is encoded to 0 => there are no further values
break;
} else {
long val = buffer[i] & DATA_BITS;
int shift = 7;
while (!isLastByte(buffer[i]) && i + 1 < buffer.length) {
val = val | ((buffer[i + 1] & DATA_BITS) << shift);
i++;
shift += 7;
}
bufferedLongs.add(decodeIntoSignedValue(val));
}
}
}
/**
* The input value (positive, negative or null) is encoded into a positive
* value.
*
* <pre>
*
* input: 0 1 -1 2 -2 3 -3
* encoded: 1 2 3 4 5 6 7
* </pre>
*/
private static long encodeIntoPositiveValue(final long value) {
return value > 0 ? value * 2 : (value * -2) + 1;
}
/**
* inverse of {@link #encodeIntoPositiveValue(long)}
*
* @param value
* @return
*/
private static long decodeIntoSignedValue(final long value) {
return (value / 2) * (value % 2 == 0 ? 1 : -1);
}
private static boolean isLastByte(final byte b) {
return (b & CONTINUATION_BYTE_FLAG) == 0;
}
}

View File

@@ -3,7 +3,7 @@ package org.lucares.pdb.diskstorage;
import java.nio.MappedByteBuffer;
import org.lucares.collections.LongList;
import org.lucares.pdb.blockstorage.intsequence.LongSequenceEncoderDecoder;
import org.lucares.pdb.blockstorage.intsequence.VariableByteEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -100,7 +100,7 @@ public class DiskBlock {
@Override
public String toString() {
final LongList bufferDecoded = new LongSequenceEncoderDecoder().decode(buffer);
final LongList bufferDecoded = VariableByteEncoder.decode(buffer);
return "DiskBlock[" + blockNumber + ", bufferDecoded=" + bufferDecoded + "]";
}
}