我正在使用NiFi设计数据摄取模式。一个进程需要停止释放流文件,直到下游的一个进程完成处理。我尝试使用等待和通知,但没有成功。我希望队列大小和背压是否可以跨几个处理器设置。
类似地,如果有一种方法可以实现逻辑:如果当前在多个处理器之间有一个处理,则不允许流文件进入。
您需要将监视器活动与执行流命令相结合(使用python“nipyapi”脚本)。
我在我的一个工作流程中也有类似的要求。
您需要先安装python lib nipyapi并在nifi框上创建此脚本。
from time import sleep
import nipyapi
nipyapi.utils.set_endpoint('http://ipaddress:port/nifi-api', ssl=False, login=False)
## Get PG ID using the PG Name
mypg = nipyapi.canvas.get_process_group('start')
nipyapi.canvas.schedule_process_group(mypg.id, scheduled=True) ## Start
sleep(1)
nipyapi.canvas.schedule_process_group(mypg.id, scheduled=False) ## Stop
我将把模板放在下面链接中的img中,请参阅监视器活动处理器上的配置-如果10秒内没有活动发生,它将生成一个流(您可以自己玩时间)。下载模板
注意:如果您对延迟要求很高,这不是一个很好的方法。
另一个想法是监控整个流中的聚合队列,如果队列为零,则重新启动启动流。(如果您有很多连接,这将非常激烈)
我能够在NiFi中设计一个解决方案。本质上使用生成流文件作为信号(永远只运行一次)。诀窍是让新生成的流文件通过碎片整理与原始输入流合并。并且每次流完成后,成功条件将能够与下一个输入流文件合并。
溶液流动