Java源码示例:org.apache.beam.sdk.coders.VarIntCoder

示例1
@Test
public void classEqualToDoesNotMatchSubclass() {
  class MyPTransform extends PTransform<PCollection<KV<String, Integer>>, PCollection<Integer>> {
    @Override
    public PCollection<Integer> expand(PCollection<KV<String, Integer>> input) {
      return PCollection.createPrimitiveOutputInternal(
          input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), VarIntCoder.of());
    }
  }

  PTransformMatcher matcher = PTransformMatchers.classEqualTo(MyPTransform.class);
  MyPTransform subclass = new MyPTransform() {};

  assertThat(subclass.getClass(), not(Matchers.<Class<?>>equalTo(MyPTransform.class)));
  assertThat(subclass, instanceOf(MyPTransform.class));

  AppliedPTransform<?, ?, ?> application = getAppliedTransform(subclass);

  assertThat(matcher.matches(application), is(false));
}
 
示例2
/**
 * Creates a {@link ReduceFnTester} for the given {@link WindowingStrategy} and {@link CombineFn},
 * creating a {@link TriggerStateMachine} from the {@link Trigger} in the {@link
 * WindowingStrategy}.
 */
public static <W extends BoundedWindow, AccumT, OutputT>
    ReduceFnTester<Integer, OutputT, W> combining(
        WindowingStrategy<?, W> strategy,
        CombineFn<Integer, AccumT, OutputT> combineFn,
        Coder<OutputT> outputCoder)
        throws Exception {

  CoderRegistry registry = CoderRegistry.createDefault();
  // Ensure that the CombineFn can be converted into an AppliedCombineFn
  AppliedCombineFn.withInputCoder(
      combineFn, registry, KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));

  return combining(
      strategy,
      TriggerStateMachines.stateMachineForTrigger(
          TriggerTranslation.toProto(strategy.getTrigger())),
      combineFn,
      outputCoder);
}
 
示例3
@Override
public PCollection<FileResult<DestinationT>> expand(PCollection<UserT> input) {
  List<PCollectionView<?>> shardingSideInputs = Lists.newArrayList(getSideInputs());
  if (numShardsView != null) {
    shardingSideInputs.add(numShardsView);
  }

  ShardingFunction<UserT, DestinationT> shardingFunction =
      getShardingFunction() == null
          ? new RandomShardingFunction(destinationCoder)
          : getShardingFunction();

  return input
      .apply(
          "ApplyShardingKey",
          ParDo.of(new ApplyShardingFunctionFn(shardingFunction, numShardsView))
              .withSideInputs(shardingSideInputs))
      .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
      .apply("GroupIntoShards", GroupByKey.create())
      .apply(
          "WriteShardsIntoTempFiles",
          ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(getSideInputs()))
      .setCoder(fileResultCoder);
}
 
示例4
@Test
public void singleElementForCollection() {
  when(context.getSideInput(COLLECTION_ID))
      .thenReturn(
          Arrays.asList(WindowedValue.valueInGlobalWindow(KV.<Void, Integer>of(null, 3))));

  BatchSideInputHandlerFactory factory =
      BatchSideInputHandlerFactory.forStage(EXECUTABLE_STAGE, context);
  MultimapSideInputHandler<Void, Integer, GlobalWindow> handler =
      factory.forMultimapSideInput(
          TRANSFORM_ID,
          SIDE_INPUT_NAME,
          KvCoder.of(VoidCoder.of(), VarIntCoder.of()),
          GlobalWindow.Coder.INSTANCE);
  Iterable<Void> keys = handler.get(GlobalWindow.INSTANCE);
  assertThat(keys, contains((Void) null));
  Iterable<Integer> values = handler.get(null, GlobalWindow.INSTANCE);
  assertThat(values, contains(3));
}
 
示例5
@Test
public void testMapEquality() {
  StateTag<?> fooStringVarInt1 = StateTags.map("foo", StringUtf8Coder.of(), VarIntCoder.of());
  StateTag<?> fooStringVarInt2 = StateTags.map("foo", StringUtf8Coder.of(), VarIntCoder.of());
  StateTag<?> fooStringBigEndian =
      StateTags.map("foo", StringUtf8Coder.of(), BigEndianIntegerCoder.of());
  StateTag<?> fooVarIntBigEndian =
      StateTags.map("foo", VarIntCoder.of(), BigEndianIntegerCoder.of());
  StateTag<?> barStringVarInt = StateTags.map("bar", StringUtf8Coder.of(), VarIntCoder.of());

  assertEquals(fooStringVarInt1, fooStringVarInt2);
  assertEquals(fooStringVarInt1, fooStringBigEndian);
  assertEquals(fooStringBigEndian, fooVarIntBigEndian);
  assertEquals(fooStringVarInt1, fooVarIntBigEndian);
  assertNotEquals(fooStringVarInt1, barStringVarInt);
}
 
示例6
@Test
public void groupsValuesByKey() {
  when(context.getSideInput(COLLECTION_ID))
      .thenReturn(
          Arrays.asList(
              WindowedValue.valueInGlobalWindow(KV.of("foo", 2)),
              WindowedValue.valueInGlobalWindow(KV.of("bar", 3)),
              WindowedValue.valueInGlobalWindow(KV.of("foo", 5))));

  BatchSideInputHandlerFactory factory =
      BatchSideInputHandlerFactory.forStage(EXECUTABLE_STAGE, context);
  MultimapSideInputHandler<String, Integer, GlobalWindow> handler =
      factory.forMultimapSideInput(
          TRANSFORM_ID,
          SIDE_INPUT_NAME,
          KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()),
          GlobalWindow.Coder.INSTANCE);
  Iterable<String> keys = handler.get(GlobalWindow.INSTANCE);
  assertThat(keys, containsInAnyOrder("foo", "bar"));
  Iterable<Integer> values = handler.get("foo", GlobalWindow.INSTANCE);
  assertThat(values, containsInAnyOrder(2, 5));
}
 
示例7
private void testViewUnbounded(
    Pipeline pipeline,
    PTransform<PCollection<KV<String, Integer>>, ? extends PCollectionView<?>> view) {
  thrown.expect(IllegalStateException.class);
  thrown.expectMessage("Unable to create a side-input view from input");
  thrown.expectCause(
      ThrowableMessageMatcher.hasMessage(Matchers.containsString("non-bounded PCollection")));
  pipeline
      .apply(
          new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
            @Override
            public PCollection<KV<String, Integer>> expand(PBegin input) {
              return PCollection.createPrimitiveOutputInternal(
                  input.getPipeline(),
                  WindowingStrategy.globalDefault(),
                  PCollection.IsBounded.UNBOUNDED,
                  KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
            }
          })
      .apply(view);
}
 
示例8
@Test
public void testTrackSingle() {
  options.setRunner(SparkRunner.class);
  JavaSparkContext jsc = SparkContextFactory.getSparkContext(options);
  JavaStreamingContext jssc =
      new JavaStreamingContext(
          jsc, new org.apache.spark.streaming.Duration(options.getBatchIntervalMillis()));

  Pipeline p = Pipeline.create(options);

  CreateStream<Integer> emptyStream =
      CreateStream.of(VarIntCoder.of(), Duration.millis(options.getBatchIntervalMillis()))
          .emptyBatch();

  p.apply(emptyStream).apply(ParDo.of(new PassthroughFn<>()));

  p.traverseTopologically(new StreamingSourceTracker(jssc, p, ParDo.MultiOutput.class, 0));
  assertThat(StreamingSourceTracker.numAssertions, equalTo(1));
}
 
示例9
@Test
@Category(ValidatesRunner.class)
public void testReshuffleAfterFixedWindows() {

  PCollection<KV<String, Integer>> input =
      pipeline
          .apply(
              Create.of(ARBITRARY_KVS)
                  .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())))
          .apply(Window.into(FixedWindows.of(Duration.standardMinutes(10L))));

  PCollection<KV<String, Integer>> output = input.apply(Reshuffle.of());

  PAssert.that(output).containsInAnyOrder(ARBITRARY_KVS);

  assertEquals(input.getWindowingStrategy(), output.getWindowingStrategy());

  pipeline.run();
}
 
示例10
/**
 * Tests that a {@link DoFn} that mutates its input with a good equals() fails in the {@link
 * DirectRunner}.
 */
@Test
public void testMutatingInputDoFnError() throws Exception {
  Pipeline pipeline = getPipeline();

  pipeline
      .apply(
          Create.of(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6))
              .withCoder(ListCoder.of(VarIntCoder.of())))
      .apply(
          ParDo.of(
              new DoFn<List<Integer>, Integer>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                  List<Integer> inputList = c.element();
                  inputList.set(0, 37);
                  c.output(12);
                }
              }));

  thrown.expect(IllegalMutationException.class);
  thrown.expectMessage("Input");
  thrown.expectMessage("must not be mutated");
  pipeline.run();
}
 
示例11
@Test
public void testEncodeDecodeEqual() throws Exception {
  Iterable<TimerData> timers =
      ImmutableList.of(
          TimerData.of(
              StateNamespaces.global(),
              new Instant(500L),
              new Instant(500L),
              TimeDomain.EVENT_TIME));
  Iterable<WindowedValue<Integer>> elements =
      ImmutableList.of(
          WindowedValue.valueInGlobalWindow(1),
          WindowedValue.valueInGlobalWindow(4),
          WindowedValue.valueInGlobalWindow(8));

  KeyedWorkItemCoder<String, Integer> coder =
      KeyedWorkItemCoder.of(StringUtf8Coder.of(), VarIntCoder.of(), GlobalWindow.Coder.INSTANCE);

  CoderProperties.coderDecodeEncodeEqual(coder, KeyedWorkItems.workItem("foo", timers, elements));
  CoderProperties.coderDecodeEncodeEqual(coder, KeyedWorkItems.elementsWorkItem("foo", elements));
  CoderProperties.coderDecodeEncodeEqual(coder, KeyedWorkItems.timersWorkItem("foo", timers));
}
 
示例12
@Test
@Category(ValidatesRunner.class)
public void testParDoWithEmptyTaggedOutput() {
  TupleTag<String> mainOutputTag = new TupleTag<String>("main") {};
  TupleTag<String> additionalOutputTag1 = new TupleTag<String>("additional1") {};
  TupleTag<String> additionalOutputTag2 = new TupleTag<String>("additional2") {};

  PCollectionTuple outputs =
      pipeline
          .apply(Create.empty(VarIntCoder.of()))
          .apply(
              ParDo.of(new TestNoOutputDoFn())
                  .withOutputTags(
                      mainOutputTag,
                      TupleTagList.of(additionalOutputTag1).and(additionalOutputTag2)));

  PAssert.that(outputs.get(mainOutputTag)).empty();

  PAssert.that(outputs.get(additionalOutputTag1)).empty();
  PAssert.that(outputs.get(additionalOutputTag2)).empty();

  pipeline.run();
}
 
示例13
/** Tests that the generated {@link DoFnInvoker} passes the state parameter that it should. */
@Test
public void testDoFnWithState() throws Exception {
  ValueState<Integer> mockState = mock(ValueState.class);
  final String stateId = "my-state-id-here";
  when(mockArgumentProvider.state(stateId, false)).thenReturn(mockState);

  class MockFn extends DoFn<String, String> {
    @StateId(stateId)
    private final StateSpec<ValueState<Integer>> spec = StateSpecs.value(VarIntCoder.of());

    @ProcessElement
    public void processElement(ProcessContext c, @StateId(stateId) ValueState<Integer> valueState)
        throws Exception {}
  }

  MockFn fn = mock(MockFn.class);
  assertEquals(stop(), invokeProcessElement(fn));
  verify(fn).processElement(mockProcessContext, mockState);
}
 
示例14
@Test
public void testStateIdNonFinal() throws Exception {
  thrown.expect(IllegalArgumentException.class);
  thrown.expectMessage("State declarations must be final");
  thrown.expectMessage("Non-final field");
  thrown.expectMessage("myfield");
  thrown.expectMessage(not(mentionsTimers()));
  DoFnSignatures.getSignature(
      new DoFn<KV<String, Integer>, Long>() {
        @StateId("my-id")
        private StateSpec<ValueState<Integer>> myfield = StateSpecs.value(VarIntCoder.of());

        @ProcessElement
        public void foo(ProcessContext context) {}
      }.getClass());
}
 
示例15
public static <W extends BoundedWindow, AccumT, OutputT>
    ReduceFnTester<Integer, OutputT, W> combining(
        WindowingStrategy<?, W> strategy,
        CombineFnWithContext<Integer, AccumT, OutputT> combineFn,
        Coder<OutputT> outputCoder,
        PipelineOptions options,
        SideInputReader sideInputReader)
        throws Exception {
  CoderRegistry registry = CoderRegistry.createDefault();
  // Ensure that the CombineFn can be converted into an AppliedCombineFn
  AppliedCombineFn.withInputCoder(
      combineFn, registry, KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));

  return combining(
      strategy,
      TriggerStateMachines.stateMachineForTrigger(
          TriggerTranslation.toProto(strategy.getTrigger())),
      combineFn,
      outputCoder,
      options,
      sideInputReader);
}
 
示例16
@Test
public void emptyFlattenWithNonEmptyFlatten() {
  AppliedPTransform application =
      AppliedPTransform.of(
          "Flatten",
          Collections.singletonMap(
              new TupleTag<Integer>(),
              PCollection.createPrimitiveOutputInternal(
                  p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
          Collections.singletonMap(
              new TupleTag<Integer>(),
              PCollection.createPrimitiveOutputInternal(
                  p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
          Flatten.pCollections(),
          p);

  assertThat(PTransformMatchers.emptyFlatten().matches(application), is(false));
}
 
示例17
@Test
public void testSimpleStateIdNamedDoFn() throws Exception {
  class DoFnForTestSimpleStateIdNamedDoFn extends DoFn<KV<String, Integer>, Long> {
    @StateId("foo")
    private final StateSpec<ValueState<Integer>> bizzle = StateSpecs.value(VarIntCoder.of());

    @ProcessElement
    public void foo(ProcessContext context) {}
  }

  // Test classes at the bottom of the file
  DoFnSignature sig = DoFnSignatures.signatureForDoFn(new DoFnForTestSimpleStateIdNamedDoFn());

  assertThat(sig.stateDeclarations().size(), equalTo(1));
  DoFnSignature.StateDeclaration decl = sig.stateDeclarations().get("foo");

  assertThat(decl.id(), equalTo("foo"));
  assertThat(
      decl.field(), equalTo(DoFnForTestSimpleStateIdNamedDoFn.class.getDeclaredField("bizzle")));
  assertThat(
      decl.stateType(),
      Matchers.<TypeDescriptor<?>>equalTo(new TypeDescriptor<ValueState<Integer>>() {}));
}
 
示例18
@Override
public PCollection<KV<Integer, Iterable<KV<KV<K, W>, WindowedValue<V>>>>> expand(
    PCollection<KV<K, V>> input) {

  @SuppressWarnings("unchecked")
  Coder<W> windowCoder = (Coder<W>) input.getWindowingStrategy().getWindowFn().windowCoder();
  @SuppressWarnings("unchecked")
  KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder();

  PCollection<KV<Integer, KV<KV<K, W>, WindowedValue<V>>>> keyedByHash;
  keyedByHash =
      input.apply(ParDo.of(new GroupByKeyHashAndSortByKeyAndWindowDoFn<K, V, W>(coder)));
  keyedByHash.setCoder(
      KvCoder.of(
          VarIntCoder.of(),
          KvCoder.of(
              KvCoder.of(inputCoder.getKeyCoder(), windowCoder),
              FullWindowedValueCoder.of(inputCoder.getValueCoder(), windowCoder))));

  return keyedByHash.apply(new GroupByKeyAndSortValuesOnly<>());
}
 
示例19
private void testViewUnbounded(
    Pipeline pipeline,
    PTransform<PCollection<KV<String, Integer>>, ? extends PCollectionView<?>> view) {
  thrown.expect(IllegalStateException.class);
  thrown.expectMessage("Unable to create a side-input view from input");
  thrown.expectCause(
      ThrowableMessageMatcher.hasMessage(Matchers.containsString("non-bounded PCollection")));
  pipeline
      .apply(
          new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
            @Override
            public PCollection<KV<String, Integer>> expand(PBegin input) {
              return PCollection.createPrimitiveOutputInternal(
                  input.getPipeline(),
                  WindowingStrategy.globalDefault(),
                  PCollection.IsBounded.UNBOUNDED,
                  KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
            }
          })
      .apply(view);
}
 
示例20
/**
 * Tests that when a processing time timer comes in after a window is expired it is just ignored.
 */
@Test
public void testLateProcessingTimeTimer() throws Exception {
  WindowingStrategy<?, IntervalWindow> strategy =
      WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100)))
          .withTimestampCombiner(TimestampCombiner.EARLIEST)
          .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
          .withAllowedLateness(Duration.ZERO)
          .withTrigger(
              Repeatedly.forever(
                  AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10))));

  ReduceFnTester<Integer, Integer, IntervalWindow> tester =
      ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());

  tester.advanceProcessingTime(new Instant(5000));
  injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100
  injectElement(tester, 5);

  // After this advancement, the window is expired and only the GC process
  // should be allowed to touch it
  tester.advanceInputWatermarkNoTimers(new Instant(100));

  // This should not output
  tester.advanceProcessingTime(new Instant(6000));

  assertThat(tester.extractOutput(), emptyIterable());
}
 
示例21
@Test
public void testDisplayData() {
  double eps = 0.01;
  double conf = 0.8;
  int width = (int) Math.ceil(2 / eps);
  int depth = (int) Math.ceil(-Math.log(1 - conf) / Math.log(2));

  final CountMinSketchFn<Integer> fn =
      CountMinSketchFn.create(VarIntCoder.of()).withAccuracy(eps, conf);

  assertThat(DisplayData.from(fn), hasDisplayItem("width", width));
  assertThat(DisplayData.from(fn), hasDisplayItem("depth", depth));
  assertThat(DisplayData.from(fn), hasDisplayItem("eps", eps));
  assertThat(DisplayData.from(fn), hasDisplayItem("conf", conf));
}
 
示例22
@Test
@SuppressWarnings("unchecked")
public void testEnsureStateCleanupWithKeyedInput() throws Exception {
  TupleTag<Integer> mainOutput = new TupleTag<>("main-output");
  DoFnOperator.MultiOutputOutputManagerFactory<Integer> outputManagerFactory =
      new DoFnOperator.MultiOutputOutputManagerFactory(mainOutput, VarIntCoder.of());
  VarIntCoder keyCoder = VarIntCoder.of();
  ExecutableStageDoFnOperator<Integer, Integer> operator =
      getOperator(
          mainOutput,
          Collections.emptyList(),
          outputManagerFactory,
          WindowingStrategy.globalDefault(),
          keyCoder,
          WindowedValue.getFullCoder(keyCoder, GlobalWindow.Coder.INSTANCE));

  KeyedOneInputStreamOperatorTestHarness<Integer, WindowedValue<Integer>, WindowedValue<Integer>>
      testHarness =
          new KeyedOneInputStreamOperatorTestHarness(
              operator, val -> val, new CoderTypeInformation<>(keyCoder));

  RemoteBundle bundle = Mockito.mock(RemoteBundle.class);
  when(bundle.getInputReceivers())
      .thenReturn(
          ImmutableMap.<String, FnDataReceiver<WindowedValue>>builder()
              .put("input", Mockito.mock(FnDataReceiver.class))
              .build());
  when(stageBundleFactory.getBundle(any(), any(), any(), any())).thenReturn(bundle);

  testHarness.open();

  Object doFnRunner = Whitebox.getInternalState(operator, "doFnRunner");
  assertThat(doFnRunner, instanceOf(DoFnRunnerWithMetricsUpdate.class));

  // There should be a StatefulDoFnRunner installed which takes care of clearing state
  Object statefulDoFnRunner = Whitebox.getInternalState(doFnRunner, "delegate");
  assertThat(statefulDoFnRunner, instanceOf(StatefulDoFnRunner.class));
}
 
示例23
@Test
public void keyedInputWithKeyPreserving() {

  PCollection<KV<String, WindowedValue<KV<String, Integer>>>> input =
      p.apply(
          Create.of(
                  KV.of(
                      "hello",
                      WindowedValue.of(
                          KV.of("hello", 3),
                          new Instant(0),
                          new IntervalWindow(new Instant(0), new Instant(9)),
                          PaneInfo.NO_FIRING)))
              .withCoder(
                  KvCoder.of(
                      StringUtf8Coder.of(),
                      WindowedValue.getValueOnlyCoder(
                          KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())))));

  TupleTag<KeyedWorkItem<String, KV<String, Integer>>> keyedTag = new TupleTag<>();
  PCollection<KeyedWorkItem<String, KV<String, Integer>>> keyed =
      input
          .apply(new DirectGroupByKeyOnly<>())
          .apply(
              new DirectGroupAlsoByWindow<>(
                  WindowingStrategy.globalDefault(), WindowingStrategy.globalDefault()))
          .apply(
              ParDo.of(new ParDoMultiOverrideFactory.ToKeyedWorkItem<String, Integer>())
                  .withOutputTags(keyedTag, TupleTagList.empty()))
          .get(keyedTag)
          .setCoder(
              KeyedWorkItemCoder.of(
                  StringUtf8Coder.of(),
                  KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()),
                  GlobalWindow.Coder.INSTANCE));

  p.traverseTopologically(visitor);
  assertThat(visitor.getKeyedPValues(), hasItem(keyed));
}
 
示例24
@Category(StreamingTest.class)
@Test
public void testInStreamingMode() throws Exception {
  Instant instant = new Instant(0);
  CreateStream<Integer> source =
      CreateStream.of(VarIntCoder.of(), batchDuration())
          .emptyBatch()
          .advanceWatermarkForNextBatch(instant)
          .nextBatch(
              TimestampedValue.of(1, instant),
              TimestampedValue.of(2, instant),
              TimestampedValue.of(3, instant))
          .advanceWatermarkForNextBatch(instant.plus(Duration.standardSeconds(1L)))
          .nextBatch(
              TimestampedValue.of(4, instant.plus(Duration.standardSeconds(1L))),
              TimestampedValue.of(5, instant.plus(Duration.standardSeconds(1L))),
              TimestampedValue.of(6, instant.plus(Duration.standardSeconds(1L))))
          .advanceNextBatchWatermarkToInfinity();
  pipeline
      .apply(source)
      .apply(
          Window.<Integer>into(FixedWindows.of(Duration.standardSeconds(3L)))
              .withAllowedLateness(Duration.ZERO))
      .apply(ParDo.of(new CountingDoFn()));

  pipeline.run();
  // give metrics pusher time to push
  Thread.sleep(
      (pipeline.getOptions().as(MetricsOptions.class).getMetricsPushPeriod() + 1L) * 1000);
  assertThat(TestMetricsSink.getCounterValue(COUNTER_NAME), is(6L));
}
 
示例25
@Test
@Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
public void testMultimapSideInputWithNonDeterministicKeyCoder() {

  final PCollectionView<Map<String, Iterable<Integer>>> view =
      pipeline
          .apply(
              "CreateSideInput",
              Create.of(KV.of("a", 1), KV.of("a", 1), KV.of("a", 2), KV.of("b", 3))
                  .withCoder(KvCoder.of(new NonDeterministicStringCoder(), VarIntCoder.of())))
          .apply(View.asMultimap());

  PCollection<KV<String, Integer>> output =
      pipeline
          .apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
          .apply(
              "OutputSideInputs",
              ParDo.of(
                      new DoFn<String, KV<String, Integer>>() {
                        @ProcessElement
                        public void processElement(ProcessContext c) {
                          for (Integer v : c.sideInput(view).get(c.element().substring(0, 1))) {
                            c.output(KV.of(c.element(), v));
                          }
                        }
                      })
                  .withSideInputs(view));

  PAssert.that(output)
      .containsInAnyOrder(
          KV.of("apple", 1),
          KV.of("apple", 1),
          KV.of("apple", 2),
          KV.of("banana", 3),
          KV.of("blackberry", 3));

  pipeline.run();
}
 
示例26
@Test
@Category(ValidatesRunner.class)
public void testEmptyMultimapSideInput() throws Exception {

  final PCollectionView<Map<String, Iterable<Integer>>> view =
      pipeline
          .apply(
              "CreateEmptyView", Create.empty(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())))
          .apply(View.asMultimap());

  PCollection<Integer> results =
      pipeline
          .apply("Create1", Create.of(1))
          .apply(
              "OutputSideInputs",
              ParDo.of(
                      new DoFn<Integer, Integer>() {
                        @ProcessElement
                        public void processElement(ProcessContext c) {
                          assertTrue(c.sideInput(view).isEmpty());
                          assertTrue(c.sideInput(view).entrySet().isEmpty());
                          assertFalse(c.sideInput(view).entrySet().iterator().hasNext());
                          c.output(c.element());
                        }
                      })
                  .withSideInputs(view));

  // Pass at least one value through to guarantee that DoFn executes.
  PAssert.that(results).containsInAnyOrder(1);

  pipeline.run();
}
 
示例27
@Test
@Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
public void testEmptyMultimapSideInputWithNonDeterministicKeyCoder() throws Exception {

  final PCollectionView<Map<String, Iterable<Integer>>> view =
      pipeline
          .apply(
              "CreateEmptyView",
              Create.empty(KvCoder.of(new NonDeterministicStringCoder(), VarIntCoder.of())))
          .apply(View.asMultimap());

  PCollection<Integer> results =
      pipeline
          .apply("Create1", Create.of(1))
          .apply(
              "OutputSideInputs",
              ParDo.of(
                      new DoFn<Integer, Integer>() {
                        @ProcessElement
                        public void processElement(ProcessContext c) {
                          assertTrue(c.sideInput(view).isEmpty());
                          assertTrue(c.sideInput(view).entrySet().isEmpty());
                          assertFalse(c.sideInput(view).entrySet().iterator().hasNext());
                          c.output(c.element());
                        }
                      })
                  .withSideInputs(view));

  // Pass at least one value through to guarantee that DoFn executes.
  PAssert.that(results).containsInAnyOrder(1);

  pipeline.run();
}
 
示例28
@Test
@Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
public void testMapSideInputWithNonDeterministicKeyCoder() {

  final PCollectionView<Map<String, Integer>> view =
      pipeline
          .apply(
              "CreateSideInput",
              Create.of(KV.of("a", 1), KV.of("b", 3))
                  .withCoder(KvCoder.of(new NonDeterministicStringCoder(), VarIntCoder.of())))
          .apply(View.asMap());

  PCollection<KV<String, Integer>> output =
      pipeline
          .apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
          .apply(
              "OutputSideInputs",
              ParDo.of(
                      new DoFn<String, KV<String, Integer>>() {
                        @ProcessElement
                        public void processElement(ProcessContext c) {
                          c.output(
                              KV.of(
                                  c.element(),
                                  c.sideInput(view).get(c.element().substring(0, 1))));
                        }
                      })
                  .withSideInputs(view));

  PAssert.that(output)
      .containsInAnyOrder(KV.of("apple", 1), KV.of("banana", 3), KV.of("blackberry", 3));

  pipeline.run();
}
 
示例29
@Test
@Category(ValidatesRunner.class)
public void testEmptyMapSideInput() throws Exception {

  final PCollectionView<Map<String, Integer>> view =
      pipeline
          .apply(
              "CreateEmptyView", Create.empty(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())))
          .apply(View.asMap());

  PCollection<Integer> results =
      pipeline
          .apply("Create1", Create.of(1))
          .apply(
              "OutputSideInputs",
              ParDo.of(
                      new DoFn<Integer, Integer>() {
                        @ProcessElement
                        public void processElement(ProcessContext c) {
                          assertTrue(c.sideInput(view).isEmpty());
                          assertTrue(c.sideInput(view).entrySet().isEmpty());
                          assertFalse(c.sideInput(view).entrySet().iterator().hasNext());
                          c.output(c.element());
                        }
                      })
                  .withSideInputs(view));

  // Pass at least one value through to guarantee that DoFn executes.
  PAssert.that(results).containsInAnyOrder(1);

  pipeline.run();
}
 
示例30
@Test
@Category(NeedsRunner.class)
public void testMapSideInputWithNullValuesCatchesDuplicates() {

  final PCollectionView<Map<String, Integer>> view =
      pipeline
          .apply(
              "CreateSideInput",
              Create.of(KV.of("a", (Integer) null), KV.of("a", (Integer) null))
                  .withCoder(
                      KvCoder.of(StringUtf8Coder.of(), NullableCoder.of(VarIntCoder.of()))))
          .apply(View.asMap());

  PCollection<KV<String, Integer>> output =
      pipeline
          .apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
          .apply(
              "OutputSideInputs",
              ParDo.of(
                      new DoFn<String, KV<String, Integer>>() {
                        @ProcessElement
                        public void processElement(ProcessContext c) {
                          c.output(
                              KV.of(
                                  c.element(),
                                  c.sideInput(view)
                                      .getOrDefault(c.element().substring(0, 1), 0)));
                        }
                      })
                  .withSideInputs(view));

  PAssert.that(output)
      .containsInAnyOrder(KV.of("apple", 1), KV.of("banana", 3), KV.of("blackberry", 3));

  // PipelineExecutionException is thrown with cause having a message stating that a
  // duplicate is not allowed.
  thrown.expectCause(
      ThrowableMessageMatcher.hasMessage(Matchers.containsString("Duplicate values for a")));
  pipeline.run();
}