Java源码示例:org.apache.nifi.jms.processors.JMSConsumer.JMSResponse
示例1
/**
* At the moment the only two supported message types are TextMessage and
* BytesMessage which is sufficient for the type if JMS use cases NiFi is
* used. The may change to the point where all message types are supported
* at which point this test will no be longer required.
*/
@Test(expected = IllegalStateException.class)
public void validateFailOnUnsupportedMessageType() throws Exception {
final String destinationName = "testQueue";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
jmsTemplate.send(destinationName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createObjectMessage();
}
});
JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class));
try {
consumer.consume(destinationName, new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
// noop
}
});
} finally {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
}
示例2
/**
* At the moment the only two supported message types are TextMessage and
* BytesMessage which is sufficient for the type if JMS use cases NiFi is
* used. The may change to the point where all message types are supported
* at which point this test will no be longer required.
*/
@Test(expected = IllegalStateException.class)
public void validateFailOnUnsupportedMessageType() throws Exception {
final String destinationName = "testQueue";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
jmsTemplate.send(destinationName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createObjectMessage();
}
});
JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class));
try {
consumer.consume(destinationName, new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
// noop
}
});
} finally {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
}
示例3
public void validateFailOnUnsupportedMessageTypeOverJNDI() throws Exception {
final String destinationName = "testQueue";
JmsTemplate jmsTemplate = CommonTest.buildJmsJndiTemplateForDestination(false);
jmsTemplate.send(destinationName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createObjectMessage();
}
});
JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class));
try {
consumer.consume(destinationName, new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
// noop
}
});
} finally {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
}
示例4
/**
* At the moment the only two supported message types are TextMessage and
* BytesMessage which is sufficient for the type if JMS use cases NiFi is
* used. The may change to the point where all message types are supported
* at which point this test will no be longer required.
*/
@Test
public void validateFailOnUnsupportedMessageType() throws Exception {
final String destinationName = "validateFailOnUnsupportedMessageType";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
try {
jmsTemplate.send(destinationName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createObjectMessage();
}
});
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
consumer.consume(destinationName, null, false, false, null, "UTF-8", new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
// noop
}
});
} finally {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
}
示例5
/**
* Will construct a {@link FlowFile} containing the body of the consumed JMS
* message (if {@link GetResponse} returned by {@link JMSConsumer} is not
* null) and JMS properties that came with message which are added to a
* {@link FlowFile} as attributes, transferring {@link FlowFile} to
* 'success' {@link Relationship}.
*/
@Override
protected void rendezvousWithJms(final ProcessContext context, final ProcessSession processSession) throws ProcessException {
final String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue();
this.targetResource.consume(destinationName, new ConsumerCallback(){
@Override
public void accept(final JMSResponse response) {
if (response != null){
FlowFile flowFile = processSession.create();
flowFile = processSession.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(response.getMessageBody());
}
});
Map<String, Object> jmsHeaders = response.getMessageHeaders();
Map<String, Object> jmsProperties = Collections.<String, Object>unmodifiableMap(response.getMessageProperties());
flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, processSession);
flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, flowFile, processSession);
flowFile = processSession.putAttribute(flowFile, JMS_SOURCE_DESTINATION_NAME, destinationName);
processSession.getProvenanceReporter().receive(flowFile, destinationName);
processSession.transfer(flowFile, REL_SUCCESS);
processSession.commit();
} else {
context.yield();
}
}
});
}
示例6
@Test
public void validateConsumeWithCustomHeadersAndProperties() throws Exception {
final String destinationName = "testQueue";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
jmsTemplate.send(destinationName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage message = session.createTextMessage("hello from the other side");
message.setStringProperty("foo", "foo");
message.setBooleanProperty("bar", false);
message.setJMSReplyTo(session.createQueue("fooQueue"));
return message;
}
});
JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class));
final AtomicBoolean callbackInvoked = new AtomicBoolean();
consumer.consume(destinationName, new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
callbackInvoked.set(true);
assertEquals("hello from the other side", new String(response.getMessageBody()));
assertEquals("fooQueue", response.getMessageHeaders().get(JmsHeaders.REPLY_TO));
assertEquals("foo", response.getMessageProperties().get("foo"));
assertEquals("false", response.getMessageProperties().get("bar"));
}
});
assertTrue(callbackInvoked.get());
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
示例7
/**
* Will construct a {@link FlowFile} containing the body of the consumed JMS
* message (if {@link GetResponse} returned by {@link JMSConsumer} is not
* null) and JMS properties that came with message which are added to a
* {@link FlowFile} as attributes, transferring {@link FlowFile} to
* 'success' {@link Relationship}.
*/
@Override
protected void rendezvousWithJms(final ProcessContext context, final ProcessSession processSession) throws ProcessException {
final String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue();
this.targetResource.consume(destinationName, new ConsumerCallback(){
@Override
public void accept(final JMSResponse response) {
if (response != null){
FlowFile flowFile = processSession.create();
flowFile = processSession.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(response.getMessageBody());
}
});
Map<String, Object> jmsHeaders = response.getMessageHeaders();
Map<String, Object> jmsProperties = Collections.<String, Object>unmodifiableMap(response.getMessageProperties());
flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, processSession);
flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, flowFile, processSession);
flowFile = processSession.putAttribute(flowFile, JMS_SOURCE_DESTINATION_NAME, destinationName);
processSession.getProvenanceReporter().receive(flowFile, destinationName);
processSession.transfer(flowFile, REL_SUCCESS);
processSession.commit();
} else {
context.yield();
}
}
});
}
示例8
@Test
public void validateConsumeWithCustomHeadersAndProperties() throws Exception {
final String destinationName = "testQueue";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
jmsTemplate.send(destinationName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage message = session.createTextMessage("hello from the other side");
message.setStringProperty("foo", "foo");
message.setBooleanProperty("bar", false);
message.setJMSReplyTo(session.createQueue("fooQueue"));
return message;
}
});
JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class));
final AtomicBoolean callbackInvoked = new AtomicBoolean();
consumer.consume(destinationName, new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
callbackInvoked.set(true);
assertEquals("hello from the other side", new String(response.getMessageBody()));
assertEquals("fooQueue", response.getMessageHeaders().get(JmsHeaders.REPLY_TO));
assertEquals("foo", response.getMessageProperties().get("foo"));
assertEquals("false", response.getMessageProperties().get("bar"));
}
});
assertTrue(callbackInvoked.get());
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
示例9
@Test
public void validateConsumeWithCustomHeadersAndPropertiesOverJNDI() throws Exception {
final String destinationName = "testQueue";
JmsTemplate jmsTemplate = CommonTest.buildJmsJndiTemplateForDestination(false);
jmsTemplate.send(destinationName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage message = session.createTextMessage("hello from the other side");
message.setStringProperty("foo", "foo");
message.setBooleanProperty("bar", false);
message.setJMSReplyTo(session.createQueue("fooQueue"));
return message;
}
});
JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class));
final AtomicBoolean callbackInvoked = new AtomicBoolean();
consumer.consume(destinationName, new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
callbackInvoked.set(true);
assertEquals("hello from the other side", new String(response.getMessageBody()));
assertEquals("fooQueue", response.getMessageHeaders().get(JmsHeaders.REPLY_TO));
assertEquals("foo", response.getMessageProperties().get("foo"));
assertEquals("false", response.getMessageProperties().get("bar"));
}
});
assertTrue(callbackInvoked.get());
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
示例10
/**
* Will construct a {@link FlowFile} containing the body of the consumed JMS
* message (if {@link JMSResponse} returned by {@link JMSConsumer} is not
* null) and JMS properties that came with message which are added to a
* {@link FlowFile} as attributes, transferring {@link FlowFile} to
* 'success' {@link Relationship}.
*/
@Override
protected void rendezvousWithJms(final ProcessContext context, final ProcessSession processSession, final JMSConsumer consumer) throws ProcessException {
final String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue();
final String errorQueueName = context.getProperty(ERROR_QUEUE).evaluateAttributeExpressions().getValue();
final boolean durable = isDurableSubscriber(context);
final boolean shared = isShared(context);
final String subscriptionName = context.getProperty(SUBSCRIPTION_NAME).evaluateAttributeExpressions().getValue();
final String charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
try {
consumer.consume(destinationName, errorQueueName, durable, shared, subscriptionName, charset, new ConsumerCallback() {
@Override
public void accept(final JMSResponse response) {
if (response == null) {
return;
}
FlowFile flowFile = processSession.create();
flowFile = processSession.write(flowFile, out -> out.write(response.getMessageBody()));
final Map<String, String> jmsHeaders = response.getMessageHeaders();
final Map<String, String> jmsProperties = response.getMessageProperties();
flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, processSession);
flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, flowFile, processSession);
flowFile = processSession.putAttribute(flowFile, JMS_SOURCE_DESTINATION_NAME, destinationName);
processSession.getProvenanceReporter().receive(flowFile, destinationName);
processSession.putAttribute(flowFile, JMS_MESSAGETYPE, response.getMessageType());
processSession.transfer(flowFile, REL_SUCCESS);
processSession.commit();
}
});
} catch(Exception e) {
consumer.setValid(false);
context.yield();
throw e; // for backward compatibility with exception handling in flows
}
}
示例11
@Test
public void validateConsumeWithCustomHeadersAndProperties() throws Exception {
final String destinationName = "validateConsumeWithCustomHeadersAndProperties";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
try {
jmsTemplate.send(destinationName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage message = session.createTextMessage("hello from the other side");
message.setStringProperty("foo", "foo");
message.setBooleanProperty("bar", false);
message.setJMSReplyTo(session.createQueue("fooQueue"));
return message;
}
});
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
final AtomicBoolean callbackInvoked = new AtomicBoolean();
consumer.consume(destinationName, null, false, false, null, "UTF-8", new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
callbackInvoked.set(true);
assertEquals("hello from the other side", new String(response.getMessageBody()));
assertEquals("fooQueue", response.getMessageHeaders().get(JmsHeaders.REPLY_TO));
assertEquals("foo", response.getMessageProperties().get("foo"));
assertEquals("false", response.getMessageProperties().get("bar"));
}
});
assertTrue(callbackInvoked.get());
} finally {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
}
示例12
@Test(timeout = 20000)
public void testMultipleThreads() throws Exception {
String destinationName = "testMultipleThreads";
JmsTemplate publishTemplate = CommonTest.buildJmsTemplateForDestination(false);
final CountDownLatch consumerTemplateCloseCount = new CountDownLatch(4);
try {
JMSPublisher publisher = new JMSPublisher((CachingConnectionFactory) publishTemplate.getConnectionFactory(), publishTemplate, mock(ComponentLog.class));
for (int i = 0; i < 4000; i++) {
publisher.publish(destinationName, String.valueOf(i).getBytes(StandardCharsets.UTF_8));
}
final AtomicInteger msgCount = new AtomicInteger(0);
final ConsumerCallback callback = new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
msgCount.incrementAndGet();
}
};
final Thread[] threads = new Thread[4];
for (int i = 0; i < 4; i++) {
final Thread t = new Thread(() -> {
JmsTemplate consumeTemplate = CommonTest.buildJmsTemplateForDestination(false);
try {
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) consumeTemplate.getConnectionFactory(), consumeTemplate, mock(ComponentLog.class));
for (int j = 0; j < 1000 && msgCount.get() < 4000; j++) {
consumer.consume(destinationName, null, false, false, null, "UTF-8", callback);
}
} finally {
((CachingConnectionFactory) consumeTemplate.getConnectionFactory()).destroy();
consumerTemplateCloseCount.countDown();
}
});
threads[i] = t;
t.start();
}
int iterations = 0;
while (msgCount.get() < 4000) {
Thread.sleep(10L);
if (++iterations % 100 == 0) {
System.out.println(msgCount.get() + " messages received so far");
}
}
} finally {
((CachingConnectionFactory) publishTemplate.getConnectionFactory()).destroy();
consumerTemplateCloseCount.await();
}
}