Java源码示例:org.apache.flink.runtime.state.internal.InternalMapState
示例1
@Override
public <N, UK, UV> StateMapView<N, UK, UV> getStateMapView(String stateName, MapViewTypeInfo<UK, UV> mapViewTypeInfo) throws Exception {
MapStateDescriptor<UK, UV> mapStateDescriptor = new MapStateDescriptor<>(
stateName,
mapViewTypeInfo.getKeyType(),
mapViewTypeInfo.getValueType());
MapState<UK, UV> mapState = keyedStateBackend.getOrCreateKeyedState(windowSerializer, mapStateDescriptor);
// explict cast to internal state
InternalMapState<?, N, UK, UV> internalMapState = (InternalMapState<?, N, UK, UV>) mapState;
if (mapViewTypeInfo.isNullAware()) {
ValueStateDescriptor<UV> nullStateDescriptor = new ValueStateDescriptor<>(
stateName + NULL_STATE_POSTFIX,
mapViewTypeInfo.getValueType());
ValueState<UV> nullState = keyedStateBackend.getOrCreateKeyedState(windowSerializer, nullStateDescriptor);
// explict cast to internal state
InternalValueState<?, N, UV> internalNullState = (InternalValueState<?, N, UV>) nullState;
return new StateMapView.NamespacedStateMapViewWithKeysNullable<>(internalMapState, internalNullState);
} else {
return new StateMapView.NamespacedStateMapViewWithKeysNotNull<>(internalMapState);
}
}
示例2
@Override
public <N, UK, UV> StateMapView<N, UK, UV> getStateMapView(String stateName, MapViewTypeInfo<UK, UV> mapViewTypeInfo) throws Exception {
MapStateDescriptor<UK, UV> mapStateDescriptor = new MapStateDescriptor<>(
stateName,
mapViewTypeInfo.getKeyType(),
mapViewTypeInfo.getValueType());
MapState<UK, UV> mapState = keyedStateBackend.getOrCreateKeyedState(windowSerializer, mapStateDescriptor);
// explict cast to internal state
InternalMapState<?, N, UK, UV> internalMapState = (InternalMapState<?, N, UK, UV>) mapState;
if (mapViewTypeInfo.isNullAware()) {
ValueStateDescriptor<UV> nullStateDescriptor = new ValueStateDescriptor<>(
stateName + NULL_STATE_POSTFIX,
mapViewTypeInfo.getValueType());
ValueState<UV> nullState = keyedStateBackend.getOrCreateKeyedState(windowSerializer, nullStateDescriptor);
// explict cast to internal state
InternalValueState<?, N, UV> internalNullState = (InternalValueState<?, N, UV>) nullState;
return new StateMapView.NamespacedStateMapViewWithKeysNullable<>(internalMapState, internalNullState);
} else {
return new StateMapView.NamespacedStateMapViewWithKeysNotNull<>(internalMapState);
}
}
示例3
/**
* Tests map serialization and deserialization match.
*
* @see KvStateRequestSerializerTest#testMapSerialization()
* KvStateRequestSerializerTest#testMapSerialization() using the heap state back-end
* test
*/
@Test
public void testMapSerialization() throws Exception {
final long key = 0L;
// objects for RocksDB state list serialisation
final RocksDBKeyedStateBackend<Long> longHeapKeyedStateBackend = RocksDBTestUtils
.builderForTestDefaults(temporaryFolder.getRoot(), LongSerializer.INSTANCE)
.build();
longHeapKeyedStateBackend.setCurrentKey(key);
final InternalMapState<Long, VoidNamespace, Long, String> mapState =
(InternalMapState<Long, VoidNamespace, Long, String>)
longHeapKeyedStateBackend.getPartitionedState(
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE,
new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE));
KvStateRequestSerializerTest.testMapSerialization(key, mapState);
longHeapKeyedStateBackend.dispose();
}
示例4
/**
* Tests map serialization utils.
*/
@Test
public void testMapSerialization() throws Exception {
final long key = 0L;
final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend = getLongHeapKeyedStateBackend(key);
final InternalMapState<Long, VoidNamespace, Long, String> mapState =
(InternalMapState<Long, VoidNamespace, Long, String>)
longHeapKeyedStateBackend.getPartitionedState(
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE,
new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE));
testMapSerialization(key, mapState);
}
示例5
/**
* Tests map serialization utils.
*/
@Test
public void testMapSerialization() throws Exception {
final long key = 0L;
final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend = getLongHeapKeyedStateBackend(key);
final InternalMapState<Long, VoidNamespace, Long, String> mapState =
(InternalMapState<Long, VoidNamespace, Long, String>)
longHeapKeyedStateBackend.getPartitionedState(
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE,
new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE));
testMapSerialization(key, mapState);
}
示例6
/**
* Tests map serialization utils.
*/
@Test
public void testMapSerialization() throws Exception {
final long key = 0L;
final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend = getLongHeapKeyedStateBackend(key);
final InternalMapState<Long, VoidNamespace, Long, String> mapState =
(InternalMapState<Long, VoidNamespace, Long, String>)
longHeapKeyedStateBackend.getPartitionedState(
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE,
new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE));
testMapSerialization(key, mapState);
}
示例7
TtlMapState(TtlStateContext<InternalMapState<K, N, UK, TtlValue<UV>>, Map<UK, UV>> ttlStateContext) {
super(ttlStateContext);
}
示例8
@Test
public void testMapStateMigrationAfterHashMapSerRemoval() throws Exception {
ClassLoader cl = getClass().getClassLoader();
URL resource = cl.getResource("heap_keyed_statebackend_1_5_map.snapshot");
Preconditions.checkNotNull(resource, "Binary snapshot resource not found!");
final SnapshotResult<KeyedStateHandle> stateHandles;
try (BufferedInputStream bis = new BufferedInputStream((new FileInputStream(resource.getFile())))) {
stateHandles = InstantiationUtil.deserializeObject(bis, Thread.currentThread().getContextClassLoader());
}
final KeyedStateHandle stateHandle = stateHandles.getJobManagerOwnedSnapshot();
try (final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend(StateObjectCollection.singleton(stateHandle))) {
final Integer namespace1 = 1;
final Integer namespace2 = 2;
final Integer namespace3 = 3;
final MapStateDescriptor<Long, Long> stateDescr = new MapStateDescriptor<>("my-map-state", Long.class, Long.class);
stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
InternalMapState<String, Integer, Long, Long> state = keyedBackend.createInternalState(IntSerializer.INSTANCE, stateDescr);
keyedBackend.setCurrentKey("abc");
state.setCurrentNamespace(namespace1);
assertEquals(33L, (long) state.get(33L));
assertEquals(55L, (long) state.get(55L));
assertEquals(2, getStateSize(state));
state.setCurrentNamespace(namespace2);
assertEquals(22L, (long) state.get(22L));
assertEquals(11L, (long) state.get(11L));
assertEquals(2, getStateSize(state));
state.setCurrentNamespace(namespace3);
assertEquals(44L, (long) state.get(44L));
assertEquals(1, getStateSize(state));
keyedBackend.setCurrentKey("def");
state.setCurrentNamespace(namespace1);
assertEquals(11L, (long) state.get(11L));
assertEquals(44L, (long) state.get(44L));
assertEquals(2, getStateSize(state));
state.setCurrentNamespace(namespace3);
assertEquals(22L, (long) state.get(22L));
assertEquals(55L, (long) state.get(55L));
assertEquals(33L, (long) state.get(33L));
assertEquals(3, getStateSize(state));
keyedBackend.setCurrentKey("jkl");
state.setCurrentNamespace(namespace1);
assertEquals(11L, (long) state.get(11L));
assertEquals(22L, (long) state.get(22L));
assertEquals(33L, (long) state.get(33L));
assertEquals(44L, (long) state.get(44L));
assertEquals(55L, (long) state.get(55L));
assertEquals(5, getStateSize(state));
keyedBackend.setCurrentKey("mno");
state.setCurrentNamespace(namespace3);
assertEquals(11L, (long) state.get(11L));
assertEquals(22L, (long) state.get(22L));
assertEquals(33L, (long) state.get(33L));
assertEquals(44L, (long) state.get(44L));
assertEquals(55L, (long) state.get(55L));
assertEquals(5, getStateSize(state));
RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot = keyedBackend.snapshot(
1L,
1L,
new MemCheckpointStreamFactory(4 * 1024 * 1024),
CheckpointOptions.forCheckpointWithDefaultLocation());
snapshot.run();
}
}
示例9
private <K, N, UK, UV> int getStateSize(InternalMapState<K, N, UK, UV> mapState) throws Exception {
int i = 0;
for (Iterator<Map.Entry<UK, UV>> itt = mapState.iterator(); itt.hasNext(); i++, itt.next()) {}
return i;
}
示例10
/**
* Verifies that the serialization of a map using the given map state
* matches the deserialization with {@link KvStateSerializer#deserializeList}.
*
* @param key
* key of the map state
* @param mapState
* map state using the {@link VoidNamespace}, must also be a {@link InternalKvState} instance
*
* @throws Exception
*/
public static void testMapSerialization(
final long key,
final InternalMapState<Long, VoidNamespace, Long, String> mapState) throws Exception {
TypeSerializer<Long> userKeySerializer = LongSerializer.INSTANCE;
TypeSerializer<String> userValueSerializer = StringSerializer.INSTANCE;
mapState.setCurrentNamespace(VoidNamespace.INSTANCE);
// Map
final int numElements = 10;
final Map<Long, String> expectedValues = new HashMap<>();
for (int i = 1; i <= numElements; i++) {
final long value = ThreadLocalRandom.current().nextLong();
expectedValues.put(value, Long.toString(value));
mapState.put(value, Long.toString(value));
}
expectedValues.put(0L, null);
mapState.put(0L, null);
final byte[] serializedKey =
KvStateSerializer.serializeKeyAndNamespace(
key, LongSerializer.INSTANCE,
VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
final byte[] serializedValues = mapState.getSerializedValue(
serializedKey,
mapState.getKeySerializer(),
mapState.getNamespaceSerializer(),
mapState.getValueSerializer());
Map<Long, String> actualValues = KvStateSerializer.deserializeMap(serializedValues, userKeySerializer, userValueSerializer);
assertEquals(expectedValues.size(), actualValues.size());
for (Map.Entry<Long, String> actualEntry : actualValues.entrySet()) {
assertEquals(expectedValues.get(actualEntry.getKey()), actualEntry.getValue());
}
// Single value
ByteArrayOutputStream baos = new ByteArrayOutputStream();
long expectedKey = ThreadLocalRandom.current().nextLong();
String expectedValue = Long.toString(expectedKey);
byte[] isNull = {0};
baos.write(KvStateSerializer.serializeValue(expectedKey, userKeySerializer));
baos.write(isNull);
baos.write(KvStateSerializer.serializeValue(expectedValue, userValueSerializer));
byte[] serializedValue = baos.toByteArray();
Map<Long, String> actualValue = KvStateSerializer.deserializeMap(serializedValue, userKeySerializer, userValueSerializer);
assertEquals(1, actualValue.size());
assertEquals(expectedValue, actualValue.get(expectedKey));
}
示例11
/**
* Tests map serialization and deserialization match.
*
* @see KvStateRequestSerializerTest#testMapSerialization()
* KvStateRequestSerializerTest#testMapSerialization() using the heap state back-end
* test
*/
@Test
public void testMapSerialization() throws Exception {
final long key = 0L;
// objects for RocksDB state list serialisation
DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions();
dbOptions.setCreateIfMissing(true);
ExecutionConfig executionConfig = new ExecutionConfig();
final RocksDBKeyedStateBackend<Long> longHeapKeyedStateBackend =
new RocksDBKeyedStateBackendBuilder<>(
"no-op",
ClassLoader.getSystemClassLoader(),
temporaryFolder.getRoot(),
dbOptions,
stateName -> PredefinedOptions.DEFAULT.createColumnOptions(),
mock(TaskKvStateRegistry.class),
LongSerializer.INSTANCE,
1,
new KeyGroupRange(0, 0),
executionConfig,
TestLocalRecoveryConfig.disabled(),
RocksDBStateBackend.PriorityQueueStateType.HEAP,
TtlTimeProvider.DEFAULT,
new UnregisteredMetricsGroup(),
Collections.emptyList(),
AbstractStateBackend.getCompressionDecorator(executionConfig),
new CloseableRegistry()
).build();
longHeapKeyedStateBackend.setCurrentKey(key);
final InternalMapState<Long, VoidNamespace, Long, String> mapState =
(InternalMapState<Long, VoidNamespace, Long, String>)
longHeapKeyedStateBackend.getPartitionedState(
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE,
new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE));
KvStateRequestSerializerTest.testMapSerialization(key, mapState);
longHeapKeyedStateBackend.dispose();
}
示例12
public NamespacedStateMapViewWithKeysNotNull(InternalMapState<?, N, MK, MV> internalMapState) {
this.internalMapState = internalMapState;
}
示例13
public NamespacedStateMapViewWithKeysNullable(InternalMapState<?, N, MK, MV> internalMapState, InternalValueState<?, N, MV> internalNullState) {
this.internalMapState = internalMapState;
this.internalNullState = internalNullState;
}
示例14
TtlMapState(TtlStateContext<InternalMapState<K, N, UK, TtlValue<UV>>, Map<UK, UV>> ttlStateContext) {
super(ttlStateContext);
}
示例15
@Test
public void testMapStateMigrationAfterHashMapSerRemoval() throws Exception {
ClassLoader cl = getClass().getClassLoader();
URL resource = cl.getResource("heap_keyed_statebackend_1_5_map.snapshot");
Preconditions.checkNotNull(resource, "Binary snapshot resource not found!");
final SnapshotResult<KeyedStateHandle> stateHandles;
try (BufferedInputStream bis = new BufferedInputStream((new FileInputStream(resource.getFile())))) {
stateHandles = InstantiationUtil.deserializeObject(bis, Thread.currentThread().getContextClassLoader());
}
final KeyedStateHandle stateHandle = stateHandles.getJobManagerOwnedSnapshot();
try (final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend(StateObjectCollection.singleton(stateHandle))) {
final Integer namespace1 = 1;
final Integer namespace2 = 2;
final Integer namespace3 = 3;
final MapStateDescriptor<Long, Long> stateDescr = new MapStateDescriptor<>("my-map-state", Long.class, Long.class);
stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
InternalMapState<String, Integer, Long, Long> state = keyedBackend.createInternalState(IntSerializer.INSTANCE, stateDescr);
keyedBackend.setCurrentKey("abc");
state.setCurrentNamespace(namespace1);
assertEquals(33L, (long) state.get(33L));
assertEquals(55L, (long) state.get(55L));
assertEquals(2, getStateSize(state));
state.setCurrentNamespace(namespace2);
assertEquals(22L, (long) state.get(22L));
assertEquals(11L, (long) state.get(11L));
assertEquals(2, getStateSize(state));
state.setCurrentNamespace(namespace3);
assertEquals(44L, (long) state.get(44L));
assertEquals(1, getStateSize(state));
keyedBackend.setCurrentKey("def");
state.setCurrentNamespace(namespace1);
assertEquals(11L, (long) state.get(11L));
assertEquals(44L, (long) state.get(44L));
assertEquals(2, getStateSize(state));
state.setCurrentNamespace(namespace3);
assertEquals(22L, (long) state.get(22L));
assertEquals(55L, (long) state.get(55L));
assertEquals(33L, (long) state.get(33L));
assertEquals(3, getStateSize(state));
keyedBackend.setCurrentKey("jkl");
state.setCurrentNamespace(namespace1);
assertEquals(11L, (long) state.get(11L));
assertEquals(22L, (long) state.get(22L));
assertEquals(33L, (long) state.get(33L));
assertEquals(44L, (long) state.get(44L));
assertEquals(55L, (long) state.get(55L));
assertEquals(5, getStateSize(state));
keyedBackend.setCurrentKey("mno");
state.setCurrentNamespace(namespace3);
assertEquals(11L, (long) state.get(11L));
assertEquals(22L, (long) state.get(22L));
assertEquals(33L, (long) state.get(33L));
assertEquals(44L, (long) state.get(44L));
assertEquals(55L, (long) state.get(55L));
assertEquals(5, getStateSize(state));
RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot = keyedBackend.snapshot(
1L,
1L,
new MemCheckpointStreamFactory(4 * 1024 * 1024),
CheckpointOptions.forCheckpointWithDefaultLocation());
snapshot.run();
}
}
示例16
private <K, N, UK, UV> int getStateSize(InternalMapState<K, N, UK, UV> mapState) throws Exception {
int i = 0;
for (Iterator<Map.Entry<UK, UV>> itt = mapState.iterator(); itt.hasNext(); i++, itt.next()) {}
return i;
}
示例17
/**
* Verifies that the serialization of a map using the given map state
* matches the deserialization with {@link KvStateSerializer#deserializeList}.
*
* @param key
* key of the map state
* @param mapState
* map state using the {@link VoidNamespace}, must also be a {@link InternalKvState} instance
*
* @throws Exception
*/
public static void testMapSerialization(
final long key,
final InternalMapState<Long, VoidNamespace, Long, String> mapState) throws Exception {
TypeSerializer<Long> userKeySerializer = LongSerializer.INSTANCE;
TypeSerializer<String> userValueSerializer = StringSerializer.INSTANCE;
mapState.setCurrentNamespace(VoidNamespace.INSTANCE);
// Map
final int numElements = 10;
final Map<Long, String> expectedValues = new HashMap<>();
for (int i = 1; i <= numElements; i++) {
final long value = ThreadLocalRandom.current().nextLong();
expectedValues.put(value, Long.toString(value));
mapState.put(value, Long.toString(value));
}
expectedValues.put(0L, null);
mapState.put(0L, null);
final byte[] serializedKey =
KvStateSerializer.serializeKeyAndNamespace(
key, LongSerializer.INSTANCE,
VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
final byte[] serializedValues = mapState.getSerializedValue(
serializedKey,
mapState.getKeySerializer(),
mapState.getNamespaceSerializer(),
mapState.getValueSerializer());
Map<Long, String> actualValues = KvStateSerializer.deserializeMap(serializedValues, userKeySerializer, userValueSerializer);
assertEquals(expectedValues.size(), actualValues.size());
for (Map.Entry<Long, String> actualEntry : actualValues.entrySet()) {
assertEquals(expectedValues.get(actualEntry.getKey()), actualEntry.getValue());
}
// Single value
ByteArrayOutputStream baos = new ByteArrayOutputStream();
long expectedKey = ThreadLocalRandom.current().nextLong();
String expectedValue = Long.toString(expectedKey);
byte[] isNull = {0};
baos.write(KvStateSerializer.serializeValue(expectedKey, userKeySerializer));
baos.write(isNull);
baos.write(KvStateSerializer.serializeValue(expectedValue, userValueSerializer));
byte[] serializedValue = baos.toByteArray();
Map<Long, String> actualValue = KvStateSerializer.deserializeMap(serializedValue, userKeySerializer, userValueSerializer);
assertEquals(1, actualValue.size());
assertEquals(expectedValue, actualValue.get(expectedKey));
}
示例18
/**
* Tests map serialization and deserialization match.
*
* @see KvStateRequestSerializerTest#testMapSerialization()
* KvStateRequestSerializerTest#testMapSerialization() using the heap state back-end
* test
*/
@Test
public void testMapSerialization() throws Exception {
final long key = 0L;
// objects for RocksDB state list serialisation
DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions();
dbOptions.setCreateIfMissing(true);
ExecutionConfig executionConfig = new ExecutionConfig();
final RocksDBKeyedStateBackend<Long> longHeapKeyedStateBackend =
new RocksDBKeyedStateBackendBuilder<>(
"no-op",
ClassLoader.getSystemClassLoader(),
temporaryFolder.getRoot(),
dbOptions,
stateName -> PredefinedOptions.DEFAULT.createColumnOptions(),
mock(TaskKvStateRegistry.class),
LongSerializer.INSTANCE,
1,
new KeyGroupRange(0, 0),
executionConfig,
TestLocalRecoveryConfig.disabled(),
RocksDBStateBackend.PriorityQueueStateType.HEAP,
TtlTimeProvider.DEFAULT,
new UnregisteredMetricsGroup(),
Collections.emptyList(),
AbstractStateBackend.getCompressionDecorator(executionConfig),
new CloseableRegistry()
).build();
longHeapKeyedStateBackend.setCurrentKey(key);
final InternalMapState<Long, VoidNamespace, Long, String> mapState =
(InternalMapState<Long, VoidNamespace, Long, String>)
longHeapKeyedStateBackend.getPartitionedState(
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE,
new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE));
KvStateRequestSerializerTest.testMapSerialization(key, mapState);
longHeapKeyedStateBackend.dispose();
}
示例19
public NamespacedStateMapViewWithKeysNotNull(InternalMapState<?, N, MK, MV> internalMapState) {
this.internalMapState = internalMapState;
}
示例20
public NamespacedStateMapViewWithKeysNullable(InternalMapState<?, N, MK, MV> internalMapState, InternalValueState<?, N, MV> internalNullState) {
this.internalMapState = internalMapState;
this.internalNullState = internalNullState;
}
示例21
TtlMapState(TtlStateContext<InternalMapState<K, N, UK, TtlValue<UV>>, Map<UK, UV>> ttlStateContext) {
super(ttlStateContext);
}
示例22
@Test
public void testMapStateMigrationAfterHashMapSerRemoval() throws Exception {
ClassLoader cl = getClass().getClassLoader();
URL resource = cl.getResource("heap_keyed_statebackend_1_5_map.snapshot");
Preconditions.checkNotNull(resource, "Binary snapshot resource not found!");
final SnapshotResult<KeyedStateHandle> stateHandles;
try (BufferedInputStream bis = new BufferedInputStream((new FileInputStream(resource.getFile())))) {
stateHandles = InstantiationUtil.deserializeObject(bis, Thread.currentThread().getContextClassLoader());
}
final KeyedStateHandle stateHandle = stateHandles.getJobManagerOwnedSnapshot();
try (final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend(StateObjectCollection.singleton(stateHandle))) {
final Integer namespace1 = 1;
final Integer namespace2 = 2;
final Integer namespace3 = 3;
final MapStateDescriptor<Long, Long> stateDescr = new MapStateDescriptor<>("my-map-state", Long.class, Long.class);
stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
InternalMapState<String, Integer, Long, Long> state = keyedBackend.createInternalState(IntSerializer.INSTANCE, stateDescr);
keyedBackend.setCurrentKey("abc");
state.setCurrentNamespace(namespace1);
assertEquals(33L, (long) state.get(33L));
assertEquals(55L, (long) state.get(55L));
assertEquals(2, getStateSize(state));
state.setCurrentNamespace(namespace2);
assertEquals(22L, (long) state.get(22L));
assertEquals(11L, (long) state.get(11L));
assertEquals(2, getStateSize(state));
state.setCurrentNamespace(namespace3);
assertEquals(44L, (long) state.get(44L));
assertEquals(1, getStateSize(state));
keyedBackend.setCurrentKey("def");
state.setCurrentNamespace(namespace1);
assertEquals(11L, (long) state.get(11L));
assertEquals(44L, (long) state.get(44L));
assertEquals(2, getStateSize(state));
state.setCurrentNamespace(namespace3);
assertEquals(22L, (long) state.get(22L));
assertEquals(55L, (long) state.get(55L));
assertEquals(33L, (long) state.get(33L));
assertEquals(3, getStateSize(state));
keyedBackend.setCurrentKey("jkl");
state.setCurrentNamespace(namespace1);
assertEquals(11L, (long) state.get(11L));
assertEquals(22L, (long) state.get(22L));
assertEquals(33L, (long) state.get(33L));
assertEquals(44L, (long) state.get(44L));
assertEquals(55L, (long) state.get(55L));
assertEquals(5, getStateSize(state));
keyedBackend.setCurrentKey("mno");
state.setCurrentNamespace(namespace3);
assertEquals(11L, (long) state.get(11L));
assertEquals(22L, (long) state.get(22L));
assertEquals(33L, (long) state.get(33L));
assertEquals(44L, (long) state.get(44L));
assertEquals(55L, (long) state.get(55L));
assertEquals(5, getStateSize(state));
RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot = keyedBackend.snapshot(
1L,
1L,
new MemCheckpointStreamFactory(4 * 1024 * 1024),
CheckpointOptions.forCheckpointWithDefaultLocation());
snapshot.run();
}
}
示例23
private <K, N, UK, UV> int getStateSize(InternalMapState<K, N, UK, UV> mapState) throws Exception {
int i = 0;
for (Iterator<Map.Entry<UK, UV>> itt = mapState.iterator(); itt.hasNext(); i++, itt.next()) {}
return i;
}
示例24
/**
* Verifies that the serialization of a map using the given map state
* matches the deserialization with {@link KvStateSerializer#deserializeList}.
*
* @param key
* key of the map state
* @param mapState
* map state using the {@link VoidNamespace}, must also be a {@link InternalKvState} instance
*
* @throws Exception
*/
public static void testMapSerialization(
final long key,
final InternalMapState<Long, VoidNamespace, Long, String> mapState) throws Exception {
TypeSerializer<Long> userKeySerializer = LongSerializer.INSTANCE;
TypeSerializer<String> userValueSerializer = StringSerializer.INSTANCE;
mapState.setCurrentNamespace(VoidNamespace.INSTANCE);
// Map
final int numElements = 10;
final Map<Long, String> expectedValues = new HashMap<>();
for (int i = 1; i <= numElements; i++) {
final long value = ThreadLocalRandom.current().nextLong();
expectedValues.put(value, Long.toString(value));
mapState.put(value, Long.toString(value));
}
expectedValues.put(0L, null);
mapState.put(0L, null);
final byte[] serializedKey =
KvStateSerializer.serializeKeyAndNamespace(
key, LongSerializer.INSTANCE,
VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
final byte[] serializedValues = mapState.getSerializedValue(
serializedKey,
mapState.getKeySerializer(),
mapState.getNamespaceSerializer(),
mapState.getValueSerializer());
Map<Long, String> actualValues = KvStateSerializer.deserializeMap(serializedValues, userKeySerializer, userValueSerializer);
assertEquals(expectedValues.size(), actualValues.size());
for (Map.Entry<Long, String> actualEntry : actualValues.entrySet()) {
assertEquals(expectedValues.get(actualEntry.getKey()), actualEntry.getValue());
}
// Single value
ByteArrayOutputStream baos = new ByteArrayOutputStream();
long expectedKey = ThreadLocalRandom.current().nextLong();
String expectedValue = Long.toString(expectedKey);
byte[] isNull = {0};
baos.write(KvStateSerializer.serializeValue(expectedKey, userKeySerializer));
baos.write(isNull);
baos.write(KvStateSerializer.serializeValue(expectedValue, userValueSerializer));
byte[] serializedValue = baos.toByteArray();
Map<Long, String> actualValue = KvStateSerializer.deserializeMap(serializedValue, userKeySerializer, userValueSerializer);
assertEquals(1, actualValue.size());
assertEquals(expectedValue, actualValue.get(expectedKey));
}