提问者:小点点

使用Cloud Dataflow将数据从PubSub流式传输到Google Cloud Storage


我正在使用数据流中的流数据从pub-sub收听数据。然后我需要上传到存储,处理数据并将其上传到bigquery。

这是我的代码:

public class BotPipline {

public static void main(String[] args) {

    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
    options.setRunner(BlockingDataflowPipelineRunner.class);
    options.setProject(MY_PROJECT);
    options.setStagingLocation(MY_STAGING_LOCATION);
    options.setStreaming(true);

    Pipeline pipeline = Pipeline.create(options);

    PCollection<String> input = pipeline.apply(PubsubIO.Read.maxNumRecords(1).subscription(MY_SUBSCRIBTION));

    input.apply(TextIO.Write.to(MY_STORAGE_LOCATION));

    input
    .apply(someDataProcessing(...)).named("update json"))
    .apply(convertToTableRow(...)).named("convert json to table row"))
            .apply(BigQueryIO.Write.to(MY_BQ_TABLE).withSchema(tableSchema)
    );
    pipeline.run();
}

}

当我运行注释写入存储的代码时,代码运行良好。但是当我尝试上传到大查询时,我收到这个错误(这是预期的…):

Write can only be applied to a Bounded PCollection

我没有使用绑定,因为我需要一直运行它,并且我需要立即上传数据。有什么解决方案吗?

编辑:这是我想要的行为:

我通过pubsub接收消息。每条消息都应该存储在自己的文件中,GCS作为粗略数据,对数据执行一些处理,然后将其保存到大查询中-在数据中具有文件名。

在BQ示例中收到数据后应立即查看:

data published to pubsub : {a:1, b:2} 
data saved to GCS file UUID: A1F432 
data processing :  {a:1, b:2} -> 
                   {a:11, b: 22} -> 
                   {fileName: A1F432, data: {a:11, b: 22}} 
data in BQ : {fileName: A1F432, data: {a:11, b: 22}} 

该想法是,将处理后的数据存储在具有链接的BQ中,以将Rough数据存储在GCS


共1个答案

匿名用户

目前我们不支持在TextIO. Write中编写无界集合。见相关问题。

你能澄清你希望无界TextIO. Write的行为是什么吗?例如,你想有一个不断增长的文件,或者每个窗口一个文件,在窗口关闭时关闭,或者其他什么,或者只对你来说,编写的文件的全部内容最终将包含所有PubSub消息,但文件的结构并不重要,等等?

作为一种解决方法,您可以实现编写GCS为您自己的DoFn,使用IOChannelFactory与GCS交互(事实上,TextIO. Write本质上只是一个复合转换,用户可以从头开始编写自己)。

您可以使用@ProcessElement上的可选BoundedWindow参数访问数据窗口。如果您解释所需的行为,我将能够提供更多建议。