Java源码示例:org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper
示例1
/**
* Restore the KV-state / ColumnFamily data for all key-groups referenced by the current state handle.
*/
private void restoreKVStateData() throws IOException, RocksDBException {
//for all key-groups in the current state handle...
try (RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db)) {
for (Tuple2<Integer, Long> keyGroupOffset : currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
int keyGroup = keyGroupOffset.f0;
// Check that restored key groups all belong to the backend
Preconditions.checkState(keyGroupRange.contains(keyGroup),
"The key group must belong to the backend");
long offset = keyGroupOffset.f1;
//not empty key-group?
if (0L != offset) {
currentStateHandleInStream.seek(offset);
try (InputStream compressedKgIn = keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream)) {
DataInputViewStreamWrapper compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn);
//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
int kvStateId = compressedKgInputView.readShort();
ColumnFamilyHandle handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
//insert all k/v pairs into DB
boolean keyGroupHasMoreKeys = true;
while (keyGroupHasMoreKeys) {
byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
if (hasMetaDataFollowsFlag(key)) {
//clear the signal bit in the key to make it ready for insertion again
clearMetaDataFollowsFlag(key);
writeBatchWrapper.put(handle, key, value);
//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
kvStateId = END_OF_KEY_GROUP_MARK
& compressedKgInputView.readShort();
if (END_OF_KEY_GROUP_MARK == kvStateId) {
keyGroupHasMoreKeys = false;
} else {
handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
}
} else {
writeBatchWrapper.put(handle, key, value);
}
}
}
}
}
}
}
示例2
/**
* Recovery from multi incremental states with rescaling. For rescaling, this method creates a temporary
* RocksDB instance for a key-groups shard. All contents from the temporary instance are copied into the
* real restore instance and then the temporary instance is discarded.
*/
private void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandles) throws Exception {
// Prepare for restore with rescaling
KeyedStateHandle initialHandle = RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
restoreStateHandles, keyGroupRange);
// Init base DB instance
if (initialHandle != null) {
restoreStateHandles.remove(initialHandle);
initDBWithRescaling(initialHandle);
} else {
openDB();
}
// Transfer remaining key-groups from temporary instance into base DB
byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
RocksDBKeySerializationUtils.serializeKeyGroup(keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes);
byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
RocksDBKeySerializationUtils.serializeKeyGroup(keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes);
for (KeyedStateHandle rawStateHandle : restoreStateHandles) {
if (!(rawStateHandle instanceof IncrementalRemoteKeyedStateHandle)) {
throw new IllegalStateException("Unexpected state handle type, " +
"expected " + IncrementalRemoteKeyedStateHandle.class +
", but found " + rawStateHandle.getClass());
}
Path temporaryRestoreInstancePath = new Path(instanceBasePath.getAbsolutePath() + UUID.randomUUID().toString());
try (RestoredDBInstance tmpRestoreDBInfo = restoreDBInstanceFromStateHandle(
(IncrementalRemoteKeyedStateHandle) rawStateHandle,
temporaryRestoreInstancePath);
RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(this.db)) {
List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors = tmpRestoreDBInfo.columnFamilyDescriptors;
List<ColumnFamilyHandle> tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles;
// iterating only the requested descriptors automatically skips the default column family handle
for (int i = 0; i < tmpColumnFamilyDescriptors.size(); ++i) {
ColumnFamilyHandle tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(i);
ColumnFamilyHandle targetColumnFamilyHandle = getOrRegisterStateColumnFamilyHandle(
null, tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i))
.columnFamilyHandle;
try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle)) {
iterator.seek(startKeyGroupPrefixBytes);
while (iterator.isValid()) {
if (RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(iterator.key(), stopKeyGroupPrefixBytes)) {
writeBatchWrapper.put(targetColumnFamilyHandle, iterator.key(), iterator.value());
} else {
// Since the iterator will visit the record according to the sorted order,
// we can just break here.
break;
}
iterator.next();
}
} // releases native iterator resources
}
} finally {
cleanUpPathQuietly(temporaryRestoreInstancePath);
}
}
}
示例3
/**
* Restore the KV-state / ColumnFamily data for all key-groups referenced by the current state handle.
*/
private void restoreKVStateData() throws IOException, RocksDBException {
//for all key-groups in the current state handle...
try (RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db)) {
for (Tuple2<Integer, Long> keyGroupOffset : currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
int keyGroup = keyGroupOffset.f0;
// Check that restored key groups all belong to the backend
Preconditions.checkState(keyGroupRange.contains(keyGroup),
"The key group must belong to the backend");
long offset = keyGroupOffset.f1;
//not empty key-group?
if (0L != offset) {
currentStateHandleInStream.seek(offset);
try (InputStream compressedKgIn = keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream)) {
DataInputViewStreamWrapper compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn);
//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
int kvStateId = compressedKgInputView.readShort();
ColumnFamilyHandle handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
//insert all k/v pairs into DB
boolean keyGroupHasMoreKeys = true;
while (keyGroupHasMoreKeys) {
byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
if (hasMetaDataFollowsFlag(key)) {
//clear the signal bit in the key to make it ready for insertion again
clearMetaDataFollowsFlag(key);
writeBatchWrapper.put(handle, key, value);
//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
kvStateId = END_OF_KEY_GROUP_MARK
& compressedKgInputView.readShort();
if (END_OF_KEY_GROUP_MARK == kvStateId) {
keyGroupHasMoreKeys = false;
} else {
handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
}
} else {
writeBatchWrapper.put(handle, key, value);
}
}
}
}
}
}
}
示例4
/**
* Recovery from multi incremental states with rescaling. For rescaling, this method creates a temporary
* RocksDB instance for a key-groups shard. All contents from the temporary instance are copied into the
* real restore instance and then the temporary instance is discarded.
*/
private void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandles) throws Exception {
// Prepare for restore with rescaling
KeyedStateHandle initialHandle = RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
restoreStateHandles, keyGroupRange);
// Init base DB instance
if (initialHandle != null) {
restoreStateHandles.remove(initialHandle);
initDBWithRescaling(initialHandle);
} else {
openDB();
}
// Transfer remaining key-groups from temporary instance into base DB
byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
RocksDBKeySerializationUtils.serializeKeyGroup(keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes);
byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
RocksDBKeySerializationUtils.serializeKeyGroup(keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes);
for (KeyedStateHandle rawStateHandle : restoreStateHandles) {
if (!(rawStateHandle instanceof IncrementalRemoteKeyedStateHandle)) {
throw new IllegalStateException("Unexpected state handle type, " +
"expected " + IncrementalRemoteKeyedStateHandle.class +
", but found " + rawStateHandle.getClass());
}
Path temporaryRestoreInstancePath = new Path(instanceBasePath.getAbsolutePath() + UUID.randomUUID().toString());
try (RestoredDBInstance tmpRestoreDBInfo = restoreDBInstanceFromStateHandle(
(IncrementalRemoteKeyedStateHandle) rawStateHandle,
temporaryRestoreInstancePath);
RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(this.db)) {
List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors = tmpRestoreDBInfo.columnFamilyDescriptors;
List<ColumnFamilyHandle> tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles;
// iterating only the requested descriptors automatically skips the default column family handle
for (int i = 0; i < tmpColumnFamilyDescriptors.size(); ++i) {
ColumnFamilyHandle tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(i);
ColumnFamilyHandle targetColumnFamilyHandle = getOrRegisterStateColumnFamilyHandle(
null, tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i))
.columnFamilyHandle;
try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle)) {
iterator.seek(startKeyGroupPrefixBytes);
while (iterator.isValid()) {
if (RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(iterator.key(), stopKeyGroupPrefixBytes)) {
writeBatchWrapper.put(targetColumnFamilyHandle, iterator.key(), iterator.value());
} else {
// Since the iterator will visit the record according to the sorted order,
// we can just break here.
break;
}
iterator.next();
}
} // releases native iterator resources
}
} finally {
cleanUpPathQuietly(temporaryRestoreInstancePath);
}
}
}
示例5
/**
* Restore the KV-state / ColumnFamily data for all key-groups referenced by the current state handle.
*/
private void restoreKVStateData() throws IOException, RocksDBException {
//for all key-groups in the current state handle...
try (RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeBatchSize)) {
for (Tuple2<Integer, Long> keyGroupOffset : currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
int keyGroup = keyGroupOffset.f0;
// Check that restored key groups all belong to the backend
Preconditions.checkState(keyGroupRange.contains(keyGroup),
"The key group must belong to the backend");
long offset = keyGroupOffset.f1;
//not empty key-group?
if (0L != offset) {
currentStateHandleInStream.seek(offset);
try (InputStream compressedKgIn = keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream)) {
DataInputViewStreamWrapper compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn);
//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
int kvStateId = compressedKgInputView.readShort();
ColumnFamilyHandle handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
//insert all k/v pairs into DB
boolean keyGroupHasMoreKeys = true;
while (keyGroupHasMoreKeys) {
byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
if (hasMetaDataFollowsFlag(key)) {
//clear the signal bit in the key to make it ready for insertion again
clearMetaDataFollowsFlag(key);
writeBatchWrapper.put(handle, key, value);
//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
kvStateId = END_OF_KEY_GROUP_MARK
& compressedKgInputView.readShort();
if (END_OF_KEY_GROUP_MARK == kvStateId) {
keyGroupHasMoreKeys = false;
} else {
handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
}
} else {
writeBatchWrapper.put(handle, key, value);
}
}
}
}
}
}
}
示例6
/**
* Recovery from multi incremental states with rescaling. For rescaling, this method creates a temporary
* RocksDB instance for a key-groups shard. All contents from the temporary instance are copied into the
* real restore instance and then the temporary instance is discarded.
*/
private void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandles) throws Exception {
// Prepare for restore with rescaling
KeyedStateHandle initialHandle = RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
restoreStateHandles, keyGroupRange);
// Init base DB instance
if (initialHandle != null) {
restoreStateHandles.remove(initialHandle);
initDBWithRescaling(initialHandle);
} else {
openDB();
}
// Transfer remaining key-groups from temporary instance into base DB
byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
RocksDBKeySerializationUtils.serializeKeyGroup(keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes);
byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
RocksDBKeySerializationUtils.serializeKeyGroup(keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes);
for (KeyedStateHandle rawStateHandle : restoreStateHandles) {
if (!(rawStateHandle instanceof IncrementalRemoteKeyedStateHandle)) {
throw new IllegalStateException("Unexpected state handle type, " +
"expected " + IncrementalRemoteKeyedStateHandle.class +
", but found " + rawStateHandle.getClass());
}
Path temporaryRestoreInstancePath = instanceBasePath.getAbsoluteFile().toPath().resolve(UUID.randomUUID().toString());
try (RestoredDBInstance tmpRestoreDBInfo = restoreDBInstanceFromStateHandle(
(IncrementalRemoteKeyedStateHandle) rawStateHandle,
temporaryRestoreInstancePath);
RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(this.db, writeBatchSize)) {
List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors = tmpRestoreDBInfo.columnFamilyDescriptors;
List<ColumnFamilyHandle> tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles;
// iterating only the requested descriptors automatically skips the default column family handle
for (int i = 0; i < tmpColumnFamilyDescriptors.size(); ++i) {
ColumnFamilyHandle tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(i);
ColumnFamilyHandle targetColumnFamilyHandle = getOrRegisterStateColumnFamilyHandle(
null, tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i))
.columnFamilyHandle;
try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle, tmpRestoreDBInfo.readOptions)) {
iterator.seek(startKeyGroupPrefixBytes);
while (iterator.isValid()) {
if (RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(iterator.key(), stopKeyGroupPrefixBytes)) {
writeBatchWrapper.put(targetColumnFamilyHandle, iterator.key(), iterator.value());
} else {
// Since the iterator will visit the record according to the sorted order,
// we can just break here.
break;
}
iterator.next();
}
} // releases native iterator resources
}
} finally {
cleanUpPathQuietly(temporaryRestoreInstancePath);
}
}
}