我有一个流式的apache光束管道,它对数据进行操作并写入大查询,数据的表名和模式在数据本身内,所以我使用侧输入来提供表名和模式,使用side_inputs两者。
所以我的管道代码看起来像这样-
pipeline | "Writing to big query">>beam.io.WriteToBigQuery(
schema=lambda row,schema:write_table_schema(row,schema),
schema_side_inputs = (table_schema,),
project=args['PROJECT_ID'],dataset=args['DATASET_ID'],
table = lambda row,table_name:write_table_name(row,table_name),table_side_inputs=(table_name,) ,ignore_unknown_columns=args['ignore_unknown_columns'],
additional_bq_parameters=additional_bq_parameters, insert_retry_strategy= RetryStrategy.RETRY_ON_TRANSIENT_ERROR))
为此,我需要添加窗口间隔(在写入大查询之前)
pipeline = pipeline | "To Window Fixed Intervals" >> beam.WindowInto(beam.window.FixedWindows(10)))
然后这个窗口数据继续成为3个管道操作的输入,WriteToBigQuery
的2个侧输入是这样的-
table_name = (pipeline
| "Get table name" >> beam.Map(lambda record: get_table_name(record)) )
table_name = beam.pvalue.AsSingleton(table_name)
table_schema = (pipeline
| "Get table schema" >> beam.Map(lambda record: get_table_schema(record)) )
table_schema = beam.pvalue.AsSingleton(table_schema)
所有这些工作正常,直到我需要在窗口间隔之前拆分数据,例如
mapped_data = (pipeline
|"Converting to map ">>beam.ParDo(ConvertToMap()).with_outputs("SUCCESS","FAILURE"))
pipeline = (mapped_data['SUCCESS']
| "To Window Fixed Intervals" >> beam.WindowInto(beam.window.FixedWindows(10)))
当我这样做的时候,我遇到了以下错误-
( ValueError: PCollection of size 2 with more than one element accessed as a singleton view. First two elements encountered are "name_1", "name_1". [while running 'Writing to big query/_StreamToBigQuery/AppendDestination-ptransform-48']
我跳过了管道中的一些步骤,因为它太复杂了。
如何修复此错误?
我尝试过使用AsDmit而不是AsSingleton,但它给出了以下错误-
ValueError: dictionary update sequence element #0 has length 20; 2 is required [while running 'Writing to big query/_StreamToBigQuery/AppendDestination-ptransform-48']
我不认为这里有任何AsDmit的用法。也许这个问题不是由于标记,而是等待高数据发生,因为它是一个流式管道。
解决方案-这里的问题是每次都生成侧输入,但只有条件地生成主输入。这使得侧输入的数量比主输入多,因此出现了问题。
在修复了这个问题但使侧输入通过与主输入相同的条件生成后,我遇到了另一个问题-
Cannot convert GlobalWindow to apache_beam.utils.windowed_value._IntervalWindowBase [while running 'Writing to big query/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)-ptransform-124']
将以下窗口转换添加到管道
"Window into Global Intervals" >> beam.WindowInto(beam.window.FixedWindows(1)) |beam.GroupByKey()
给出了以下错误-
AbstractComponentCoderImpl.encode_to_stream ValueError: Number of components does not match number of coders. [while running 'WindowInto(WindowIntoFn)
这里的任何帮助都很感激。
出现此问题可能是因为当PCollection具有不同的窗口集时,返回GlobalWindow中元素的代码出现了一些问题。根据您的要求,我建议您在beam. WindowTo(beam.window.GlobalWindows())之间插入
beam.WindowTo(NONGLOBALWINDOW)|beam.GroupByKey()和其他导致问题的P变换。