提问者:小点点

在NiFi中将流文件移动到下一个处理器之前引入时间延迟


在NiFi中,存在一个从MQTT(ConsumeMQTT)消费并发布到HDFS路径(PutHDFS)的数据流。我需要在将消费的数据推送到HDFS路径之前引入60分钟的延迟。发现Controlrate和MergeContent处理器是可能的解决方案,但不确定。

引入时间延迟的理想解决方案是什么?

示例:在上午9:00消费的流文件应在上午10:00发布到HDFS


共2个答案

匿名用户

您可以使用ExecuteScript处理器来运行睡眠(60*60*1000)循环,但这会不必要地使用系统资源。

相反,我将介绍一个RouteOnAtket处理器,它的输出关系为one_hour_elapsedPutHDFS,并且不匹配的循环回自身。RouteOnAtket处理器应该将路由策略设置为路由到属性名称和一个名为one_hour_elapsed的动态属性(单击属性选项卡右上角的按钮)。表达式语言值应该是${now(): toNumber():gt(${entryDate:toNumber():plus(3600000)})}

这个表达式:

  1. 获取当前时间并将其转换为自纪元以来的毫秒(now(): toNumber()
  2. 获取流文件的entryDate属性(当它进入NiFi时)并将其转换为毫秒并添加一小时(entryDate: toNumber():plus(3600000)[3600000==60*60*1000])
  3. 比较两个数字(a: gt(${b})

如果这实际上不是流的开始,您可以使用UpdateAt的处理器在流的任何点插入任意时间戳并从那里计算。

我还建议将RouteOnAt的处理器的收益持续时间和运行计划设置为比平时高得多,因为您不希望该处理器持续运行,因为它不起作用。我建议将其设置为1或5分钟开始,因为您已经引入了一小时的延迟。

匿名用户

从nifi 1.10开始,使用RetryFlowfile处理器可以更轻松地完成这项工作。使用惩罚持续时间来设置延迟时间: