尝试在Cloud Dataflow Job中启用流式传输,需要从一个BigQuery表中读取数据并将其写入另一个具有追加模式的BigQuery表。
为此,我在Java代码中启用了options. setStreaming(true);
。
应用窗口概念-固定窗口选项(如下代码),
PCollection<TableRow> fixedWindowedItems = finalRecords.apply(Window.<TableRow>into(FixedWindows.of(Duration.standardMinutes(1))));
最后用BigQueryIO将数据写入BigQuery表(如下代码),
fixedWindowedItems.apply(BigQueryIO.writeTableRows()
.withSchema(schema1)
.to(options.getTargetTable())
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withFailedInsertRetryPolicy(InsertRetryPolicy.alwaysRetry())
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
代码工作正常。没有错误。数据第一次从一个表移动到另一个表。但是,如果您在第一个表中插入新数据,则第二个表不会得到反映。作业似乎以成功状态完成,尽管作业类型为流式传输。
如果我在代码/配置级别遗漏了一些东西以启用流模式,请告诉我。
初步答复:
您正在寻找的功能是BigQuery输出一个更改流,并且该流应用于另一个BigQuery表,对吗?这不是Apache Beam/Dataflow BigQuery源提供的。
您的管道运行并完成,因为它将批处理数据从BigQuery表复制/查询到另一个表中。
为什么您希望保持两个BQ表同步?如果您解释您的场景,我们可以共同努力改进它。