我正在使用数据流中的流数据从pub-sub收听数据。然后我需要上传到存储,处理数据并将其上传到bigquery。
这是我的代码:
public class BotPipline {
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setProject(MY_PROJECT);
options.setStagingLocation(MY_STAGING_LOCATION);
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
PCollection<String> input = pipeline.apply(PubsubIO.Read.maxNumRecords(1).subscription(MY_SUBSCRIBTION));
input.apply(TextIO.Write.to(MY_STORAGE_LOCATION));
input
.apply(someDataProcessing(...)).named("update json"))
.apply(convertToTableRow(...)).named("convert json to table row"))
.apply(BigQueryIO.Write.to(MY_BQ_TABLE).withSchema(tableSchema)
);
pipeline.run();
}
}
当我运行注释写入存储的代码时,代码运行良好。但是当我尝试上传到大查询时,我收到这个错误(这是预期的…):
Write can only be applied to a Bounded PCollection
我没有使用绑定,因为我需要一直运行它,并且我需要立即上传数据。有什么解决方案吗?
编辑:这是我想要的行为:
我通过pubsub接收消息。每条消息都应该存储在自己的文件中,GCS作为粗略数据,对数据执行一些处理,然后将其保存到大查询中-在数据中具有文件名。
在BQ示例中收到数据后应立即查看:
data published to pubsub : {a:1, b:2}
data saved to GCS file UUID: A1F432
data processing : {a:1, b:2} ->
{a:11, b: 22} ->
{fileName: A1F432, data: {a:11, b: 22}}
data in BQ : {fileName: A1F432, data: {a:11, b: 22}}
该想法是,将处理后的数据存储在具有链接的BQ中,以将Rough数据存储在GCS
目前我们不支持在TextIO. Write
中编写无界集合。见相关问题。
你能澄清你希望无界TextIO. Write
的行为是什么吗?例如,你想有一个不断增长的文件,或者每个窗口一个文件,在窗口关闭时关闭,或者其他什么,或者只对你来说,编写的文件的全部内容最终将包含所有PubSub消息,但文件的结构并不重要,等等?
作为一种解决方法,您可以实现编写GCS为您自己的DoFn
,使用IOChannelFactory
与GCS交互(事实上,TextIO. Write
本质上只是一个复合转换,用户可以从头开始编写自己)。
您可以使用@ProcessElement
上的可选BoundedWindow
参数访问数据窗口。如果您解释所需的行为,我将能够提供更多建议。