有人知道如何使用java中的CamelDSL接收SQS消息属性吗?我收到以下错误:
msgstr"创建路由收款人路由失败:路由(批处理路由)[[from[aws-sqs://myque? amazonSQSEndpoint=…因为无法解析endpoint:aws-sqs://myQueue?amazonSQSEndpoint=sqs.us-west-1.amazonaws.com
请找到我的密码
StringBuilder QueueURI = new StringBuilder();
QueueURI(PropertyUtils.AWS_SQS)
.append(propertyUtils.queueName)
.append(PropertyUtils.AMAZON_SQS_REGION)
.append(propertyUtils.sqsRegion);
QueueURI(PropertyUtils.AWS_ACCESS_KEY).append(
propertyUtils.awsAccessKey);
QueueURI(PropertyUtils.AWS_SECRET_KEY).append(
propertyUtils.awsSecretKey);
QueueURI(PropertyUtils.MAX_MESSAGES_PER_POLL_1);
QueueURI("&messageAttributeNames=");
Collection<String> collection = new ArrayList<String>();
collection.add("userID");
//aws-sqs://myqueue?amazonSQSEndpoint=sqs.us-west-1.amazonaws.com&accessKey=*****&secretKey=****************&maxMessagesPerPoll=1&messageAttributeNames=[userID]
from(QueueURI.ToString() + collection)
.routeId("batch route")
.process(userValidator);
骆驼没有java. lang.String=
我正在使用Spring,所以我利用了Spring的转换支持:
import org.apache.camel.Exchange;
import org.apache.camel.TypeConversionException;
import org.apache.camel.support.TypeConverterSupport;
import org.springframework.core.convert.ConversionService;
import org.springframework.core.convert.support.DefaultConversionService;
public class TypeConverterBridge extends TypeConverterSupport {
private ConversionService cs = new DefaultConversionService();
@Override
public <T> T convertTo(Class<T> type, Exchange exchange, Object value) throws TypeConversionException {
if (cs.canConvert(value.getClass(), type)) {
return cs.convert(value, type);
}
return null;
}
}
然后用我的CamelContext注册了TypeConverter:
camelContext.getTypeConverterRegistry().addFallbackTypeConverter(new TypeConverterBridge(), false);
您可以在名为CamelAwsSqsAtbantes
的标头中找到SQS消息的属性,如下所述:http://camel.apache.org/aws-sqs.html
该标头是一个Map
...
from(QueueURI.ToString() + collection)
.routeId("batch route")
.log("Attributes: ${header.CamelAwsSqsAttributes}")
.process(userValidator);