Java源码示例:org.apache.flink.runtime.state.internal.InternalMapState

示例1
@Override
public <N, UK, UV> StateMapView<N, UK, UV> getStateMapView(String stateName, MapViewTypeInfo<UK, UV> mapViewTypeInfo) throws Exception {
	MapStateDescriptor<UK, UV> mapStateDescriptor = new MapStateDescriptor<>(
		stateName,
		mapViewTypeInfo.getKeyType(),
		mapViewTypeInfo.getValueType());

	MapState<UK, UV> mapState = keyedStateBackend.getOrCreateKeyedState(windowSerializer, mapStateDescriptor);
	// explict cast to internal state
	InternalMapState<?, N, UK, UV> internalMapState = (InternalMapState<?, N, UK, UV>) mapState;

	if (mapViewTypeInfo.isNullAware()) {
		ValueStateDescriptor<UV> nullStateDescriptor = new ValueStateDescriptor<>(
			stateName + NULL_STATE_POSTFIX,
			mapViewTypeInfo.getValueType());
		ValueState<UV> nullState = keyedStateBackend.getOrCreateKeyedState(windowSerializer, nullStateDescriptor);
		// explict cast to internal state
		InternalValueState<?, N, UV> internalNullState = (InternalValueState<?, N, UV>) nullState;
		return new StateMapView.NamespacedStateMapViewWithKeysNullable<>(internalMapState, internalNullState);
	} else {
		return new StateMapView.NamespacedStateMapViewWithKeysNotNull<>(internalMapState);
	}
}
 
示例2
@Override
public <N, UK, UV> StateMapView<N, UK, UV> getStateMapView(String stateName, MapViewTypeInfo<UK, UV> mapViewTypeInfo) throws Exception {
	MapStateDescriptor<UK, UV> mapStateDescriptor = new MapStateDescriptor<>(
		stateName,
		mapViewTypeInfo.getKeyType(),
		mapViewTypeInfo.getValueType());

	MapState<UK, UV> mapState = keyedStateBackend.getOrCreateKeyedState(windowSerializer, mapStateDescriptor);
	// explict cast to internal state
	InternalMapState<?, N, UK, UV> internalMapState = (InternalMapState<?, N, UK, UV>) mapState;

	if (mapViewTypeInfo.isNullAware()) {
		ValueStateDescriptor<UV> nullStateDescriptor = new ValueStateDescriptor<>(
			stateName + NULL_STATE_POSTFIX,
			mapViewTypeInfo.getValueType());
		ValueState<UV> nullState = keyedStateBackend.getOrCreateKeyedState(windowSerializer, nullStateDescriptor);
		// explict cast to internal state
		InternalValueState<?, N, UV> internalNullState = (InternalValueState<?, N, UV>) nullState;
		return new StateMapView.NamespacedStateMapViewWithKeysNullable<>(internalMapState, internalNullState);
	} else {
		return new StateMapView.NamespacedStateMapViewWithKeysNotNull<>(internalMapState);
	}
}
 
示例3
/**
 * Tests map serialization and deserialization match.
 *
 * @see KvStateRequestSerializerTest#testMapSerialization()
 * KvStateRequestSerializerTest#testMapSerialization() using the heap state back-end
 * test
 */
@Test
public void testMapSerialization() throws Exception {
	final long key = 0L;

	// objects for RocksDB state list serialisation
	final RocksDBKeyedStateBackend<Long> longHeapKeyedStateBackend = RocksDBTestUtils
		.builderForTestDefaults(temporaryFolder.getRoot(), LongSerializer.INSTANCE)
		.build();

	longHeapKeyedStateBackend.setCurrentKey(key);

	final InternalMapState<Long, VoidNamespace, Long, String> mapState =
			(InternalMapState<Long, VoidNamespace, Long, String>)
					longHeapKeyedStateBackend.getPartitionedState(
							VoidNamespace.INSTANCE,
							VoidNamespaceSerializer.INSTANCE,
							new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE));

	KvStateRequestSerializerTest.testMapSerialization(key, mapState);
	longHeapKeyedStateBackend.dispose();
}
 
示例4
/**
 * Tests map serialization utils.
 */
@Test
public void testMapSerialization() throws Exception {
	final long key = 0L;
	final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend = getLongHeapKeyedStateBackend(key);

	final InternalMapState<Long, VoidNamespace, Long, String> mapState =
			(InternalMapState<Long, VoidNamespace, Long, String>)
					longHeapKeyedStateBackend.getPartitionedState(
							VoidNamespace.INSTANCE,
							VoidNamespaceSerializer.INSTANCE,
							new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE));

	testMapSerialization(key, mapState);
}
 
示例5
/**
 * Tests map serialization utils.
 */
@Test
public void testMapSerialization() throws Exception {
	final long key = 0L;
	final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend = getLongHeapKeyedStateBackend(key);

	final InternalMapState<Long, VoidNamespace, Long, String> mapState =
			(InternalMapState<Long, VoidNamespace, Long, String>)
					longHeapKeyedStateBackend.getPartitionedState(
							VoidNamespace.INSTANCE,
							VoidNamespaceSerializer.INSTANCE,
							new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE));

	testMapSerialization(key, mapState);
}
 
示例6
/**
 * Tests map serialization utils.
 */
@Test
public void testMapSerialization() throws Exception {
	final long key = 0L;
	final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend = getLongHeapKeyedStateBackend(key);

	final InternalMapState<Long, VoidNamespace, Long, String> mapState =
			(InternalMapState<Long, VoidNamespace, Long, String>)
					longHeapKeyedStateBackend.getPartitionedState(
							VoidNamespace.INSTANCE,
							VoidNamespaceSerializer.INSTANCE,
							new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE));

	testMapSerialization(key, mapState);
}
 
示例7
TtlMapState(TtlStateContext<InternalMapState<K, N, UK, TtlValue<UV>>, Map<UK, UV>> ttlStateContext) {
	super(ttlStateContext);
}
 
示例8
@Test
public void testMapStateMigrationAfterHashMapSerRemoval() throws Exception {
	ClassLoader cl = getClass().getClassLoader();
	URL resource = cl.getResource("heap_keyed_statebackend_1_5_map.snapshot");

	Preconditions.checkNotNull(resource, "Binary snapshot resource not found!");

	final SnapshotResult<KeyedStateHandle> stateHandles;
	try (BufferedInputStream bis = new BufferedInputStream((new FileInputStream(resource.getFile())))) {
		stateHandles = InstantiationUtil.deserializeObject(bis, Thread.currentThread().getContextClassLoader());
	}
	final KeyedStateHandle stateHandle = stateHandles.getJobManagerOwnedSnapshot();
	try (final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend(StateObjectCollection.singleton(stateHandle))) {
		final Integer namespace1 = 1;
		final Integer namespace2 = 2;
		final Integer namespace3 = 3;

		final MapStateDescriptor<Long, Long> stateDescr = new MapStateDescriptor<>("my-map-state", Long.class, Long.class);
		stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());

		InternalMapState<String, Integer, Long, Long> state = keyedBackend.createInternalState(IntSerializer.INSTANCE, stateDescr);

		keyedBackend.setCurrentKey("abc");
		state.setCurrentNamespace(namespace1);
		assertEquals(33L, (long) state.get(33L));
		assertEquals(55L, (long) state.get(55L));
		assertEquals(2, getStateSize(state));

		state.setCurrentNamespace(namespace2);
		assertEquals(22L, (long) state.get(22L));
		assertEquals(11L, (long) state.get(11L));
		assertEquals(2, getStateSize(state));

		state.setCurrentNamespace(namespace3);
		assertEquals(44L, (long) state.get(44L));
		assertEquals(1, getStateSize(state));

		keyedBackend.setCurrentKey("def");
		state.setCurrentNamespace(namespace1);
		assertEquals(11L, (long) state.get(11L));
		assertEquals(44L, (long) state.get(44L));
		assertEquals(2, getStateSize(state));

		state.setCurrentNamespace(namespace3);
		assertEquals(22L, (long) state.get(22L));
		assertEquals(55L, (long) state.get(55L));
		assertEquals(33L, (long) state.get(33L));
		assertEquals(3, getStateSize(state));

		keyedBackend.setCurrentKey("jkl");
		state.setCurrentNamespace(namespace1);
		assertEquals(11L, (long) state.get(11L));
		assertEquals(22L, (long) state.get(22L));
		assertEquals(33L, (long) state.get(33L));
		assertEquals(44L, (long) state.get(44L));
		assertEquals(55L, (long) state.get(55L));
		assertEquals(5, getStateSize(state));

		keyedBackend.setCurrentKey("mno");
		state.setCurrentNamespace(namespace3);
		assertEquals(11L, (long) state.get(11L));
		assertEquals(22L, (long) state.get(22L));
		assertEquals(33L, (long) state.get(33L));
		assertEquals(44L, (long) state.get(44L));
		assertEquals(55L, (long) state.get(55L));
		assertEquals(5, getStateSize(state));

		RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot = keyedBackend.snapshot(
				1L,
				1L,
				new MemCheckpointStreamFactory(4 * 1024 * 1024),
				CheckpointOptions.forCheckpointWithDefaultLocation());

		snapshot.run();
	}
}
 
示例9
private <K, N, UK, UV> int getStateSize(InternalMapState<K, N, UK, UV> mapState) throws Exception {
	int i = 0;
	for (Iterator<Map.Entry<UK, UV>> itt = mapState.iterator(); itt.hasNext(); i++, itt.next()) {}
	return i;
}
 
示例10
/**
 * Verifies that the serialization of a map using the given map state
 * matches the deserialization with {@link KvStateSerializer#deserializeList}.
 *
 * @param key
 * 		key of the map state
 * @param mapState
 * 		map state using the {@link VoidNamespace}, must also be a {@link InternalKvState} instance
 *
 * @throws Exception
 */
public static void testMapSerialization(
		final long key,
		final InternalMapState<Long, VoidNamespace, Long, String> mapState) throws Exception {

	TypeSerializer<Long> userKeySerializer = LongSerializer.INSTANCE;
	TypeSerializer<String> userValueSerializer = StringSerializer.INSTANCE;
	mapState.setCurrentNamespace(VoidNamespace.INSTANCE);

	// Map
	final int numElements = 10;

	final Map<Long, String> expectedValues = new HashMap<>();
	for (int i = 1; i <= numElements; i++) {
		final long value = ThreadLocalRandom.current().nextLong();
		expectedValues.put(value, Long.toString(value));
		mapState.put(value, Long.toString(value));
	}

	expectedValues.put(0L, null);
	mapState.put(0L, null);

	final byte[] serializedKey =
		KvStateSerializer.serializeKeyAndNamespace(
			key, LongSerializer.INSTANCE,
			VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);

	final byte[] serializedValues = mapState.getSerializedValue(
			serializedKey,
			mapState.getKeySerializer(),
			mapState.getNamespaceSerializer(),
			mapState.getValueSerializer());

	Map<Long, String> actualValues = KvStateSerializer.deserializeMap(serializedValues, userKeySerializer, userValueSerializer);
	assertEquals(expectedValues.size(), actualValues.size());
	for (Map.Entry<Long, String> actualEntry : actualValues.entrySet()) {
		assertEquals(expectedValues.get(actualEntry.getKey()), actualEntry.getValue());
	}

	// Single value
	ByteArrayOutputStream baos = new ByteArrayOutputStream();
	long expectedKey = ThreadLocalRandom.current().nextLong();
	String expectedValue = Long.toString(expectedKey);
	byte[] isNull = {0};

	baos.write(KvStateSerializer.serializeValue(expectedKey, userKeySerializer));
	baos.write(isNull);
	baos.write(KvStateSerializer.serializeValue(expectedValue, userValueSerializer));
	byte[] serializedValue = baos.toByteArray();

	Map<Long, String> actualValue = KvStateSerializer.deserializeMap(serializedValue, userKeySerializer, userValueSerializer);
	assertEquals(1, actualValue.size());
	assertEquals(expectedValue, actualValue.get(expectedKey));
}
 
示例11
/**
 * Tests map serialization and deserialization match.
 *
 * @see KvStateRequestSerializerTest#testMapSerialization()
 * KvStateRequestSerializerTest#testMapSerialization() using the heap state back-end
 * test
 */
@Test
public void testMapSerialization() throws Exception {
	final long key = 0L;

	// objects for RocksDB state list serialisation
	DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions();
	dbOptions.setCreateIfMissing(true);
	ExecutionConfig executionConfig = new ExecutionConfig();
	final RocksDBKeyedStateBackend<Long> longHeapKeyedStateBackend =
		new RocksDBKeyedStateBackendBuilder<>(
			"no-op",
			ClassLoader.getSystemClassLoader(),
			temporaryFolder.getRoot(),
			dbOptions,
			stateName -> PredefinedOptions.DEFAULT.createColumnOptions(),
			mock(TaskKvStateRegistry.class),
			LongSerializer.INSTANCE,
			1,
			new KeyGroupRange(0, 0),
			executionConfig,
			TestLocalRecoveryConfig.disabled(),
			RocksDBStateBackend.PriorityQueueStateType.HEAP,
			TtlTimeProvider.DEFAULT,
			new UnregisteredMetricsGroup(),
			Collections.emptyList(),
			AbstractStateBackend.getCompressionDecorator(executionConfig),
			new CloseableRegistry()
		).build();
	longHeapKeyedStateBackend.setCurrentKey(key);

	final InternalMapState<Long, VoidNamespace, Long, String> mapState =
			(InternalMapState<Long, VoidNamespace, Long, String>)
					longHeapKeyedStateBackend.getPartitionedState(
							VoidNamespace.INSTANCE,
							VoidNamespaceSerializer.INSTANCE,
							new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE));

	KvStateRequestSerializerTest.testMapSerialization(key, mapState);
	longHeapKeyedStateBackend.dispose();
}
 
示例12
public NamespacedStateMapViewWithKeysNotNull(InternalMapState<?, N, MK, MV> internalMapState) {
	this.internalMapState = internalMapState;
}
 
示例13
public NamespacedStateMapViewWithKeysNullable(InternalMapState<?, N, MK, MV> internalMapState, InternalValueState<?, N, MV> internalNullState) {
	this.internalMapState = internalMapState;
	this.internalNullState = internalNullState;
}
 
示例14
TtlMapState(TtlStateContext<InternalMapState<K, N, UK, TtlValue<UV>>, Map<UK, UV>> ttlStateContext) {
	super(ttlStateContext);
}
 
示例15
@Test
public void testMapStateMigrationAfterHashMapSerRemoval() throws Exception {
	ClassLoader cl = getClass().getClassLoader();
	URL resource = cl.getResource("heap_keyed_statebackend_1_5_map.snapshot");

	Preconditions.checkNotNull(resource, "Binary snapshot resource not found!");

	final SnapshotResult<KeyedStateHandle> stateHandles;
	try (BufferedInputStream bis = new BufferedInputStream((new FileInputStream(resource.getFile())))) {
		stateHandles = InstantiationUtil.deserializeObject(bis, Thread.currentThread().getContextClassLoader());
	}
	final KeyedStateHandle stateHandle = stateHandles.getJobManagerOwnedSnapshot();
	try (final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend(StateObjectCollection.singleton(stateHandle))) {
		final Integer namespace1 = 1;
		final Integer namespace2 = 2;
		final Integer namespace3 = 3;

		final MapStateDescriptor<Long, Long> stateDescr = new MapStateDescriptor<>("my-map-state", Long.class, Long.class);
		stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());

		InternalMapState<String, Integer, Long, Long> state = keyedBackend.createInternalState(IntSerializer.INSTANCE, stateDescr);

		keyedBackend.setCurrentKey("abc");
		state.setCurrentNamespace(namespace1);
		assertEquals(33L, (long) state.get(33L));
		assertEquals(55L, (long) state.get(55L));
		assertEquals(2, getStateSize(state));

		state.setCurrentNamespace(namespace2);
		assertEquals(22L, (long) state.get(22L));
		assertEquals(11L, (long) state.get(11L));
		assertEquals(2, getStateSize(state));

		state.setCurrentNamespace(namespace3);
		assertEquals(44L, (long) state.get(44L));
		assertEquals(1, getStateSize(state));

		keyedBackend.setCurrentKey("def");
		state.setCurrentNamespace(namespace1);
		assertEquals(11L, (long) state.get(11L));
		assertEquals(44L, (long) state.get(44L));
		assertEquals(2, getStateSize(state));

		state.setCurrentNamespace(namespace3);
		assertEquals(22L, (long) state.get(22L));
		assertEquals(55L, (long) state.get(55L));
		assertEquals(33L, (long) state.get(33L));
		assertEquals(3, getStateSize(state));

		keyedBackend.setCurrentKey("jkl");
		state.setCurrentNamespace(namespace1);
		assertEquals(11L, (long) state.get(11L));
		assertEquals(22L, (long) state.get(22L));
		assertEquals(33L, (long) state.get(33L));
		assertEquals(44L, (long) state.get(44L));
		assertEquals(55L, (long) state.get(55L));
		assertEquals(5, getStateSize(state));

		keyedBackend.setCurrentKey("mno");
		state.setCurrentNamespace(namespace3);
		assertEquals(11L, (long) state.get(11L));
		assertEquals(22L, (long) state.get(22L));
		assertEquals(33L, (long) state.get(33L));
		assertEquals(44L, (long) state.get(44L));
		assertEquals(55L, (long) state.get(55L));
		assertEquals(5, getStateSize(state));

		RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot = keyedBackend.snapshot(
				1L,
				1L,
				new MemCheckpointStreamFactory(4 * 1024 * 1024),
				CheckpointOptions.forCheckpointWithDefaultLocation());

		snapshot.run();
	}
}
 
示例16
private <K, N, UK, UV> int getStateSize(InternalMapState<K, N, UK, UV> mapState) throws Exception {
	int i = 0;
	for (Iterator<Map.Entry<UK, UV>> itt = mapState.iterator(); itt.hasNext(); i++, itt.next()) {}
	return i;
}
 
示例17
/**
 * Verifies that the serialization of a map using the given map state
 * matches the deserialization with {@link KvStateSerializer#deserializeList}.
 *
 * @param key
 * 		key of the map state
 * @param mapState
 * 		map state using the {@link VoidNamespace}, must also be a {@link InternalKvState} instance
 *
 * @throws Exception
 */
public static void testMapSerialization(
		final long key,
		final InternalMapState<Long, VoidNamespace, Long, String> mapState) throws Exception {

	TypeSerializer<Long> userKeySerializer = LongSerializer.INSTANCE;
	TypeSerializer<String> userValueSerializer = StringSerializer.INSTANCE;
	mapState.setCurrentNamespace(VoidNamespace.INSTANCE);

	// Map
	final int numElements = 10;

	final Map<Long, String> expectedValues = new HashMap<>();
	for (int i = 1; i <= numElements; i++) {
		final long value = ThreadLocalRandom.current().nextLong();
		expectedValues.put(value, Long.toString(value));
		mapState.put(value, Long.toString(value));
	}

	expectedValues.put(0L, null);
	mapState.put(0L, null);

	final byte[] serializedKey =
		KvStateSerializer.serializeKeyAndNamespace(
			key, LongSerializer.INSTANCE,
			VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);

	final byte[] serializedValues = mapState.getSerializedValue(
			serializedKey,
			mapState.getKeySerializer(),
			mapState.getNamespaceSerializer(),
			mapState.getValueSerializer());

	Map<Long, String> actualValues = KvStateSerializer.deserializeMap(serializedValues, userKeySerializer, userValueSerializer);
	assertEquals(expectedValues.size(), actualValues.size());
	for (Map.Entry<Long, String> actualEntry : actualValues.entrySet()) {
		assertEquals(expectedValues.get(actualEntry.getKey()), actualEntry.getValue());
	}

	// Single value
	ByteArrayOutputStream baos = new ByteArrayOutputStream();
	long expectedKey = ThreadLocalRandom.current().nextLong();
	String expectedValue = Long.toString(expectedKey);
	byte[] isNull = {0};

	baos.write(KvStateSerializer.serializeValue(expectedKey, userKeySerializer));
	baos.write(isNull);
	baos.write(KvStateSerializer.serializeValue(expectedValue, userValueSerializer));
	byte[] serializedValue = baos.toByteArray();

	Map<Long, String> actualValue = KvStateSerializer.deserializeMap(serializedValue, userKeySerializer, userValueSerializer);
	assertEquals(1, actualValue.size());
	assertEquals(expectedValue, actualValue.get(expectedKey));
}
 
示例18
/**
 * Tests map serialization and deserialization match.
 *
 * @see KvStateRequestSerializerTest#testMapSerialization()
 * KvStateRequestSerializerTest#testMapSerialization() using the heap state back-end
 * test
 */
@Test
public void testMapSerialization() throws Exception {
	final long key = 0L;

	// objects for RocksDB state list serialisation
	DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions();
	dbOptions.setCreateIfMissing(true);
	ExecutionConfig executionConfig = new ExecutionConfig();
	final RocksDBKeyedStateBackend<Long> longHeapKeyedStateBackend =
		new RocksDBKeyedStateBackendBuilder<>(
			"no-op",
			ClassLoader.getSystemClassLoader(),
			temporaryFolder.getRoot(),
			dbOptions,
			stateName -> PredefinedOptions.DEFAULT.createColumnOptions(),
			mock(TaskKvStateRegistry.class),
			LongSerializer.INSTANCE,
			1,
			new KeyGroupRange(0, 0),
			executionConfig,
			TestLocalRecoveryConfig.disabled(),
			RocksDBStateBackend.PriorityQueueStateType.HEAP,
			TtlTimeProvider.DEFAULT,
			new UnregisteredMetricsGroup(),
			Collections.emptyList(),
			AbstractStateBackend.getCompressionDecorator(executionConfig),
			new CloseableRegistry()
		).build();
	longHeapKeyedStateBackend.setCurrentKey(key);

	final InternalMapState<Long, VoidNamespace, Long, String> mapState =
			(InternalMapState<Long, VoidNamespace, Long, String>)
					longHeapKeyedStateBackend.getPartitionedState(
							VoidNamespace.INSTANCE,
							VoidNamespaceSerializer.INSTANCE,
							new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE));

	KvStateRequestSerializerTest.testMapSerialization(key, mapState);
	longHeapKeyedStateBackend.dispose();
}
 
示例19
public NamespacedStateMapViewWithKeysNotNull(InternalMapState<?, N, MK, MV> internalMapState) {
	this.internalMapState = internalMapState;
}
 
示例20
public NamespacedStateMapViewWithKeysNullable(InternalMapState<?, N, MK, MV> internalMapState, InternalValueState<?, N, MV> internalNullState) {
	this.internalMapState = internalMapState;
	this.internalNullState = internalNullState;
}
 
示例21
TtlMapState(TtlStateContext<InternalMapState<K, N, UK, TtlValue<UV>>, Map<UK, UV>> ttlStateContext) {
	super(ttlStateContext);
}
 
示例22
@Test
public void testMapStateMigrationAfterHashMapSerRemoval() throws Exception {
	ClassLoader cl = getClass().getClassLoader();
	URL resource = cl.getResource("heap_keyed_statebackend_1_5_map.snapshot");

	Preconditions.checkNotNull(resource, "Binary snapshot resource not found!");

	final SnapshotResult<KeyedStateHandle> stateHandles;
	try (BufferedInputStream bis = new BufferedInputStream((new FileInputStream(resource.getFile())))) {
		stateHandles = InstantiationUtil.deserializeObject(bis, Thread.currentThread().getContextClassLoader());
	}
	final KeyedStateHandle stateHandle = stateHandles.getJobManagerOwnedSnapshot();
	try (final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend(StateObjectCollection.singleton(stateHandle))) {
		final Integer namespace1 = 1;
		final Integer namespace2 = 2;
		final Integer namespace3 = 3;

		final MapStateDescriptor<Long, Long> stateDescr = new MapStateDescriptor<>("my-map-state", Long.class, Long.class);
		stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());

		InternalMapState<String, Integer, Long, Long> state = keyedBackend.createInternalState(IntSerializer.INSTANCE, stateDescr);

		keyedBackend.setCurrentKey("abc");
		state.setCurrentNamespace(namespace1);
		assertEquals(33L, (long) state.get(33L));
		assertEquals(55L, (long) state.get(55L));
		assertEquals(2, getStateSize(state));

		state.setCurrentNamespace(namespace2);
		assertEquals(22L, (long) state.get(22L));
		assertEquals(11L, (long) state.get(11L));
		assertEquals(2, getStateSize(state));

		state.setCurrentNamespace(namespace3);
		assertEquals(44L, (long) state.get(44L));
		assertEquals(1, getStateSize(state));

		keyedBackend.setCurrentKey("def");
		state.setCurrentNamespace(namespace1);
		assertEquals(11L, (long) state.get(11L));
		assertEquals(44L, (long) state.get(44L));
		assertEquals(2, getStateSize(state));

		state.setCurrentNamespace(namespace3);
		assertEquals(22L, (long) state.get(22L));
		assertEquals(55L, (long) state.get(55L));
		assertEquals(33L, (long) state.get(33L));
		assertEquals(3, getStateSize(state));

		keyedBackend.setCurrentKey("jkl");
		state.setCurrentNamespace(namespace1);
		assertEquals(11L, (long) state.get(11L));
		assertEquals(22L, (long) state.get(22L));
		assertEquals(33L, (long) state.get(33L));
		assertEquals(44L, (long) state.get(44L));
		assertEquals(55L, (long) state.get(55L));
		assertEquals(5, getStateSize(state));

		keyedBackend.setCurrentKey("mno");
		state.setCurrentNamespace(namespace3);
		assertEquals(11L, (long) state.get(11L));
		assertEquals(22L, (long) state.get(22L));
		assertEquals(33L, (long) state.get(33L));
		assertEquals(44L, (long) state.get(44L));
		assertEquals(55L, (long) state.get(55L));
		assertEquals(5, getStateSize(state));

		RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot = keyedBackend.snapshot(
				1L,
				1L,
				new MemCheckpointStreamFactory(4 * 1024 * 1024),
				CheckpointOptions.forCheckpointWithDefaultLocation());

		snapshot.run();
	}
}
 
示例23
private <K, N, UK, UV> int getStateSize(InternalMapState<K, N, UK, UV> mapState) throws Exception {
	int i = 0;
	for (Iterator<Map.Entry<UK, UV>> itt = mapState.iterator(); itt.hasNext(); i++, itt.next()) {}
	return i;
}
 
示例24
/**
 * Verifies that the serialization of a map using the given map state
 * matches the deserialization with {@link KvStateSerializer#deserializeList}.
 *
 * @param key
 * 		key of the map state
 * @param mapState
 * 		map state using the {@link VoidNamespace}, must also be a {@link InternalKvState} instance
 *
 * @throws Exception
 */
public static void testMapSerialization(
		final long key,
		final InternalMapState<Long, VoidNamespace, Long, String> mapState) throws Exception {

	TypeSerializer<Long> userKeySerializer = LongSerializer.INSTANCE;
	TypeSerializer<String> userValueSerializer = StringSerializer.INSTANCE;
	mapState.setCurrentNamespace(VoidNamespace.INSTANCE);

	// Map
	final int numElements = 10;

	final Map<Long, String> expectedValues = new HashMap<>();
	for (int i = 1; i <= numElements; i++) {
		final long value = ThreadLocalRandom.current().nextLong();
		expectedValues.put(value, Long.toString(value));
		mapState.put(value, Long.toString(value));
	}

	expectedValues.put(0L, null);
	mapState.put(0L, null);

	final byte[] serializedKey =
		KvStateSerializer.serializeKeyAndNamespace(
			key, LongSerializer.INSTANCE,
			VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);

	final byte[] serializedValues = mapState.getSerializedValue(
			serializedKey,
			mapState.getKeySerializer(),
			mapState.getNamespaceSerializer(),
			mapState.getValueSerializer());

	Map<Long, String> actualValues = KvStateSerializer.deserializeMap(serializedValues, userKeySerializer, userValueSerializer);
	assertEquals(expectedValues.size(), actualValues.size());
	for (Map.Entry<Long, String> actualEntry : actualValues.entrySet()) {
		assertEquals(expectedValues.get(actualEntry.getKey()), actualEntry.getValue());
	}

	// Single value
	ByteArrayOutputStream baos = new ByteArrayOutputStream();
	long expectedKey = ThreadLocalRandom.current().nextLong();
	String expectedValue = Long.toString(expectedKey);
	byte[] isNull = {0};

	baos.write(KvStateSerializer.serializeValue(expectedKey, userKeySerializer));
	baos.write(isNull);
	baos.write(KvStateSerializer.serializeValue(expectedValue, userValueSerializer));
	byte[] serializedValue = baos.toByteArray();

	Map<Long, String> actualValue = KvStateSerializer.deserializeMap(serializedValue, userKeySerializer, userValueSerializer);
	assertEquals(1, actualValue.size());
	assertEquals(expectedValue, actualValue.get(expectedKey));
}