我正在使用ActiveMQ Artemis管理API来使用有关正在使用的队列以及有多少消费者的信息。
由于我正在缓存ClientRequest estor
,activemq.management.*队列上的消息计数会不断增加。虽然我定义了到期延迟,但队列上的消息不会被丢弃。
有人能解释一下为什么吗?
我希望消息被消费,然后消失。
private ServerLocator locator;
private ClientSessionFactory defaultFactory;
private ClientSession session;
private ClientRequestor requestor;
public ManagementHelper(String defaultURL) {
this.locator = ActiveMQClient.createServerLocator(defaultURL);
this.defaultFactory = locator.createSessionFactory();
this.session = factory.createSession(this.username, this.password, false, true, true, locator.isPreAcknowledge(), locator.getAckBatchSize());
this.requestor = new ClientRequestor(session, "activemq.management");
}
@Scheduled(fixedRateString = "20000")
public void getConsumerNbos(ClientSessionFactory factory, ServerLocator locator) {
ClientMessage message = session.createMessage(false);
ManagementHelper.putOperationInvocation(message, ResourceNames.BROKER, listAllConsumersAsJSON);
session.start();
ClientMessage replyConsumer = requestor.request(message);
String resultJSON = (String) ManagementHelper.getResult(replyConsumer, String.class);
ClientMessage message2 = session.createMessage(false);
ManagementHelper.putOperationInvocation(message2, ResourceNames.BROKER, MANAGEMENT_OPERATION_QUEUES);
ClientMessage replyQueueNames = requestor.request(message2);
Object[] objQueueNames = (Object[]) ManagementHelper.getResult(replyQueueNames);
}
这些消息不会被删除,因为您的应用程序没有确认它们。您必须在回复消息上调用确认()
,就像您可能使用核心API从队列中收到的任何其他消息一样,例如:
@Scheduled(fixedRateString = "20000")
public void getConsumerNbos(ClientSessionFactory factory, ServerLocator locator) {
ClientMessage message = session.createMessage(false);
ManagementHelper.putOperationInvocation(message, ResourceNames.BROKER, listAllConsumersAsJSON);
session.start();
ClientMessage replyConsumer = requestor.request(message);
replyConsumer.acknowledge();
String resultJSON = (String) ManagementHelper.getResult(replyConsumer, String.class);
ClientMessage message2 = session.createMessage(false);
ManagementHelper.putOperationInvocation(message2, ResourceNames.BROKER, MANAGEMENT_OPERATION_QUEUES);
ClientMessage replyQueueNames = requestor.request(message2);
replyQueueNames.acknowledge();
Object[] objQueueNames = (Object[]) ManagementHelper.getResult(replyQueueNames);
}
您不需要在ClientSession
上调用提交()
,因为您使用autoCompuAcks作为true
创建它。