提问者:小点点

Apache Camel:如何使用来自两个或多个JMS队列的消息


从编程的角度来看,我有一个非常简单的商业案例。但是,我不知道如何使用阿帕奇骆驼实现它......好吧,我有 2 个 JMS 队列:一个用于接收命令,另一个 - 存储大量消息,这些消息应该以 1000 或更少的批次传递到外部系统。

下面是概念消息交换算法:

  1. 在第一个 JMS 队列中收到命令消息时,我准备 XML 消息
  2. 将 XML 消息发送到外部 SOAP Web 服务以获取用户令牌
  3. 使用用户令牌,准备另一条XML消息并将其发送到REST服务以获取jobToken
  4. 循环:
    4.1. 以 1000 个为批次聚合来自第二个 JMS 队列的消息,超时
    时停止聚合 4.2. 对于每个批次,将其转换为 CSV 文件
    4.3. 通过 HTTP Post 发送 CSV 到 REST 服务
    4.4. 保留分配给每个批次的批处理令牌
  5. 使用作业令牌准备 XML 消息并发送o 用于提交批处理的 REST 服务
  6. 使用 batchtoken 通过 XML 消息向 REST 服务检查每个批处理的执行状态

在查看 Camel 时,我可以创建一个示例项目,我可以在其中对交换 1-3、5 进行建模:

           from("file:src/data?noop=true")
                .setHeader("sfUsername", constant("a@fd.com"))
                .setHeader("sfPwd", constant("12345"))
            .to("velocity:com/eip/vm/bulkPreLogin.vm?contentCache=false")
                .setHeader(Exchange.CONTENT_TYPE, constant("text/xml; charset=UTF-8"))
                .setHeader("SOAPAction", constant("login"))
                .setHeader("CamelHttpMethod", constant("POST"))
            .to("http4://bulklogin") // send login
            .to("xslt:com/eip/xslt/bulkLogin.xsl")  //xslt transformation to retrieve userToken
            .process(new Processor() {
                @Override
                public void process(Exchange exchange) throws Exception {
                    String body = (String) exchange.getIn().getBody();
                    String[] bodyParts = body.split(",");
                    exchange.getProperties().put("userToken", bodyParts[0]);
                    .....
                }
            })
            .to("velocity:com/eip/vm/jobInsertTeamOppStart.vm")
                .setHeader(Exchange.CONTENT_TYPE, constant("application/xml; charset=UTF-8"))
                .setHeader("X-Session", property("userToken"))
                .setHeader("CamelHttpMethod", constant("POST"))
            .to("http4://scheduleJob")       //schedule job
            .to("xslt:com//eip/xslt/jobInfoTransform.xsl")
            .process(new Processor() {
                @Override
                public void process(Exchange exchange) throws Exception {
                    String body = (String) exchange.getIn().getBody();
                    exchange.getProperties().put("jobToken",body.trim());
                }
            })
            //add batches in a loop ???
            .to("velocity:com/eip/vm/jobInsertTeamOppEnd.vm")
                .setHeader(Exchange.HTTP_URI, simple("https://na15.com/services/async/job/${property.jobToken}"))
                .setHeader(Exchange.CONTENT_TYPE, constant("application/xml; charset=UTF-8"))
                .setHeader("X-ID-Session", property("userToken"))
                .setHeader("CamelHttpMethod", constant("POST"))
            .to("http4://closeJob")       //schedule job
            //check batch?
            .bean(new SomeBean());

所以,我的问题是:如何从第二个 JMS 队列中读取消息?


共1个答案

匿名用户

这对我来说并不是单骆驼路线的一个很好的用例。我认为您应该在 POJO 中实现主要功能,并使用骆驼豆集成来消费和生成消息。这将导致更易于维护的代码,也更容易处理异常。

见 https://camel.apache.org/pojo-consuming.html