Java源码示例:org.apache.flink.api.java.typeutils.runtime.PojoSerializer

示例1
public PojoSerializer<T> createPojoSerializer(ExecutionConfig config) {
	TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[fields.length];
	Field[] reflectiveFields = new Field[fields.length];

	for (int i = 0; i < fields.length; i++) {
		fieldSerializers[i] = fields[i].getTypeInformation().createSerializer(config);
		reflectiveFields[i] = fields[i].getField();
	}

	return new PojoSerializer<T>(getTypeClass(), fieldSerializers, reflectiveFields, config);
}
 
示例2
public PojoSerializer<T> createPojoSerializer(ExecutionConfig config) {
	TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[fields.length];
	Field[] reflectiveFields = new Field[fields.length];

	for (int i = 0; i < fields.length; i++) {
		fieldSerializers[i] = fields[i].getTypeInformation().createSerializer(config);
		reflectiveFields[i] = fields[i].getField();
	}

	return new PojoSerializer<T>(getTypeClass(), fieldSerializers, reflectiveFields, config);
}
 
示例3
public PojoSerializer<T> createPojoSerializer(ExecutionConfig config) {
	TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[fields.length];
	Field[] reflectiveFields = new Field[fields.length];

	for (int i = 0; i < fields.length; i++) {
		fieldSerializers[i] = fields[i].getTypeInformation().createSerializer(config);
		reflectiveFields[i] = fields[i].getField();
	}

	return new PojoSerializer<T>(getTypeClass(), fieldSerializers, reflectiveFields, config);
}
 
示例4
@Test
public void testStreamShardMetadataSerializedUsingPojoSerializer() {
	TypeInformation<StreamShardMetadata> typeInformation = TypeInformation.of(StreamShardMetadata.class);
	assertTrue(typeInformation.createSerializer(new ExecutionConfig()) instanceof PojoSerializer);
}
 
示例5
@Test
public void testStreamShardMetadataSerializedUsingPojoSerializer() {
	TypeInformation<StreamShardMetadata> typeInformation = TypeInformation.of(StreamShardMetadata.class);
	assertTrue(typeInformation.createSerializer(new ExecutionConfig()) instanceof PojoSerializer);
}
 
示例6
@Test
public void testStreamShardMetadataSerializedUsingPojoSerializer() {
	TypeInformation<StreamShardMetadata> typeInformation = TypeInformation.of(StreamShardMetadata.class);
	assertTrue(typeInformation.createSerializer(new ExecutionConfig()) instanceof PojoSerializer);
}
 
示例7
@Test
public void testPojoRestoreResilienceWithDifferentRegistrationOrder() throws Exception {
	CheckpointStreamFactory streamFactory = createStreamFactory();
	SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();

	// register A first then B
	env.getExecutionConfig().registerPojoType(TestNestedPojoClassA.class);
	env.getExecutionConfig().registerPojoType(TestNestedPojoClassB.class);

	AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);

	try {

		TypeInformation<TestPojo> pojoType = TypeExtractor.getForClass(TestPojo.class);

		// make sure that we are in fact using the PojoSerializer
		assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof PojoSerializer);

		ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType);
		ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);

		// ============== create snapshot of current configuration ==============

		// make some more modifications
		backend.setCurrentKey(1);
		state.update(new TestPojo("u1", 1, new TestNestedPojoClassA(1.0, 2), new TestNestedPojoClassB(2.3, "foo")));

		backend.setCurrentKey(2);
		state.update(new TestPojo("u2", 2, new TestNestedPojoClassA(2.0, 5), new TestNestedPojoClassB(3.1, "bar")));

		KeyedStateHandle snapshot = runSnapshot(
			backend.snapshot(
				682375462378L,
				2,
				streamFactory,
				CheckpointOptions.forCheckpointWithDefaultLocation()),
			sharedStateRegistry);

		backend.dispose();

		// ========== restore snapshot, with a different registration order in the configuration ==========

		env.close();
		env = buildMockEnv();

		env.getExecutionConfig().registerPojoType(TestNestedPojoClassB.class); // this time register B first
		env.getExecutionConfig().registerPojoType(TestNestedPojoClassA.class);

		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env);

		// re-initialize to ensure that we create the PojoSerializer from scratch, otherwise
		// initializeSerializerUnlessSet would not pick up our new config
		kvId = new ValueStateDescriptor<>("id", pojoType);
		state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);

		backend.setCurrentKey(1);

		// update to test state backends that eagerly serialize, such as RocksDB
		state.update(new TestPojo("u1", 11, new TestNestedPojoClassA(22.1, 12), new TestNestedPojoClassB(1.23, "foobar")));

		// this tests backends that lazily serialize, such as memory state backend
		runSnapshot(
			backend.snapshot(
				682375462378L,
				2,
				streamFactory,
				CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);

		snapshot.discardState();
	} finally {
		backend.dispose();
	}
}