提问者:小点点

使用骆驼dsl接收sqs消息属性?


有人知道如何使用java中的CamelDSL接收SQS消息属性吗?我收到以下错误:

msgstr"创建路由收款人路由失败:路由(批处理路由)[[from[aws-sqs://myque? amazonSQSEndpoint=…因为无法解析endpoint:aws-sqs://myQueue?amazonSQSEndpoint=sqs.us-west-1.amazonaws.com

请找到我的密码

StringBuilder QueueURI = new StringBuilder();
QueueURI(PropertyUtils.AWS_SQS)
        .append(propertyUtils.queueName)
        .append(PropertyUtils.AMAZON_SQS_REGION)
        .append(propertyUtils.sqsRegion);
QueueURI(PropertyUtils.AWS_ACCESS_KEY).append(
         propertyUtils.awsAccessKey);
QueueURI(PropertyUtils.AWS_SECRET_KEY).append(
         propertyUtils.awsSecretKey);
QueueURI(PropertyUtils.MAX_MESSAGES_PER_POLL_1);
QueueURI("&messageAttributeNames=");


Collection<String> collection = new ArrayList<String>();
collection.add("userID");

//aws-sqs://myqueue?amazonSQSEndpoint=sqs.us-west-1.amazonaws.com&accessKey=*****&secretKey=****************&maxMessagesPerPoll=1&messageAttributeNames=[userID]

from(QueueURI.ToString() + collection)
       .routeId("batch route")
       .process(userValidator);

共2个答案

匿名用户

骆驼没有java. lang.String=

我正在使用Spring,所以我利用了Spring的转换支持:

import org.apache.camel.Exchange;
import org.apache.camel.TypeConversionException;
import org.apache.camel.support.TypeConverterSupport;
import org.springframework.core.convert.ConversionService;
import org.springframework.core.convert.support.DefaultConversionService;

public class TypeConverterBridge extends TypeConverterSupport {
    private ConversionService cs = new DefaultConversionService();

    @Override
    public <T> T convertTo(Class<T> type, Exchange exchange, Object value) throws TypeConversionException {
        if (cs.canConvert(value.getClass(), type)) {
            return cs.convert(value, type);
        }
        return null;
    }
}

然后用我的CamelContext注册了TypeConverter:

camelContext.getTypeConverterRegistry().addFallbackTypeConverter(new TypeConverterBridge(), false);

匿名用户

您可以在名为CamelAwsSqsAtbantes的标头中找到SQS消息的属性,如下所述:http://camel.apache.org/aws-sqs.html

该标头是一个Map

...
from(QueueURI.ToString() + collection)
   .routeId("batch route")
   .log("Attributes: ${header.CamelAwsSqsAttributes}")
   .process(userValidator);