Java源码示例:org.apache.flink.core.memory.DataInputViewStreamWrapper
示例1
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
int collectionLength = in.readInt();
List<T> list = new ArrayList<T>(collectionLength);
if (collectionLength > 0) {
try {
DataInputViewStreamWrapper wrapper = new DataInputViewStreamWrapper(in);
for (int i = 0; i < collectionLength; i++){
T element = serializer.deserialize(wrapper);
list.add(element);
}
}
catch (Throwable t) {
throw new IOException("Error while deserializing element from collection", t);
}
}
dataSet = list;
}
示例2
@Override
public void open(Configuration configuration) throws Exception {
super.open(configuration);
if (accSerializer == null) {
throw new RuntimeException("No serializer set for the fold accumulator type. " +
"Probably the setOutputType method was not called.");
}
if (serializedInitialValue == null) {
throw new RuntimeException("No initial value was serialized for the fold " +
"window function. Probably the setOutputType method was not called.");
}
ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
initialValue = accSerializer.deserialize(in);
}
示例3
@Setup
public void setup() throws IOException {
length = Integer.parseInt(lengthStr);
switch (type) {
case "ascii":
input = generate(asciiChars, length);
break;
case "russian":
input = generate(russianChars, length);
break;
case "chinese":
input = generate(chineseChars, length);
break;
default:
throw new IllegalArgumentException(type + "charset is not supported");
}
byte[] stringBytes = stringWrite();
serializedBuffer = new ByteArrayInputStream(stringBytes);
serializedStream = new DataInputViewStreamWrapper(serializedBuffer);
}
示例4
@Test
public void testMigrationStrategyForRemovedAvroDependency() throws Exception {
KryoSerializer<TestClass> kryoSerializerForA = new KryoSerializer<>(TestClass.class, new ExecutionConfig());
// read configuration again from bytes
TypeSerializerSnapshot kryoSerializerConfigSnapshot;
try (InputStream in = getClass().getResourceAsStream("/kryo-serializer-flink1.3-snapshot")) {
kryoSerializerConfigSnapshot = TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader(), kryoSerializerForA);
}
@SuppressWarnings("unchecked")
TypeSerializerSchemaCompatibility<TestClass> compatResult =
kryoSerializerConfigSnapshot.resolveSchemaCompatibility(kryoSerializerForA);
assertTrue(compatResult.isCompatibleAsIs());
}
示例5
private IterationEventWithAggregators pipeThroughSerialization(IterationEventWithAggregators event) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
event.write(new DataOutputViewStreamWrapper(baos));
byte[] data = baos.toByteArray();
baos.close();
DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(new ByteArrayInputStream(data));
IterationEventWithAggregators newEvent = event.getClass().newInstance();
newEvent.read(in);
in.close();
return newEvent;
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail("Test threw an exception: " + e.getMessage());
return null;
}
}
示例6
/**
* Clones the given writable using the {@link IOReadableWritable serialization}.
*
* @param original Object to clone
* @param <T> Type of the object to clone
* @return Cloned object
* @throws IOException Thrown is the serialization fails.
*/
public static <T extends IOReadableWritable> T createCopyWritable(T original) throws IOException {
if (original == null) {
return null;
}
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos)) {
original.write(out);
}
final ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
try (DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais)) {
@SuppressWarnings("unchecked")
T copy = (T) instantiate(original.getClass());
copy.read(in);
return copy;
}
}
示例7
/**
* This read attempts to first identify if the input view contains the special
* {@link #VERSIONED_IDENTIFIER} by reading and buffering the first few bytes.
* If identified to be versioned, the usual version resolution read path
* in {@link VersionedIOReadableWritable#read(DataInputView)} is invoked.
* Otherwise, we "reset" the input stream by pushing back the read buffered bytes
* into the stream.
*/
public final void read(InputStream inputStream) throws IOException {
byte[] tmp = new byte[VERSIONED_IDENTIFIER.length];
inputStream.read(tmp);
if (Arrays.equals(tmp, VERSIONED_IDENTIFIER)) {
DataInputView inputView = new DataInputViewStreamWrapper(inputStream);
super.read(inputView);
read(inputView, true);
} else {
PushbackInputStream resetStream = new PushbackInputStream(inputStream, VERSIONED_IDENTIFIER.length);
resetStream.unread(tmp);
read(new DataInputViewStreamWrapper(resetStream), false);
}
}
示例8
/**
* Verifies that deserializing config snapshots fail if the config class could not be found.
*/
@Test(expected = IOException.class)
public void testFailsWhenConfigurationSnapshotClassNotFound() throws Exception {
byte[] serializedConfig;
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
new DataOutputViewStreamWrapper(out),
new TypeSerializerSerializationUtilTest.TestConfigSnapshot<>(123, "foobar"),
StringSerializer.INSTANCE);
serializedConfig = out.toByteArray();
}
try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
// read using a dummy classloader
TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
new DataInputViewStreamWrapper(in), new URLClassLoader(new URL[0], null), null);
}
fail("Expected a ClassNotFoundException wrapped in IOException");
}
示例9
/**
* Verifies that reading and writing configuration snapshots work correctly.
*/
@Test
public void testSerializeConfigurationSnapshots() throws Exception {
TypeSerializerSerializationUtilTest.TestConfigSnapshot<String> configSnapshot1 =
new TypeSerializerSerializationUtilTest.TestConfigSnapshot<>(1, "foo");
byte[] serializedConfig;
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
new DataOutputViewStreamWrapper(out),
configSnapshot1,
StringSerializer.INSTANCE);
serializedConfig = out.toByteArray();
}
TypeSerializerSnapshot<?> restoredConfigs;
try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
restoredConfigs = TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader(), null);
}
assertEquals(configSnapshot1, restoredConfigs);
}
示例10
public static void testSerialization(String[] values) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
DataOutputViewStreamWrapper serializer = new DataOutputViewStreamWrapper(baos);
for (String value : values) {
StringValue sv = new StringValue(value);
sv.write(serializer);
}
serializer.close();
baos.close();
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
DataInputViewStreamWrapper deserializer = new DataInputViewStreamWrapper(bais);
int num = 0;
while (bais.available() > 0) {
StringValue deser = new StringValue();
deser.read(deserializer);
assertEquals("DeserializedString differs from original string.", values[num], deser.getValue());
num++;
}
assertEquals("Wrong number of deserialized values", values.length, num);
}
示例11
@Override
public void flatMap(KeyedStateRow row, Collector<V> out) throws Exception {
if (!stateName.equals(row.getStateName())) {
return;
}
byte[] valueBytes = row.getValueBytes();
V value = null;
try (ByteArrayInputStreamWithPos valIs = new ByteArrayInputStreamWithPos(valueBytes)) {
DataInputViewStreamWrapper iw = new DataInputViewStreamWrapper(valIs);
skipTimestampIfTtlEnabled(iw);
value = valueDeserializer.deserialize(iw);
}
if (value == null) {
throw new RuntimeException("MapStates with null values are not supported at the moment.");
} else {
out.collect(value);
}
}
示例12
@Test
public void testReadCompatibleVersion() throws Exception {
String payload = "test";
TestWriteable testWriteable = new TestWriteable(1, payload);
byte[] serialized;
try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
testWriteable.write(new DataOutputViewStreamWrapper(out));
serialized = out.toByteArray();
}
testWriteable = new TestWriteable(2) {
@Override
public int[] getCompatibleVersions() {
return new int[] {1, 2};
}
};
try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
testWriteable.read(new DataInputViewStreamWrapper(in));
}
Assert.assertEquals(payload, testWriteable.getData());
}
示例13
/**
* This read attempts to first identify if the input view contains the special
* {@link #VERSIONED_IDENTIFIER} by reading and buffering the first few bytes.
* If identified to be versioned, the usual version resolution read path
* in {@link VersionedIOReadableWritable#read(DataInputView)} is invoked.
* Otherwise, we "reset" the input stream by pushing back the read buffered bytes
* into the stream.
*/
public final void read(InputStream inputStream) throws IOException {
byte[] tmp = new byte[VERSIONED_IDENTIFIER.length];
inputStream.read(tmp);
if (Arrays.equals(tmp, VERSIONED_IDENTIFIER)) {
DataInputView inputView = new DataInputViewStreamWrapper(inputStream);
super.read(inputView);
read(inputView, true);
} else {
PushbackInputStream resetStream = new PushbackInputStream(inputStream, VERSIONED_IDENTIFIER.length);
resetStream.unread(tmp);
read(new DataInputViewStreamWrapper(resetStream), false);
}
}
示例14
/**
* Verifies that reading and writing configuration snapshots work correctly.
*/
@Test
public void testSerializeConfigurationSnapshots() throws Exception {
TypeSerializerSerializationUtilTest.TestConfigSnapshot<String> configSnapshot1 =
new TypeSerializerSerializationUtilTest.TestConfigSnapshot<>(1, "foo");
byte[] serializedConfig;
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
new DataOutputViewStreamWrapper(out),
configSnapshot1,
StringSerializer.INSTANCE);
serializedConfig = out.toByteArray();
}
TypeSerializerSnapshot<?> restoredConfigs;
try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
restoredConfigs = TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader(), null);
}
assertEquals(configSnapshot1, restoredConfigs);
}
示例15
private <K, V> void deserializeBroadcastStateValues(
final BackendWritableBroadcastState<K, V> broadcastStateForName,
final FSDataInputStream in,
final OperatorStateHandle.StateMetaInfo metaInfo) throws Exception {
if (metaInfo != null) {
long[] offsets = metaInfo.getOffsets();
if (offsets != null) {
TypeSerializer<K> keySerializer = broadcastStateForName.getStateMetaInfo().getKeySerializer();
TypeSerializer<V> valueSerializer = broadcastStateForName.getStateMetaInfo().getValueSerializer();
in.seek(offsets[0]);
DataInputView div = new DataInputViewStreamWrapper(in);
int size = div.readInt();
for (int i = 0; i < size; i++) {
broadcastStateForName.put(keySerializer.deserialize(div), valueSerializer.deserialize(div));
}
}
}
}
示例16
/**
* Verifies that reading and writing serializers work correctly.
*/
@Test
public void testSerializerSerialization() throws Exception {
TypeSerializer<?> serializer = IntSerializer.INSTANCE;
byte[] serialized;
try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
TypeSerializerSerializationUtil.writeSerializer(new DataOutputViewStreamWrapper(out), serializer);
serialized = out.toByteArray();
}
TypeSerializer<?> deserializedSerializer;
try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
deserializedSerializer = TypeSerializerSerializationUtil.tryReadSerializer(
new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
}
Assert.assertEquals(serializer, deserializedSerializer);
}
示例17
private IterationEventWithAggregators pipeThroughSerialization(IterationEventWithAggregators event) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
event.write(new DataOutputViewStreamWrapper(baos));
byte[] data = baos.toByteArray();
baos.close();
DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(new ByteArrayInputStream(data));
IterationEventWithAggregators newEvent = event.getClass().newInstance();
newEvent.read(in);
in.close();
return newEvent;
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail("Test threw an exception: " + e.getMessage());
return null;
}
}
示例18
private static void verifyRead(OperatorStateHandle fullHandle, int numPartitions) throws IOException {
int count = 0;
try (FSDataInputStream in = fullHandle.openInputStream()) {
OperatorStateHandle.StateMetaInfo metaInfo = fullHandle.getStateNameToPartitionOffsets().
get(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
long[] offsets = metaInfo.getOffsets();
Assert.assertNotNull(offsets);
DataInputView div = new DataInputViewStreamWrapper(in);
for (int i = 0; i < numPartitions; ++i) {
in.seek(offsets[i]);
Assert.assertEquals(i, div.readInt());
++count;
}
}
Assert.assertEquals(numPartitions, count);
}
示例19
@Override
public void open(Configuration configuration) throws Exception {
FunctionUtils.openFunction(this.windowFunction, configuration);
if (serializedInitialValue == null) {
throw new RuntimeException("No initial value was serialized for the fold " +
"window function. Probably the setOutputType method was not called.");
}
ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
initialValue = accSerializer.deserialize(in);
ctx = new InternalProcessApplyAllWindowContext<>(windowFunction);
}
示例20
@Test
public void getKeyedStateStreams() throws Exception {
int readKeyGroupCount = 0;
for (KeyGroupStatePartitionStreamProvider stateStreamProvider
: initializationContext.getRawKeyedStateInputs()) {
Assert.assertNotNull(stateStreamProvider);
try (InputStream is = stateStreamProvider.getStream()) {
DataInputView div = new DataInputViewStreamWrapper(is);
int val = div.readInt();
++readKeyGroupCount;
Assert.assertEquals(stateStreamProvider.getKeyGroupId(), val);
}
}
Assert.assertEquals(writtenKeyGroups, readKeyGroupCount);
}
示例21
@Test
public void getOperatorStateStore() throws Exception {
Set<Integer> readStatesCount = new HashSet<>();
for (StatePartitionStreamProvider statePartitionStreamProvider
: initializationContext.getRawOperatorStateInputs()) {
Assert.assertNotNull(statePartitionStreamProvider);
try (InputStream is = statePartitionStreamProvider.getStream()) {
DataInputView div = new DataInputViewStreamWrapper(is);
Assert.assertTrue(readStatesCount.add(div.readInt()));
}
}
Assert.assertEquals(writtenOperatorStates, readStatesCount);
}
示例22
@Test
public void testKeyGroupSerializationAndDeserialization() throws Exception {
ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(8);
DataOutputView outputView = new DataOutputViewStreamWrapper(outputStream);
for (int keyGroupPrefixBytes = 1; keyGroupPrefixBytes <= 2; ++keyGroupPrefixBytes) {
for (int orgKeyGroup = 0; orgKeyGroup < 128; ++orgKeyGroup) {
outputStream.reset();
RocksDBKeySerializationUtils.writeKeyGroup(orgKeyGroup, keyGroupPrefixBytes, outputView);
int deserializedKeyGroup = RocksDBKeySerializationUtils.readKeyGroup(
keyGroupPrefixBytes,
new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(outputStream.toByteArray())));
Assert.assertEquals(orgKeyGroup, deserializedKeyGroup);
}
}
}
示例23
public static void testSerialization(String[] values) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
DataOutputViewStreamWrapper serializer = new DataOutputViewStreamWrapper(baos);
for (String value : values) {
StringValue sv = new StringValue(value);
sv.write(serializer);
}
serializer.close();
baos.close();
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
DataInputViewStreamWrapper deserializer = new DataInputViewStreamWrapper(bais);
int num = 0;
while (bais.available() > 0) {
StringValue deser = new StringValue();
deser.read(deserializer);
assertEquals("DeserializedString differs from original string.", values[num], deser.getValue());
num++;
}
assertEquals("Wrong number of deserialized values", values.length, num);
}
示例24
@Override
public Long deserialize(ConsumerRecord<byte[], byte[]> record) throws IOException {
cnt++;
DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(record.value()));
Long e = ser.deserialize(in);
return e;
}
示例25
@Override
public Long deserialize(ConsumerRecord<byte[], byte[]> record) throws IOException {
cnt++;
DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(record.value()));
Long e = ser.deserialize(in);
return e;
}
示例26
@Override
public Long deserialize(ConsumerRecord<byte[], byte[]> record) throws IOException {
cnt++;
DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(record.value()));
Long e = ser.deserialize(in);
return e;
}
示例27
@Override
public Long deserialize(ConsumerRecord<byte[], byte[]> record) throws IOException {
cnt++;
DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(record.value()));
Long e = ser.deserialize(in);
return e;
}
示例28
@SuppressWarnings("unchecked")
public static <T> List<T> deserializeList(ArrayList<byte[]> data, TypeSerializer<T> serializer)
throws IOException, ClassNotFoundException
{
List<T> result = new ArrayList<T>(data.size());
for (byte[] bytes : data) {
ByteArrayInputStream inStream = new ByteArrayInputStream(bytes);
DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(inStream);
T val = serializer.deserialize(in);
result.add(val);
}
return result;
}
示例29
@Override
public void deserialize(
ConsumerRecord<byte[], byte[]> message,
Collector<Tuple2<Integer, String>> out) throws Exception {
DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message.value()));
Tuple2<Integer, Integer> tuple = ser.deserialize(in);
out.collect(Tuple2.of(tuple.f0, tuple.f1 + "a"));
out.collect(Tuple2.of(tuple.f0, tuple.f1 + "b"));
}
示例30
/**
* Verifies that reconfiguring with a config snapshot of a preceding POJO serializer
* with different POJO type will result in INCOMPATIBLE.
*/
@Test
public void testReconfigureWithDifferentPojoType() throws Exception {
PojoSerializer<SubTestUserClassB> pojoSerializer1 = (PojoSerializer<SubTestUserClassB>)
TypeExtractor.getForClass(SubTestUserClassB.class).createSerializer(new ExecutionConfig());
// snapshot configuration and serialize to bytes
TypeSerializerSnapshot pojoSerializerConfigSnapshot = pojoSerializer1.snapshotConfiguration();
byte[] serializedConfig;
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot, pojoSerializer1);
serializedConfig = out.toByteArray();
}
PojoSerializer<SubTestUserClassA> pojoSerializer2 = (PojoSerializer<SubTestUserClassA>)
TypeExtractor.getForClass(SubTestUserClassA.class).createSerializer(new ExecutionConfig());
// read configuration again from bytes
try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
pojoSerializerConfigSnapshot = TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader(), pojoSerializer2);
}
@SuppressWarnings("unchecked")
TypeSerializerSchemaCompatibility<SubTestUserClassA> compatResult =
pojoSerializerConfigSnapshot.resolveSchemaCompatibility(pojoSerializer2);
assertTrue(compatResult.isIncompatible());
}