Java源码示例:org.apache.beam.sdk.transforms.SerializableFunctions
示例1
public static PTransform<PCollection<String>, WriteFilesResult<Void>> createWrite(
String filenamePrefix, String filenameSuffix, Schema schema, JdbcAvroArgs jdbcAvroArgs) {
filenamePrefix = filenamePrefix.replaceAll("/+$", "") + "/part";
ValueProvider<ResourceId> prefixProvider =
StaticValueProvider.of(FileBasedSink.convertToFileResourceIfPossible(filenamePrefix));
FileBasedSink.FilenamePolicy filenamePolicy =
DefaultFilenamePolicy.fromStandardParameters(
prefixProvider, DEFAULT_SHARD_TEMPLATE, filenameSuffix, false);
final DynamicAvroDestinations<String, Void, String> destinations =
AvroIO.constantDestinations(
filenamePolicy,
schema,
ImmutableMap.of(),
// since Beam does not support zstandard
CodecFactory.nullCodec(),
SerializableFunctions.identity());
final FileBasedSink<String, Void, String> sink =
new JdbcAvroSink<>(prefixProvider, destinations, jdbcAvroArgs);
return WriteFiles.to(sink);
}
示例2
@Test
public void testRemoveTemporaryFiles() throws Exception {
int numFiles = 10;
List<String> fileNames = Lists.newArrayList();
String tempFilePrefix = options.getTempLocation() + "/";
for (int i = 0; i < numFiles; ++i) {
TableRowWriter<TableRow> writer =
new TableRowWriter<>(tempFilePrefix, SerializableFunctions.identity());
writer.close();
fileNames.add(writer.getResult().resourceId.toString());
}
fileNames.add(tempFilePrefix + String.format("files%05d", numFiles));
File tempDir = new File(options.getTempLocation());
testNumFiles(tempDir, 10);
WriteTables.removeTemporaryFiles(fileNames);
}
示例3
@Override
public PCollection<KV<String, String>> expand(PCollection<KV<String, String>> input) {
// reparallelize mimics the same behavior as in JdbcIO, used to break fusion
PCollectionView<Iterable<KV<String, String>>> empty =
input
.apply("Consume", Filter.by(SerializableFunctions.constant(false)))
.apply(View.asIterable());
PCollection<KV<String, String>> materialized =
input.apply(
"Identity",
ParDo.of(
new DoFn<KV<String, String>, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element());
}
})
.withSideInputs(empty));
return materialized.apply(Reshuffle.viaRandomKey());
}
示例4
@Override
public PCollection<T> expand(PCollection<T> input) {
// See https://issues.apache.org/jira/browse/BEAM-2803
// We use a combined approach to "break fusion" here:
// (see https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion)
// 1) force the data to be materialized by passing it as a side input to an identity fn,
// then 2) reshuffle it with a random key. Initial materialization provides some parallelism
// and ensures that data to be shuffled can be generated in parallel, while reshuffling
// provides perfect parallelism.
// In most cases where a "fusion break" is needed, a simple reshuffle would be sufficient.
// The current approach is necessary only to support the particular case of JdbcIO where
// a single query may produce many gigabytes of query results.
PCollectionView<Iterable<T>> empty =
input
.apply("Consume", Filter.by(SerializableFunctions.constant(false)))
.apply(View.asIterable());
PCollection<T> materialized =
input.apply(
"Identity",
ParDo.of(
new DoFn<T, T>() {
@ProcessElement
public void process(ProcessContext c) {
c.output(c.element());
}
})
.withSideInputs(empty));
return materialized.apply(Reshuffle.viaRandomKey());
}
示例5
private RowCoder(Schema schema) {
super(
schema,
TypeDescriptors.rows(),
SerializableFunctions.identity(),
SerializableFunctions.identity());
}
示例6
/** See {@link TypedWrite#to(String)}. */
public Write<T> to(String outputPrefix) {
return new Write<>(
inner
.to(FileBasedSink.convertToFileResourceIfPossible(outputPrefix))
.withFormatFunction(SerializableFunctions.identity()));
}
示例7
/** Like {@link #via(Contextful, Contextful)}, but uses the same sink for all destinations. */
public <OutputT> Write<DestinationT, UserT> via(
Contextful<Fn<UserT, OutputT>> outputFn, final Sink<OutputT> sink) {
checkArgument(sink != null, "sink can not be null");
checkArgument(outputFn != null, "outputFn can not be null");
return via(outputFn, fn(SerializableFunctions.clonesOf(sink)));
}
示例8
/**
* Like {@link #via(Contextful, Contextful)}, but the output type of the sink is the same as the
* type of the input collection. The sink function must create a new {@link Sink} instance every
* time it is called.
*/
public Write<DestinationT, UserT> via(Contextful<Fn<DestinationT, Sink<UserT>>> sinkFn) {
checkArgument(sinkFn != null, "sinkFn can not be null");
return toBuilder()
.setSinkFn((Contextful) sinkFn)
.setOutputFn(fn(SerializableFunctions.<UserT>identity()))
.build();
}
示例9
/**
* See {@link TypedWrite#to(SerializableFunction, Params)}.
*
* @deprecated Use {@link FileIO#write()} or {@link FileIO#writeDynamic()} ()} with {@link
* #sink()} instead.
*/
@Experimental(Kind.FILESYSTEM)
@Deprecated
public Write to(
SerializableFunction<String, Params> destinationFunction, Params emptyDestination) {
return new Write(
inner
.to(destinationFunction, emptyDestination)
.withFormatFunction(SerializableFunctions.identity()));
}
示例10
@Test
@Category(NeedsRunner.class)
public void testFileIoDynamicNaming() throws IOException {
// Test for BEAM-6407.
String outputFileName = tmpFolder.newFile().getAbsolutePath();
PCollectionView<String> outputFileNameView =
p.apply("outputFileName", Create.of(outputFileName)).apply(View.asSingleton());
Contextful.Fn<String, FileIO.Write.FileNaming> fileNaming =
(element, c) ->
(window, pane, numShards, shardIndex, compression) ->
c.sideInput(outputFileNameView) + "-" + shardIndex;
p.apply(Create.of(""))
.apply(
"WriteDynamicFilename",
FileIO.<String, String>writeDynamic()
.by(SerializableFunctions.constant(""))
.withDestinationCoder(StringUtf8Coder.of())
.via(TextIO.sink())
.withTempDirectory(tmpFolder.newFolder().getAbsolutePath())
.withNaming(
Contextful.of(
fileNaming, Requirements.requiresSideInputs(outputFileNameView))));
// We need to run the TestPipeline with the default options.
p.run(PipelineOptionsFactory.create()).waitUntilFinish();
assertTrue(
"Output file shard 0 exists after pipeline completes",
new File(outputFileName + "-0").exists());
}
示例11
@Override
public PCollection<T> expand(PCollection<T> input) {
// See https://issues.apache.org/jira/browse/BEAM-2803
// We use a combined approach to "break fusion" here:
// (see https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion)
// 1) force the data to be materialized by passing it as a side input to an identity fn,
// then 2) reshuffle it with a random key. Initial materialization provides some parallelism
// and ensures that data to be shuffled can be generated in parallel, while reshuffling
// provides perfect parallelism.
// In most cases where a "fusion break" is needed, a simple reshuffle would be sufficient.
// The current approach is necessary only to support the particular case of JdbcIO where
// a single query may produce many gigabytes of query results.
PCollectionView<Iterable<T>> empty =
input
.apply("Consume", Filter.by(SerializableFunctions.constant(false)))
.apply(View.asIterable());
PCollection<T> materialized =
input.apply(
"Identity",
ParDo.of(
new DoFn<T, T>() {
@ProcessElement
public void process(ProcessContext c) {
c.output(c.element());
}
})
.withSideInputs(empty));
return materialized.apply(Reshuffle.viaRandomKey());
}
示例12
/** See {@link TypedWrite#to(ResourceId)} . */
@Experimental(Kind.FILESYSTEM)
public Write<T> to(ResourceId outputPrefix) {
return new Write<>(
inner.to(outputPrefix).withFormatFunction(SerializableFunctions.identity()));
}
示例13
/** See {@link TypedWrite#to(ValueProvider)}. */
public Write<T> to(ValueProvider<String> outputPrefix) {
return new Write<>(
inner.to(outputPrefix).withFormatFunction(SerializableFunctions.identity()));
}
示例14
/** See {@link TypedWrite#to(ResourceId)}. */
@Experimental(Kind.FILESYSTEM)
public Write<T> toResource(ValueProvider<ResourceId> outputPrefix) {
return new Write<>(
inner.toResource(outputPrefix).withFormatFunction(SerializableFunctions.identity()));
}
示例15
/** See {@link TypedWrite#to(FilenamePolicy)}. */
public Write<T> to(FilenamePolicy filenamePolicy) {
return new Write<>(
inner.to(filenamePolicy).withFormatFunction(SerializableFunctions.identity()));
}
示例16
/** Like {@link #via(Contextful)}, but uses the same {@link Sink} for all destinations. */
public Write<DestinationT, UserT> via(Sink<UserT> sink) {
checkArgument(sink != null, "sink can not be null");
return via(fn(SerializableFunctions.clonesOf(sink)));
}
示例17
@VisibleForTesting
Contextful<Fn<DestinationT, FileNaming>> resolveFileNamingFn() {
if (getDynamic()) {
checkArgument(
getConstantFileNaming() == null,
"when using writeDynamic(), must use versions of .withNaming() "
+ "that take functions from DestinationT");
checkArgument(getFilenamePrefix() == null, ".withPrefix() requires write()");
checkArgument(getFilenameSuffix() == null, ".withSuffix() requires write()");
checkArgument(
getFileNamingFn() != null,
"when using writeDynamic(), must specify "
+ ".withNaming() taking a function form DestinationT");
return fn(
(element, c) -> {
FileNaming naming = getFileNamingFn().getClosure().apply(element, c);
return getOutputDirectory() == null
? naming
: relativeFileNaming(getOutputDirectory(), naming);
},
getFileNamingFn().getRequirements());
} else {
checkArgument(
getFileNamingFn() == null,
".withNaming() taking a function from DestinationT requires writeDynamic()");
FileNaming constantFileNaming;
if (getConstantFileNaming() == null) {
constantFileNaming =
defaultNaming(
MoreObjects.firstNonNull(getFilenamePrefix(), StaticValueProvider.of("output")),
MoreObjects.firstNonNull(getFilenameSuffix(), StaticValueProvider.of("")));
} else {
checkArgument(
getFilenamePrefix() == null, ".to(FileNaming) is incompatible with .withSuffix()");
checkArgument(
getFilenameSuffix() == null, ".to(FileNaming) is incompatible with .withPrefix()");
constantFileNaming = getConstantFileNaming();
}
if (getOutputDirectory() != null) {
constantFileNaming = relativeFileNaming(getOutputDirectory(), constantFileNaming);
}
return fn(SerializableFunctions.<DestinationT, FileNaming>constant(constantFileNaming));
}
}
示例18
@Override
public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
Write.Builder<DestinationT, UserT> resolvedSpec = new AutoValue_FileIO_Write.Builder<>();
resolvedSpec.setDynamic(getDynamic());
checkArgument(getSinkFn() != null, ".via() is required");
resolvedSpec.setSinkFn(getSinkFn());
checkArgument(getOutputFn() != null, "outputFn should have been set by .via()");
resolvedSpec.setOutputFn(getOutputFn());
// Resolve destinationFn
if (getDynamic()) {
checkArgument(getDestinationFn() != null, "when using writeDynamic(), .by() is required");
resolvedSpec.setDestinationFn(getDestinationFn());
resolvedSpec.setDestinationCoder(resolveDestinationCoder(input));
} else {
checkArgument(getDestinationFn() == null, ".by() requires writeDynamic()");
checkArgument(
getDestinationCoder() == null, ".withDestinationCoder() requires writeDynamic()");
resolvedSpec.setDestinationFn(fn(SerializableFunctions.constant(null)));
resolvedSpec.setDestinationCoder((Coder) VoidCoder.of());
}
resolvedSpec.setFileNamingFn(resolveFileNamingFn());
resolvedSpec.setEmptyWindowDestination(getEmptyWindowDestination());
if (getTempDirectory() == null) {
checkArgument(
getOutputDirectory() != null, "must specify either .withTempDirectory() or .to()");
resolvedSpec.setTempDirectory(getOutputDirectory());
} else {
resolvedSpec.setTempDirectory(getTempDirectory());
}
resolvedSpec.setCompression(getCompression());
resolvedSpec.setNumShards(getNumShards());
resolvedSpec.setSharding(getSharding());
resolvedSpec.setIgnoreWindowing(getIgnoreWindowing());
resolvedSpec.setNoSpilling(getNoSpilling());
Write<DestinationT, UserT> resolved = resolvedSpec.build();
WriteFiles<UserT, DestinationT, ?> writeFiles =
WriteFiles.to(new ViaFileBasedSink<>(resolved))
.withSideInputs(Lists.newArrayList(resolved.getAllSideInputs()));
if (getNumShards() != null) {
writeFiles = writeFiles.withNumShards(getNumShards());
} else if (getSharding() != null) {
writeFiles = writeFiles.withSharding(getSharding());
} else {
writeFiles = writeFiles.withRunnerDeterminedSharding();
}
if (!getIgnoreWindowing()) {
writeFiles = writeFiles.withWindowedWrites();
}
if (getNoSpilling()) {
writeFiles = writeFiles.withNoSpilling();
}
return input.apply(writeFiles);
}
示例19
/** See {@link TypedWrite#to(String)}. */
public Write to(String filenamePrefix) {
return new Write(
inner.to(filenamePrefix).withFormatFunction(SerializableFunctions.identity()));
}
示例20
/** See {@link TypedWrite#to(ResourceId)}. */
@Experimental(Kind.FILESYSTEM)
public Write to(ResourceId filenamePrefix) {
return new Write(
inner.to(filenamePrefix).withFormatFunction(SerializableFunctions.identity()));
}
示例21
/** See {@link TypedWrite#to(ValueProvider)}. */
public Write to(ValueProvider<String> outputPrefix) {
return new Write(inner.to(outputPrefix).withFormatFunction(SerializableFunctions.identity()));
}
示例22
/** See {@link TypedWrite#toResource(ValueProvider)}. */
@Experimental(Kind.FILESYSTEM)
public Write toResource(ValueProvider<ResourceId> filenamePrefix) {
return new Write(
inner.toResource(filenamePrefix).withFormatFunction(SerializableFunctions.identity()));
}
示例23
/** See {@link TypedWrite#to(FilenamePolicy)}. */
@Experimental(Kind.FILESYSTEM)
public Write to(FilenamePolicy filenamePolicy) {
return new Write(
inner.to(filenamePolicy).withFormatFunction(SerializableFunctions.identity()));
}
示例24
/**
* A specialization of {@link #constant(FilenamePolicy, SerializableFunction)} for the case where
* UserT and OutputT are the same type and the format function is the identity.
*/
public static <UserT> DynamicDestinations<UserT, Void, UserT> constant(
FilenamePolicy filenamePolicy) {
return new ConstantFilenamePolicy<>(filenamePolicy, SerializableFunctions.<UserT>identity());
}
示例25
@Test
public void testWriteTables() throws Exception {
long numTables = 3;
long numPartitions = 3;
long numFilesPerPartition = 10;
String jobIdToken = "jobId";
final Multimap<TableDestination, String> expectedTempTables = ArrayListMultimap.create();
List<KV<ShardedKey<String>, List<String>>> partitions = Lists.newArrayList();
for (int i = 0; i < numTables; ++i) {
String tableName = String.format("project-id:dataset-id.table%05d", i);
TableDestination tableDestination = new TableDestination(tableName, tableName);
for (int j = 0; j < numPartitions; ++j) {
String tempTableId = BigQueryHelpers.createJobId(jobIdToken, tableDestination, j, 0);
List<String> filesPerPartition = Lists.newArrayList();
for (int k = 0; k < numFilesPerPartition; ++k) {
String filename =
Paths.get(
testFolder.getRoot().getAbsolutePath(),
String.format("files0x%08x_%05d", tempTableId.hashCode(), k))
.toString();
TableRowWriter<TableRow> writer =
new TableRowWriter<>(filename, SerializableFunctions.identity());
try (TableRowWriter ignored = writer) {
TableRow tableRow = new TableRow().set("name", tableName);
writer.write(tableRow);
}
filesPerPartition.add(writer.getResult().resourceId.toString());
}
partitions.add(KV.of(ShardedKey.of(tableDestination.getTableSpec(), j), filesPerPartition));
String json =
String.format(
"{\"datasetId\":\"dataset-id\",\"projectId\":\"project-id\",\"tableId\":\"%s\"}",
tempTableId);
expectedTempTables.put(tableDestination, json);
}
}
PCollection<KV<ShardedKey<String>, List<String>>> writeTablesInput =
p.apply(Create.of(partitions));
PCollectionView<String> jobIdTokenView =
p.apply("CreateJobId", Create.of("jobId")).apply(View.asSingleton());
List<PCollectionView<?>> sideInputs = ImmutableList.of(jobIdTokenView);
fakeJobService.setNumFailuresExpected(3);
WriteTables<String> writeTables =
new WriteTables<>(
true,
fakeBqServices,
jobIdTokenView,
BigQueryIO.Write.WriteDisposition.WRITE_EMPTY,
BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED,
sideInputs,
new IdentityDynamicTables(),
null,
4,
false,
null,
"NEWLINE_DELIMITED_JSON",
false,
Collections.emptySet());
PCollection<KV<TableDestination, String>> writeTablesOutput =
writeTablesInput.apply(writeTables);
PAssert.thatMultimap(writeTablesOutput)
.satisfies(
input -> {
assertEquals(input.keySet(), expectedTempTables.keySet());
for (Map.Entry<TableDestination, Iterable<String>> entry : input.entrySet()) {
@SuppressWarnings("unchecked")
String[] expectedValues =
Iterables.toArray(expectedTempTables.get(entry.getKey()), String.class);
assertThat(entry.getValue(), containsInAnyOrder(expectedValues));
}
return null;
});
p.run();
}
示例26
DummySink() {
super(
StaticValueProvider.of(FileSystems.matchNewResource("nowhere", false)),
DynamicFileDestinations.constant(
new DummyFilenamePolicy(), SerializableFunctions.constant(null)));
}
示例27
TestSink(String tmpFolder) {
super(
ValueProvider.StaticValueProvider.of(FileSystems.matchNewResource(tmpFolder, true)),
DynamicFileDestinations.constant(FILENAME_POLICY, SerializableFunctions.identity()));
}