提问者:小点点

Google Dataflow-能够并行化当前运行步骤中的工作


我尝试运行谷歌数据流模拟过程。我试图通过最大工作节点5启动数据流,自动缩放(THROUGHPUT_BASED)。

问题是数据流没有利用所有的工作节点(5)和给我消息。

自动缩放:根据并行化当前运行步骤中的工作的能力,将工作人员的数量减少到1。

请建议可能是什么问题。

日志附在下面。

更新:-这是防止融合的正确方法吗?

我正在使用bigqueryIO读取一个Bigquery表。这个输入参数给我每个记录中的产品编号。

在我进行ParDo操作之后。在流程元素函数中,我正在为我从输入中获得的每个产品做一些数据预测操作。

PCollection<TableRow> quotes3 = quotes2.apply(ParDo.of(new  DoFn<TableRow, TableRow>() {
private static final long serialVersionUID = 1L;
    @Override
    public void processElement(ProcessContext c) throws Exception{

        TableRow rowData = c.element();
        rowData = c.element();
        TableRow tableRowData = ForcastOperation(rowData);
        c.output(tableRowDRP);
    }
}));

在最后一步中,我使用数据流管道将预测结果转储到bigquery中。

quotes3.apply(BigQueryIO.Write
        .named("WriteToBigQuery")
        .to("com-dl-parts:ds_parts.output_data")
        .withSchema(schema)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

pipelineTransform.run();

更新:-17/05/2017 13:38

我正在尝试通过下面列出的方式打破融合。它将GroupByKey应用操作扩展到308个节点。但我不确定它是否将具有Forcast操作方法的第二次pardo扩展到308个节点。

 PCollection<String> quotes1 = quotes.apply(ParDo.of(new  DoFn<TableRow, KV<String, String>>() {
    private static final long serialVersionUID = 1L;
    private Random random = new Random();

        @Override
        public void processElement(ProcessContext c) throws Exception{
            TableRow rowData = c.element();
            rowData = c.element();
            c.output(KV.of(rowData.get("APRODUCT").toString(), rowData.get("APRODUCT").toString()));
        }
        })).apply(GroupByKey.<String, String>create())
           .apply(Values.<Iterable<String>>create())
           .apply(Flatten.<String>iterables());



PCollection<TableRow> quotes3 = quotes1.apply(ParDo.of(new  DoFn<String, TableRow>() {
private static final long serialVersionUID = 1L;
    @Override
    public void processElement(ProcessContext c) throws Exception{

        TableRow rowData = c.element();
        rowData = c.element();
        TableRow tableRowData = ForcastOperation(rowData);
        c.output(tableRowDRP);
    }
}));

在最后一步中,我使用数据流管道将预测结果转储到bigquery中。

    quotes3.apply(BigQueryIO.Write
        .named("WriteToBigQuery")
        .to("com-dl-parts:ds_parts.output_data")
        .withSchema(schema)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

pipelineTransform.run();

共1个答案

匿名用户

我查看了这个作业的日志,它从BigQuery作为输入读取的数据量非常小——大约1kb。这是预期的吗?

如果是,并且如果您仍然想并行处理1kb 1000路,那么我假设您的预测操作函数每个元素的计算量非常大。

在这种情况下,您需要中断从BigQuery读取和应用预测操作之间的融合。请参阅https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion