我开始学习Dataflow,我正在使用此代码示例自动完成。我正在尝试从BigQuery读取,但我收到此错误:
ERROR:root:Error while visiting split
...
File "/usr/lib/python2.7/re.py", line 181, in findall
return _compile(pattern, flags).findall(string)
TypeError: expected string or buffer [while running 'split']
代码:
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--output',
required=True,
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
(p # pylint: disable=expression-not-assigned
| 'read' >> beam.io.Read(beam.io.BigQuerySource(input_table))
| 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
| 'TopPerPrefix' >> TopPerPrefix(5)
| 'format' >> beam.Map(
lambda (prefix, candidates): '%s: %s' % (prefix, candidates))
| 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
p.run()
谢谢你的反馈汤姆
默认情况下,BigQuerySource返回结构化记录,即从列到输入表中的值的映射。这意味着您不能直接对记录运行re. findall。
相反,提取您关心的特定字段,即,
| 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x['my_string_field']))