我有一个要求轮询一个hazelcast(客户端模式)队列,异常时带有重试(10次尝试)选项。我原以为骆驼轮询和处理将是多线程的。但事实并非如此。在异常重试时,队列中的任何新消息都会堆积起来,只有在第一条消息完成后才会被拾取处理。是否有并行处理(并发消费)的选项。我已经添加了并发消费者和poolSize作为查询参数。但它并没有真正发挥作用。
我尝试过的是:
fromF(hazelcast-queue://FOO?concurrentConsumers=5&hazelcastInstance=#hazelcastInstance&poolSize=10&queueConsumerMode=Poll).to("direct:testPoll");
from("direct:testPoll")
.log(LoggingLevel.DEBUG,":::>:Camel[${routeId}] consumes")
.onException(Exception.class)
.maximumRedeliveries(maxAttempt)
.delayPattern(delayPattern)
.maximumRedeliveryDelay(maxDelay)
.handled(true)
.logExhausted(false)
.end()
.bean("processTestPoll").log(INFO,"${body}").end();
错误:
有1个参数无法在endpoint上设置。检查uri参数是否拼写正确并且它们是endpoint的属性。未知参数=[{concurrent t消费者=10}]
非常感谢你的帮助。事先谢谢。
由于SEDA,您可以通过两种不同的方式实现:
您可以将消息发送到SEDAendpoint并作为下一个同时使用它们:
fromF("hazelcast-%sFOO?hazelcastInstance=#hazelcastInstance&queueConsumerMode=Poll",
HazelcastConstants.QUEUE_PREFIX)
.to("seda:process");
from("seda:process?concurrentConsumers=5")
.log("Processing: ${threadName} ${body}");
在前面的示例中,Hazelcast QueueFOO
由一个线程轮询,该线程将消息放入SEDA进程
,SEDA进程
由5
线程同时使用。
有关使用SEDA组件的并发消费者的更多详细信息
正如您在已删除的答案中建议的那样,您也可以使用Hazelcast的特定SEDAendpoint直接实现它,如下所示:
fromF("hazelcast-%sFOO?hazelcastInstance=#hazelcastInstance&concurrentConsumers=5",
HazelcastConstants.SEDA_PREFIX)
.log("Processing: ${threadName} ${body}");
在前面的示例中,Hazelcast QueueFOO
由5个线程同时使用。
有关Hazelcat SEDAendpoint的更多详细信息。