Java源码示例:org.springframework.integration.context.IntegrationContextUtils

示例1
@Bean
public KafkaStreamsMessageConversionDelegate messageConversionDelegate(
		@Qualifier(IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
				CompositeMessageConverter compositeMessageConverter,
		SendToDlqAndContinue sendToDlqAndContinue,
		KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue,
		@Qualifier("binderConfigurationProperties") KafkaStreamsBinderConfigurationProperties binderConfigurationProperties) {
	return new KafkaStreamsMessageConversionDelegate(compositeMessageConverter, sendToDlqAndContinue,
			KafkaStreamsBindingInformationCatalogue, binderConfigurationProperties);
}
 
示例2
@Bean
public MessageConverterConfigurer messageConverterConfigurer(
		BindingServiceProperties bindingServiceProperties,
		@Qualifier(IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME) CompositeMessageConverter compositeMessageConverter,
		@Nullable StreamFunctionProperties streamFunctionProperties) {

	return new MessageConverterConfigurer(bindingServiceProperties, compositeMessageConverter, streamFunctionProperties);
}
 
示例3
@Bean
public MessageSourceBindingTargetFactory messageSourceFactory(
		@Qualifier(IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME) CompositeMessageConverter compositeMessageConverter,
		CompositeMessageChannelConfigurer compositeMessageChannelConfigurer) {
	return new MessageSourceBindingTargetFactory(compositeMessageConverter,
			compositeMessageChannelConfigurer);
}
 
示例4
@Bean(name = IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
public CompositeMessageConverter configurableCompositeMessageConverter(
		ObjectProvider<ObjectMapper> objectMapperObjectProvider,
		List<MessageConverter> customMessageConverters) {

	customMessageConverters = customMessageConverters.stream()
			.filter(c -> isConverterEligible(c)).collect(Collectors.toList());

	CompositeMessageConverterFactory factory =
			new CompositeMessageConverterFactory(customMessageConverters, objectMapperObjectProvider.getIfAvailable(ObjectMapper::new));

	return factory.getMessageConverterForAllRegistered();
}
 
示例5
@Test
public void testErrorsNoRetry() {
	TestChannelBinder binder = createBinder();
	MessageConverterConfigurer configurer = this.context
			.getBean(MessageConverterConfigurer.class);

	DefaultPollableMessageSource pollableSource = new DefaultPollableMessageSource(
			this.messageConverter);
	configurer.configurePolledMessageSource(pollableSource, "foo");
	pollableSource.addInterceptor(new ChannelInterceptor() {

		@Override
		public Message<?> preSend(Message<?> message, MessageChannel channel) {
			return MessageBuilder
					.withPayload(((String) message.getPayload()).toUpperCase())
					.copyHeaders(message.getHeaders()).build();
		}

	});
	ExtendedConsumerProperties<Object> properties = new ExtendedConsumerProperties<>(
			null);
	properties.setMaxAttempts(1);
	binder.bindPollableConsumer("foo", "bar", pollableSource, properties);
	final CountDownLatch latch = new CountDownLatch(1);
	this.context.getBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME,
			SubscribableChannel.class).subscribe(m -> {
				latch.countDown();
			});
	final AtomicInteger count = new AtomicInteger();
	assertThat(pollableSource.poll(received -> {
		count.incrementAndGet();
		throw new RuntimeException("test recoverer");
	})).isTrue();
	assertThat(count.get()).isEqualTo(1);
}
 
示例6
@Bean
public MessageConverterDelegateSerde messageConverterDelegateSerde(
		@Qualifier(IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
				CompositeMessageConverter compositeMessageConverterFactory) {
	return new MessageConverterDelegateSerde(compositeMessageConverterFactory);
}
 
示例7
@Bean
public CompositeNonNativeSerde compositeNonNativeSerde(
		@Qualifier(IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
				CompositeMessageConverter compositeMessageConverterFactory) {
	return new CompositeNonNativeSerde(compositeMessageConverterFactory);
}
 
示例8
@Override
protected MessageHandler createProducerMessageHandler(
		final ProducerDestination producerDestination,
		ExtendedProducerProperties<RabbitProducerProperties> producerProperties,
		MessageChannel errorChannel) {
	Assert.state(
			!HeaderMode.embeddedHeaders.equals(producerProperties.getHeaderMode()),
			"the RabbitMQ binder does not support embedded headers since RabbitMQ supports headers natively");
	String prefix = producerProperties.getExtension().getPrefix();
	String exchangeName = producerDestination.getName();
	String destination = StringUtils.isEmpty(prefix) ? exchangeName
			: exchangeName.substring(prefix.length());
	final AmqpOutboundEndpoint endpoint = new AmqpOutboundEndpoint(
			buildRabbitTemplate(producerProperties.getExtension(),
					errorChannel != null));
	endpoint.setExchangeName(producerDestination.getName());
	RabbitProducerProperties extendedProperties = producerProperties.getExtension();
	boolean expressionInterceptorNeeded = expressionInterceptorNeeded(
			extendedProperties);
	Expression routingKeyExpression = extendedProperties.getRoutingKeyExpression();
	if (!producerProperties.isPartitioned()) {
		if (routingKeyExpression == null) {
			endpoint.setRoutingKey(destination);
		}
		else {
			if (expressionInterceptorNeeded) {
				endpoint.setRoutingKeyExpressionString("headers['"
						+ RabbitExpressionEvaluatingInterceptor.ROUTING_KEY_HEADER
						+ "']");
			}
			else {
				endpoint.setRoutingKeyExpression(routingKeyExpression);
			}
		}
	}
	else {
		if (routingKeyExpression == null) {
			endpoint.setRoutingKeyExpression(
					buildPartitionRoutingExpression(destination, false));
		}
		else {
			if (expressionInterceptorNeeded) {
				endpoint.setRoutingKeyExpression(
						buildPartitionRoutingExpression("headers['"
								+ RabbitExpressionEvaluatingInterceptor.ROUTING_KEY_HEADER
								+ "']", true));
			}
			else {
				endpoint.setRoutingKeyExpression(buildPartitionRoutingExpression(
						routingKeyExpression.getExpressionString(), true));
			}
		}
	}
	if (extendedProperties.getDelayExpression() != null) {
		if (expressionInterceptorNeeded) {
			endpoint.setDelayExpressionString("headers['"
					+ RabbitExpressionEvaluatingInterceptor.DELAY_HEADER + "']");
		}
		else {
			endpoint.setDelayExpression(extendedProperties.getDelayExpression());
		}
	}
	DefaultAmqpHeaderMapper mapper = DefaultAmqpHeaderMapper.outboundMapper();
	List<String> headerPatterns = new ArrayList<>(extendedProperties.getHeaderPatterns().length + 3);
	headerPatterns.add("!" + BinderHeaders.PARTITION_HEADER);
	headerPatterns.add("!" + IntegrationMessageHeaderAccessor.SOURCE_DATA);
	headerPatterns.add("!" + IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT);
	headerPatterns.addAll(Arrays.asList(extendedProperties.getHeaderPatterns()));
	mapper.setRequestHeaderNames(
			headerPatterns.toArray(new String[headerPatterns.size()]));
	endpoint.setHeaderMapper(mapper);
	endpoint.setDefaultDeliveryMode(extendedProperties.getDeliveryMode());
	endpoint.setBeanFactory(this.getBeanFactory());
	if (errorChannel != null) {
		checkConnectionFactoryIsErrorCapable();
		endpoint.setReturnChannel(errorChannel);
		endpoint.setConfirmNackChannel(errorChannel);
		String ackChannelBeanName = StringUtils
				.hasText(extendedProperties.getConfirmAckChannel())
						? extendedProperties.getConfirmAckChannel()
						: IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME;
		if (!ackChannelBeanName.equals(IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME)
				&& !getApplicationContext().containsBean(ackChannelBeanName)) {
			GenericApplicationContext context = (GenericApplicationContext) getApplicationContext();
			context.registerBean(ackChannelBeanName, DirectChannel.class,
					() -> new DirectChannel());
		}
		endpoint.setConfirmAckChannelName(ackChannelBeanName);
		endpoint.setConfirmCorrelationExpressionString("#root");
		endpoint.setErrorMessageStrategy(new DefaultErrorMessageStrategy());
	}
	endpoint.setHeadersMappedLast(true);
	return endpoint;
}
 
示例9
@Test
public void testErrors() {
	TestChannelBinder binder = createBinder();
	MessageConverterConfigurer configurer = this.context
			.getBean(MessageConverterConfigurer.class);

	DefaultPollableMessageSource pollableSource = new DefaultPollableMessageSource(
			this.messageConverter);
	configurer.configurePolledMessageSource(pollableSource, "foo");
	pollableSource.addInterceptor(new ChannelInterceptor() {

		@Override
		public Message<?> preSend(Message<?> message, MessageChannel channel) {
			return MessageBuilder
					.withPayload(((String) message.getPayload()).toUpperCase())
					.copyHeaders(message.getHeaders()).build();
		}

	});
	ExtendedConsumerProperties<Object> properties = new ExtendedConsumerProperties<>(
			null);
	properties.setMaxAttempts(2);
	properties.setBackOffInitialInterval(0);
	properties.getRetryableExceptions().put(IllegalStateException.class, false);
	binder.bindPollableConsumer("foo", "bar", pollableSource, properties);
	final CountDownLatch latch = new CountDownLatch(1);
	this.context.getBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME,
			SubscribableChannel.class).subscribe(m -> {
				latch.countDown();
			});
	final AtomicInteger count = new AtomicInteger();
	assertThat(pollableSource.poll(received -> {
		count.incrementAndGet();
		throw new RuntimeException("test recoverer");
	})).isTrue();
	assertThat(count.get()).isEqualTo(2);
	Message<?> lastError = binder.getLastError();
	assertThat(lastError).isNotNull();
	assertThat(((Exception) lastError.getPayload()).getCause().getMessage())
			.isEqualTo("test recoverer");
	assertThat(pollableSource.poll(received -> {
		count.incrementAndGet();
		throw new IllegalStateException("no retries");
	})).isTrue();
	assertThat(count.get()).isEqualTo(3);
	lastError = binder.getLastError();
	assertThat(lastError).isNotNull();
	assertThat(((Exception) lastError.getPayload()).getCause().getMessage())
			.isEqualTo("no retries");
}