在NiFi中,存在一个从MQTT(ConsumeMQTT)消费并发布到HDFS路径(PutHDFS)的数据流。我需要在将消费的数据推送到HDFS路径之前引入60分钟的延迟。发现Controlrate和MergeContent处理器是可能的解决方案,但不确定。
引入时间延迟的理想解决方案是什么?
示例:在上午9:00消费的流文件应在上午10:00发布到HDFS
您可以使用ExecuteScript
处理器来运行睡眠(60*60*1000)
循环,但这会不必要地使用系统资源。
相反,我将介绍一个RouteOnAtket
处理器,它的输出关系为one_hour_elapsed
到PutHDFS
,并且不匹配的
循环回自身。RouteOnAtket
处理器应该将路由策略设置为路由到属性名称和一个名为one_hour_elapsed的动态属性(单击属性选项卡右上角的按钮)。表达式语言值应该是${now(): toNumber():gt(${entryDate:toNumber():plus(3600000)})}
。
这个表达式:
now(): toNumber()
)entryDate
属性(当它进入NiFi时)并将其转换为毫秒并添加一小时(entryDate: toNumber():plus(3600000)
[3600000==60*60*1000
])a: gt(${b})
)如果这实际上不是流的开始,您可以使用UpdateAt的
处理器在流的任何点插入任意时间戳并从那里计算。
我还建议将RouteOnAt的
处理器的收益持续时间和运行计划设置为比平时高得多,因为您不希望该处理器持续运行,因为它不起作用。我建议将其设置为1或5分钟开始,因为您已经引入了一小时的延迟。
从nifi 1.10开始,使用RetryFlowfile处理器可以更轻松地完成这项工作。使用惩罚持续时间来设置延迟时间: