提问者:小点点

ActiveMQ Artemis-增加管理队列上的消息计数


我正在使用ActiveMQ Artemis管理API来使用有关正在使用的队列以及有多少消费者的信息。

由于我正在缓存ClientRequest estor,activemq.management.*队列上的消息计数会不断增加。虽然我定义了到期延迟,但队列上的消息不会被丢弃。

有人能解释一下为什么吗?

我希望消息被消费,然后消失。

private ServerLocator locator;
private ClientSessionFactory defaultFactory;
private ClientSession session;
private ClientRequestor requestor;

public ManagementHelper(String defaultURL) {
    this.locator = ActiveMQClient.createServerLocator(defaultURL);
    this.defaultFactory = locator.createSessionFactory();
    this.session = factory.createSession(this.username, this.password, false, true, true, locator.isPreAcknowledge(), locator.getAckBatchSize());
    this.requestor = new ClientRequestor(session, "activemq.management");
}


@Scheduled(fixedRateString = "20000")
public void getConsumerNbos(ClientSessionFactory factory, ServerLocator locator) {
    ClientMessage message = session.createMessage(false);
    ManagementHelper.putOperationInvocation(message, ResourceNames.BROKER, listAllConsumersAsJSON);
    session.start();

    ClientMessage replyConsumer = requestor.request(message);
    String resultJSON = (String) ManagementHelper.getResult(replyConsumer, String.class);

    ClientMessage message2 = session.createMessage(false);
    ManagementHelper.putOperationInvocation(message2, ResourceNames.BROKER, MANAGEMENT_OPERATION_QUEUES);

    ClientMessage replyQueueNames = requestor.request(message2);
    Object[] objQueueNames = (Object[]) ManagementHelper.getResult(replyQueueNames);
}

共1个答案

匿名用户

这些消息不会被删除,因为您的应用程序没有确认它们。您必须在回复消息上调用确认(),就像您可能使用核心API从队列中收到的任何其他消息一样,例如:

@Scheduled(fixedRateString = "20000")
public void getConsumerNbos(ClientSessionFactory factory, ServerLocator locator) {
    ClientMessage message = session.createMessage(false);
    ManagementHelper.putOperationInvocation(message, ResourceNames.BROKER, listAllConsumersAsJSON);
    session.start();

    ClientMessage replyConsumer = requestor.request(message);
    replyConsumer.acknowledge();
    String resultJSON = (String) ManagementHelper.getResult(replyConsumer, String.class);

    ClientMessage message2 = session.createMessage(false);
    ManagementHelper.putOperationInvocation(message2, ResourceNames.BROKER, MANAGEMENT_OPERATION_QUEUES);

    ClientMessage replyQueueNames = requestor.request(message2);
    replyQueueNames.acknowledge();
    Object[] objQueueNames = (Object[]) ManagementHelper.getResult(replyQueueNames);
}

您不需要在ClientSession上调用提交(),因为您使用autoCompuAcks作为true创建它。