提问者:小点点

我的Camel路由应用程序检测到其他应用程序发布的ampq消息(?),但无法处理,出现“没有可用的类型转换器”错误。我该如何解决?


我的Camel路由应用程序检测到其他应用程序发布的ampq消息(发布数字),但无法处理,出现“没有可用的类型转换器”错误。我该如何解决?。

"org.apache.camel.NoTypeConversionAvailableException: No type converter available to convert from type: java.lang.Integer to the required type: java.io.InputStream with value 79"

路由器类

package aaa.bbb.ccc.qscx;

import java.io.IOException;
import javax.ejb.Startup;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.reactivestreams.Subscriber;

@Startup
@ApplicationScoped
public class TheRoutes extends RouteBuilder {

    @Inject
    TheProcessor theProcessor;

    @Inject
    CamelContext ctx;

    @Inject
    CamelReactiveStreamsService crss;

    @Override
    public void configure() throws IOException, InterruptedException {
        from("reactive-streams:in")
                .process(theProcessor)
                .log(".........from reactive-streams:in - body:  ${body}");
    }

    @Incoming("prices")
    public Subscriber<String> sink() {
        return crss.subscriber("file:./target?fileName=values.txt&fileExist=append", String.class);
    }
}   

pom. xml

<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <modelVersion>4.0.0</modelVersion>
    <groupId>aaa.bbb.ccc </groupId>
    <artifactId>qscx</artifactId>
    <version>1.0</version>
    <properties>
        <compiler-plugin.version>3.8.1</compiler-plugin.version>
        <maven.compiler.parameters>true</maven.compiler.parameters>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <quarkus-plugin.version>1.0.0.CR2</quarkus-plugin.version>
        <quarkus.platform.artifact-id>quarkus-universe-bom</quarkus.platform.artifact-id>
        <quarkus.platform.group-id>io.quarkus</quarkus.platform.group-id>
        <quarkus.platform.version>1.0.0.CR2</quarkus.platform.version>
        <surefire-plugin.version>2.22.1</surefire-plugin.version>
    </properties>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>${quarkus.platform.group-id}</groupId>
                <artifactId>${quarkus.platform.artifact-id}</artifactId>
                <version>${quarkus.platform.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-junit5</artifactId>
            <scope>test</scope>
        </dependency>  
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-resteasy</artifactId>
        </dependency>
        <dependency>
            <groupId>io.rest-assured</groupId>
            <artifactId>rest-assured</artifactId>
            <scope>test</scope>
        </dependency>                 
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-smallrye-reactive-messaging-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-smallrye-reactive-messaging</artifactId>
        </dependency>                
        <dependency>
            <groupId>org.apache.camel.quarkus</groupId>
            <artifactId>camel-quarkus-core</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.camel.quarkus</groupId>
            <artifactId>camel-quarkus-timer</artifactId>
        </dependency>
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-artemis-jms</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.camel.quarkus</groupId>
            <artifactId>camel-quarkus-bean</artifactId>
        </dependency>    
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-smallrye-reactive-streams-operators</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.camel.quarkus</groupId>
            <artifactId>camel-quarkus-support-common</artifactId>
        </dependency>   
        <dependency>
            <groupId>io.smallrye.reactive</groupId>
            <artifactId>smallrye-reactive-messaging-camel</artifactId>
            <version>1.0.8</version>
        </dependency>        
        <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-simple</artifactId>
          <version>1.7.26</version>
        </dependency>                                                                                         
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>io.quarkus</groupId>
                <artifactId>quarkus-maven-plugin</artifactId>
                <version>${quarkus-plugin.version}</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>build</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>${compiler-plugin.version}</version>
            </plugin>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>${surefire-plugin.version}</version>
                <configuration>
                    <systemProperties>
                        <java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
                    </systemProperties>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <profiles>
        <profile>
            <id>native</id>
            <activation>
                <property>
                    <name>native</name>
                </property>
            </activation>
            <build>
                <plugins>
                    <plugin>
                        <artifactId>maven-failsafe-plugin</artifactId>
                        <version>${surefire-plugin.version}</version>
                        <executions>
                            <execution>
                                <goals>
                                    <goal>integration-test</goal>
                                    <goal>verify</goal>
                                </goals>
                                <configuration>
                                    <systemProperties>
                                        <native.image.path>${project.build.directory}/${project.build.finalName}-runner</native.image.path>
                                    </systemProperties>
                                </configuration>
                            </execution>
                        </executions>
                    </plugin>
                </plugins>
            </build>
            <properties>
                <quarkus.package.type>native</quarkus.package.type>
            </properties>
        </profile>
    </profiles>
    <name>qscx</name>
</project>

application.properties

amqp-username=quarkus
amqp-password=quarkus
mp.messaging.incoming.prices.address=prices
mp.messaging.incoming.prices.connector=smallrye-amqp
mp.messaging.incoming.prices.host=localhost
mp.messaging.incoming.prices.port=5672
mp.messaging.incoming.prices.username=quarkus
mp.messaging.incoming.prices.password=quarkus
mp.messaging.incoming.prices.broadcast=true
mp.messaging.incoming.prices.containerId=my-container-id

控制台堆栈跟踪(摘录)

2019-11-22 22:31:22,930 WARN  [org.apa.cam.com.rea.str.ReactiveStreamsConsumer] (Camel (camel-1) thread #1 - reactive-streams://DCE85ACAC992C3A-0000000000000000) Error processing exchange. Exchange[DCE85ACAC992C3A-000000000000005F]. Caused by: [org.apache.camel.component.file.GenericFileOperationFailedException - Cannot store file: .\target\values.txt]: org.apache.camel.component.file.GenericFileOperationFailedException: Cannot store file: .\target\values.txt
        at org.apache.camel.component.file.FileOperations.storeFile(FileOperations.java:376)
        at org.apache.camel.component.file.GenericFileProducer.writeFile(GenericFileProducer.java:300)
        at org.apache.camel.component.file.GenericFileProducer.processExchange(GenericFileProducer.java:164)
        at org.apache.camel.component.file.GenericFileProducer.process(GenericFileProducer.java:75)
        at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:67)
        at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:134)
        at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryState.run(RedeliveryErrorHandler.java:476)
        at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:185)
        at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:87)
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:228)
        at org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer.lambda$doSend$3(ReactiveStreamsConsumer.java:96)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.camel.InvalidPayloadException: No body available of type: java.io.InputStream but has value: 79 of type: java.lang.Integer on: Message[]. Caused by: No type converter available to convert from type: java.lang.Integer to the required type: java.io.InputStream with value 79. Exchange[DCE85ACAC992C3A-000000000000005F]. Caused by: [org.apache.camel.NoTypeConversionAvailableException - No type converter available to convert from type: java.lang.Integer to the required type: java.io.InputStream with value 79]
        at org.apache.camel.support.MessageSupport.getMandatoryBody(MessageSupport.java:115)
        at org.apache.camel.component.file.FileOperations.storeFile(FileOperations.java:355)
        ... 14 more
Caused by: org.apache.camel.NoTypeConversionAvailableException: No type converter available to convert from type: java.lang.Integer to the required type: java.io.InputStream with value 79
        at org.apache.camel.impl.converter.BaseTypeConverterRegistry.mandatoryConvertTo(BaseTypeConverterRegistry.java:139)
        at org.apache.camel.support.MessageSupport.getMandatoryBody(MessageSupport.java:113)
        ... 15 more

其他说明

发布应用程序基于夸克平台Ampq示例:
https://github.com/quarkusio/quarkus-quickstarts/tree/master/amqp-quickstart/src/main/java/org/acme/quarkus/sample

技术

java8

夸克

小黑麦

骆驼

maven


共1个答案

匿名用户

似乎订阅者当前没有执行类型转换。它可能会在未来的版本中得到解决。

同时,您需要在路由中强制执行:

@Override
public void configure() throws IOException, InterruptedException {
    from("direct:proc")
        .convertBodyTo(String.class)
        .to("file:./target?fileName=values.txt&fileExist=append");
}

@Incoming("prices")
public Subscriber<String> sink() {
    return crss.subscriber("direct:proc", String.class);
}