在Apache NiFi,dockerize版本1.15中,创建了一个由3个NiFi节点组成的集群。当通过默认端口6342使用负载平衡时,流文件会卡在一些队列中,在启用负载平衡的队列中。但是,当尝试“列表队列”时,会发出消息“队列没有FlowFiles”。
发生问题的NiFi处理器组部分:
流文件似乎卡住的NiFi队列配置:
另一个可能不相关的问题是,发生这种情况后,一些流文件到达了后续的NiFi处理器,但在MergeContent处理器之前卡住了。这一次,可以列出队列:
出现第二个问题时的代码部分:
出现第二期时的代码部分
队列的配置:
队列中FlowFiles的列表:
MergeContent处理器配置。参数"max_num_for_merge_smxs"设置为100:
使用负载平衡是因为数据是从SFTP服务器收集的,并且该处理器仅在主节点上运行。
如果你需要更多的信息,请告诉我。
提前谢谢你!
编辑:
我将负载平衡队列放在ConsumeMQTT(仅在主节点上工作)和UpdataAt的处理器之间,但是Flow文件似乎停留在负载平衡队列中,但是当列表完成时,消息是“队列没有FlowFiles”。请检查:
更改了负载平衡队列的位置:
队列中没有流文件的消息:
请注意,在执行“列表队列”时,队列前后的处理器会停止。
编辑2:
我将nifi.properties中的配置更改为以下内容:
nifi.cluster.load.balance.connections.per.node=20
nifi.cluster.load.balance.max.thread.count=60
nifi.cluster.load.balance.comms.timeout=30 sec
我还重新启动了NiFi容器,因此我将监控行为。目前,负载平衡队列中没有卡住的Flow文件,它们会进入队列后面的处理器。
“队列没有流文件”是输入合并的队列的正常行为——流文件等待合并。
他们在合并前被“卡住”的最有可能的原因是,你已经将FlowFiles分布在许多节点上,然后你在合并上设置了最小计数。这个最小值是每个节点,并且每个节点上没有足够的FlowFiles来达到最小值,所以它们会卡住等待更多的FlowFiles来触发合并。
--编辑
"队列没有FlowFiles"也应该出现在活动的队列上-在您的流中,负载平衡队列会立即排入您的merge
PGs输入端口的输出队列-因此负载平衡队列中没有FF。如果您要停止merge
PG中的输入端口,您应该能够在LB队列中列出它们。
听起来您正在执行GetSFTP(主),然后分发文件。更好的方法是使用ListSFTP(主)-
其次,我会查看您的合并配置-您定义了一个参数#{max_num_for_merge_xmsx}
,但这设置在合并的最小条目数中-因此您告诉Merge仅在达到至少#{max_num_for_merge_xmsx}
FlowFiles数量时才进行合并。