Java源码示例:org.apache.beam.sdk.coders.VarIntCoder
示例1
@Test
public void classEqualToDoesNotMatchSubclass() {
class MyPTransform extends PTransform<PCollection<KV<String, Integer>>, PCollection<Integer>> {
@Override
public PCollection<Integer> expand(PCollection<KV<String, Integer>> input) {
return PCollection.createPrimitiveOutputInternal(
input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), VarIntCoder.of());
}
}
PTransformMatcher matcher = PTransformMatchers.classEqualTo(MyPTransform.class);
MyPTransform subclass = new MyPTransform() {};
assertThat(subclass.getClass(), not(Matchers.<Class<?>>equalTo(MyPTransform.class)));
assertThat(subclass, instanceOf(MyPTransform.class));
AppliedPTransform<?, ?, ?> application = getAppliedTransform(subclass);
assertThat(matcher.matches(application), is(false));
}
示例2
/**
* Creates a {@link ReduceFnTester} for the given {@link WindowingStrategy} and {@link CombineFn},
* creating a {@link TriggerStateMachine} from the {@link Trigger} in the {@link
* WindowingStrategy}.
*/
public static <W extends BoundedWindow, AccumT, OutputT>
ReduceFnTester<Integer, OutputT, W> combining(
WindowingStrategy<?, W> strategy,
CombineFn<Integer, AccumT, OutputT> combineFn,
Coder<OutputT> outputCoder)
throws Exception {
CoderRegistry registry = CoderRegistry.createDefault();
// Ensure that the CombineFn can be converted into an AppliedCombineFn
AppliedCombineFn.withInputCoder(
combineFn, registry, KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
return combining(
strategy,
TriggerStateMachines.stateMachineForTrigger(
TriggerTranslation.toProto(strategy.getTrigger())),
combineFn,
outputCoder);
}
示例3
@Override
public PCollection<FileResult<DestinationT>> expand(PCollection<UserT> input) {
List<PCollectionView<?>> shardingSideInputs = Lists.newArrayList(getSideInputs());
if (numShardsView != null) {
shardingSideInputs.add(numShardsView);
}
ShardingFunction<UserT, DestinationT> shardingFunction =
getShardingFunction() == null
? new RandomShardingFunction(destinationCoder)
: getShardingFunction();
return input
.apply(
"ApplyShardingKey",
ParDo.of(new ApplyShardingFunctionFn(shardingFunction, numShardsView))
.withSideInputs(shardingSideInputs))
.setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
.apply("GroupIntoShards", GroupByKey.create())
.apply(
"WriteShardsIntoTempFiles",
ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(getSideInputs()))
.setCoder(fileResultCoder);
}
示例4
@Test
public void singleElementForCollection() {
when(context.getSideInput(COLLECTION_ID))
.thenReturn(
Arrays.asList(WindowedValue.valueInGlobalWindow(KV.<Void, Integer>of(null, 3))));
BatchSideInputHandlerFactory factory =
BatchSideInputHandlerFactory.forStage(EXECUTABLE_STAGE, context);
MultimapSideInputHandler<Void, Integer, GlobalWindow> handler =
factory.forMultimapSideInput(
TRANSFORM_ID,
SIDE_INPUT_NAME,
KvCoder.of(VoidCoder.of(), VarIntCoder.of()),
GlobalWindow.Coder.INSTANCE);
Iterable<Void> keys = handler.get(GlobalWindow.INSTANCE);
assertThat(keys, contains((Void) null));
Iterable<Integer> values = handler.get(null, GlobalWindow.INSTANCE);
assertThat(values, contains(3));
}
示例5
@Test
public void testMapEquality() {
StateTag<?> fooStringVarInt1 = StateTags.map("foo", StringUtf8Coder.of(), VarIntCoder.of());
StateTag<?> fooStringVarInt2 = StateTags.map("foo", StringUtf8Coder.of(), VarIntCoder.of());
StateTag<?> fooStringBigEndian =
StateTags.map("foo", StringUtf8Coder.of(), BigEndianIntegerCoder.of());
StateTag<?> fooVarIntBigEndian =
StateTags.map("foo", VarIntCoder.of(), BigEndianIntegerCoder.of());
StateTag<?> barStringVarInt = StateTags.map("bar", StringUtf8Coder.of(), VarIntCoder.of());
assertEquals(fooStringVarInt1, fooStringVarInt2);
assertEquals(fooStringVarInt1, fooStringBigEndian);
assertEquals(fooStringBigEndian, fooVarIntBigEndian);
assertEquals(fooStringVarInt1, fooVarIntBigEndian);
assertNotEquals(fooStringVarInt1, barStringVarInt);
}
示例6
@Test
public void groupsValuesByKey() {
when(context.getSideInput(COLLECTION_ID))
.thenReturn(
Arrays.asList(
WindowedValue.valueInGlobalWindow(KV.of("foo", 2)),
WindowedValue.valueInGlobalWindow(KV.of("bar", 3)),
WindowedValue.valueInGlobalWindow(KV.of("foo", 5))));
BatchSideInputHandlerFactory factory =
BatchSideInputHandlerFactory.forStage(EXECUTABLE_STAGE, context);
MultimapSideInputHandler<String, Integer, GlobalWindow> handler =
factory.forMultimapSideInput(
TRANSFORM_ID,
SIDE_INPUT_NAME,
KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()),
GlobalWindow.Coder.INSTANCE);
Iterable<String> keys = handler.get(GlobalWindow.INSTANCE);
assertThat(keys, containsInAnyOrder("foo", "bar"));
Iterable<Integer> values = handler.get("foo", GlobalWindow.INSTANCE);
assertThat(values, containsInAnyOrder(2, 5));
}
示例7
private void testViewUnbounded(
Pipeline pipeline,
PTransform<PCollection<KV<String, Integer>>, ? extends PCollectionView<?>> view) {
thrown.expect(IllegalStateException.class);
thrown.expectMessage("Unable to create a side-input view from input");
thrown.expectCause(
ThrowableMessageMatcher.hasMessage(Matchers.containsString("non-bounded PCollection")));
pipeline
.apply(
new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
@Override
public PCollection<KV<String, Integer>> expand(PBegin input) {
return PCollection.createPrimitiveOutputInternal(
input.getPipeline(),
WindowingStrategy.globalDefault(),
PCollection.IsBounded.UNBOUNDED,
KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
}
})
.apply(view);
}
示例8
@Test
public void testTrackSingle() {
options.setRunner(SparkRunner.class);
JavaSparkContext jsc = SparkContextFactory.getSparkContext(options);
JavaStreamingContext jssc =
new JavaStreamingContext(
jsc, new org.apache.spark.streaming.Duration(options.getBatchIntervalMillis()));
Pipeline p = Pipeline.create(options);
CreateStream<Integer> emptyStream =
CreateStream.of(VarIntCoder.of(), Duration.millis(options.getBatchIntervalMillis()))
.emptyBatch();
p.apply(emptyStream).apply(ParDo.of(new PassthroughFn<>()));
p.traverseTopologically(new StreamingSourceTracker(jssc, p, ParDo.MultiOutput.class, 0));
assertThat(StreamingSourceTracker.numAssertions, equalTo(1));
}
示例9
@Test
@Category(ValidatesRunner.class)
public void testReshuffleAfterFixedWindows() {
PCollection<KV<String, Integer>> input =
pipeline
.apply(
Create.of(ARBITRARY_KVS)
.withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(10L))));
PCollection<KV<String, Integer>> output = input.apply(Reshuffle.of());
PAssert.that(output).containsInAnyOrder(ARBITRARY_KVS);
assertEquals(input.getWindowingStrategy(), output.getWindowingStrategy());
pipeline.run();
}
示例10
/**
* Tests that a {@link DoFn} that mutates its input with a good equals() fails in the {@link
* DirectRunner}.
*/
@Test
public void testMutatingInputDoFnError() throws Exception {
Pipeline pipeline = getPipeline();
pipeline
.apply(
Create.of(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6))
.withCoder(ListCoder.of(VarIntCoder.of())))
.apply(
ParDo.of(
new DoFn<List<Integer>, Integer>() {
@ProcessElement
public void processElement(ProcessContext c) {
List<Integer> inputList = c.element();
inputList.set(0, 37);
c.output(12);
}
}));
thrown.expect(IllegalMutationException.class);
thrown.expectMessage("Input");
thrown.expectMessage("must not be mutated");
pipeline.run();
}
示例11
@Test
public void testEncodeDecodeEqual() throws Exception {
Iterable<TimerData> timers =
ImmutableList.of(
TimerData.of(
StateNamespaces.global(),
new Instant(500L),
new Instant(500L),
TimeDomain.EVENT_TIME));
Iterable<WindowedValue<Integer>> elements =
ImmutableList.of(
WindowedValue.valueInGlobalWindow(1),
WindowedValue.valueInGlobalWindow(4),
WindowedValue.valueInGlobalWindow(8));
KeyedWorkItemCoder<String, Integer> coder =
KeyedWorkItemCoder.of(StringUtf8Coder.of(), VarIntCoder.of(), GlobalWindow.Coder.INSTANCE);
CoderProperties.coderDecodeEncodeEqual(coder, KeyedWorkItems.workItem("foo", timers, elements));
CoderProperties.coderDecodeEncodeEqual(coder, KeyedWorkItems.elementsWorkItem("foo", elements));
CoderProperties.coderDecodeEncodeEqual(coder, KeyedWorkItems.timersWorkItem("foo", timers));
}
示例12
@Test
@Category(ValidatesRunner.class)
public void testParDoWithEmptyTaggedOutput() {
TupleTag<String> mainOutputTag = new TupleTag<String>("main") {};
TupleTag<String> additionalOutputTag1 = new TupleTag<String>("additional1") {};
TupleTag<String> additionalOutputTag2 = new TupleTag<String>("additional2") {};
PCollectionTuple outputs =
pipeline
.apply(Create.empty(VarIntCoder.of()))
.apply(
ParDo.of(new TestNoOutputDoFn())
.withOutputTags(
mainOutputTag,
TupleTagList.of(additionalOutputTag1).and(additionalOutputTag2)));
PAssert.that(outputs.get(mainOutputTag)).empty();
PAssert.that(outputs.get(additionalOutputTag1)).empty();
PAssert.that(outputs.get(additionalOutputTag2)).empty();
pipeline.run();
}
示例13
/** Tests that the generated {@link DoFnInvoker} passes the state parameter that it should. */
@Test
public void testDoFnWithState() throws Exception {
ValueState<Integer> mockState = mock(ValueState.class);
final String stateId = "my-state-id-here";
when(mockArgumentProvider.state(stateId, false)).thenReturn(mockState);
class MockFn extends DoFn<String, String> {
@StateId(stateId)
private final StateSpec<ValueState<Integer>> spec = StateSpecs.value(VarIntCoder.of());
@ProcessElement
public void processElement(ProcessContext c, @StateId(stateId) ValueState<Integer> valueState)
throws Exception {}
}
MockFn fn = mock(MockFn.class);
assertEquals(stop(), invokeProcessElement(fn));
verify(fn).processElement(mockProcessContext, mockState);
}
示例14
@Test
public void testStateIdNonFinal() throws Exception {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("State declarations must be final");
thrown.expectMessage("Non-final field");
thrown.expectMessage("myfield");
thrown.expectMessage(not(mentionsTimers()));
DoFnSignatures.getSignature(
new DoFn<KV<String, Integer>, Long>() {
@StateId("my-id")
private StateSpec<ValueState<Integer>> myfield = StateSpecs.value(VarIntCoder.of());
@ProcessElement
public void foo(ProcessContext context) {}
}.getClass());
}
示例15
public static <W extends BoundedWindow, AccumT, OutputT>
ReduceFnTester<Integer, OutputT, W> combining(
WindowingStrategy<?, W> strategy,
CombineFnWithContext<Integer, AccumT, OutputT> combineFn,
Coder<OutputT> outputCoder,
PipelineOptions options,
SideInputReader sideInputReader)
throws Exception {
CoderRegistry registry = CoderRegistry.createDefault();
// Ensure that the CombineFn can be converted into an AppliedCombineFn
AppliedCombineFn.withInputCoder(
combineFn, registry, KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
return combining(
strategy,
TriggerStateMachines.stateMachineForTrigger(
TriggerTranslation.toProto(strategy.getTrigger())),
combineFn,
outputCoder,
options,
sideInputReader);
}
示例16
@Test
public void emptyFlattenWithNonEmptyFlatten() {
AppliedPTransform application =
AppliedPTransform.of(
"Flatten",
Collections.singletonMap(
new TupleTag<Integer>(),
PCollection.createPrimitiveOutputInternal(
p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
Collections.singletonMap(
new TupleTag<Integer>(),
PCollection.createPrimitiveOutputInternal(
p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
Flatten.pCollections(),
p);
assertThat(PTransformMatchers.emptyFlatten().matches(application), is(false));
}
示例17
@Test
public void testSimpleStateIdNamedDoFn() throws Exception {
class DoFnForTestSimpleStateIdNamedDoFn extends DoFn<KV<String, Integer>, Long> {
@StateId("foo")
private final StateSpec<ValueState<Integer>> bizzle = StateSpecs.value(VarIntCoder.of());
@ProcessElement
public void foo(ProcessContext context) {}
}
// Test classes at the bottom of the file
DoFnSignature sig = DoFnSignatures.signatureForDoFn(new DoFnForTestSimpleStateIdNamedDoFn());
assertThat(sig.stateDeclarations().size(), equalTo(1));
DoFnSignature.StateDeclaration decl = sig.stateDeclarations().get("foo");
assertThat(decl.id(), equalTo("foo"));
assertThat(
decl.field(), equalTo(DoFnForTestSimpleStateIdNamedDoFn.class.getDeclaredField("bizzle")));
assertThat(
decl.stateType(),
Matchers.<TypeDescriptor<?>>equalTo(new TypeDescriptor<ValueState<Integer>>() {}));
}
示例18
@Override
public PCollection<KV<Integer, Iterable<KV<KV<K, W>, WindowedValue<V>>>>> expand(
PCollection<KV<K, V>> input) {
@SuppressWarnings("unchecked")
Coder<W> windowCoder = (Coder<W>) input.getWindowingStrategy().getWindowFn().windowCoder();
@SuppressWarnings("unchecked")
KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder();
PCollection<KV<Integer, KV<KV<K, W>, WindowedValue<V>>>> keyedByHash;
keyedByHash =
input.apply(ParDo.of(new GroupByKeyHashAndSortByKeyAndWindowDoFn<K, V, W>(coder)));
keyedByHash.setCoder(
KvCoder.of(
VarIntCoder.of(),
KvCoder.of(
KvCoder.of(inputCoder.getKeyCoder(), windowCoder),
FullWindowedValueCoder.of(inputCoder.getValueCoder(), windowCoder))));
return keyedByHash.apply(new GroupByKeyAndSortValuesOnly<>());
}
示例19
private void testViewUnbounded(
Pipeline pipeline,
PTransform<PCollection<KV<String, Integer>>, ? extends PCollectionView<?>> view) {
thrown.expect(IllegalStateException.class);
thrown.expectMessage("Unable to create a side-input view from input");
thrown.expectCause(
ThrowableMessageMatcher.hasMessage(Matchers.containsString("non-bounded PCollection")));
pipeline
.apply(
new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
@Override
public PCollection<KV<String, Integer>> expand(PBegin input) {
return PCollection.createPrimitiveOutputInternal(
input.getPipeline(),
WindowingStrategy.globalDefault(),
PCollection.IsBounded.UNBOUNDED,
KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
}
})
.apply(view);
}
示例20
/**
* Tests that when a processing time timer comes in after a window is expired it is just ignored.
*/
@Test
public void testLateProcessingTimeTimer() throws Exception {
WindowingStrategy<?, IntervalWindow> strategy =
WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100)))
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
.withAllowedLateness(Duration.ZERO)
.withTrigger(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10))));
ReduceFnTester<Integer, Integer, IntervalWindow> tester =
ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
tester.advanceProcessingTime(new Instant(5000));
injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100
injectElement(tester, 5);
// After this advancement, the window is expired and only the GC process
// should be allowed to touch it
tester.advanceInputWatermarkNoTimers(new Instant(100));
// This should not output
tester.advanceProcessingTime(new Instant(6000));
assertThat(tester.extractOutput(), emptyIterable());
}
示例21
@Test
public void testDisplayData() {
double eps = 0.01;
double conf = 0.8;
int width = (int) Math.ceil(2 / eps);
int depth = (int) Math.ceil(-Math.log(1 - conf) / Math.log(2));
final CountMinSketchFn<Integer> fn =
CountMinSketchFn.create(VarIntCoder.of()).withAccuracy(eps, conf);
assertThat(DisplayData.from(fn), hasDisplayItem("width", width));
assertThat(DisplayData.from(fn), hasDisplayItem("depth", depth));
assertThat(DisplayData.from(fn), hasDisplayItem("eps", eps));
assertThat(DisplayData.from(fn), hasDisplayItem("conf", conf));
}
示例22
@Test
@SuppressWarnings("unchecked")
public void testEnsureStateCleanupWithKeyedInput() throws Exception {
TupleTag<Integer> mainOutput = new TupleTag<>("main-output");
DoFnOperator.MultiOutputOutputManagerFactory<Integer> outputManagerFactory =
new DoFnOperator.MultiOutputOutputManagerFactory(mainOutput, VarIntCoder.of());
VarIntCoder keyCoder = VarIntCoder.of();
ExecutableStageDoFnOperator<Integer, Integer> operator =
getOperator(
mainOutput,
Collections.emptyList(),
outputManagerFactory,
WindowingStrategy.globalDefault(),
keyCoder,
WindowedValue.getFullCoder(keyCoder, GlobalWindow.Coder.INSTANCE));
KeyedOneInputStreamOperatorTestHarness<Integer, WindowedValue<Integer>, WindowedValue<Integer>>
testHarness =
new KeyedOneInputStreamOperatorTestHarness(
operator, val -> val, new CoderTypeInformation<>(keyCoder));
RemoteBundle bundle = Mockito.mock(RemoteBundle.class);
when(bundle.getInputReceivers())
.thenReturn(
ImmutableMap.<String, FnDataReceiver<WindowedValue>>builder()
.put("input", Mockito.mock(FnDataReceiver.class))
.build());
when(stageBundleFactory.getBundle(any(), any(), any(), any())).thenReturn(bundle);
testHarness.open();
Object doFnRunner = Whitebox.getInternalState(operator, "doFnRunner");
assertThat(doFnRunner, instanceOf(DoFnRunnerWithMetricsUpdate.class));
// There should be a StatefulDoFnRunner installed which takes care of clearing state
Object statefulDoFnRunner = Whitebox.getInternalState(doFnRunner, "delegate");
assertThat(statefulDoFnRunner, instanceOf(StatefulDoFnRunner.class));
}
示例23
@Test
public void keyedInputWithKeyPreserving() {
PCollection<KV<String, WindowedValue<KV<String, Integer>>>> input =
p.apply(
Create.of(
KV.of(
"hello",
WindowedValue.of(
KV.of("hello", 3),
new Instant(0),
new IntervalWindow(new Instant(0), new Instant(9)),
PaneInfo.NO_FIRING)))
.withCoder(
KvCoder.of(
StringUtf8Coder.of(),
WindowedValue.getValueOnlyCoder(
KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())))));
TupleTag<KeyedWorkItem<String, KV<String, Integer>>> keyedTag = new TupleTag<>();
PCollection<KeyedWorkItem<String, KV<String, Integer>>> keyed =
input
.apply(new DirectGroupByKeyOnly<>())
.apply(
new DirectGroupAlsoByWindow<>(
WindowingStrategy.globalDefault(), WindowingStrategy.globalDefault()))
.apply(
ParDo.of(new ParDoMultiOverrideFactory.ToKeyedWorkItem<String, Integer>())
.withOutputTags(keyedTag, TupleTagList.empty()))
.get(keyedTag)
.setCoder(
KeyedWorkItemCoder.of(
StringUtf8Coder.of(),
KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()),
GlobalWindow.Coder.INSTANCE));
p.traverseTopologically(visitor);
assertThat(visitor.getKeyedPValues(), hasItem(keyed));
}
示例24
@Category(StreamingTest.class)
@Test
public void testInStreamingMode() throws Exception {
Instant instant = new Instant(0);
CreateStream<Integer> source =
CreateStream.of(VarIntCoder.of(), batchDuration())
.emptyBatch()
.advanceWatermarkForNextBatch(instant)
.nextBatch(
TimestampedValue.of(1, instant),
TimestampedValue.of(2, instant),
TimestampedValue.of(3, instant))
.advanceWatermarkForNextBatch(instant.plus(Duration.standardSeconds(1L)))
.nextBatch(
TimestampedValue.of(4, instant.plus(Duration.standardSeconds(1L))),
TimestampedValue.of(5, instant.plus(Duration.standardSeconds(1L))),
TimestampedValue.of(6, instant.plus(Duration.standardSeconds(1L))))
.advanceNextBatchWatermarkToInfinity();
pipeline
.apply(source)
.apply(
Window.<Integer>into(FixedWindows.of(Duration.standardSeconds(3L)))
.withAllowedLateness(Duration.ZERO))
.apply(ParDo.of(new CountingDoFn()));
pipeline.run();
// give metrics pusher time to push
Thread.sleep(
(pipeline.getOptions().as(MetricsOptions.class).getMetricsPushPeriod() + 1L) * 1000);
assertThat(TestMetricsSink.getCounterValue(COUNTER_NAME), is(6L));
}
示例25
@Test
@Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
public void testMultimapSideInputWithNonDeterministicKeyCoder() {
final PCollectionView<Map<String, Iterable<Integer>>> view =
pipeline
.apply(
"CreateSideInput",
Create.of(KV.of("a", 1), KV.of("a", 1), KV.of("a", 2), KV.of("b", 3))
.withCoder(KvCoder.of(new NonDeterministicStringCoder(), VarIntCoder.of())))
.apply(View.asMultimap());
PCollection<KV<String, Integer>> output =
pipeline
.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<String, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
for (Integer v : c.sideInput(view).get(c.element().substring(0, 1))) {
c.output(KV.of(c.element(), v));
}
}
})
.withSideInputs(view));
PAssert.that(output)
.containsInAnyOrder(
KV.of("apple", 1),
KV.of("apple", 1),
KV.of("apple", 2),
KV.of("banana", 3),
KV.of("blackberry", 3));
pipeline.run();
}
示例26
@Test
@Category(ValidatesRunner.class)
public void testEmptyMultimapSideInput() throws Exception {
final PCollectionView<Map<String, Iterable<Integer>>> view =
pipeline
.apply(
"CreateEmptyView", Create.empty(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())))
.apply(View.asMultimap());
PCollection<Integer> results =
pipeline
.apply("Create1", Create.of(1))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<Integer, Integer>() {
@ProcessElement
public void processElement(ProcessContext c) {
assertTrue(c.sideInput(view).isEmpty());
assertTrue(c.sideInput(view).entrySet().isEmpty());
assertFalse(c.sideInput(view).entrySet().iterator().hasNext());
c.output(c.element());
}
})
.withSideInputs(view));
// Pass at least one value through to guarantee that DoFn executes.
PAssert.that(results).containsInAnyOrder(1);
pipeline.run();
}
示例27
@Test
@Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
public void testEmptyMultimapSideInputWithNonDeterministicKeyCoder() throws Exception {
final PCollectionView<Map<String, Iterable<Integer>>> view =
pipeline
.apply(
"CreateEmptyView",
Create.empty(KvCoder.of(new NonDeterministicStringCoder(), VarIntCoder.of())))
.apply(View.asMultimap());
PCollection<Integer> results =
pipeline
.apply("Create1", Create.of(1))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<Integer, Integer>() {
@ProcessElement
public void processElement(ProcessContext c) {
assertTrue(c.sideInput(view).isEmpty());
assertTrue(c.sideInput(view).entrySet().isEmpty());
assertFalse(c.sideInput(view).entrySet().iterator().hasNext());
c.output(c.element());
}
})
.withSideInputs(view));
// Pass at least one value through to guarantee that DoFn executes.
PAssert.that(results).containsInAnyOrder(1);
pipeline.run();
}
示例28
@Test
@Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
public void testMapSideInputWithNonDeterministicKeyCoder() {
final PCollectionView<Map<String, Integer>> view =
pipeline
.apply(
"CreateSideInput",
Create.of(KV.of("a", 1), KV.of("b", 3))
.withCoder(KvCoder.of(new NonDeterministicStringCoder(), VarIntCoder.of())))
.apply(View.asMap());
PCollection<KV<String, Integer>> output =
pipeline
.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<String, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(
KV.of(
c.element(),
c.sideInput(view).get(c.element().substring(0, 1))));
}
})
.withSideInputs(view));
PAssert.that(output)
.containsInAnyOrder(KV.of("apple", 1), KV.of("banana", 3), KV.of("blackberry", 3));
pipeline.run();
}
示例29
@Test
@Category(ValidatesRunner.class)
public void testEmptyMapSideInput() throws Exception {
final PCollectionView<Map<String, Integer>> view =
pipeline
.apply(
"CreateEmptyView", Create.empty(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())))
.apply(View.asMap());
PCollection<Integer> results =
pipeline
.apply("Create1", Create.of(1))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<Integer, Integer>() {
@ProcessElement
public void processElement(ProcessContext c) {
assertTrue(c.sideInput(view).isEmpty());
assertTrue(c.sideInput(view).entrySet().isEmpty());
assertFalse(c.sideInput(view).entrySet().iterator().hasNext());
c.output(c.element());
}
})
.withSideInputs(view));
// Pass at least one value through to guarantee that DoFn executes.
PAssert.that(results).containsInAnyOrder(1);
pipeline.run();
}
示例30
@Test
@Category(NeedsRunner.class)
public void testMapSideInputWithNullValuesCatchesDuplicates() {
final PCollectionView<Map<String, Integer>> view =
pipeline
.apply(
"CreateSideInput",
Create.of(KV.of("a", (Integer) null), KV.of("a", (Integer) null))
.withCoder(
KvCoder.of(StringUtf8Coder.of(), NullableCoder.of(VarIntCoder.of()))))
.apply(View.asMap());
PCollection<KV<String, Integer>> output =
pipeline
.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<String, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(
KV.of(
c.element(),
c.sideInput(view)
.getOrDefault(c.element().substring(0, 1), 0)));
}
})
.withSideInputs(view));
PAssert.that(output)
.containsInAnyOrder(KV.of("apple", 1), KV.of("banana", 3), KV.of("blackberry", 3));
// PipelineExecutionException is thrown with cause having a message stating that a
// duplicate is not allowed.
thrown.expectCause(
ThrowableMessageMatcher.hasMessage(Matchers.containsString("Duplicate values for a")));
pipeline.run();
}