提问者:小点点

NiFi-使用负载平衡时数据卡在队列中


在Apache NiFi,dockerize版本1.15中,创建了一个由3个NiFi节点组成的集群。当通过默认端口6342使用负载平衡时,流文件会卡在一些队列中,在启用负载平衡的队列中。但是,当尝试“列表队列”时,会发出消息“队列没有FlowFiles”。

发生问题的NiFi处理器组部分:

流文件似乎卡住的NiFi队列配置:

另一个可能不相关的问题是,发生这种情况后,一些流文件到达了后续的NiFi处理器,但在MergeContent处理器之前卡住了。这一次,可以列出队列:

出现第二个问题时的代码部分:

出现第二期时的代码部分

队列的配置:

队列中FlowFiles的列表:

MergeContent处理器配置。参数"max_num_for_merge_smxs"设置为100:

使用负载平衡是因为数据是从SFTP服务器收集的,并且该处理器仅在主节点上运行。

如果你需要更多的信息,请告诉我。

提前谢谢你!

编辑:

我将负载平衡队列放在ConsumeMQTT(仅在主节点上工作)和UpdataAt的处理器之间,但是Flow文件似乎停留在负载平衡队列中,但是当列表完成时,消息是“队列没有FlowFiles”。请检查:

更改了负载平衡队列的位置:

队列中没有流文件的消息:

请注意,在执行“列表队列”时,队列前后的处理器会停止。

编辑2:

我将nifi.properties中的配置更改为以下内容:

nifi.cluster.load.balance.connections.per.node=20
nifi.cluster.load.balance.max.thread.count=60
nifi.cluster.load.balance.comms.timeout=30 sec

我还重新启动了NiFi容器,因此我将监控行为。目前,负载平衡队列中没有卡住的Flow文件,它们会进入队列后面的处理器。


共1个答案

匿名用户

“队列没有流文件”是输入合并的队列的正常行为——流文件等待合并。

他们在合并前被“卡住”的最有可能的原因是,你已经将FlowFiles分布在许多节点上,然后你在合并上设置了最小计数。这个最小值是每个节点,并且每个节点上没有足够的FlowFiles来达到最小值,所以它们会卡住等待更多的FlowFiles来触发合并。

--编辑

"队列没有FlowFiles"也应该出现在活动的队列上-在您的流中,负载平衡队列会立即排入您的mergePGs输入端口的输出队列-因此负载平衡队列中没有FF。如果您要停止mergePG中的输入端口,您应该能够在LB队列中列出它们。

听起来您正在执行GetSFTP(主),然后分发文件。更好的方法是使用ListSFTP(主)-

其次,我会查看您的合并配置-您定义了一个参数#{max_num_for_merge_xmsx},但这设置在合并的最小条目数中-因此您告诉Merge仅在达到至少#{max_num_for_merge_xmsx}FlowFiles数量时才进行合并。