Java源码示例:org.springframework.integration.context.IntegrationContextUtils
示例1
@Bean
public KafkaStreamsMessageConversionDelegate messageConversionDelegate(
@Qualifier(IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
CompositeMessageConverter compositeMessageConverter,
SendToDlqAndContinue sendToDlqAndContinue,
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue,
@Qualifier("binderConfigurationProperties") KafkaStreamsBinderConfigurationProperties binderConfigurationProperties) {
return new KafkaStreamsMessageConversionDelegate(compositeMessageConverter, sendToDlqAndContinue,
KafkaStreamsBindingInformationCatalogue, binderConfigurationProperties);
}
示例2
@Bean
public MessageConverterConfigurer messageConverterConfigurer(
BindingServiceProperties bindingServiceProperties,
@Qualifier(IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME) CompositeMessageConverter compositeMessageConverter,
@Nullable StreamFunctionProperties streamFunctionProperties) {
return new MessageConverterConfigurer(bindingServiceProperties, compositeMessageConverter, streamFunctionProperties);
}
示例3
@Bean
public MessageSourceBindingTargetFactory messageSourceFactory(
@Qualifier(IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME) CompositeMessageConverter compositeMessageConverter,
CompositeMessageChannelConfigurer compositeMessageChannelConfigurer) {
return new MessageSourceBindingTargetFactory(compositeMessageConverter,
compositeMessageChannelConfigurer);
}
示例4
@Bean(name = IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
public CompositeMessageConverter configurableCompositeMessageConverter(
ObjectProvider<ObjectMapper> objectMapperObjectProvider,
List<MessageConverter> customMessageConverters) {
customMessageConverters = customMessageConverters.stream()
.filter(c -> isConverterEligible(c)).collect(Collectors.toList());
CompositeMessageConverterFactory factory =
new CompositeMessageConverterFactory(customMessageConverters, objectMapperObjectProvider.getIfAvailable(ObjectMapper::new));
return factory.getMessageConverterForAllRegistered();
}
示例5
@Test
public void testErrorsNoRetry() {
TestChannelBinder binder = createBinder();
MessageConverterConfigurer configurer = this.context
.getBean(MessageConverterConfigurer.class);
DefaultPollableMessageSource pollableSource = new DefaultPollableMessageSource(
this.messageConverter);
configurer.configurePolledMessageSource(pollableSource, "foo");
pollableSource.addInterceptor(new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
return MessageBuilder
.withPayload(((String) message.getPayload()).toUpperCase())
.copyHeaders(message.getHeaders()).build();
}
});
ExtendedConsumerProperties<Object> properties = new ExtendedConsumerProperties<>(
null);
properties.setMaxAttempts(1);
binder.bindPollableConsumer("foo", "bar", pollableSource, properties);
final CountDownLatch latch = new CountDownLatch(1);
this.context.getBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME,
SubscribableChannel.class).subscribe(m -> {
latch.countDown();
});
final AtomicInteger count = new AtomicInteger();
assertThat(pollableSource.poll(received -> {
count.incrementAndGet();
throw new RuntimeException("test recoverer");
})).isTrue();
assertThat(count.get()).isEqualTo(1);
}
示例6
@Bean
public MessageConverterDelegateSerde messageConverterDelegateSerde(
@Qualifier(IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
CompositeMessageConverter compositeMessageConverterFactory) {
return new MessageConverterDelegateSerde(compositeMessageConverterFactory);
}
示例7
@Bean
public CompositeNonNativeSerde compositeNonNativeSerde(
@Qualifier(IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
CompositeMessageConverter compositeMessageConverterFactory) {
return new CompositeNonNativeSerde(compositeMessageConverterFactory);
}
示例8
@Override
protected MessageHandler createProducerMessageHandler(
final ProducerDestination producerDestination,
ExtendedProducerProperties<RabbitProducerProperties> producerProperties,
MessageChannel errorChannel) {
Assert.state(
!HeaderMode.embeddedHeaders.equals(producerProperties.getHeaderMode()),
"the RabbitMQ binder does not support embedded headers since RabbitMQ supports headers natively");
String prefix = producerProperties.getExtension().getPrefix();
String exchangeName = producerDestination.getName();
String destination = StringUtils.isEmpty(prefix) ? exchangeName
: exchangeName.substring(prefix.length());
final AmqpOutboundEndpoint endpoint = new AmqpOutboundEndpoint(
buildRabbitTemplate(producerProperties.getExtension(),
errorChannel != null));
endpoint.setExchangeName(producerDestination.getName());
RabbitProducerProperties extendedProperties = producerProperties.getExtension();
boolean expressionInterceptorNeeded = expressionInterceptorNeeded(
extendedProperties);
Expression routingKeyExpression = extendedProperties.getRoutingKeyExpression();
if (!producerProperties.isPartitioned()) {
if (routingKeyExpression == null) {
endpoint.setRoutingKey(destination);
}
else {
if (expressionInterceptorNeeded) {
endpoint.setRoutingKeyExpressionString("headers['"
+ RabbitExpressionEvaluatingInterceptor.ROUTING_KEY_HEADER
+ "']");
}
else {
endpoint.setRoutingKeyExpression(routingKeyExpression);
}
}
}
else {
if (routingKeyExpression == null) {
endpoint.setRoutingKeyExpression(
buildPartitionRoutingExpression(destination, false));
}
else {
if (expressionInterceptorNeeded) {
endpoint.setRoutingKeyExpression(
buildPartitionRoutingExpression("headers['"
+ RabbitExpressionEvaluatingInterceptor.ROUTING_KEY_HEADER
+ "']", true));
}
else {
endpoint.setRoutingKeyExpression(buildPartitionRoutingExpression(
routingKeyExpression.getExpressionString(), true));
}
}
}
if (extendedProperties.getDelayExpression() != null) {
if (expressionInterceptorNeeded) {
endpoint.setDelayExpressionString("headers['"
+ RabbitExpressionEvaluatingInterceptor.DELAY_HEADER + "']");
}
else {
endpoint.setDelayExpression(extendedProperties.getDelayExpression());
}
}
DefaultAmqpHeaderMapper mapper = DefaultAmqpHeaderMapper.outboundMapper();
List<String> headerPatterns = new ArrayList<>(extendedProperties.getHeaderPatterns().length + 3);
headerPatterns.add("!" + BinderHeaders.PARTITION_HEADER);
headerPatterns.add("!" + IntegrationMessageHeaderAccessor.SOURCE_DATA);
headerPatterns.add("!" + IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT);
headerPatterns.addAll(Arrays.asList(extendedProperties.getHeaderPatterns()));
mapper.setRequestHeaderNames(
headerPatterns.toArray(new String[headerPatterns.size()]));
endpoint.setHeaderMapper(mapper);
endpoint.setDefaultDeliveryMode(extendedProperties.getDeliveryMode());
endpoint.setBeanFactory(this.getBeanFactory());
if (errorChannel != null) {
checkConnectionFactoryIsErrorCapable();
endpoint.setReturnChannel(errorChannel);
endpoint.setConfirmNackChannel(errorChannel);
String ackChannelBeanName = StringUtils
.hasText(extendedProperties.getConfirmAckChannel())
? extendedProperties.getConfirmAckChannel()
: IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME;
if (!ackChannelBeanName.equals(IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME)
&& !getApplicationContext().containsBean(ackChannelBeanName)) {
GenericApplicationContext context = (GenericApplicationContext) getApplicationContext();
context.registerBean(ackChannelBeanName, DirectChannel.class,
() -> new DirectChannel());
}
endpoint.setConfirmAckChannelName(ackChannelBeanName);
endpoint.setConfirmCorrelationExpressionString("#root");
endpoint.setErrorMessageStrategy(new DefaultErrorMessageStrategy());
}
endpoint.setHeadersMappedLast(true);
return endpoint;
}
示例9
@Test
public void testErrors() {
TestChannelBinder binder = createBinder();
MessageConverterConfigurer configurer = this.context
.getBean(MessageConverterConfigurer.class);
DefaultPollableMessageSource pollableSource = new DefaultPollableMessageSource(
this.messageConverter);
configurer.configurePolledMessageSource(pollableSource, "foo");
pollableSource.addInterceptor(new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
return MessageBuilder
.withPayload(((String) message.getPayload()).toUpperCase())
.copyHeaders(message.getHeaders()).build();
}
});
ExtendedConsumerProperties<Object> properties = new ExtendedConsumerProperties<>(
null);
properties.setMaxAttempts(2);
properties.setBackOffInitialInterval(0);
properties.getRetryableExceptions().put(IllegalStateException.class, false);
binder.bindPollableConsumer("foo", "bar", pollableSource, properties);
final CountDownLatch latch = new CountDownLatch(1);
this.context.getBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME,
SubscribableChannel.class).subscribe(m -> {
latch.countDown();
});
final AtomicInteger count = new AtomicInteger();
assertThat(pollableSource.poll(received -> {
count.incrementAndGet();
throw new RuntimeException("test recoverer");
})).isTrue();
assertThat(count.get()).isEqualTo(2);
Message<?> lastError = binder.getLastError();
assertThat(lastError).isNotNull();
assertThat(((Exception) lastError.getPayload()).getCause().getMessage())
.isEqualTo("test recoverer");
assertThat(pollableSource.poll(received -> {
count.incrementAndGet();
throw new IllegalStateException("no retries");
})).isTrue();
assertThat(count.get()).isEqualTo(3);
lastError = binder.getLastError();
assertThat(lastError).isNotNull();
assertThat(((Exception) lastError.getPayload()).getCause().getMessage())
.isEqualTo("no retries");
}