我能够创建一个数据流管道,它从pub/sub读取数据,并在处理后以流模式写入大查询。
现在,我想以批处理模式运行我的管道,以降低成本,而不是流模式。
目前我的流水线正在使用动态目的地在bigquery中进行流式插入。我想知道是否有办法使用动态目的地执行批量插入操作。
下面是
public class StarterPipeline {
public interface StarterPipelineOption extends PipelineOptions {
/**
* Set this required option to specify where to read the input.
*/
@Description("Path of the file to read from")
@Default.String(Constants.pubsub_event_pipeline_url)
String getInputFile();
void setInputFile(String value);
}
@SuppressWarnings("serial")
public static void main(String[] args) throws SocketTimeoutException {
StarterPipelineOption options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(StarterPipelineOption.class);
Pipeline p = Pipeline.create(options);
PCollection<String> datastream = p.apply("Read Events From Pubsub",
PubsubIO.readStrings().fromSubscription(Constants.pubsub_event_pipeline_url));
PCollection<String> windowed_items = datastream.apply(Window.<String>into(new GlobalWindows())
.triggering(Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(300))))
.withAllowedLateness(Duration.standardDays(10)).discardingFiredPanes());
// Write into Big Query
windowed_items.apply("Read and make event table row", new
ReadEventJson_bigquery())
.apply("Write_events_to_BQ",
BigQueryIO.writeTableRows().to(new DynamicDestinations<TableRow, String>() {
public String getDestination(ValueInSingleWindow<TableRow> element) {
String destination = EventSchemaBuilder
.fetch_destination_based_on_event(element.getValue().get("event").toString());
return destination;
}
@Override
public TableDestination getTable(String table) {
String destination =
EventSchemaBuilder.fetch_table_name_based_on_event(table);
return new TableDestination(destination, destination);
}
@Override
public TableSchema getSchema(String table) {
TableSchema table_schema =
EventSchemaBuilder.fetch_table_schema_based_on_event(table);
return table_schema;
}
}).withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
p.run().waitUntilFinish();
log.info("Events Pipeline Job Stopped");
}
}
批处理或流由PCollection决定,因此您需要将数据流PCollection从Pub/Sub转换为批处理PCollection以写入BigQuery。允许这样做的转换是GroupIntoBatches
请注意,由于此转换使用键值对,批处理将仅包含单个键的元素。对于非KV元素,请检查此相关答案。
使用此转换将PCollection作为批处理创建后,请像使用流PCollection一样应用带有动态目标的BigQuery写入。
您可以通过对流式作业使用文件加载来限制成本。插入方法部分指出,BigQueryIO. Write支持两种方法将数据插入到使用BigQueryIO.Write.与方法(org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method)指定的BigQuery中。如果未提供任何方法,则将根据输入PCollection选择默认方法。有关方法的更多信息,请参阅BigQueryIO.Write.Method。
不同的插入方法提供了成本、配额和数据展示一致性的不同权衡。有关这些权衡的更多信息,请参阅BigQuery留档。