我正在GCP上运行一个使用apacheSDK2.39创建的流式传输管道作为数据流作业。基本上,我从PubSub读取并写入PubSub,通过墙壁时钟进行一些聚合。
当我的一个ParDo操作抛出运行时异常时,我遇到了一个问题(运行时异常的原因是PubSub中的一个输入元素“无效”:属性为null,但代码假定它是非null)。我观察到的是:
我正在寻找一种通用的方法来处理这种情况:我想重试一个有问题的元素配置的次数(以覆盖一些瞬态问题,如在我的ParDo之一,我正在访问BigQuery),但经过多次重试后,我只想记录问题并忽略违规元素-我不在乎输出消息会乱序。
它可以在Apache Beam中以某种方式完成吗?我知道我可能会将所有ParDo逻辑包装在try/catch块中,但是:
最简单的方法是在DoFn的流程方法中放置一个try/catch块(可能使用for循环重试固定次数)。然后,如果您想用它做其他事情(例如计算它们或记录它们以供将来检查),您可以将持久失败的消息发送到不同的输出。这种模式(以重试为模,认为这可能是一个值得添加的功能)通常称为“死信”模式,可通过ParDo(…)在Python中使用。with_exception_handling(…)。