Java源码示例:org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService
示例1
@Override
public CamelReactiveStreamsService getReactiveStreamsService() {
synchronized (this.lock) {
if (getReactiveStreamsEngineConfiguration() == null) {
ReactiveStreamsEngineConfiguration reactiveStreamsEngineConfiguration = new ReactiveStreamsEngineConfiguration();
reactiveStreamsEngineConfiguration.setThreadPoolMaxSize(getThreadPoolMaxSize());
reactiveStreamsEngineConfiguration.setThreadPoolMinSize(getThreadPoolMinSize());
reactiveStreamsEngineConfiguration.setThreadPoolName(getThreadPoolName());
setReactiveStreamsEngineConfiguration(reactiveStreamsEngineConfiguration);
}
if (reactiveStreamService == null) {
this.reactiveStreamService = reactiveStreamServiceFactory.newInstance(
getCamelContext(),
getReactiveStreamsEngineConfiguration());
try {
// Start the service and add it to the Camel context to expose managed attributes
getCamelContext().addService(this.reactiveStreamService, true, true);
} catch (Exception e) {
throw new RuntimeCamelException(e);
}
}
}
return this.reactiveStreamService;
}
示例2
@Lazy
@Bean
@ConditionalOnMissingBean
@ConditionalOnClass(CamelContext.class)
public CamelReactiveStreamsService camelReactiveStreamsService(ApplicationContext ac) throws Exception {
ReactiveStreamsEngineConfiguration engineConfiguration = new ReactiveStreamsEngineConfiguration();
if (configuration.getReactiveStreamsEngineConfiguration() != null) {
engineConfiguration = ac.getBean(configuration.getReactiveStreamsEngineConfiguration(), ReactiveStreamsEngineConfiguration.class);
} else {
engineConfiguration.setThreadPoolName(configuration.getThreadPoolName());
if (configuration.getThreadPoolMinSize() != null) {
engineConfiguration.setThreadPoolMinSize(configuration.getThreadPoolMinSize());
}
if (configuration.getThreadPoolMaxSize() != null) {
engineConfiguration.setThreadPoolMinSize(configuration.getThreadPoolMaxSize());
}
}
return ReactiveStreamsHelper.resolveReactiveStreamsService(context, configuration.getServiceType(), engineConfiguration);
}
示例3
public static void main(String[] args) throws Exception {
CamelContext context = new DefaultCamelContext();
CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
Subscriber<String> subscriber = camel
.subscriber("file:./target?fileName=values.txt&fileExist=append", String.class);
ReactiveStreams.of("hello", "from", "smallrye", "reactive", "stream", "operators",
"using", "Apache", "Camel")
.map(String::toUpperCase) // Transform the words
.filter(s -> s.length() > 4) // Filter items
.peek(System.out::println)
.map(s -> s + " ")
.to(subscriber)
.run();
context.start();
// Just wait until it's done.
Thread.sleep(1000);
Files.readAllLines(new File("target/values.txt").toPath())
.forEach(s -> System.out.println("File >> " + s));
}
示例4
@Test
public void testConsumerBackPressure() throws Exception {
CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(context);
// create an array with the messages
String[] inbox = new String[100];
for (int i = 0; i < 100; i++) {
inbox[i] = "Hello " + i;
}
// use stream engine create a publisher
Flowable.fromArray(inbox)
.doOnRequest(n -> {
// log each time we are request more data from the publisher
log.info("Requesting {} messages", n);
})
.subscribe(rsCamel.streamSubscriber("inbox", String.class));
// let it run for 10 seconds
Thread.sleep(10 * 1000L);
}
示例5
@Test
public void testNoBackPressure() throws Exception {
CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(context);
// create a published that receive from the inbox stream
Publisher<String> inbox = rsCamel.fromStream("inbox", String.class);
// use stream engine to subscribe from the publisher
Flowable.fromPublisher(inbox)
.doOnNext(c -> {
log.info("Processing message {}", c);
Thread.sleep(1000);
})
.subscribe();
// send in 200 messages
log.info("Sending 200 messages ...");
for (int i = 0; i < 200; i++) {
fluentTemplate.withBody("Hello " + i).to("seda:inbox?waitForTaskToComplete=Never").send();
}
log.info("Sent 200 messages done");
// let it run for 250 seconds
Thread.sleep(250 * 1000L);
}
示例6
@Test
public void testInflightBackPressure() throws Exception {
CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(context);
// create a published that receive from the inbox stream
Publisher<String> inbox = rsCamel.fromStream("inbox", String.class);
// use stream engine to subscribe from the publisher
Flowable.fromPublisher(inbox)
.doOnNext(c -> {
log.info("Processing message {}", c);
Thread.sleep(1000);
})
.subscribe();
// send in 200 messages
log.info("Sending 200 messages ...");
for (int i = 0; i < 200; i++) {
fluentTemplate.withBody("Hello " + i).to("seda:inbox?waitForTaskToComplete=Never").send();
}
log.info("Sent 200 messages done");
// let it run for 250 seconds
Thread.sleep(250 * 1000L);
}
示例7
@Test
public void testFiles() throws Exception {
getMockEndpoint("mock:inbox").expectedMessageCount(4);
getMockEndpoint("mock:camel").expectedMessageCount(2);
CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(context);
// use stream engine to subscribe from the publisher
// where we filter out the big numbers which is logged
Flowable.fromPublisher(rsCamel.from("file:target/inbox"))
// call the direct:inbox Camel route from within this flow
.doOnNext(e -> rsCamel.to("direct:inbox", e))
// filter out files which has Camel in the text
.filter(e -> e.getIn().getBody(String.class).contains("Camel"))
// let Camel also be subscriber by the endpoint direct:camel
.subscribe(rsCamel.subscriber("direct:camel"));
// create some test files
fluentTemplate.to("file:target/inbox").withBody("Hello World").withHeader(Exchange.FILE_NAME, "hello.txt").send();
fluentTemplate.to("file:target/inbox").withBody("Hello Camel").withHeader(Exchange.FILE_NAME, "hello2.txt").send();
fluentTemplate.to("file:target/inbox").withBody("Bye Camel").withHeader(Exchange.FILE_NAME, "bye.txt").send();
fluentTemplate.to("file:target/inbox").withBody("Bye World").withHeader(Exchange.FILE_NAME, "bye2.txt").send();
assertMockEndpointsSatisfied();
}
示例8
@Test
public void testNumbers() throws Exception {
CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(context);
// create a published that receive from the numbers stream
Publisher<Integer> numbers = rsCamel.fromStream("numbers", Integer.class);
// use stream engine to subscribe from the publisher
// where we filter out the big numbers which is logged
Flowable.fromPublisher(numbers)
.filter(n -> n > 5)
.doOnNext(c -> log.info("Streaming big number {}", c))
.subscribe();
// let it run for 10 seconds
Thread.sleep(10000);
}
示例9
@Test
public void testLatestBackPressure() throws Exception {
CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(context);
// create a published that receive from the inbox stream
Publisher<String> inbox = rsCamel.fromStream("inbox", String.class);
// use stream engine to subscribe from the publisher
Flowable.fromPublisher(inbox)
.doOnNext(c -> {
log.info("Processing message {}", c);
Thread.sleep(1000);
})
.subscribe();
// send in 200 messages
log.info("Sending 200 messages ...");
for (int i = 0; i < 200; i++) {
fluentTemplate.withBody("Hello " + i).to("seda:inbox?waitForTaskToComplete=Never").send();
}
log.info("Sent 200 messages done");
// let it run for 250 seconds
Thread.sleep(250 * 1000L);
}
示例10
@Test
public void testNumbers() throws Exception {
CamelReactiveStreamsService reactive = CamelReactiveStreams.get(context);
// create a published that receive from the numbers stream
Publisher<Integer> numbers = reactive.fromStream("numbers", Integer.class);
// use stream engine to subscribe from the publisher
// where we filter out the big numbers which is logged
Flux.from(numbers)
.filter(n -> n > 5)
.doOnNext(c -> log.info("Streaming big number {}", c))
.subscribe();
// let it run for 10 seconds
Thread.sleep(10000);
}
示例11
@Test
public void testToStream() throws Exception {
CamelContext camelctx = new DefaultCamelContext();
camelctx.addRoutes(new RouteBuilder() {
public void configure() {
from("reactive-streams:reactive")
.setBody().constant("123");
}
});
camelctx.start();
try {
CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);
Publisher<Exchange> publisher = crs.toStream("reactive", new DefaultExchange(camelctx));
Exchange res = Flux.from(publisher).blockFirst();
Assert.assertNotNull(res);
String content = res.getIn().getBody(String.class);
Assert.assertNotNull(content);
Assert.assertEquals("123", content);
} finally {
camelctx.close();
}
}
示例12
@Test
public void testTo() throws Exception {
CamelContext camelctx = createWildFlyCamelContext();
camelctx.start();
try {
CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);
Set<String> values = Collections.synchronizedSet(new TreeSet<>());
CountDownLatch latch = new CountDownLatch(3);
Flux.just(1, 2, 3)
.flatMap(e -> crs.to("bean:hello", e, String.class))
.doOnNext(res -> values.add(res))
.doOnNext(res -> latch.countDown())
.subscribe();
Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
Assert.assertEquals(new TreeSet<>(Arrays.asList("Hello 1", "Hello 2", "Hello 3")), values);
} finally {
camelctx.close();
}
}
示例13
@Test
public void testToWithExchange() throws Exception {
CamelContext camelctx = createWildFlyCamelContext();
camelctx.start();
try {
CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);
Set<String> values = Collections.synchronizedSet(new TreeSet<>());
CountDownLatch latch = new CountDownLatch(3);
Flux.just(1, 2, 3)
.flatMap(e -> crs.to("bean:hello", e))
.map(e -> e.getOut())
.map(e -> e.getBody(String.class))
.doOnNext(res -> values.add(res))
.doOnNext(res -> latch.countDown())
.subscribe();
Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
Assert.assertEquals(new TreeSet<>(Arrays.asList("Hello 1", "Hello 2", "Hello 3")), values);
} finally {
camelctx.close();
}
}
示例14
@Test
public void testToFunction() throws Exception {
CamelContext camelctx = createWildFlyCamelContext();
camelctx.start();
try {
CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);
/* A TreeSet will order the messages alphabetically regardless of the insertion order
* This is important because in the Flux returned by Flux.flatMap(Function<? super T, ? extends
* Publisher<? extends R>>) the emissions may interleave */
Set<String> values = Collections.synchronizedSet(new TreeSet<>());
CountDownLatch latch = new CountDownLatch(3);
Function<Object, Publisher<String>> fun = crs.to("bean:hello", String.class);
Flux.just(1, 2, 3)
.flatMap(fun)
.doOnNext(res -> values.add(res))
.doOnNext(res -> latch.countDown())
.subscribe();
Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
Assert.assertEquals(new TreeSet<>(Arrays.asList("Hello 1", "Hello 2", "Hello 3")), values);
} finally {
camelctx.close();
}
}
示例15
@Test
public void testToFunctionWithExchange() throws Exception {
CamelContext camelctx = createWildFlyCamelContext();
camelctx.start();
try {
CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);
Set<String> values = Collections.synchronizedSet(new TreeSet<>());
CountDownLatch latch = new CountDownLatch(3);
Function<Object, Publisher<Exchange>> fun = crs.to("bean:hello");
Flux.just(1, 2, 3)
.flatMap(fun)
.map(e -> e.getOut())
.map(e -> e.getBody(String.class))
.doOnNext(res -> values.add(res))
.doOnNext(res -> latch.countDown())
.subscribe();
Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
Assert.assertEquals(new TreeSet<>(Arrays.asList("Hello 1", "Hello 2", "Hello 3")), values);
} finally {
camelctx.close();
}
}
示例16
@Test
public void testOnCompleteHeaderNotForwarded() throws Exception {
CamelContext camelctx = createCamelContext();
camelctx.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("reactive-streams:numbers")
.to("mock:endpoint");
}
});
CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);
Subscriber<Integer> numbers = crs.streamSubscriber("numbers", Integer.class);
camelctx.start();
try {
Flowable.<Integer>empty().subscribe(numbers);
MockEndpoint endpoint = camelctx.getEndpoint("mock:endpoint", MockEndpoint.class);
endpoint.expectedMessageCount(0);
endpoint.assertIsSatisfied(200);
} finally {
camelctx.close();
}
}
示例17
@Test
public void testConfiguration() throws Exception {
CamelReactiveStreamsService service = CamelReactiveStreams.get(context);
assertTrue(service instanceof DefaultCamelReactiveStreamsService);
assertEquals(service, reactiveStreamsService);
ReactiveStreamsComponent component = context.getComponent(ReactiveStreamsConstants.SCHEME, ReactiveStreamsComponent.class);
assertEquals("rs-test", component.getThreadPoolName());
}
示例18
@Test
public void testService() throws Exception {
CamelReactiveStreamsService service = CamelReactiveStreams.get(context);
CountDownLatch latch = new CountDownLatch(1);
String[] res = new String[1];
Throwable[] error = new Throwable[1];
Publisher<String> string = service.fromStream("stream", String.class);
string.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(100);
}
@Override
public void onNext(String s) {
res[0] = s;
latch.countDown();
}
@Override
public void onError(Throwable throwable) {
error[0] = throwable;
}
@Override
public void onComplete() {
}
});
context.createFluentProducerTemplate().to("direct:endpoint").withBody("Hello").send();
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertEquals("Hello", res[0]);
Thread.sleep(100);
assertNull(error[0]);
}
示例19
@Test
public void testAutoConfiguration() throws Exception {
new RouteBuilder() {
@Override
public void configure() throws Exception {
from("reactive-streams:data")
.log("${body}");
}
}.addRoutesToCamelContext(context);
Assert.assertTrue(context.getStatus().isStarted());
CamelReactiveStreamsService service = CamelReactiveStreams.get(context);
Assert.assertTrue(service instanceof DefaultCamelReactiveStreamsService);
}
示例20
@Test
public void testConsumeNumbers() throws Exception {
CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(context);
// use stream engine create a publisher
// that just sends 5 numbers, which needs to be sorted
// and then each data is send to Camel on the reactive-streams:number endpoint
Flowable.just("3", "4", "1", "5", "2")
.sorted(String::compareToIgnoreCase)
.subscribe(rsCamel.streamSubscriber("numbers", String.class));
// let it run for 2 seconds
Thread.sleep(2000);
}
示例21
@Test
public void testCamelFirst() throws Exception {
LOG.info("Starting RX-Java2 Flowable Camel first");
// create Camel
CamelContext camel = new DefaultCamelContext();
// create Reative Camel
CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(camel);
camel.start();
rsCamel.start();
// create a publisher from Camel seda:words endpoint
Publisher<String> publisher = rsCamel.from("seda:words", String.class);
Flowable.fromPublisher(publisher)
// upper case the word
.map(w -> w.toUpperCase())
// log the big number
.doOnNext(w -> LOG.info(w))
.subscribe();
// send some words to Camel
FluentProducerTemplate template = camel.createFluentProducerTemplate();
template.withBody("Camel").to("seda:words").send();
template.withBody("rocks").to("seda:words").send();
template.withBody("streams").to("seda:words").send();
template.withBody("as").to("seda:words").send();
template.withBody("well").to("seda:words").send();
// sleep a bit for reactive subscriber to complete
Thread.sleep(1000);
camel.stop();
rsCamel.stop();
}
示例22
@Test
public void testFromStreamDirect() throws Exception {
CamelContext camelctx = new DefaultCamelContext();
camelctx.addRoutes(new RouteBuilder() {
public void configure() {
from("direct:reactive")
.to("reactive-streams:numbers");
}
});
camelctx.start();
try {
AtomicInteger value = new AtomicInteger(0);
CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);
Flux.from(crs.fromStream("numbers", Integer.class))
.doOnNext(res -> Assert.assertEquals(value.incrementAndGet(), res.intValue()))
.subscribe();
ProducerTemplate template = camelctx.createProducerTemplate();
template.sendBody("direct:reactive", 1);
template.sendBody("direct:reactive", 2);
template.sendBody("direct:reactive", 3);
Assert.assertEquals(3, value.get());
} finally {
camelctx.close();
}
}
示例23
@Test
public void testFromStreamTimer() throws Exception {
CamelContext camelctx = new DefaultCamelContext();
camelctx.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("timer:tick?period=5&repeatCount=30")
.setBody().header(Exchange.TIMER_COUNTER)
.to("reactive-streams:tick");
}
});
final int num = 30;
final CountDownLatch latch = new CountDownLatch(num);
final AtomicInteger value = new AtomicInteger(0);
CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);
Flux.from(crs.fromStream("tick", Integer.class))
.doOnNext(res -> Assert.assertEquals(value.incrementAndGet(), res.intValue()))
.doOnNext(n -> latch.countDown())
.subscribe();
camelctx.start();
try {
latch.await(5, TimeUnit.SECONDS);
Assert.assertEquals(num, value.get());
} finally {
camelctx.close();
}
}
示例24
@Test
public void testFromStreamMultipleSubscriptionsWithDirect() throws Exception {
CamelContext camelctx = new DefaultCamelContext();
camelctx.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:reactive")
.to("reactive-streams:direct");
}
});
camelctx.start();
try {
CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);
CountDownLatch latch1 = new CountDownLatch(2);
Flux.from(crs.fromStream("direct", Integer.class))
.doOnNext(res -> latch1.countDown())
.subscribe();
CountDownLatch latch2 = new CountDownLatch(2);
Flux.from(crs.fromStream("direct", Integer.class))
.doOnNext(res -> latch2.countDown())
.subscribe();
ProducerTemplate template = camelctx.createProducerTemplate();
template.sendBody("direct:reactive", 1);
template.sendBody("direct:reactive", 2);
Assert.assertTrue(latch1.await(5, TimeUnit.SECONDS));
Assert.assertTrue(latch2.await(5, TimeUnit.SECONDS));
} finally {
camelctx.close();
}
}
示例25
@Test
public void testFrom() throws Exception {
CamelContext camelctx = new DefaultCamelContext();
CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);
Publisher<Exchange> timer = crs.from("timer:reactive?period=250&repeatCount=3");
AtomicInteger value = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(3);
camelctx.start();
try {
// [#2936] Reactive stream has no active subscriptions
Thread.sleep(500);
Flux.from(timer)
.map(exchange -> ExchangeHelper.getHeaderOrProperty(exchange, Exchange.TIMER_COUNTER, Integer.class))
.doOnNext(res -> Assert.assertEquals(value.incrementAndGet(), res.intValue()))
.doOnNext(res -> latch.countDown())
.subscribe();
Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
} finally {
camelctx.close();
}
}
示例26
@Test
public void testFromPublisher() throws Exception {
CamelContext camelctx = new DefaultCamelContext();
camelctx.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:source")
.to("direct:stream")
.setBody()
.simple("after stream: ${body}");
}
});
camelctx.start();
try {
CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);
crs.process("direct:stream",
publisher ->
Flux.from(publisher)
.map(e -> {
int i = e.getIn().getBody(Integer.class);
e.getOut().setBody(-i);
return e;
}
)
);
ProducerTemplate template = camelctx.createProducerTemplate();
for (int i = 1; i <= 3; i++) {
Assert.assertEquals(
"after stream: " + (-i),
template.requestBody("direct:source", i, String.class)
);
}
} finally {
camelctx.close();
}
}
示例27
@Test
public void testFromPublisherWithConversion() throws Exception {
CamelContext camelctx = new DefaultCamelContext();
camelctx.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:source")
.to("direct:stream")
.setBody()
.simple("after stream: ${body}");
}
});
camelctx.start();
try {
CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);
crs.process("direct:stream",
Integer.class,
publisher ->
Flux.from(publisher).map(Math::negateExact)
);
ProducerTemplate template = camelctx.createProducerTemplate();
for (int i = 1; i <= 3; i++) {
Assert.assertEquals(
"after stream: " + (-i),
template.requestBody("direct:source", i, String.class)
);
}
} finally {
camelctx.close();
}
}
示例28
@Test
public void testSubscriber() throws Exception {
CamelContext camelctx = new DefaultCamelContext();
camelctx.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:reactor")
.to("mock:result");
}
});
camelctx.start();
try {
CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);
Flux.just(1, 2, 3)
.subscribe(crs.subscriber("direct:reactor", Integer.class));
MockEndpoint mock = camelctx.getEndpoint("mock:result", MockEndpoint.class);
mock.expectedMessageCount(3);
mock.assertIsSatisfied();
int idx = 1;
for (Exchange ex : mock.getExchanges()) {
Assert.assertEquals(new Integer(idx++), ex.getIn().getBody(Integer.class));
}
} finally {
camelctx.close();
}
}
示例29
@Test
public void testOnCompleteHeaderForwarded() throws Exception {
CamelContext camelctx = createCamelContext();
camelctx.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("reactive-streams:numbers?forwardOnComplete=true")
.to("mock:endpoint");
}
});
CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);
Subscriber<Integer> numbers = crs.streamSubscriber("numbers", Integer.class);
camelctx.start();
try {
Flowable.<Integer>empty().subscribe(numbers);
MockEndpoint endpoint = camelctx.getEndpoint("mock:endpoint", MockEndpoint.class);
endpoint.expectedMessageCount(1);
endpoint.expectedHeaderReceived(ReactiveStreamsConstants.REACTIVE_STREAMS_EVENT_TYPE, "onComplete");
endpoint.expectedBodiesReceived(new Object[]{null});
endpoint.assertIsSatisfied();
} finally {
camelctx.close();
}
}
示例30
@Test
public void testOnNextHeaderForwarded() throws Exception {
CamelContext camelctx = createCamelContext();
camelctx.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("reactive-streams:numbers")
.to("mock:endpoint");
}
});
CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);
Subscriber<Integer> numbers = crs.streamSubscriber("numbers", Integer.class);
camelctx.start();
try {
Flowable.just(1).subscribe(numbers);
MockEndpoint endpoint = camelctx.getEndpoint("mock:endpoint", MockEndpoint.class);
endpoint.expectedHeaderReceived(ReactiveStreamsConstants.REACTIVE_STREAMS_EVENT_TYPE, "onNext");
endpoint.expectedMessageCount(1);
endpoint.assertIsSatisfied();
Exchange ex = endpoint.getExchanges().get(0);
Assert.assertEquals(1, ex.getIn().getBody());
} finally {
camelctx.close();
}
}