提问者:小点点

使用数据流在BigQuery表之间流式更新


尝试在Cloud Dataflow Job中启用流式传输,需要从一个BigQuery表中读取数据并将其写入另一个具有追加模式的BigQuery表。

为此,我在Java代码中启用了options. setStreaming(true);

应用窗口概念-固定窗口选项(如下代码),

PCollection<TableRow> fixedWindowedItems = finalRecords.apply(Window.<TableRow>into(FixedWindows.of(Duration.standardMinutes(1))));

最后用BigQueryIO将数据写入BigQuery表(如下代码),

fixedWindowedItems.apply(BigQueryIO.writeTableRows()
                .withSchema(schema1)
                .to(options.getTargetTable())
                .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                .withFailedInsertRetryPolicy(InsertRetryPolicy.alwaysRetry())
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

代码工作正常。没有错误。数据第一次从一个表移动到另一个表。但是,如果您在第一个表中插入新数据,则第二个表不会得到反映。作业似乎以成功状态完成,尽管作业类型为流式传输。

如果我在代码/配置级别遗漏了一些东西以启用流模式,请告诉我。


共1个答案

匿名用户

初步答复:

您正在寻找的功能是BigQuery输出一个更改流,并且该流应用于另一个BigQuery表,对吗?这不是Apache Beam/Dataflow BigQuery源提供的。

您的管道运行并完成,因为它将批处理数据从BigQuery表复制/查询到另一个表中。

为什么您希望保持两个BQ表同步?如果您解释您的场景,我们可以共同努力改进它。