提问者:小点点

GCloud数据流如果在作业运行期间被删除,则重新创建BigQuery表


我已经设置了一个GCloud数据流管道,它使用来自Pub/Sub订阅的消息,将它们转换为表行并将这些行写入相应的BigQuery表。

表的目标是根据Pub/Sub消息的内容决定的,偶尔会导致表还不存在,必须先创建的情况。为此,我使用创建配置CREATE_IF_NEEDED,效果很好。

但是,我注意到,如果我在Dataflow作业仍在运行时手动删除BigQuery中新创建的表,Dataflow将卡住并且不会重新创建表。相反,我收到一个错误:

Operation ongoing in step write-rows-to-bigquery/StreamingInserts/StreamingWriteTables/StreamingWrite for at least 05m00s without outputting or completing in state finish at sun.misc.Unsafe.park(Native Method) at
    java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at
    java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429) at
    java.util.concurrent.FutureTask.get(FutureTask.java:191) at
    org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:816) at
    org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:881) at
    org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:143) at
    org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:115) at
    org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn$DoFnInvoker.invokeFinishBundle(Unknown Source)

如果我回到BigQuery并手动重新创建此表,Dataflow作业将继续工作。

但是,我想知道是否有一种方法可以指示Dataflow管道在作业运行期间删除表时重新创建表?


共1个答案

匿名用户

这在当前的BigqueryIO连接器中是不可能的。从这里存在的连接器的github链接中,您将观察到,对于您的代码StreamingWriteFn,表创建过程是在getOrCreateTable中完成的,这在finishBundle中调用。有一个createdTables的映射被维护,在finishBundle中,如果表不存在,则会创建它,一旦它存在并存储在hashmap中,它就不会重新创建,如下所示:-

    public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec)
        throws IOException {
      TableReference tableReference = parseTableSpec(tableSpec);
      if (!createdTables.contains(tableSpec)) {
        synchronized (createdTables) {
          // Another thread may have succeeded in creating the table in the meanwhile, so
          // check again. This check isn't needed for correctness, but we add it to prevent
          // every thread from attempting a create and overwhelming our BigQuery quota.
          if (!createdTables.contains(tableSpec)) {
            TableSchema tableSchema = JSON_FACTORY.fromString(jsonTableSchema, TableSchema.class);
            Bigquery client = Transport.newBigQueryClient(options).build();
            BigQueryTableInserter inserter = new BigQueryTableInserter(client);
            inserter.getOrCreateTable(tableReference, WriteDisposition.WRITE_APPEND,
                CreateDisposition.CREATE_IF_NEEDED, tableSchema);
            createdTables.add(tableSpec);
          }
        }
      }
      return tableReference;
    }

为了满足您的要求,您可能必须维护自己的BigqueryIO,其中您不执行此特定检查

if (!createdTables.contains(tableSpec)) {

然而,更重要的问题是为什么表会在生产系统中被删除?这个问题应该得到解决,而不是尝试从数据流重新创建表。