提问者:小点点

Apache骆驼Hazelcast队列轮询并发


我有一个要求轮询一个hazelcast(客户端模式)队列,异常时带有重试(10次尝试)选项。我原以为骆驼轮询和处理将是多线程的。但事实并非如此。在异常重试时,队列中的任何新消息都会堆积起来,只有在第一条消息完成后才会被拾取处理。是否有并行处理(并发消费)的选项。我已经添加了并发消费者和poolSize作为查询参数。但它并没有真正发挥作用。

我尝试过的是:

fromF(hazelcast-queue://FOO?concurrentConsumers=5&hazelcastInstance=#hazelcastInstance&poolSize=10&queueConsumerMode=Poll).to("direct:testPoll");

from("direct:testPoll")
     .log(LoggingLevel.DEBUG,":::>:Camel[${routeId}] consumes")
     .onException(Exception.class)
     .maximumRedeliveries(maxAttempt)
     .delayPattern(delayPattern)
     .maximumRedeliveryDelay(maxDelay)
     .handled(true)
     .logExhausted(false)
     .end()
.bean("processTestPoll").log(INFO,"${body}").end();

错误:

有1个参数无法在endpoint上设置。检查uri参数是否拼写正确并且它们是endpoint的属性。未知参数=[{concurrent t消费者=10}]

非常感谢你的帮助。事先谢谢。


共1个答案

匿名用户

由于SEDA,您可以通过两种不同的方式实现:

您可以将消息发送到SEDAendpoint并作为下一个同时使用它们:

fromF("hazelcast-%sFOO?hazelcastInstance=#hazelcastInstance&queueConsumerMode=Poll", 
      HazelcastConstants.QUEUE_PREFIX)
   .to("seda:process");
from("seda:process?concurrentConsumers=5")
   .log("Processing: ${threadName} ${body}");

在前面的示例中,Hazelcast QueueFOO由一个线程轮询,该线程将消息放入SEDA进程,SEDA进程5线程同时使用。

有关使用SEDA组件的并发消费者的更多详细信息

正如您在已删除的答案中建议的那样,您也可以使用Hazelcast的特定SEDAendpoint直接实现它,如下所示:

fromF("hazelcast-%sFOO?hazelcastInstance=#hazelcastInstance&concurrentConsumers=5", 
      HazelcastConstants.SEDA_PREFIX)
   .log("Processing: ${threadName} ${body}");

在前面的示例中,Hazelcast QueueFOO由5个线程同时使用。

有关Hazelcat SEDAendpoint的更多详细信息。