提问者:小点点

从Bigquery导出到存储


将数据从BigQuery导出到Google Storage的最佳方式是什么?请注意,我需要对BigQuery运行查询,而不是导出所有数据。本质上,我需要对BigQuery运行自定义查询(如select*from mytable where code=foo),查询的结果需要写入csv,存储在Google Cloud上。我相信,最好的方法是通过Google数据流。让我知道是否有其他选择?此外,我正在寻找一些关于如何完成此操作的示例。有什么地方可以找到一些示例吗?

这是我到目前为止所拥有的PipelineOptions PipelineOptions=PipelineOptionsFactory. create();管道p=Pipeline.create(Pipeline Options);

    Date date = new Date();

    p.getOptions().setTempLocation("gs://mybucket/tmp"+date.getTime());

    PCollection<TableRow> rowPCollection = p.apply(BigQueryIO.Read.named("promos")
            .fromQuery("SELECT * FROM [projectid:mydataset.mytable] where id = 256 LIMIT 1000"));

    PCollection<String> stringPCollection = rowPCollection.apply(ParDo.named("Extract").of(new DoFn<TableRow, String>() {
        @Override
        public void processElement(ProcessContext c) {
            TableRow tableRow = c.element();
            try {
                String prettyString = tableRow.toPrettyString();
                c.output(prettyString);
            } catch (IOException e) {
                log.error("Exception occurred:" + e.getMessage());
            }
        }
    }));

    stringPCollection.apply(TextIO.Write.named("WriteOutput").to("gs://mybucket/avexport").withSuffix(".csv"));

    p.run();

当此运行时,在创建ParDo时会抛出异常

caused by: java.io.NotSerializableException: com.my.validation.CommonValidator
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:50)

共2个答案

匿名用户

我猜您的匿名DoFn正在从无法序列化的封闭类(Common Validator)中提取一些东西。如果您为DoFn实现创建一个静态类,这是否解决了问题?

有关详细信息,请参阅匿名类上的NotSerializableException。

匿名用户

撇开错误不谈,您不必使用Dataflow将BigQuery数据导出到GCS,除非您正在Dataflow管道中进行一些复杂的转换(无论如何,您几乎可以在SQL/UDF中进行这些转换,但我跑题了)。从您的代码片段和描述来看,您似乎没有对数据进行任何类型的转换。

你可以:

  1. 运行SQL并将结果保存到BigQuery表中。
  2. 按此处所述将表导出为GCS。