Java源码示例:org.apache.beam.examples.common.WriteOneFilePerWindow
示例1
public static void main(String[] args) throws IOException {
// The maximum number of shards when writing output.
int numShards = 1;
PubSubToGCSOptions options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(PubSubToGCSOptions.class);
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
pipeline
// 1) Read string messages from a Pub/Sub topic.
.apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
// 2) Group the messages into fixed-sized minute intervals.
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
// 3) Write one file to GCS for every window of messages.
.apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));
// Execute the pipeline and wait until it finishes running.
pipeline.run().waitUntilFinish();
}
示例2
static void runWindowedWordCount(Options options) throws IOException {
final String output = options.getOutput();
final Instant minTimestamp = new Instant(options.getMinTimestampMillis());
final Instant maxTimestamp = new Instant(options.getMaxTimestampMillis());
Pipeline pipeline = Pipeline.create(options);
/*
* Concept #1: the Beam SDK lets us run the same pipeline with either a bounded or
* unbounded input source.
*/
PCollection<String> input =
pipeline
/* Read from the GCS file. */
.apply(TextIO.read().from(options.getInputFile()))
// Concept #2: Add an element timestamp, using an artificial time just to show
// windowing.
// See AddTimestampFn for more detail on this.
.apply(ParDo.of(new AddTimestampFn(minTimestamp, maxTimestamp)));
/*
* Concept #3: Window into fixed windows. The fixed window size for this example defaults to 1
* minute (you can change this with a command-line option). See the documentation for more
* information on how fixed windows work, and for information on the other types of windowing
* available (e.g., sliding windows).
*/
PCollection<String> windowedWords =
input.apply(
Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))));
/*
* Concept #4: Re-use our existing CountWords transform that does not have knowledge of
* windows over a PCollection containing windowed values.
*/
PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new WordCount.CountWords());
/*
* Concept #5: Format the results and write to a sharded file partitioned by window, using a
* simple ParDo operation. Because there may be failures followed by retries, the
* writes must be idempotent, but the details of writing to files is elided here.
*/
wordCounts
.apply(MapElements.via(new WordCount.FormatAsTextFn()))
.apply(new WriteOneFilePerWindow(output, options.getNumShards()));
PipelineResult result = pipeline.run();
try {
result.waitUntilFinish();
} catch (Exception exc) {
result.cancel();
}
}
示例3
static void runWindowedWordCount(Options options) throws IOException {
final String output = options.getOutput();
final Instant minTimestamp = new Instant(options.getMinTimestampMillis());
final Instant maxTimestamp = new Instant(options.getMaxTimestampMillis());
Pipeline pipeline = Pipeline.create(options);
/*
* Concept #1: the Beam SDK lets us run the same pipeline with either a bounded or
* unbounded input source.
*/
PCollection<String> input =
pipeline
/* Read from the GCS file. */
.apply(TextIO.read().from(options.getInputFile()))
// Concept #2: Add an element timestamp, using an artificial time just to show
// windowing.
// See AddTimestampFn for more detail on this.
.apply(ParDo.of(new AddTimestampFn(minTimestamp, maxTimestamp)));
/*
* Concept #3: Window into fixed windows. The fixed window size for this example defaults to 1
* minute (you can change this with a command-line option). See the documentation for more
* information on how fixed windows work, and for information on the other types of windowing
* available (e.g., sliding windows).
*/
PCollection<String> windowedWords =
input.apply(
Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))));
/*
* Concept #4: Re-use our existing CountWords transform that does not have knowledge of
* windows over a PCollection containing windowed values.
*/
PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new WordCount.CountWords());
/*
* Concept #5: Format the results and write to a sharded file partitioned by window, using a
* simple ParDo operation. Because there may be failures followed by retries, the
* writes must be idempotent, but the details of writing to files is elided here.
*/
wordCounts
.apply(MapElements.via(new WordCount.FormatAsTextFn()))
.apply(new WriteOneFilePerWindow(output, options.getNumShards()));
PipelineResult result = pipeline.run();
try {
result.waitUntilFinish();
} catch (Exception exc) {
result.cancel();
}
}