我已经设置了一个GCloud数据流管道,它使用来自Pub/Sub订阅的消息,将它们转换为表行并将这些行写入相应的BigQuery表。
表的目标是根据Pub/Sub消息的内容决定的,偶尔会导致表还不存在,必须先创建的情况。为此,我使用创建配置CREATE_IF_NEEDED
,效果很好。
但是,我注意到,如果我在Dataflow作业仍在运行时手动删除BigQuery中新创建的表,Dataflow将卡住并且不会重新创建表。相反,我收到一个错误:
Operation ongoing in step write-rows-to-bigquery/StreamingInserts/StreamingWriteTables/StreamingWrite for at least 05m00s without outputting or completing in state finish at sun.misc.Unsafe.park(Native Method) at
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at
java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429) at
java.util.concurrent.FutureTask.get(FutureTask.java:191) at
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:816) at
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:881) at
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:143) at
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:115) at
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn$DoFnInvoker.invokeFinishBundle(Unknown Source)
如果我回到BigQuery并手动重新创建此表,Dataflow作业将继续工作。
但是,我想知道是否有一种方法可以指示Dataflow管道在作业运行期间删除表时重新创建表?
这在当前的BigqueryIO
连接器中是不可能的。从这里存在的连接器的github链接中,您将观察到,对于您的代码StreamingWriteFn
,表创建过程是在getOrCreateTable
中完成的,这在finishBundle
中调用。有一个createdTables
的映射被维护,在finishBundle
中,如果表不存在,则会创建它,一旦它存在并存储在hashmap中,它就不会重新创建,如下所示:-
public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec)
throws IOException {
TableReference tableReference = parseTableSpec(tableSpec);
if (!createdTables.contains(tableSpec)) {
synchronized (createdTables) {
// Another thread may have succeeded in creating the table in the meanwhile, so
// check again. This check isn't needed for correctness, but we add it to prevent
// every thread from attempting a create and overwhelming our BigQuery quota.
if (!createdTables.contains(tableSpec)) {
TableSchema tableSchema = JSON_FACTORY.fromString(jsonTableSchema, TableSchema.class);
Bigquery client = Transport.newBigQueryClient(options).build();
BigQueryTableInserter inserter = new BigQueryTableInserter(client);
inserter.getOrCreateTable(tableReference, WriteDisposition.WRITE_APPEND,
CreateDisposition.CREATE_IF_NEEDED, tableSchema);
createdTables.add(tableSpec);
}
}
}
return tableReference;
}
为了满足您的要求,您可能必须维护自己的BigqueryIO,其中您不执行此特定检查
if (!createdTables.contains(tableSpec)) {
然而,更重要的问题是为什么表会在生产系统中被删除?这个问题应该得到解决,而不是尝试从数据流重新创建表。