Java源码示例:org.apache.flink.core.memory.ByteArrayOutputStreamWithPos

示例1
/**
 * Write a list of serializers and their corresponding config snapshots to the provided
 * data output view. This method writes in a fault tolerant way, so that when read again
 * using {@link #readSerializersAndConfigsWithResilience(DataInputView, ClassLoader)}, if
 * deserialization of the serializer fails, its configuration snapshot will remain intact.
 *
 * <p>Specifically, all written serializers and their config snapshots are indexed by their
 * offset positions within the serialized bytes. The serialization format is as follows:
 * <ul>
 *     <li>1. number of serializer and configuration snapshot pairs.</li>
 *     <li>2. offsets of each serializer and configuration snapshot, in order.</li>
 *     <li>3. total number of bytes for the serialized serializers and the config snapshots.</li>
 *     <li>4. serialized serializers and the config snapshots.</li>
 * </ul>
 *
 * @param out the data output view.
 * @param serializersAndConfigs serializer and configuration snapshot pairs
 *
 * @throws IOException
 */
public static void writeSerializersAndConfigsWithResilience(
		DataOutputView out,
		List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> serializersAndConfigs) throws IOException {

	try (
		ByteArrayOutputStreamWithPos bufferWithPos = new ByteArrayOutputStreamWithPos();
		DataOutputViewStreamWrapper bufferWrapper = new DataOutputViewStreamWrapper(bufferWithPos)) {

		out.writeInt(serializersAndConfigs.size());
		for (Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> serAndConfSnapshot : serializersAndConfigs) {
			out.writeInt(bufferWithPos.getPosition());
			writeSerializer(bufferWrapper, serAndConfSnapshot.f0);

			out.writeInt(bufferWithPos.getPosition());
			TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
				bufferWrapper, (TypeSerializerSnapshot) serAndConfSnapshot.f1, serAndConfSnapshot.f0);
		}

		out.writeInt(bufferWithPos.getPosition());
		out.write(bufferWithPos.getBuf(), 0, bufferWithPos.getPosition());
	}
}
 
示例2
@Override
public void write(DataOutputView out) throws IOException {
	super.write(out);

	if (typeSerializer instanceof UnloadableDummyTypeSerializer) {
		UnloadableDummyTypeSerializer<T> dummyTypeSerializer =
			(UnloadableDummyTypeSerializer<T>) this.typeSerializer;

		byte[] serializerBytes = dummyTypeSerializer.getActualBytes();
		out.write(serializerBytes.length);
		out.write(serializerBytes);
	} else {
		// write in a way that allows the stream to recover from exceptions
		try (ByteArrayOutputStreamWithPos streamWithPos = new ByteArrayOutputStreamWithPos()) {
			InstantiationUtil.serializeObject(streamWithPos, typeSerializer);
			out.writeInt(streamWithPos.getPosition());
			out.write(streamWithPos.getBuf(), 0, streamWithPos.getPosition());
		}
	}
}
 
示例3
/**
 * Verifies that reading and writing serializers work correctly.
 */
@Test
public void testSerializerSerialization() throws Exception {

	TypeSerializer<?> serializer = IntSerializer.INSTANCE;

	byte[] serialized;
	try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
		TypeSerializerSerializationUtil.writeSerializer(new DataOutputViewStreamWrapper(out), serializer);
		serialized = out.toByteArray();
	}

	TypeSerializer<?> deserializedSerializer;
	try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
		deserializedSerializer = TypeSerializerSerializationUtil.tryReadSerializer(
			new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
	}

	Assert.assertEquals(serializer, deserializedSerializer);
}
 
示例4
/**
 * Verifies deserialization failure cases when reading a serializer from bytes, in the
 * case of a {@link InvalidClassException}.
 */
@Test
public void testSerializerSerializationWithInvalidClass() throws Exception {

	TypeSerializer<?> serializer = IntSerializer.INSTANCE;

	byte[] serialized;
	try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
		TypeSerializerSerializationUtil.writeSerializer(new DataOutputViewStreamWrapper(out), serializer);
		serialized = out.toByteArray();
	}

	TypeSerializer<?> deserializedSerializer;

	try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
		deserializedSerializer = TypeSerializerSerializationUtil.tryReadSerializer(
			new DataInputViewStreamWrapper(in),
			new ArtificialCNFExceptionThrowingClassLoader(
				Thread.currentThread().getContextClassLoader(),
				Collections.singleton(IntSerializer.class.getName())),
			true);
	}
	Assert.assertTrue(deserializedSerializer instanceof UnloadableDummyTypeSerializer);
}
 
示例5
@Test
public void testReadSameVersion() throws Exception {

	String payload = "test";

	TestWriteable testWriteable = new TestWriteable(1, payload);
	byte[] serialized;
	try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
		testWriteable.write(new DataOutputViewStreamWrapper(out));
		serialized = out.toByteArray();
	}

	testWriteable = new TestWriteable(1);
	try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
		testWriteable.read(new DataInputViewStreamWrapper(in));
	}

	Assert.assertEquals(payload, testWriteable.getData());
}
 
示例6
@Test
public void testReadCompatibleVersion() throws Exception {

	String payload = "test";

	TestWriteable testWriteable = new TestWriteable(1, payload);
	byte[] serialized;
	try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
		testWriteable.write(new DataOutputViewStreamWrapper(out));
		serialized = out.toByteArray();
	}

	testWriteable = new TestWriteable(2) {
		@Override
		public int[] getCompatibleVersions() {
			return new int[] {1, 2};
		}
	};
	try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
		testWriteable.read(new DataInputViewStreamWrapper(in));
	}

	Assert.assertEquals(payload, testWriteable.getData());
}
 
示例7
@Test
public void testReadMismatchVersion() throws Exception {

	String payload = "test";

	TestWriteable testWriteable = new TestWriteable(1, payload);
	byte[] serialized;
	try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
		testWriteable.write(new DataOutputViewStreamWrapper(out));
		serialized = out.toByteArray();
	}

	testWriteable = new TestWriteable(2);
	try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
		testWriteable.read(new DataInputViewStreamWrapper(in));
		Assert.fail("Version mismatch expected.");
	} catch (VersionMismatchException ignored) {

	}

	Assert.assertEquals(null, testWriteable.getData());
}
 
示例8
@Test
public void testReadVersioned() throws IOException {

	String payload = "test-data";
	TestPostVersionedReadableWritable versionedReadableWritable = new TestPostVersionedReadableWritable(payload);

	byte[] serialized;
	try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
		versionedReadableWritable.write(new DataOutputViewStreamWrapper(out));
		serialized = out.toByteArray();
	}

	TestPostVersionedReadableWritable restoredVersionedReadableWritable = new TestPostVersionedReadableWritable();
	try(ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
		restoredVersionedReadableWritable.read(in);
	}

	Assert.assertEquals(payload, restoredVersionedReadableWritable.getData());
}
 
示例9
@Test
public void testReadNonVersioned() throws IOException {
	int preVersionedPayload = 563;

	TestNonVersionedReadableWritable nonVersionedReadableWritable = new TestNonVersionedReadableWritable(preVersionedPayload);

	byte[] serialized;
	try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
		nonVersionedReadableWritable.write(new DataOutputViewStreamWrapper(out));
		serialized = out.toByteArray();
	}

	TestPostVersionedReadableWritable restoredVersionedReadableWritable = new TestPostVersionedReadableWritable();
	try(ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
		restoredVersionedReadableWritable.read(in);
	}

	Assert.assertEquals(String.valueOf(preVersionedPayload), restoredVersionedReadableWritable.getData());
}
 
示例10
@Test
public void testKeyGroupSerializationAndDeserialization() throws Exception {
	ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(8);
	DataOutputView outputView = new DataOutputViewStreamWrapper(outputStream);

	for (int keyGroupPrefixBytes = 1; keyGroupPrefixBytes <= 2; ++keyGroupPrefixBytes) {
		for (int orgKeyGroup = 0; orgKeyGroup < 128; ++orgKeyGroup) {
			outputStream.reset();
			RocksDBKeySerializationUtils.writeKeyGroup(orgKeyGroup, keyGroupPrefixBytes, outputView);
			int deserializedKeyGroup = RocksDBKeySerializationUtils.readKeyGroup(
				keyGroupPrefixBytes,
				new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(outputStream.toByteArray())));
			Assert.assertEquals(orgKeyGroup, deserializedKeyGroup);
		}
	}
}
 
示例11
@Test
public void testKeyedStateMetaInfoSerialization() throws Exception {

	String name = "test";
	TypeSerializer<?> namespaceSerializer = LongSerializer.INSTANCE;
	TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;

	StateMetaInfoSnapshot metaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(
		StateDescriptor.Type.VALUE, name, namespaceSerializer, stateSerializer).snapshot();

	byte[] serialized;
	try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
		StateMetaInfoSnapshotReadersWriters.getWriter().
			writeStateMetaInfoSnapshot(metaInfo, new DataOutputViewStreamWrapper(out));
		serialized = out.toByteArray();
	}

	try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
		final StateMetaInfoReader reader = StateMetaInfoSnapshotReadersWriters.getReader(
			CURRENT_STATE_META_INFO_SNAPSHOT_VERSION, StateMetaInfoSnapshotReadersWriters.StateTypeHint.KEYED_STATE);
		metaInfo = reader.readStateMetaInfoSnapshot(
			new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
	}

	Assert.assertEquals(name, metaInfo.getName());
}
 
示例12
@Override
public int compare(TestElement o1, TestElement o2) {

	ByteArrayOutputStreamWithPos os = new ByteArrayOutputStreamWithPos();
	DataOutputViewStreamWrapper ow = new DataOutputViewStreamWrapper(os);
	try {
		TestElementSerializer.INSTANCE.serialize(o1, ow);
		byte[] a1 = os.toByteArray();
		os.reset();
		TestElementSerializer.INSTANCE.serialize(o2, ow);
		byte[] a2 = os.toByteArray();
		return UnsignedBytes.lexicographicalComparator().compare(a1, a2);
	} catch (Exception e) {
		throw new RuntimeException(e);
	}
}
 
示例13
/**
 * Write a list of serializers and their corresponding config snapshots to the provided
 * data output view. This method writes in a fault tolerant way, so that when read again
 * using {@link #readSerializersAndConfigsWithResilience(DataInputView, ClassLoader)}, if
 * deserialization of the serializer fails, its configuration snapshot will remain intact.
 *
 * <p>Specifically, all written serializers and their config snapshots are indexed by their
 * offset positions within the serialized bytes. The serialization format is as follows:
 * <ul>
 *     <li>1. number of serializer and configuration snapshot pairs.</li>
 *     <li>2. offsets of each serializer and configuration snapshot, in order.</li>
 *     <li>3. total number of bytes for the serialized serializers and the config snapshots.</li>
 *     <li>4. serialized serializers and the config snapshots.</li>
 * </ul>
 *
 * @param out the data output view.
 * @param serializersAndConfigs serializer and configuration snapshot pairs
 *
 * @throws IOException
 */
public static void writeSerializersAndConfigsWithResilience(
		DataOutputView out,
		List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> serializersAndConfigs) throws IOException {

	try (
		ByteArrayOutputStreamWithPos bufferWithPos = new ByteArrayOutputStreamWithPos();
		DataOutputViewStreamWrapper bufferWrapper = new DataOutputViewStreamWrapper(bufferWithPos)) {

		out.writeInt(serializersAndConfigs.size());
		for (Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> serAndConfSnapshot : serializersAndConfigs) {
			out.writeInt(bufferWithPos.getPosition());
			writeSerializer(bufferWrapper, serAndConfSnapshot.f0);

			out.writeInt(bufferWithPos.getPosition());
			TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
				bufferWrapper, (TypeSerializerSnapshot) serAndConfSnapshot.f1, serAndConfSnapshot.f0);
		}

		out.writeInt(bufferWithPos.getPosition());
		out.write(bufferWithPos.getBuf(), 0, bufferWithPos.getPosition());
	}
}
 
示例14
@Override
public void write(DataOutputView out) throws IOException {
	super.write(out);

	if (typeSerializer instanceof UnloadableDummyTypeSerializer) {
		UnloadableDummyTypeSerializer<T> dummyTypeSerializer =
			(UnloadableDummyTypeSerializer<T>) this.typeSerializer;

		byte[] serializerBytes = dummyTypeSerializer.getActualBytes();
		out.write(serializerBytes.length);
		out.write(serializerBytes);
	} else {
		// write in a way that allows the stream to recover from exceptions
		try (ByteArrayOutputStreamWithPos streamWithPos = new ByteArrayOutputStreamWithPos()) {
			InstantiationUtil.serializeObject(streamWithPos, typeSerializer);
			out.writeInt(streamWithPos.getPosition());
			out.write(streamWithPos.getBuf(), 0, streamWithPos.getPosition());
		}
	}
}
 
示例15
/**
 * Verifies that reading and writing serializers work correctly.
 */
@Test
public void testSerializerSerialization() throws Exception {

	TypeSerializer<?> serializer = IntSerializer.INSTANCE;

	byte[] serialized;
	try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
		TypeSerializerSerializationUtil.writeSerializer(new DataOutputViewStreamWrapper(out), serializer);
		serialized = out.toByteArray();
	}

	TypeSerializer<?> deserializedSerializer;
	try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
		deserializedSerializer = TypeSerializerSerializationUtil.tryReadSerializer(
			new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
	}

	Assert.assertEquals(serializer, deserializedSerializer);
}
 
示例16
/**
 * Verifies deserialization failure cases when reading a serializer from bytes, in the
 * case of a {@link InvalidClassException}.
 */
@Test
public void testSerializerSerializationWithInvalidClass() throws Exception {

	TypeSerializer<?> serializer = IntSerializer.INSTANCE;

	byte[] serialized;
	try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
		TypeSerializerSerializationUtil.writeSerializer(new DataOutputViewStreamWrapper(out), serializer);
		serialized = out.toByteArray();
	}

	TypeSerializer<?> deserializedSerializer;

	try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
		deserializedSerializer = TypeSerializerSerializationUtil.tryReadSerializer(
			new DataInputViewStreamWrapper(in),
			new ArtificialCNFExceptionThrowingClassLoader(
				Thread.currentThread().getContextClassLoader(),
				Collections.singleton(IntSerializer.class.getName())),
			true);
	}
	Assert.assertTrue(deserializedSerializer instanceof UnloadableDummyTypeSerializer);
}
 
示例17
@Test
public void testReadSameVersion() throws Exception {

	String payload = "test";

	TestWriteable testWriteable = new TestWriteable(1, payload);
	byte[] serialized;
	try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
		testWriteable.write(new DataOutputViewStreamWrapper(out));
		serialized = out.toByteArray();
	}

	testWriteable = new TestWriteable(1);
	try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
		testWriteable.read(new DataInputViewStreamWrapper(in));
	}

	Assert.assertEquals(payload, testWriteable.getData());
}
 
示例18
@Test
public void testReadCompatibleVersion() throws Exception {

	String payload = "test";

	TestWriteable testWriteable = new TestWriteable(1, payload);
	byte[] serialized;
	try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
		testWriteable.write(new DataOutputViewStreamWrapper(out));
		serialized = out.toByteArray();
	}

	testWriteable = new TestWriteable(2) {
		@Override
		public int[] getCompatibleVersions() {
			return new int[] {1, 2};
		}
	};
	try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
		testWriteable.read(new DataInputViewStreamWrapper(in));
	}

	Assert.assertEquals(payload, testWriteable.getData());
}
 
示例19
@Test
public void testReadMismatchVersion() throws Exception {

	String payload = "test";

	TestWriteable testWriteable = new TestWriteable(1, payload);
	byte[] serialized;
	try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
		testWriteable.write(new DataOutputViewStreamWrapper(out));
		serialized = out.toByteArray();
	}

	testWriteable = new TestWriteable(2);
	try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
		testWriteable.read(new DataInputViewStreamWrapper(in));
		Assert.fail("Version mismatch expected.");
	} catch (VersionMismatchException ignored) {

	}

	Assert.assertEquals(null, testWriteable.getData());
}
 
示例20
@Test
public void testReadVersioned() throws IOException {

	String payload = "test-data";
	TestPostVersionedReadableWritable versionedReadableWritable = new TestPostVersionedReadableWritable(payload);

	byte[] serialized;
	try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
		versionedReadableWritable.write(new DataOutputViewStreamWrapper(out));
		serialized = out.toByteArray();
	}

	TestPostVersionedReadableWritable restoredVersionedReadableWritable = new TestPostVersionedReadableWritable();
	try(ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
		restoredVersionedReadableWritable.read(in);
	}

	Assert.assertEquals(payload, restoredVersionedReadableWritable.getData());
}
 
示例21
@Test
public void testReadNonVersioned() throws IOException {
	int preVersionedPayload = 563;

	TestNonVersionedReadableWritable nonVersionedReadableWritable = new TestNonVersionedReadableWritable(preVersionedPayload);

	byte[] serialized;
	try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
		nonVersionedReadableWritable.write(new DataOutputViewStreamWrapper(out));
		serialized = out.toByteArray();
	}

	TestPostVersionedReadableWritable restoredVersionedReadableWritable = new TestPostVersionedReadableWritable();
	try(ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
		restoredVersionedReadableWritable.read(in);
	}

	Assert.assertEquals(String.valueOf(preVersionedPayload), restoredVersionedReadableWritable.getData());
}
 
示例22
@Test
public void testKeyGroupSerializationAndDeserialization() throws Exception {
	ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(8);
	DataOutputView outputView = new DataOutputViewStreamWrapper(outputStream);

	for (int keyGroupPrefixBytes = 1; keyGroupPrefixBytes <= 2; ++keyGroupPrefixBytes) {
		for (int orgKeyGroup = 0; orgKeyGroup < 128; ++orgKeyGroup) {
			outputStream.reset();
			RocksDBKeySerializationUtils.writeKeyGroup(orgKeyGroup, keyGroupPrefixBytes, outputView);
			int deserializedKeyGroup = RocksDBKeySerializationUtils.readKeyGroup(
				keyGroupPrefixBytes,
				new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(outputStream.toByteArray())));
			Assert.assertEquals(orgKeyGroup, deserializedKeyGroup);
		}
	}
}
 
示例23
@Test
public void testKeyedStateMetaInfoSerialization() throws Exception {

	String name = "test";
	TypeSerializer<?> namespaceSerializer = LongSerializer.INSTANCE;
	TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;

	StateMetaInfoSnapshot metaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(
		StateDescriptor.Type.VALUE, name, namespaceSerializer, stateSerializer).snapshot();

	byte[] serialized;
	try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
		StateMetaInfoSnapshotReadersWriters.getWriter().
			writeStateMetaInfoSnapshot(metaInfo, new DataOutputViewStreamWrapper(out));
		serialized = out.toByteArray();
	}

	try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
		final StateMetaInfoReader reader = StateMetaInfoSnapshotReadersWriters.getReader(
			CURRENT_STATE_META_INFO_SNAPSHOT_VERSION, StateMetaInfoSnapshotReadersWriters.StateTypeHint.KEYED_STATE);
		metaInfo = reader.readStateMetaInfoSnapshot(
			new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
	}

	Assert.assertEquals(name, metaInfo.getName());
}
 
示例24
@Override
public int compare(TestElement o1, TestElement o2) {

	ByteArrayOutputStreamWithPos os = new ByteArrayOutputStreamWithPos();
	DataOutputViewStreamWrapper ow = new DataOutputViewStreamWrapper(os);
	try {
		TestElementSerializer.INSTANCE.serialize(o1, ow);
		byte[] a1 = os.toByteArray();
		os.reset();
		TestElementSerializer.INSTANCE.serialize(o2, ow);
		byte[] a2 = os.toByteArray();
		return UnsignedBytes.lexicographicalComparator().compare(a1, a2);
	} catch (Exception e) {
		throw new RuntimeException(e);
	}
}
 
示例25
public static <N> void writeNameSpace(
		N namespace,
		TypeSerializer<N> namespaceSerializer,
		ByteArrayOutputStreamWithPos keySerializationStream,
		DataOutputView keySerializationDataOutputView,
		boolean ambiguousKeyPossible) throws IOException {

	int beforeWrite = keySerializationStream.getPosition();
	namespaceSerializer.serialize(namespace, keySerializationDataOutputView);

	if (ambiguousKeyPossible) {
		// write length of namespace
		writeLengthFrom(beforeWrite, keySerializationStream,
				keySerializationDataOutputView);
	}
}
 
示例26
public static <K> void writeKey(
		K key,
		TypeSerializer<K> keySerializer,
		ByteArrayOutputStreamWithPos keySerializationStream,
		DataOutputView keySerializationDataOutputView,
		boolean ambiguousKeyPossible) throws IOException {
	// write key
	int beforeWrite = keySerializationStream.getPosition();
	keySerializer.serialize(key, keySerializationDataOutputView);

	if (ambiguousKeyPossible) {
		// write size of key
		writeLengthFrom(beforeWrite, keySerializationStream,
				keySerializationDataOutputView);
	}
}
 
示例27
@Override
public void open() throws Exception {
	baos = new ByteArrayOutputStreamWithPos();
	baosWrapper = new DataOutputViewStreamWrapper(baos);

	// The creation of stageBundleFactory depends on the initialized environment manager.
	environmentManager.open();

	PortablePipelineOptions portableOptions =
		PipelineOptionsFactory.as(PortablePipelineOptions.class);
	// one operator has one Python SDK harness
	portableOptions.setSdkWorkerParallelism(1);
	ExperimentalOptions experimentalOptions = portableOptions.as(ExperimentalOptions.class);
	for (Map.Entry<String, String> entry : jobOptions.entrySet()) {
		ExperimentalOptions.addExperiment(experimentalOptions,
			String.join("=", entry.getKey(), entry.getValue()));
	}

	Struct pipelineOptions = PipelineOptionsTranslation.toProto(portableOptions);

	jobBundleFactory = createJobBundleFactory(pipelineOptions);
	stageBundleFactory = createStageBundleFactory();
	progressHandler = getProgressHandler(flinkMetricContainer);
}
 
示例28
/**
 * Write a list of serializers and their corresponding config snapshots to the provided
 * data output view. This method writes in a fault tolerant way, so that when read again
 * using {@link #readSerializersAndConfigsWithResilience(DataInputView, ClassLoader)}, if
 * deserialization of the serializer fails, its configuration snapshot will remain intact.
 *
 * <p>Specifically, all written serializers and their config snapshots are indexed by their
 * offset positions within the serialized bytes. The serialization format is as follows:
 * <ul>
 *     <li>1. number of serializer and configuration snapshot pairs.</li>
 *     <li>2. offsets of each serializer and configuration snapshot, in order.</li>
 *     <li>3. total number of bytes for the serialized serializers and the config snapshots.</li>
 *     <li>4. serialized serializers and the config snapshots.</li>
 * </ul>
 *
 * @param out the data output view.
 * @param serializersAndConfigs serializer and configuration snapshot pairs
 *
 * @throws IOException
 */
public static void writeSerializersAndConfigsWithResilience(
		DataOutputView out,
		List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> serializersAndConfigs) throws IOException {

	try (
		ByteArrayOutputStreamWithPos bufferWithPos = new ByteArrayOutputStreamWithPos();
		DataOutputViewStreamWrapper bufferWrapper = new DataOutputViewStreamWrapper(bufferWithPos)) {

		out.writeInt(serializersAndConfigs.size());
		for (Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> serAndConfSnapshot : serializersAndConfigs) {
			out.writeInt(bufferWithPos.getPosition());
			writeSerializer(bufferWrapper, serAndConfSnapshot.f0);

			out.writeInt(bufferWithPos.getPosition());
			TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
				bufferWrapper, (TypeSerializerSnapshot) serAndConfSnapshot.f1, serAndConfSnapshot.f0);
		}

		out.writeInt(bufferWithPos.getPosition());
		out.write(bufferWithPos.getBuf(), 0, bufferWithPos.getPosition());
	}
}
 
示例29
@Override
public void write(DataOutputView out) throws IOException {
	super.write(out);

	if (typeSerializer instanceof UnloadableDummyTypeSerializer) {
		UnloadableDummyTypeSerializer<T> dummyTypeSerializer =
			(UnloadableDummyTypeSerializer<T>) this.typeSerializer;

		byte[] serializerBytes = dummyTypeSerializer.getActualBytes();
		out.write(serializerBytes.length);
		out.write(serializerBytes);
	} else {
		// write in a way that allows the stream to recover from exceptions
		try (ByteArrayOutputStreamWithPos streamWithPos = new ByteArrayOutputStreamWithPos()) {
			InstantiationUtil.serializeObject(streamWithPos, typeSerializer);
			out.writeInt(streamWithPos.getPosition());
			out.write(streamWithPos.getBuf(), 0, streamWithPos.getPosition());
		}
	}
}
 
示例30
/**
 * Verifies that reading and writing serializers work correctly.
 */
@Test
public void testSerializerSerialization() throws Exception {

	TypeSerializer<?> serializer = IntSerializer.INSTANCE;

	byte[] serialized;
	try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
		TypeSerializerSerializationUtil.writeSerializer(new DataOutputViewStreamWrapper(out), serializer);
		serialized = out.toByteArray();
	}

	TypeSerializer<?> deserializedSerializer;
	try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
		deserializedSerializer = TypeSerializerSerializationUtil.tryReadSerializer(
			new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
	}

	Assert.assertEquals(serializer, deserializedSerializer);
}