@Override
public Closeable adapt(MessageChannel streamListenerResult,
MessageChannel bindingTarget) {
BridgeHandler handler = new BridgeHandler();
handler.setOutputChannel(bindingTarget);
handler.afterPropertiesSet();
((SubscribableChannel) streamListenerResult).subscribe(handler);
return new NoOpCloseeable();
}
@Override
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
ProducerProperties producerProperties, MessageChannel errorChannel)
throws Exception {
BridgeHandler handler = new BridgeHandler();
handler.setBeanFactory(this.beanFactory);
handler.setOutputChannel(
((SpringIntegrationProducerDestination) destination).getChannel());
return handler;
}
@Test
@SuppressWarnings("unchecked")
public void testEndpointLifecycle() throws Exception {
// @checkstyle:off
AbstractMessageChannelBinder<ConsumerProperties, ProducerProperties, ProvisioningProvider<ConsumerProperties, ProducerProperties>> binder = this.context
.getBean(AbstractMessageChannelBinder.class);
// @checkstyle:on
ConsumerProperties consumerProperties = new ConsumerProperties();
consumerProperties.setMaxAttempts(1); // to force error infrastructure creation
Binding<MessageChannel> consumerBinding = binder.bindConsumer("foo", "fooGroup",
new DirectChannel(), consumerProperties);
DirectFieldAccessor consumerBindingAccessor = new DirectFieldAccessor(
consumerBinding);
MessageProducer messageProducer = (MessageProducer) consumerBindingAccessor
.getPropertyValue("lifecycle");
assertThat(((Lifecycle) messageProducer).isRunning()).isTrue();
assertThat(messageProducer.getOutputChannel()).isNotNull();
SubscribableChannel errorChannel = (SubscribableChannel) consumerBindingAccessor
.getPropertyValue("lifecycle.errorChannel");
assertThat(errorChannel).isNotNull();
Set<MessageHandler> handlers = TestUtils.getPropertyValue(errorChannel,
"dispatcher.handlers", Set.class);
assertThat(handlers.size()).isEqualTo(2);
Iterator<MessageHandler> iterator = handlers.iterator();
assertThat(iterator.next()).isInstanceOf(BridgeHandler.class);
assertThat(iterator.next()).isInstanceOf(LastSubscriberMessageHandler.class);
assertThat(this.context.containsBean("foo.fooGroup.errors")).isTrue();
assertThat(this.context.containsBean("foo.fooGroup.errors.recoverer")).isTrue();
assertThat(this.context.containsBean("foo.fooGroup.errors.handler")).isTrue();
assertThat(this.context.containsBean("foo.fooGroup.errors.bridge")).isTrue();
consumerBinding.unbind();
assertThat(this.context.containsBean("foo.fooGroup.errors")).isFalse();
assertThat(this.context.containsBean("foo.fooGroup.errors.recoverer")).isFalse();
assertThat(this.context.containsBean("foo.fooGroup.errors.handler")).isFalse();
assertThat(this.context.containsBean("foo.fooGroup.errors.bridge")).isFalse();
assertThat(((Lifecycle) messageProducer).isRunning()).isFalse();
ProducerProperties producerProps = new ProducerProperties();
producerProps.setErrorChannelEnabled(true);
Binding<MessageChannel> producerBinding = binder.bindProducer("bar",
new DirectChannel(), producerProps);
assertThat(this.context.containsBean("bar.errors")).isTrue();
assertThat(this.context.containsBean("bar.errors.bridge")).isTrue();
producerBinding.unbind();
assertThat(this.context.containsBean("bar.errors")).isFalse();
assertThat(this.context.containsBean("bar.errors.bridge")).isFalse();
}