我的Camel路由应用程序检测到其他应用程序发布的ampq消息(发布数字),但无法处理,出现“没有可用的类型转换器”错误。我该如何解决?。
"org.apache.camel.NoTypeConversionAvailableException: No type converter available to convert from type: java.lang.Integer to the required type: java.io.InputStream with value 79"
路由器类
package aaa.bbb.ccc.qscx;
import java.io.IOException;
import javax.ejb.Startup;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.reactivestreams.Subscriber;
@Startup
@ApplicationScoped
public class TheRoutes extends RouteBuilder {
@Inject
TheProcessor theProcessor;
@Inject
CamelContext ctx;
@Inject
CamelReactiveStreamsService crss;
@Override
public void configure() throws IOException, InterruptedException {
from("reactive-streams:in")
.process(theProcessor)
.log(".........from reactive-streams:in - body: ${body}");
}
@Incoming("prices")
public Subscriber<String> sink() {
return crss.subscriber("file:./target?fileName=values.txt&fileExist=append", String.class);
}
}
pom. xml
<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<groupId>aaa.bbb.ccc </groupId>
<artifactId>qscx</artifactId>
<version>1.0</version>
<properties>
<compiler-plugin.version>3.8.1</compiler-plugin.version>
<maven.compiler.parameters>true</maven.compiler.parameters>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<quarkus-plugin.version>1.0.0.CR2</quarkus-plugin.version>
<quarkus.platform.artifact-id>quarkus-universe-bom</quarkus.platform.artifact-id>
<quarkus.platform.group-id>io.quarkus</quarkus.platform.group-id>
<quarkus.platform.version>1.0.0.CR2</quarkus.platform.version>
<surefire-plugin.version>2.22.1</surefire-plugin.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>${quarkus.platform.group-id}</groupId>
<artifactId>${quarkus.platform.artifact-id}</artifactId>
<version>${quarkus.platform.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy</artifactId>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-amqp</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-timer</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-artemis-jms</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-bean</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-streams-operators</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-support-common</artifactId>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-camel</artifactId>
<version>1.0.8</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.26</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<version>${quarkus-plugin.version}</version>
<executions>
<execution>
<goals>
<goal>build</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>${compiler-plugin.version}</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>${surefire-plugin.version}</version>
<configuration>
<systemProperties>
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
</systemProperties>
</configuration>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>native</id>
<activation>
<property>
<name>native</name>
</property>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-failsafe-plugin</artifactId>
<version>${surefire-plugin.version}</version>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
<configuration>
<systemProperties>
<native.image.path>${project.build.directory}/${project.build.finalName}-runner</native.image.path>
</systemProperties>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<properties>
<quarkus.package.type>native</quarkus.package.type>
</properties>
</profile>
</profiles>
<name>qscx</name>
</project>
application.properties
amqp-username=quarkus
amqp-password=quarkus
mp.messaging.incoming.prices.address=prices
mp.messaging.incoming.prices.connector=smallrye-amqp
mp.messaging.incoming.prices.host=localhost
mp.messaging.incoming.prices.port=5672
mp.messaging.incoming.prices.username=quarkus
mp.messaging.incoming.prices.password=quarkus
mp.messaging.incoming.prices.broadcast=true
mp.messaging.incoming.prices.containerId=my-container-id
控制台堆栈跟踪(摘录)
2019-11-22 22:31:22,930 WARN [org.apa.cam.com.rea.str.ReactiveStreamsConsumer] (Camel (camel-1) thread #1 - reactive-streams://DCE85ACAC992C3A-0000000000000000) Error processing exchange. Exchange[DCE85ACAC992C3A-000000000000005F]. Caused by: [org.apache.camel.component.file.GenericFileOperationFailedException - Cannot store file: .\target\values.txt]: org.apache.camel.component.file.GenericFileOperationFailedException: Cannot store file: .\target\values.txt
at org.apache.camel.component.file.FileOperations.storeFile(FileOperations.java:376)
at org.apache.camel.component.file.GenericFileProducer.writeFile(GenericFileProducer.java:300)
at org.apache.camel.component.file.GenericFileProducer.processExchange(GenericFileProducer.java:164)
at org.apache.camel.component.file.GenericFileProducer.process(GenericFileProducer.java:75)
at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:67)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:134)
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryState.run(RedeliveryErrorHandler.java:476)
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:185)
at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:87)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:228)
at org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer.lambda$doSend$3(ReactiveStreamsConsumer.java:96)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.camel.InvalidPayloadException: No body available of type: java.io.InputStream but has value: 79 of type: java.lang.Integer on: Message[]. Caused by: No type converter available to convert from type: java.lang.Integer to the required type: java.io.InputStream with value 79. Exchange[DCE85ACAC992C3A-000000000000005F]. Caused by: [org.apache.camel.NoTypeConversionAvailableException - No type converter available to convert from type: java.lang.Integer to the required type: java.io.InputStream with value 79]
at org.apache.camel.support.MessageSupport.getMandatoryBody(MessageSupport.java:115)
at org.apache.camel.component.file.FileOperations.storeFile(FileOperations.java:355)
... 14 more
Caused by: org.apache.camel.NoTypeConversionAvailableException: No type converter available to convert from type: java.lang.Integer to the required type: java.io.InputStream with value 79
at org.apache.camel.impl.converter.BaseTypeConverterRegistry.mandatoryConvertTo(BaseTypeConverterRegistry.java:139)
at org.apache.camel.support.MessageSupport.getMandatoryBody(MessageSupport.java:113)
... 15 more
其他说明
发布应用程序基于夸克平台Ampq示例:
https://github.com/quarkusio/quarkus-quickstarts/tree/master/amqp-quickstart/src/main/java/org/acme/quarkus/sample
技术
java8
夸克
小黑麦
骆驼
maven
似乎订阅者
当前没有执行类型转换。它可能会在未来的版本中得到解决。
同时,您需要在路由中强制执行:
@Override
public void configure() throws IOException, InterruptedException {
from("direct:proc")
.convertBodyTo(String.class)
.to("file:./target?fileName=values.txt&fileExist=append");
}
@Incoming("prices")
public Subscriber<String> sink() {
return crss.subscriber("direct:proc", String.class);
}