Java源码示例:org.apache.activemq.advisory.ConsumerEventSource
示例1
protected AtomicInteger createConsumerCounter(ActiveMQConnectionFactory cf) throws Exception {
final AtomicInteger rc = new AtomicInteger(0);
Connection connection = cf.createConnection();
connections.add(connection);
connection.start();
ConsumerEventSource source = new ConsumerEventSource(connection, destination);
source.setConsumerListener(new ConsumerListener() {
@Override
public void onConsumerEvent(ConsumerEvent event) {
rc.set(event.getConsumerCount());
}
});
source.start();
return rc;
}
示例2
@Override
protected void start() throws PublisherRegistrationFailedFault {
if (demand) {
try {
producers = new HashMap<>();
advisories = new ArrayList<>();
for (TopicExpressionType topic : this.topic) {
ConsumerEventSource advisory
= new ConsumerEventSource(connection, topicConverter.toActiveMQTopic(topic));
advisory.setConsumerListener(this);
advisory.start();
advisories.add(advisory);
}
} catch (Exception e) {
PublisherRegistrationFailedFaultType fault = new PublisherRegistrationFailedFaultType();
throw new PublisherRegistrationFailedFault("Error starting demand-based publisher", fault, e);
}
}
}
示例3
protected void assertConsumersConnect(String brokerName,
Destination destination,
final int count,
long timeout) throws Exception {
BrokerItem brokerItem = brokers.get(brokerName);
Connection conn = brokerItem.createConnection();
conn.start();
ConsumerEventSource ces = new ConsumerEventSource(conn, destination);
try {
final AtomicInteger actualConnected = new AtomicInteger();
final CountDownLatch latch = new CountDownLatch(1);
ces.setConsumerListener(new ConsumerListener() {
@Override
public void onConsumerEvent(ConsumerEvent event) {
if (actualConnected.get() < count) {
actualConnected.set(event.getConsumerCount());
}
if (event.getConsumerCount() >= count) {
latch.countDown();
}
}
});
ces.start();
latch.await(timeout, TimeUnit.MILLISECONDS);
assertTrue("Expected at least " + count + " consumers to connect, but only " + actualConnected.get() + " connectect within " + timeout + " ms", actualConnected.get() >= count);
} finally {
ces.stop();
conn.close();
brokerItem.connections.remove(conn);
}
}
示例4
public MessageConsumer build() throws Exception {
MessageConsumer consumer = null;
if (waitTillStarted) {
ConsumerEventSource consumerEventSource = new ConsumerEventSource(session.getConnection(), destination);
final CountDownLatch latch = new CountDownLatch(1);
consumerEventSource.setConsumerListener(new ConsumerListener() {
@Override
public void onConsumerEvent(ConsumerEvent event) {
latch.countDown();
}
});
try {
consumerEventSource.start();
consumer = this.session.createConsumer(this.destination);
if (!latch.await(5L, TimeUnit.SECONDS)) {
throw new TimeoutException("Timed out waiting for MessageConsumer start event.");
}
} finally {
consumerEventSource.stop();
}
} else {
consumer = this.session.createConsumer(this.destination);
}
if (this.messageListener != null) {
consumer.setMessageListener(this.messageListener);
}
return consumer;
}
示例5
protected void destroy() throws ResourceNotDestroyedFault {
try {
if (advisories != null) {
for (ConsumerEventSource advisory : advisories) {
advisory.stop();
}
}
} catch (Exception e) {
ResourceNotDestroyedFaultType fault = new ResourceNotDestroyedFaultType();
throw new ResourceNotDestroyedFault("Error destroying publisher", fault, e);
} finally {
super.destroy();
}
}