public PojoSerializer<T> createPojoSerializer(ExecutionConfig config) {
TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[fields.length];
Field[] reflectiveFields = new Field[fields.length];
for (int i = 0; i < fields.length; i++) {
fieldSerializers[i] = fields[i].getTypeInformation().createSerializer(config);
reflectiveFields[i] = fields[i].getField();
}
return new PojoSerializer<T>(getTypeClass(), fieldSerializers, reflectiveFields, config);
}
public PojoSerializer<T> createPojoSerializer(ExecutionConfig config) {
TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[fields.length];
Field[] reflectiveFields = new Field[fields.length];
for (int i = 0; i < fields.length; i++) {
fieldSerializers[i] = fields[i].getTypeInformation().createSerializer(config);
reflectiveFields[i] = fields[i].getField();
}
return new PojoSerializer<T>(getTypeClass(), fieldSerializers, reflectiveFields, config);
}
public PojoSerializer<T> createPojoSerializer(ExecutionConfig config) {
TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[fields.length];
Field[] reflectiveFields = new Field[fields.length];
for (int i = 0; i < fields.length; i++) {
fieldSerializers[i] = fields[i].getTypeInformation().createSerializer(config);
reflectiveFields[i] = fields[i].getField();
}
return new PojoSerializer<T>(getTypeClass(), fieldSerializers, reflectiveFields, config);
}
@Test
public void testStreamShardMetadataSerializedUsingPojoSerializer() {
TypeInformation<StreamShardMetadata> typeInformation = TypeInformation.of(StreamShardMetadata.class);
assertTrue(typeInformation.createSerializer(new ExecutionConfig()) instanceof PojoSerializer);
}
@Test
public void testStreamShardMetadataSerializedUsingPojoSerializer() {
TypeInformation<StreamShardMetadata> typeInformation = TypeInformation.of(StreamShardMetadata.class);
assertTrue(typeInformation.createSerializer(new ExecutionConfig()) instanceof PojoSerializer);
}
@Test
public void testStreamShardMetadataSerializedUsingPojoSerializer() {
TypeInformation<StreamShardMetadata> typeInformation = TypeInformation.of(StreamShardMetadata.class);
assertTrue(typeInformation.createSerializer(new ExecutionConfig()) instanceof PojoSerializer);
}
@Test
public void testPojoRestoreResilienceWithDifferentRegistrationOrder() throws Exception {
CheckpointStreamFactory streamFactory = createStreamFactory();
SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
// register A first then B
env.getExecutionConfig().registerPojoType(TestNestedPojoClassA.class);
env.getExecutionConfig().registerPojoType(TestNestedPojoClassB.class);
AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
try {
TypeInformation<TestPojo> pojoType = TypeExtractor.getForClass(TestPojo.class);
// make sure that we are in fact using the PojoSerializer
assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof PojoSerializer);
ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType);
ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
// ============== create snapshot of current configuration ==============
// make some more modifications
backend.setCurrentKey(1);
state.update(new TestPojo("u1", 1, new TestNestedPojoClassA(1.0, 2), new TestNestedPojoClassB(2.3, "foo")));
backend.setCurrentKey(2);
state.update(new TestPojo("u2", 2, new TestNestedPojoClassA(2.0, 5), new TestNestedPojoClassB(3.1, "bar")));
KeyedStateHandle snapshot = runSnapshot(
backend.snapshot(
682375462378L,
2,
streamFactory,
CheckpointOptions.forCheckpointWithDefaultLocation()),
sharedStateRegistry);
backend.dispose();
// ========== restore snapshot, with a different registration order in the configuration ==========
env.close();
env = buildMockEnv();
env.getExecutionConfig().registerPojoType(TestNestedPojoClassB.class); // this time register B first
env.getExecutionConfig().registerPojoType(TestNestedPojoClassA.class);
backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env);
// re-initialize to ensure that we create the PojoSerializer from scratch, otherwise
// initializeSerializerUnlessSet would not pick up our new config
kvId = new ValueStateDescriptor<>("id", pojoType);
state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
backend.setCurrentKey(1);
// update to test state backends that eagerly serialize, such as RocksDB
state.update(new TestPojo("u1", 11, new TestNestedPojoClassA(22.1, 12), new TestNestedPojoClassB(1.23, "foobar")));
// this tests backends that lazily serialize, such as memory state backend
runSnapshot(
backend.snapshot(
682375462378L,
2,
streamFactory,
CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
snapshot.discardState();
} finally {
backend.dispose();
}
}