Java源码示例:org.apache.beam.sdk.transforms.SerializableFunctions

示例1
public static PTransform<PCollection<String>, WriteFilesResult<Void>> createWrite(
    String filenamePrefix, String filenameSuffix, Schema schema, JdbcAvroArgs jdbcAvroArgs) {
  filenamePrefix = filenamePrefix.replaceAll("/+$", "") + "/part";
  ValueProvider<ResourceId> prefixProvider =
      StaticValueProvider.of(FileBasedSink.convertToFileResourceIfPossible(filenamePrefix));
  FileBasedSink.FilenamePolicy filenamePolicy =
      DefaultFilenamePolicy.fromStandardParameters(
          prefixProvider, DEFAULT_SHARD_TEMPLATE, filenameSuffix, false);

  final DynamicAvroDestinations<String, Void, String> destinations =
      AvroIO.constantDestinations(
          filenamePolicy,
          schema,
          ImmutableMap.of(),
          // since Beam does not support zstandard
          CodecFactory.nullCodec(),
          SerializableFunctions.identity());
  final FileBasedSink<String, Void, String> sink =
      new JdbcAvroSink<>(prefixProvider, destinations, jdbcAvroArgs);
  return WriteFiles.to(sink);
}
 
示例2
@Test
public void testRemoveTemporaryFiles() throws Exception {
  int numFiles = 10;
  List<String> fileNames = Lists.newArrayList();
  String tempFilePrefix = options.getTempLocation() + "/";
  for (int i = 0; i < numFiles; ++i) {
    TableRowWriter<TableRow> writer =
        new TableRowWriter<>(tempFilePrefix, SerializableFunctions.identity());
    writer.close();
    fileNames.add(writer.getResult().resourceId.toString());
  }
  fileNames.add(tempFilePrefix + String.format("files%05d", numFiles));

  File tempDir = new File(options.getTempLocation());
  testNumFiles(tempDir, 10);

  WriteTables.removeTemporaryFiles(fileNames);
}
 
示例3
@Override
public PCollection<KV<String, String>> expand(PCollection<KV<String, String>> input) {
  // reparallelize mimics the same behavior as in JdbcIO, used to break fusion
  PCollectionView<Iterable<KV<String, String>>> empty =
      input
          .apply("Consume", Filter.by(SerializableFunctions.constant(false)))
          .apply(View.asIterable());
  PCollection<KV<String, String>> materialized =
      input.apply(
          "Identity",
          ParDo.of(
                  new DoFn<KV<String, String>, KV<String, String>>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) {
                      c.output(c.element());
                    }
                  })
              .withSideInputs(empty));
  return materialized.apply(Reshuffle.viaRandomKey());
}
 
示例4
@Override
public PCollection<T> expand(PCollection<T> input) {
  // See https://issues.apache.org/jira/browse/BEAM-2803
  // We use a combined approach to "break fusion" here:
  // (see https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion)
  // 1) force the data to be materialized by passing it as a side input to an identity fn,
  // then 2) reshuffle it with a random key. Initial materialization provides some parallelism
  // and ensures that data to be shuffled can be generated in parallel, while reshuffling
  // provides perfect parallelism.
  // In most cases where a "fusion break" is needed, a simple reshuffle would be sufficient.
  // The current approach is necessary only to support the particular case of JdbcIO where
  // a single query may produce many gigabytes of query results.
  PCollectionView<Iterable<T>> empty =
      input
          .apply("Consume", Filter.by(SerializableFunctions.constant(false)))
          .apply(View.asIterable());
  PCollection<T> materialized =
      input.apply(
          "Identity",
          ParDo.of(
                  new DoFn<T, T>() {
                    @ProcessElement
                    public void process(ProcessContext c) {
                      c.output(c.element());
                    }
                  })
              .withSideInputs(empty));
  return materialized.apply(Reshuffle.viaRandomKey());
}
 
示例5
private RowCoder(Schema schema) {
  super(
      schema,
      TypeDescriptors.rows(),
      SerializableFunctions.identity(),
      SerializableFunctions.identity());
}
 
示例6
/** See {@link TypedWrite#to(String)}. */
public Write<T> to(String outputPrefix) {
  return new Write<>(
      inner
          .to(FileBasedSink.convertToFileResourceIfPossible(outputPrefix))
          .withFormatFunction(SerializableFunctions.identity()));
}
 
示例7
/** Like {@link #via(Contextful, Contextful)}, but uses the same sink for all destinations. */
public <OutputT> Write<DestinationT, UserT> via(
    Contextful<Fn<UserT, OutputT>> outputFn, final Sink<OutputT> sink) {
  checkArgument(sink != null, "sink can not be null");
  checkArgument(outputFn != null, "outputFn can not be null");
  return via(outputFn, fn(SerializableFunctions.clonesOf(sink)));
}
 
示例8
/**
 * Like {@link #via(Contextful, Contextful)}, but the output type of the sink is the same as the
 * type of the input collection. The sink function must create a new {@link Sink} instance every
 * time it is called.
 */
public Write<DestinationT, UserT> via(Contextful<Fn<DestinationT, Sink<UserT>>> sinkFn) {
  checkArgument(sinkFn != null, "sinkFn can not be null");
  return toBuilder()
      .setSinkFn((Contextful) sinkFn)
      .setOutputFn(fn(SerializableFunctions.<UserT>identity()))
      .build();
}
 
示例9
/**
 * See {@link TypedWrite#to(SerializableFunction, Params)}.
 *
 * @deprecated Use {@link FileIO#write()} or {@link FileIO#writeDynamic()} ()} with {@link
 *     #sink()} instead.
 */
@Experimental(Kind.FILESYSTEM)
@Deprecated
public Write to(
    SerializableFunction<String, Params> destinationFunction, Params emptyDestination) {
  return new Write(
      inner
          .to(destinationFunction, emptyDestination)
          .withFormatFunction(SerializableFunctions.identity()));
}
 
示例10
@Test
@Category(NeedsRunner.class)
public void testFileIoDynamicNaming() throws IOException {
  // Test for BEAM-6407.

  String outputFileName = tmpFolder.newFile().getAbsolutePath();
  PCollectionView<String> outputFileNameView =
      p.apply("outputFileName", Create.of(outputFileName)).apply(View.asSingleton());

  Contextful.Fn<String, FileIO.Write.FileNaming> fileNaming =
      (element, c) ->
          (window, pane, numShards, shardIndex, compression) ->
              c.sideInput(outputFileNameView) + "-" + shardIndex;

  p.apply(Create.of(""))
      .apply(
          "WriteDynamicFilename",
          FileIO.<String, String>writeDynamic()
              .by(SerializableFunctions.constant(""))
              .withDestinationCoder(StringUtf8Coder.of())
              .via(TextIO.sink())
              .withTempDirectory(tmpFolder.newFolder().getAbsolutePath())
              .withNaming(
                  Contextful.of(
                      fileNaming, Requirements.requiresSideInputs(outputFileNameView))));

  // We need to run the TestPipeline with the default options.
  p.run(PipelineOptionsFactory.create()).waitUntilFinish();
  assertTrue(
      "Output file shard 0 exists after pipeline completes",
      new File(outputFileName + "-0").exists());
}
 
示例11
@Override
public PCollection<T> expand(PCollection<T> input) {
  // See https://issues.apache.org/jira/browse/BEAM-2803
  // We use a combined approach to "break fusion" here:
  // (see https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion)
  // 1) force the data to be materialized by passing it as a side input to an identity fn,
  // then 2) reshuffle it with a random key. Initial materialization provides some parallelism
  // and ensures that data to be shuffled can be generated in parallel, while reshuffling
  // provides perfect parallelism.
  // In most cases where a "fusion break" is needed, a simple reshuffle would be sufficient.
  // The current approach is necessary only to support the particular case of JdbcIO where
  // a single query may produce many gigabytes of query results.
  PCollectionView<Iterable<T>> empty =
      input
          .apply("Consume", Filter.by(SerializableFunctions.constant(false)))
          .apply(View.asIterable());
  PCollection<T> materialized =
      input.apply(
          "Identity",
          ParDo.of(
                  new DoFn<T, T>() {
                    @ProcessElement
                    public void process(ProcessContext c) {
                      c.output(c.element());
                    }
                  })
              .withSideInputs(empty));
  return materialized.apply(Reshuffle.viaRandomKey());
}
 
示例12
/** See {@link TypedWrite#to(ResourceId)} . */
@Experimental(Kind.FILESYSTEM)
public Write<T> to(ResourceId outputPrefix) {
  return new Write<>(
      inner.to(outputPrefix).withFormatFunction(SerializableFunctions.identity()));
}
 
示例13
/** See {@link TypedWrite#to(ValueProvider)}. */
public Write<T> to(ValueProvider<String> outputPrefix) {
  return new Write<>(
      inner.to(outputPrefix).withFormatFunction(SerializableFunctions.identity()));
}
 
示例14
/** See {@link TypedWrite#to(ResourceId)}. */
@Experimental(Kind.FILESYSTEM)
public Write<T> toResource(ValueProvider<ResourceId> outputPrefix) {
  return new Write<>(
      inner.toResource(outputPrefix).withFormatFunction(SerializableFunctions.identity()));
}
 
示例15
/** See {@link TypedWrite#to(FilenamePolicy)}. */
public Write<T> to(FilenamePolicy filenamePolicy) {
  return new Write<>(
      inner.to(filenamePolicy).withFormatFunction(SerializableFunctions.identity()));
}
 
示例16
/** Like {@link #via(Contextful)}, but uses the same {@link Sink} for all destinations. */
public Write<DestinationT, UserT> via(Sink<UserT> sink) {
  checkArgument(sink != null, "sink can not be null");
  return via(fn(SerializableFunctions.clonesOf(sink)));
}
 
示例17
@VisibleForTesting
Contextful<Fn<DestinationT, FileNaming>> resolveFileNamingFn() {
  if (getDynamic()) {
    checkArgument(
        getConstantFileNaming() == null,
        "when using writeDynamic(), must use versions of .withNaming() "
            + "that take functions from DestinationT");
    checkArgument(getFilenamePrefix() == null, ".withPrefix() requires write()");
    checkArgument(getFilenameSuffix() == null, ".withSuffix() requires write()");
    checkArgument(
        getFileNamingFn() != null,
        "when using writeDynamic(), must specify "
            + ".withNaming() taking a function form DestinationT");
    return fn(
        (element, c) -> {
          FileNaming naming = getFileNamingFn().getClosure().apply(element, c);
          return getOutputDirectory() == null
              ? naming
              : relativeFileNaming(getOutputDirectory(), naming);
        },
        getFileNamingFn().getRequirements());
  } else {
    checkArgument(
        getFileNamingFn() == null,
        ".withNaming() taking a function from DestinationT requires writeDynamic()");
    FileNaming constantFileNaming;
    if (getConstantFileNaming() == null) {
      constantFileNaming =
          defaultNaming(
              MoreObjects.firstNonNull(getFilenamePrefix(), StaticValueProvider.of("output")),
              MoreObjects.firstNonNull(getFilenameSuffix(), StaticValueProvider.of("")));
    } else {
      checkArgument(
          getFilenamePrefix() == null, ".to(FileNaming) is incompatible with .withSuffix()");
      checkArgument(
          getFilenameSuffix() == null, ".to(FileNaming) is incompatible with .withPrefix()");
      constantFileNaming = getConstantFileNaming();
    }
    if (getOutputDirectory() != null) {
      constantFileNaming = relativeFileNaming(getOutputDirectory(), constantFileNaming);
    }
    return fn(SerializableFunctions.<DestinationT, FileNaming>constant(constantFileNaming));
  }
}
 
示例18
@Override
public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
  Write.Builder<DestinationT, UserT> resolvedSpec = new AutoValue_FileIO_Write.Builder<>();

  resolvedSpec.setDynamic(getDynamic());

  checkArgument(getSinkFn() != null, ".via() is required");
  resolvedSpec.setSinkFn(getSinkFn());

  checkArgument(getOutputFn() != null, "outputFn should have been set by .via()");
  resolvedSpec.setOutputFn(getOutputFn());

  // Resolve destinationFn
  if (getDynamic()) {
    checkArgument(getDestinationFn() != null, "when using writeDynamic(), .by() is required");
    resolvedSpec.setDestinationFn(getDestinationFn());
    resolvedSpec.setDestinationCoder(resolveDestinationCoder(input));
  } else {
    checkArgument(getDestinationFn() == null, ".by() requires writeDynamic()");
    checkArgument(
        getDestinationCoder() == null, ".withDestinationCoder() requires writeDynamic()");
    resolvedSpec.setDestinationFn(fn(SerializableFunctions.constant(null)));
    resolvedSpec.setDestinationCoder((Coder) VoidCoder.of());
  }

  resolvedSpec.setFileNamingFn(resolveFileNamingFn());
  resolvedSpec.setEmptyWindowDestination(getEmptyWindowDestination());
  if (getTempDirectory() == null) {
    checkArgument(
        getOutputDirectory() != null, "must specify either .withTempDirectory() or .to()");
    resolvedSpec.setTempDirectory(getOutputDirectory());
  } else {
    resolvedSpec.setTempDirectory(getTempDirectory());
  }

  resolvedSpec.setCompression(getCompression());
  resolvedSpec.setNumShards(getNumShards());
  resolvedSpec.setSharding(getSharding());
  resolvedSpec.setIgnoreWindowing(getIgnoreWindowing());
  resolvedSpec.setNoSpilling(getNoSpilling());

  Write<DestinationT, UserT> resolved = resolvedSpec.build();
  WriteFiles<UserT, DestinationT, ?> writeFiles =
      WriteFiles.to(new ViaFileBasedSink<>(resolved))
          .withSideInputs(Lists.newArrayList(resolved.getAllSideInputs()));
  if (getNumShards() != null) {
    writeFiles = writeFiles.withNumShards(getNumShards());
  } else if (getSharding() != null) {
    writeFiles = writeFiles.withSharding(getSharding());
  } else {
    writeFiles = writeFiles.withRunnerDeterminedSharding();
  }
  if (!getIgnoreWindowing()) {
    writeFiles = writeFiles.withWindowedWrites();
  }
  if (getNoSpilling()) {
    writeFiles = writeFiles.withNoSpilling();
  }
  return input.apply(writeFiles);
}
 
示例19
/** See {@link TypedWrite#to(String)}. */
public Write to(String filenamePrefix) {
  return new Write(
      inner.to(filenamePrefix).withFormatFunction(SerializableFunctions.identity()));
}
 
示例20
/** See {@link TypedWrite#to(ResourceId)}. */
@Experimental(Kind.FILESYSTEM)
public Write to(ResourceId filenamePrefix) {
  return new Write(
      inner.to(filenamePrefix).withFormatFunction(SerializableFunctions.identity()));
}
 
示例21
/** See {@link TypedWrite#to(ValueProvider)}. */
public Write to(ValueProvider<String> outputPrefix) {
  return new Write(inner.to(outputPrefix).withFormatFunction(SerializableFunctions.identity()));
}
 
示例22
/** See {@link TypedWrite#toResource(ValueProvider)}. */
@Experimental(Kind.FILESYSTEM)
public Write toResource(ValueProvider<ResourceId> filenamePrefix) {
  return new Write(
      inner.toResource(filenamePrefix).withFormatFunction(SerializableFunctions.identity()));
}
 
示例23
/** See {@link TypedWrite#to(FilenamePolicy)}. */
@Experimental(Kind.FILESYSTEM)
public Write to(FilenamePolicy filenamePolicy) {
  return new Write(
      inner.to(filenamePolicy).withFormatFunction(SerializableFunctions.identity()));
}
 
示例24
/**
 * A specialization of {@link #constant(FilenamePolicy, SerializableFunction)} for the case where
 * UserT and OutputT are the same type and the format function is the identity.
 */
public static <UserT> DynamicDestinations<UserT, Void, UserT> constant(
    FilenamePolicy filenamePolicy) {
  return new ConstantFilenamePolicy<>(filenamePolicy, SerializableFunctions.<UserT>identity());
}
 
示例25
@Test
public void testWriteTables() throws Exception {
  long numTables = 3;
  long numPartitions = 3;
  long numFilesPerPartition = 10;
  String jobIdToken = "jobId";
  final Multimap<TableDestination, String> expectedTempTables = ArrayListMultimap.create();

  List<KV<ShardedKey<String>, List<String>>> partitions = Lists.newArrayList();
  for (int i = 0; i < numTables; ++i) {
    String tableName = String.format("project-id:dataset-id.table%05d", i);
    TableDestination tableDestination = new TableDestination(tableName, tableName);
    for (int j = 0; j < numPartitions; ++j) {
      String tempTableId = BigQueryHelpers.createJobId(jobIdToken, tableDestination, j, 0);
      List<String> filesPerPartition = Lists.newArrayList();
      for (int k = 0; k < numFilesPerPartition; ++k) {
        String filename =
            Paths.get(
                    testFolder.getRoot().getAbsolutePath(),
                    String.format("files0x%08x_%05d", tempTableId.hashCode(), k))
                .toString();
        TableRowWriter<TableRow> writer =
            new TableRowWriter<>(filename, SerializableFunctions.identity());
        try (TableRowWriter ignored = writer) {
          TableRow tableRow = new TableRow().set("name", tableName);
          writer.write(tableRow);
        }
        filesPerPartition.add(writer.getResult().resourceId.toString());
      }
      partitions.add(KV.of(ShardedKey.of(tableDestination.getTableSpec(), j), filesPerPartition));

      String json =
          String.format(
              "{\"datasetId\":\"dataset-id\",\"projectId\":\"project-id\",\"tableId\":\"%s\"}",
              tempTableId);
      expectedTempTables.put(tableDestination, json);
    }
  }

  PCollection<KV<ShardedKey<String>, List<String>>> writeTablesInput =
      p.apply(Create.of(partitions));
  PCollectionView<String> jobIdTokenView =
      p.apply("CreateJobId", Create.of("jobId")).apply(View.asSingleton());
  List<PCollectionView<?>> sideInputs = ImmutableList.of(jobIdTokenView);

  fakeJobService.setNumFailuresExpected(3);
  WriteTables<String> writeTables =
      new WriteTables<>(
          true,
          fakeBqServices,
          jobIdTokenView,
          BigQueryIO.Write.WriteDisposition.WRITE_EMPTY,
          BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED,
          sideInputs,
          new IdentityDynamicTables(),
          null,
          4,
          false,
          null,
          "NEWLINE_DELIMITED_JSON",
          false,
          Collections.emptySet());

  PCollection<KV<TableDestination, String>> writeTablesOutput =
      writeTablesInput.apply(writeTables);

  PAssert.thatMultimap(writeTablesOutput)
      .satisfies(
          input -> {
            assertEquals(input.keySet(), expectedTempTables.keySet());
            for (Map.Entry<TableDestination, Iterable<String>> entry : input.entrySet()) {
              @SuppressWarnings("unchecked")
              String[] expectedValues =
                  Iterables.toArray(expectedTempTables.get(entry.getKey()), String.class);
              assertThat(entry.getValue(), containsInAnyOrder(expectedValues));
            }
            return null;
          });
  p.run();
}
 
示例26
DummySink() {
  super(
      StaticValueProvider.of(FileSystems.matchNewResource("nowhere", false)),
      DynamicFileDestinations.constant(
          new DummyFilenamePolicy(), SerializableFunctions.constant(null)));
}
 
示例27
TestSink(String tmpFolder) {
  super(
      ValueProvider.StaticValueProvider.of(FileSystems.matchNewResource(tmpFolder, true)),
      DynamicFileDestinations.constant(FILENAME_POLICY, SerializableFunctions.identity()));
}