提问者:小点点

如何让Google Dataflow从输入数据写入BigQuery表名?


我是Dataflow/Beam的新手。我正在尝试将一些数据写入BigQuery。我希望目标表名从前一阶段引入一个映射条目键控的“表”。但是我找不到如何通过管道将此表名传递给BigQuery。这就是我卡住的地方…有什么想法下一步该怎么办?

pipeline
// ...
//////// I guess I shouldn't output TableRow here?
.apply("ToBQRow", ParDo.of(new DoFn<Map<String, String>, TableRow>() {
    @ProcessElement
    public void processElement(ProcessContext c) throws Exception {
        ////////// WHAT DO I DO WITH "table"?
        String table = c.element().get("table");
        TableRow row = new TableRow();
        // ... set some records
        c.output(row);
    }
}))
.apply(BigQueryIO.writeTableRows().to(/* ///// WHAT DO I WRITE HERE?? */)
    .withSchema(schema)
    .withWriteDisposition(
        BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
));

共1个答案

匿名用户

您可以为此使用Dynamic没目标。

例如,我创建了一些虚拟数据,我将使用最后一个单词作为键:

p.apply("Create Data", Create.of("this should go to table one",
                                 "I would like to go to table one",
                                 "please, table one",
                                 "I prefer table two",
                                 "Back to one",
                                 "My fave is one",
                                 "Rooting for two"))
.apply("Create Keys", ParDo.of(new DoFn<String, KV<String,String>>() {
    @ProcessElement
    public void processElement(ProcessContext c) {
      String[] splitBySpaces = c.element().split(" ");
      c.output(KV.of(splitBySpaces[splitBySpaces.length - 1],c.element()));
    }
  }))

然后使用getDestination我们控制如何根据键和getTable将每个元素路由到不同的表以构建完全限定的表名(前缀)。如果不同的表具有不同的模式,我们可以使用getSchema。最后,我们使用withFormatFunction控制在表中写入的内容:

.apply(BigQueryIO.<KV<String, String>>write()
.to(new DynamicDestinations<KV<String, String>, String>() {
    public String getDestination(ValueInSingleWindow<KV<String, String>> element) {
        return element.getValue().getKey();
    }
    public TableDestination getTable(String name) {
      String tableSpec = output + name;
        return new TableDestination(tableSpec, "Table for type " + name);
  }
    public TableSchema getSchema(String schema) {
          List<TableFieldSchema> fields = new ArrayList<>();

      fields.add(new TableFieldSchema().setName("Text").setType("STRING"));
      TableSchema ts = new TableSchema();
      ts.setFields(fields);
      return ts;
    }
})
.withFormatFunction(new SerializableFunction<KV<String, String>, TableRow>() {
    public TableRow apply(KV<String, String> row) {
    TableRow tr = new TableRow();

    tr.set("Text", row.getValue());
    return tr;
    }
 })
 .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

为了完全测试这一点,我创建了以下表:

bq mk dynamic_key
bq mk -f dynamic_key.dynamic_one Text:STRING
bq mk -f dynamic_key.dynamic_two Text:STRING

并且,在设置$PROJECT$BUCKET$TABLE_PREFIX(在我的例子中是PROJECT_ID:dynamic_key.dynamic_)变量后,我使用以下命令运行作业:

mvn -Pdataflow-runner compile -e exec:java \
 -Dexec.mainClass=com.dataflow.samples.DynamicTableFromKey \
      -Dexec.args="--project=$PROJECT \
      --stagingLocation=gs://$BUCKET/staging/ \
      --tempLocation=gs://$BUCKET/temp/ \
      --output=$TABLE_PREFIX \
      --runner=DataflowRunner"

我们可以验证每个元素是否进入了正确的表:

$ bq query "SELECT * FROM dynamic_key.dynamic_one"
+---------------------------------+
|              Text               |
+---------------------------------+
| please, table one               |
| Back to one                     |
| My fave is one                  |
| this should go to table one     |
| I would like to go to table one |
+---------------------------------+
$ bq query "SELECT * FROM dynamic_key.dynamic_two"
+--------------------+
|        Text        |
+--------------------+
| I prefer table two |
| Rooting for two    |
+--------------------+

完整代码在这里。