Java源码示例:org.apache.flink.core.memory.ByteArrayOutputStreamWithPos
示例1
/**
* Write a list of serializers and their corresponding config snapshots to the provided
* data output view. This method writes in a fault tolerant way, so that when read again
* using {@link #readSerializersAndConfigsWithResilience(DataInputView, ClassLoader)}, if
* deserialization of the serializer fails, its configuration snapshot will remain intact.
*
* <p>Specifically, all written serializers and their config snapshots are indexed by their
* offset positions within the serialized bytes. The serialization format is as follows:
* <ul>
* <li>1. number of serializer and configuration snapshot pairs.</li>
* <li>2. offsets of each serializer and configuration snapshot, in order.</li>
* <li>3. total number of bytes for the serialized serializers and the config snapshots.</li>
* <li>4. serialized serializers and the config snapshots.</li>
* </ul>
*
* @param out the data output view.
* @param serializersAndConfigs serializer and configuration snapshot pairs
*
* @throws IOException
*/
public static void writeSerializersAndConfigsWithResilience(
DataOutputView out,
List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> serializersAndConfigs) throws IOException {
try (
ByteArrayOutputStreamWithPos bufferWithPos = new ByteArrayOutputStreamWithPos();
DataOutputViewStreamWrapper bufferWrapper = new DataOutputViewStreamWrapper(bufferWithPos)) {
out.writeInt(serializersAndConfigs.size());
for (Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> serAndConfSnapshot : serializersAndConfigs) {
out.writeInt(bufferWithPos.getPosition());
writeSerializer(bufferWrapper, serAndConfSnapshot.f0);
out.writeInt(bufferWithPos.getPosition());
TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
bufferWrapper, (TypeSerializerSnapshot) serAndConfSnapshot.f1, serAndConfSnapshot.f0);
}
out.writeInt(bufferWithPos.getPosition());
out.write(bufferWithPos.getBuf(), 0, bufferWithPos.getPosition());
}
}
示例2
@Override
public void write(DataOutputView out) throws IOException {
super.write(out);
if (typeSerializer instanceof UnloadableDummyTypeSerializer) {
UnloadableDummyTypeSerializer<T> dummyTypeSerializer =
(UnloadableDummyTypeSerializer<T>) this.typeSerializer;
byte[] serializerBytes = dummyTypeSerializer.getActualBytes();
out.write(serializerBytes.length);
out.write(serializerBytes);
} else {
// write in a way that allows the stream to recover from exceptions
try (ByteArrayOutputStreamWithPos streamWithPos = new ByteArrayOutputStreamWithPos()) {
InstantiationUtil.serializeObject(streamWithPos, typeSerializer);
out.writeInt(streamWithPos.getPosition());
out.write(streamWithPos.getBuf(), 0, streamWithPos.getPosition());
}
}
}
示例3
/**
* Verifies that reading and writing serializers work correctly.
*/
@Test
public void testSerializerSerialization() throws Exception {
TypeSerializer<?> serializer = IntSerializer.INSTANCE;
byte[] serialized;
try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
TypeSerializerSerializationUtil.writeSerializer(new DataOutputViewStreamWrapper(out), serializer);
serialized = out.toByteArray();
}
TypeSerializer<?> deserializedSerializer;
try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
deserializedSerializer = TypeSerializerSerializationUtil.tryReadSerializer(
new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
}
Assert.assertEquals(serializer, deserializedSerializer);
}
示例4
/**
* Verifies deserialization failure cases when reading a serializer from bytes, in the
* case of a {@link InvalidClassException}.
*/
@Test
public void testSerializerSerializationWithInvalidClass() throws Exception {
TypeSerializer<?> serializer = IntSerializer.INSTANCE;
byte[] serialized;
try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
TypeSerializerSerializationUtil.writeSerializer(new DataOutputViewStreamWrapper(out), serializer);
serialized = out.toByteArray();
}
TypeSerializer<?> deserializedSerializer;
try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
deserializedSerializer = TypeSerializerSerializationUtil.tryReadSerializer(
new DataInputViewStreamWrapper(in),
new ArtificialCNFExceptionThrowingClassLoader(
Thread.currentThread().getContextClassLoader(),
Collections.singleton(IntSerializer.class.getName())),
true);
}
Assert.assertTrue(deserializedSerializer instanceof UnloadableDummyTypeSerializer);
}
示例5
@Test
public void testReadSameVersion() throws Exception {
String payload = "test";
TestWriteable testWriteable = new TestWriteable(1, payload);
byte[] serialized;
try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
testWriteable.write(new DataOutputViewStreamWrapper(out));
serialized = out.toByteArray();
}
testWriteable = new TestWriteable(1);
try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
testWriteable.read(new DataInputViewStreamWrapper(in));
}
Assert.assertEquals(payload, testWriteable.getData());
}
示例6
@Test
public void testReadCompatibleVersion() throws Exception {
String payload = "test";
TestWriteable testWriteable = new TestWriteable(1, payload);
byte[] serialized;
try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
testWriteable.write(new DataOutputViewStreamWrapper(out));
serialized = out.toByteArray();
}
testWriteable = new TestWriteable(2) {
@Override
public int[] getCompatibleVersions() {
return new int[] {1, 2};
}
};
try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
testWriteable.read(new DataInputViewStreamWrapper(in));
}
Assert.assertEquals(payload, testWriteable.getData());
}
示例7
@Test
public void testReadMismatchVersion() throws Exception {
String payload = "test";
TestWriteable testWriteable = new TestWriteable(1, payload);
byte[] serialized;
try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
testWriteable.write(new DataOutputViewStreamWrapper(out));
serialized = out.toByteArray();
}
testWriteable = new TestWriteable(2);
try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
testWriteable.read(new DataInputViewStreamWrapper(in));
Assert.fail("Version mismatch expected.");
} catch (VersionMismatchException ignored) {
}
Assert.assertEquals(null, testWriteable.getData());
}
示例8
@Test
public void testReadVersioned() throws IOException {
String payload = "test-data";
TestPostVersionedReadableWritable versionedReadableWritable = new TestPostVersionedReadableWritable(payload);
byte[] serialized;
try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
versionedReadableWritable.write(new DataOutputViewStreamWrapper(out));
serialized = out.toByteArray();
}
TestPostVersionedReadableWritable restoredVersionedReadableWritable = new TestPostVersionedReadableWritable();
try(ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
restoredVersionedReadableWritable.read(in);
}
Assert.assertEquals(payload, restoredVersionedReadableWritable.getData());
}
示例9
@Test
public void testReadNonVersioned() throws IOException {
int preVersionedPayload = 563;
TestNonVersionedReadableWritable nonVersionedReadableWritable = new TestNonVersionedReadableWritable(preVersionedPayload);
byte[] serialized;
try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
nonVersionedReadableWritable.write(new DataOutputViewStreamWrapper(out));
serialized = out.toByteArray();
}
TestPostVersionedReadableWritable restoredVersionedReadableWritable = new TestPostVersionedReadableWritable();
try(ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
restoredVersionedReadableWritable.read(in);
}
Assert.assertEquals(String.valueOf(preVersionedPayload), restoredVersionedReadableWritable.getData());
}
示例10
@Test
public void testKeyGroupSerializationAndDeserialization() throws Exception {
ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(8);
DataOutputView outputView = new DataOutputViewStreamWrapper(outputStream);
for (int keyGroupPrefixBytes = 1; keyGroupPrefixBytes <= 2; ++keyGroupPrefixBytes) {
for (int orgKeyGroup = 0; orgKeyGroup < 128; ++orgKeyGroup) {
outputStream.reset();
RocksDBKeySerializationUtils.writeKeyGroup(orgKeyGroup, keyGroupPrefixBytes, outputView);
int deserializedKeyGroup = RocksDBKeySerializationUtils.readKeyGroup(
keyGroupPrefixBytes,
new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(outputStream.toByteArray())));
Assert.assertEquals(orgKeyGroup, deserializedKeyGroup);
}
}
}
示例11
@Test
public void testKeyedStateMetaInfoSerialization() throws Exception {
String name = "test";
TypeSerializer<?> namespaceSerializer = LongSerializer.INSTANCE;
TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
StateMetaInfoSnapshot metaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(
StateDescriptor.Type.VALUE, name, namespaceSerializer, stateSerializer).snapshot();
byte[] serialized;
try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
StateMetaInfoSnapshotReadersWriters.getWriter().
writeStateMetaInfoSnapshot(metaInfo, new DataOutputViewStreamWrapper(out));
serialized = out.toByteArray();
}
try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
final StateMetaInfoReader reader = StateMetaInfoSnapshotReadersWriters.getReader(
CURRENT_STATE_META_INFO_SNAPSHOT_VERSION, StateMetaInfoSnapshotReadersWriters.StateTypeHint.KEYED_STATE);
metaInfo = reader.readStateMetaInfoSnapshot(
new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
}
Assert.assertEquals(name, metaInfo.getName());
}
示例12
@Override
public int compare(TestElement o1, TestElement o2) {
ByteArrayOutputStreamWithPos os = new ByteArrayOutputStreamWithPos();
DataOutputViewStreamWrapper ow = new DataOutputViewStreamWrapper(os);
try {
TestElementSerializer.INSTANCE.serialize(o1, ow);
byte[] a1 = os.toByteArray();
os.reset();
TestElementSerializer.INSTANCE.serialize(o2, ow);
byte[] a2 = os.toByteArray();
return UnsignedBytes.lexicographicalComparator().compare(a1, a2);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
示例13
/**
* Write a list of serializers and their corresponding config snapshots to the provided
* data output view. This method writes in a fault tolerant way, so that when read again
* using {@link #readSerializersAndConfigsWithResilience(DataInputView, ClassLoader)}, if
* deserialization of the serializer fails, its configuration snapshot will remain intact.
*
* <p>Specifically, all written serializers and their config snapshots are indexed by their
* offset positions within the serialized bytes. The serialization format is as follows:
* <ul>
* <li>1. number of serializer and configuration snapshot pairs.</li>
* <li>2. offsets of each serializer and configuration snapshot, in order.</li>
* <li>3. total number of bytes for the serialized serializers and the config snapshots.</li>
* <li>4. serialized serializers and the config snapshots.</li>
* </ul>
*
* @param out the data output view.
* @param serializersAndConfigs serializer and configuration snapshot pairs
*
* @throws IOException
*/
public static void writeSerializersAndConfigsWithResilience(
DataOutputView out,
List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> serializersAndConfigs) throws IOException {
try (
ByteArrayOutputStreamWithPos bufferWithPos = new ByteArrayOutputStreamWithPos();
DataOutputViewStreamWrapper bufferWrapper = new DataOutputViewStreamWrapper(bufferWithPos)) {
out.writeInt(serializersAndConfigs.size());
for (Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> serAndConfSnapshot : serializersAndConfigs) {
out.writeInt(bufferWithPos.getPosition());
writeSerializer(bufferWrapper, serAndConfSnapshot.f0);
out.writeInt(bufferWithPos.getPosition());
TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
bufferWrapper, (TypeSerializerSnapshot) serAndConfSnapshot.f1, serAndConfSnapshot.f0);
}
out.writeInt(bufferWithPos.getPosition());
out.write(bufferWithPos.getBuf(), 0, bufferWithPos.getPosition());
}
}
示例14
@Override
public void write(DataOutputView out) throws IOException {
super.write(out);
if (typeSerializer instanceof UnloadableDummyTypeSerializer) {
UnloadableDummyTypeSerializer<T> dummyTypeSerializer =
(UnloadableDummyTypeSerializer<T>) this.typeSerializer;
byte[] serializerBytes = dummyTypeSerializer.getActualBytes();
out.write(serializerBytes.length);
out.write(serializerBytes);
} else {
// write in a way that allows the stream to recover from exceptions
try (ByteArrayOutputStreamWithPos streamWithPos = new ByteArrayOutputStreamWithPos()) {
InstantiationUtil.serializeObject(streamWithPos, typeSerializer);
out.writeInt(streamWithPos.getPosition());
out.write(streamWithPos.getBuf(), 0, streamWithPos.getPosition());
}
}
}
示例15
/**
* Verifies that reading and writing serializers work correctly.
*/
@Test
public void testSerializerSerialization() throws Exception {
TypeSerializer<?> serializer = IntSerializer.INSTANCE;
byte[] serialized;
try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
TypeSerializerSerializationUtil.writeSerializer(new DataOutputViewStreamWrapper(out), serializer);
serialized = out.toByteArray();
}
TypeSerializer<?> deserializedSerializer;
try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
deserializedSerializer = TypeSerializerSerializationUtil.tryReadSerializer(
new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
}
Assert.assertEquals(serializer, deserializedSerializer);
}
示例16
/**
* Verifies deserialization failure cases when reading a serializer from bytes, in the
* case of a {@link InvalidClassException}.
*/
@Test
public void testSerializerSerializationWithInvalidClass() throws Exception {
TypeSerializer<?> serializer = IntSerializer.INSTANCE;
byte[] serialized;
try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
TypeSerializerSerializationUtil.writeSerializer(new DataOutputViewStreamWrapper(out), serializer);
serialized = out.toByteArray();
}
TypeSerializer<?> deserializedSerializer;
try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
deserializedSerializer = TypeSerializerSerializationUtil.tryReadSerializer(
new DataInputViewStreamWrapper(in),
new ArtificialCNFExceptionThrowingClassLoader(
Thread.currentThread().getContextClassLoader(),
Collections.singleton(IntSerializer.class.getName())),
true);
}
Assert.assertTrue(deserializedSerializer instanceof UnloadableDummyTypeSerializer);
}
示例17
@Test
public void testReadSameVersion() throws Exception {
String payload = "test";
TestWriteable testWriteable = new TestWriteable(1, payload);
byte[] serialized;
try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
testWriteable.write(new DataOutputViewStreamWrapper(out));
serialized = out.toByteArray();
}
testWriteable = new TestWriteable(1);
try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
testWriteable.read(new DataInputViewStreamWrapper(in));
}
Assert.assertEquals(payload, testWriteable.getData());
}
示例18
@Test
public void testReadCompatibleVersion() throws Exception {
String payload = "test";
TestWriteable testWriteable = new TestWriteable(1, payload);
byte[] serialized;
try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
testWriteable.write(new DataOutputViewStreamWrapper(out));
serialized = out.toByteArray();
}
testWriteable = new TestWriteable(2) {
@Override
public int[] getCompatibleVersions() {
return new int[] {1, 2};
}
};
try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
testWriteable.read(new DataInputViewStreamWrapper(in));
}
Assert.assertEquals(payload, testWriteable.getData());
}
示例19
@Test
public void testReadMismatchVersion() throws Exception {
String payload = "test";
TestWriteable testWriteable = new TestWriteable(1, payload);
byte[] serialized;
try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
testWriteable.write(new DataOutputViewStreamWrapper(out));
serialized = out.toByteArray();
}
testWriteable = new TestWriteable(2);
try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
testWriteable.read(new DataInputViewStreamWrapper(in));
Assert.fail("Version mismatch expected.");
} catch (VersionMismatchException ignored) {
}
Assert.assertEquals(null, testWriteable.getData());
}
示例20
@Test
public void testReadVersioned() throws IOException {
String payload = "test-data";
TestPostVersionedReadableWritable versionedReadableWritable = new TestPostVersionedReadableWritable(payload);
byte[] serialized;
try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
versionedReadableWritable.write(new DataOutputViewStreamWrapper(out));
serialized = out.toByteArray();
}
TestPostVersionedReadableWritable restoredVersionedReadableWritable = new TestPostVersionedReadableWritable();
try(ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
restoredVersionedReadableWritable.read(in);
}
Assert.assertEquals(payload, restoredVersionedReadableWritable.getData());
}
示例21
@Test
public void testReadNonVersioned() throws IOException {
int preVersionedPayload = 563;
TestNonVersionedReadableWritable nonVersionedReadableWritable = new TestNonVersionedReadableWritable(preVersionedPayload);
byte[] serialized;
try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
nonVersionedReadableWritable.write(new DataOutputViewStreamWrapper(out));
serialized = out.toByteArray();
}
TestPostVersionedReadableWritable restoredVersionedReadableWritable = new TestPostVersionedReadableWritable();
try(ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
restoredVersionedReadableWritable.read(in);
}
Assert.assertEquals(String.valueOf(preVersionedPayload), restoredVersionedReadableWritable.getData());
}
示例22
@Test
public void testKeyGroupSerializationAndDeserialization() throws Exception {
ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(8);
DataOutputView outputView = new DataOutputViewStreamWrapper(outputStream);
for (int keyGroupPrefixBytes = 1; keyGroupPrefixBytes <= 2; ++keyGroupPrefixBytes) {
for (int orgKeyGroup = 0; orgKeyGroup < 128; ++orgKeyGroup) {
outputStream.reset();
RocksDBKeySerializationUtils.writeKeyGroup(orgKeyGroup, keyGroupPrefixBytes, outputView);
int deserializedKeyGroup = RocksDBKeySerializationUtils.readKeyGroup(
keyGroupPrefixBytes,
new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(outputStream.toByteArray())));
Assert.assertEquals(orgKeyGroup, deserializedKeyGroup);
}
}
}
示例23
@Test
public void testKeyedStateMetaInfoSerialization() throws Exception {
String name = "test";
TypeSerializer<?> namespaceSerializer = LongSerializer.INSTANCE;
TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
StateMetaInfoSnapshot metaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(
StateDescriptor.Type.VALUE, name, namespaceSerializer, stateSerializer).snapshot();
byte[] serialized;
try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
StateMetaInfoSnapshotReadersWriters.getWriter().
writeStateMetaInfoSnapshot(metaInfo, new DataOutputViewStreamWrapper(out));
serialized = out.toByteArray();
}
try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
final StateMetaInfoReader reader = StateMetaInfoSnapshotReadersWriters.getReader(
CURRENT_STATE_META_INFO_SNAPSHOT_VERSION, StateMetaInfoSnapshotReadersWriters.StateTypeHint.KEYED_STATE);
metaInfo = reader.readStateMetaInfoSnapshot(
new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
}
Assert.assertEquals(name, metaInfo.getName());
}
示例24
@Override
public int compare(TestElement o1, TestElement o2) {
ByteArrayOutputStreamWithPos os = new ByteArrayOutputStreamWithPos();
DataOutputViewStreamWrapper ow = new DataOutputViewStreamWrapper(os);
try {
TestElementSerializer.INSTANCE.serialize(o1, ow);
byte[] a1 = os.toByteArray();
os.reset();
TestElementSerializer.INSTANCE.serialize(o2, ow);
byte[] a2 = os.toByteArray();
return UnsignedBytes.lexicographicalComparator().compare(a1, a2);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
示例25
public static <N> void writeNameSpace(
N namespace,
TypeSerializer<N> namespaceSerializer,
ByteArrayOutputStreamWithPos keySerializationStream,
DataOutputView keySerializationDataOutputView,
boolean ambiguousKeyPossible) throws IOException {
int beforeWrite = keySerializationStream.getPosition();
namespaceSerializer.serialize(namespace, keySerializationDataOutputView);
if (ambiguousKeyPossible) {
// write length of namespace
writeLengthFrom(beforeWrite, keySerializationStream,
keySerializationDataOutputView);
}
}
示例26
public static <K> void writeKey(
K key,
TypeSerializer<K> keySerializer,
ByteArrayOutputStreamWithPos keySerializationStream,
DataOutputView keySerializationDataOutputView,
boolean ambiguousKeyPossible) throws IOException {
// write key
int beforeWrite = keySerializationStream.getPosition();
keySerializer.serialize(key, keySerializationDataOutputView);
if (ambiguousKeyPossible) {
// write size of key
writeLengthFrom(beforeWrite, keySerializationStream,
keySerializationDataOutputView);
}
}
示例27
@Override
public void open() throws Exception {
baos = new ByteArrayOutputStreamWithPos();
baosWrapper = new DataOutputViewStreamWrapper(baos);
// The creation of stageBundleFactory depends on the initialized environment manager.
environmentManager.open();
PortablePipelineOptions portableOptions =
PipelineOptionsFactory.as(PortablePipelineOptions.class);
// one operator has one Python SDK harness
portableOptions.setSdkWorkerParallelism(1);
ExperimentalOptions experimentalOptions = portableOptions.as(ExperimentalOptions.class);
for (Map.Entry<String, String> entry : jobOptions.entrySet()) {
ExperimentalOptions.addExperiment(experimentalOptions,
String.join("=", entry.getKey(), entry.getValue()));
}
Struct pipelineOptions = PipelineOptionsTranslation.toProto(portableOptions);
jobBundleFactory = createJobBundleFactory(pipelineOptions);
stageBundleFactory = createStageBundleFactory();
progressHandler = getProgressHandler(flinkMetricContainer);
}
示例28
/**
* Write a list of serializers and their corresponding config snapshots to the provided
* data output view. This method writes in a fault tolerant way, so that when read again
* using {@link #readSerializersAndConfigsWithResilience(DataInputView, ClassLoader)}, if
* deserialization of the serializer fails, its configuration snapshot will remain intact.
*
* <p>Specifically, all written serializers and their config snapshots are indexed by their
* offset positions within the serialized bytes. The serialization format is as follows:
* <ul>
* <li>1. number of serializer and configuration snapshot pairs.</li>
* <li>2. offsets of each serializer and configuration snapshot, in order.</li>
* <li>3. total number of bytes for the serialized serializers and the config snapshots.</li>
* <li>4. serialized serializers and the config snapshots.</li>
* </ul>
*
* @param out the data output view.
* @param serializersAndConfigs serializer and configuration snapshot pairs
*
* @throws IOException
*/
public static void writeSerializersAndConfigsWithResilience(
DataOutputView out,
List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> serializersAndConfigs) throws IOException {
try (
ByteArrayOutputStreamWithPos bufferWithPos = new ByteArrayOutputStreamWithPos();
DataOutputViewStreamWrapper bufferWrapper = new DataOutputViewStreamWrapper(bufferWithPos)) {
out.writeInt(serializersAndConfigs.size());
for (Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> serAndConfSnapshot : serializersAndConfigs) {
out.writeInt(bufferWithPos.getPosition());
writeSerializer(bufferWrapper, serAndConfSnapshot.f0);
out.writeInt(bufferWithPos.getPosition());
TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
bufferWrapper, (TypeSerializerSnapshot) serAndConfSnapshot.f1, serAndConfSnapshot.f0);
}
out.writeInt(bufferWithPos.getPosition());
out.write(bufferWithPos.getBuf(), 0, bufferWithPos.getPosition());
}
}
示例29
@Override
public void write(DataOutputView out) throws IOException {
super.write(out);
if (typeSerializer instanceof UnloadableDummyTypeSerializer) {
UnloadableDummyTypeSerializer<T> dummyTypeSerializer =
(UnloadableDummyTypeSerializer<T>) this.typeSerializer;
byte[] serializerBytes = dummyTypeSerializer.getActualBytes();
out.write(serializerBytes.length);
out.write(serializerBytes);
} else {
// write in a way that allows the stream to recover from exceptions
try (ByteArrayOutputStreamWithPos streamWithPos = new ByteArrayOutputStreamWithPos()) {
InstantiationUtil.serializeObject(streamWithPos, typeSerializer);
out.writeInt(streamWithPos.getPosition());
out.write(streamWithPos.getBuf(), 0, streamWithPos.getPosition());
}
}
}
示例30
/**
* Verifies that reading and writing serializers work correctly.
*/
@Test
public void testSerializerSerialization() throws Exception {
TypeSerializer<?> serializer = IntSerializer.INSTANCE;
byte[] serialized;
try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
TypeSerializerSerializationUtil.writeSerializer(new DataOutputViewStreamWrapper(out), serializer);
serialized = out.toByteArray();
}
TypeSerializer<?> deserializedSerializer;
try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
deserializedSerializer = TypeSerializerSerializationUtil.tryReadSerializer(
new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
}
Assert.assertEquals(serializer, deserializedSerializer);
}