提问者:小点点

WriteToBigQuery的侧输入-大小为2的PCollection,其中多个元素作为单例视图访问


我有一个流式的apache光束管道,它对数据进行操作并写入大查询,数据的表名和模式在数据本身内,所以我使用侧输入来提供表名和模式,使用side_inputs两者。

所以我的管道代码看起来像这样-

 pipeline | "Writing to big query">>beam.io.WriteToBigQuery(
        schema=lambda row,schema:write_table_schema(row,schema),
        schema_side_inputs = (table_schema,),
        project=args['PROJECT_ID'],dataset=args['DATASET_ID'],
        table = lambda row,table_name:write_table_name(row,table_name),table_side_inputs=(table_name,) ,ignore_unknown_columns=args['ignore_unknown_columns'], 
        additional_bq_parameters=additional_bq_parameters, insert_retry_strategy=  RetryStrategy.RETRY_ON_TRANSIENT_ERROR))

为此,我需要添加窗口间隔(在写入大查询之前)

pipeline = pipeline | "To Window Fixed Intervals" >> beam.WindowInto(beam.window.FixedWindows(10)))

然后这个窗口数据继续成为3个管道操作的输入,WriteToBigQuery的2个侧输入是这样的-

table_name = (pipeline 
    | "Get table name" >> beam.Map(lambda record: get_table_name(record)) )
    table_name = beam.pvalue.AsSingleton(table_name)

table_schema = (pipeline 
    | "Get table schema" >> beam.Map(lambda record: get_table_schema(record)) )
    table_schema = beam.pvalue.AsSingleton(table_schema)

所有这些工作正常,直到我需要在窗口间隔之前拆分数据,例如

mapped_data = (pipeline 
    |"Converting to map ">>beam.ParDo(ConvertToMap()).with_outputs("SUCCESS","FAILURE"))
pipeline = (mapped_data['SUCCESS']
    | "To Window Fixed Intervals" >> beam.WindowInto(beam.window.FixedWindows(10)))

当我这样做的时候,我遇到了以下错误-

( ValueError: PCollection of size 2 with more than one element accessed as a singleton view. First two elements encountered are "name_1", "name_1". [while running 'Writing to big query/_StreamToBigQuery/AppendDestination-ptransform-48']

我跳过了管道中的一些步骤,因为它太复杂了。

如何修复此错误?

我尝试过使用AsDmit而不是AsSingleton,但它给出了以下错误-

ValueError: dictionary update sequence element #0 has length 20; 2 is required [while running 'Writing to big query/_StreamToBigQuery/AppendDestination-ptransform-48']

我不认为这里有任何AsDmit的用法。也许这个问题不是由于标记,而是等待高数据发生,因为它是一个流式管道。

解决方案-这里的问题是每次都生成侧输入,但只有条件地生成主输入。这使得侧输入的数量比主输入多,因此出现了问题。

在修复了这个问题但使侧输入通过与主输入相同的条件生成后,我遇到了另一个问题-

 Cannot convert GlobalWindow to apache_beam.utils.windowed_value._IntervalWindowBase [while running 'Writing to big query/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)-ptransform-124']

将以下窗口转换添加到管道

"Window into Global Intervals" >> beam.WindowInto(beam.window.FixedWindows(1)) |beam.GroupByKey()

给出了以下错误-

AbstractComponentCoderImpl.encode_to_stream ValueError: Number of components does not match number of coders. [while running 'WindowInto(WindowIntoFn)

这里的任何帮助都很感激。


共1个答案

匿名用户

出现此问题可能是因为当PCollection具有不同的窗口集时,返回GlobalWindow中元素的代码出现了一些问题。根据您的要求,我建议您在beam. WindowTo(beam.window.GlobalWindows())之间插入beam.WindowTo(NONGLOBALWINDOW)|beam.GroupByKey()和其他导致问题的P变换。