Java源码示例:org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService

示例1
@Override
public CamelReactiveStreamsService getReactiveStreamsService() {
    synchronized (this.lock) {
        if (getReactiveStreamsEngineConfiguration() == null) {
            ReactiveStreamsEngineConfiguration reactiveStreamsEngineConfiguration = new ReactiveStreamsEngineConfiguration();
            reactiveStreamsEngineConfiguration.setThreadPoolMaxSize(getThreadPoolMaxSize());
            reactiveStreamsEngineConfiguration.setThreadPoolMinSize(getThreadPoolMinSize());
            reactiveStreamsEngineConfiguration.setThreadPoolName(getThreadPoolName());
            setReactiveStreamsEngineConfiguration(reactiveStreamsEngineConfiguration);
        }
        if (reactiveStreamService == null) {
            this.reactiveStreamService = reactiveStreamServiceFactory.newInstance(
                    getCamelContext(),
                    getReactiveStreamsEngineConfiguration());

            try {
                // Start the service and add it to the Camel context to expose managed attributes
                getCamelContext().addService(this.reactiveStreamService, true, true);
            } catch (Exception e) {
                throw new RuntimeCamelException(e);
            }
        }
    }

    return this.reactiveStreamService;
}
 
示例2
@Lazy
@Bean
@ConditionalOnMissingBean
@ConditionalOnClass(CamelContext.class)
public CamelReactiveStreamsService camelReactiveStreamsService(ApplicationContext ac) throws Exception {
    ReactiveStreamsEngineConfiguration engineConfiguration = new ReactiveStreamsEngineConfiguration();

    if (configuration.getReactiveStreamsEngineConfiguration() != null) {
        engineConfiguration = ac.getBean(configuration.getReactiveStreamsEngineConfiguration(), ReactiveStreamsEngineConfiguration.class);
    } else {
        engineConfiguration.setThreadPoolName(configuration.getThreadPoolName());
        if (configuration.getThreadPoolMinSize() != null) {
            engineConfiguration.setThreadPoolMinSize(configuration.getThreadPoolMinSize());
        }
        if (configuration.getThreadPoolMaxSize() != null) {
            engineConfiguration.setThreadPoolMinSize(configuration.getThreadPoolMaxSize());
        }
    }

    return ReactiveStreamsHelper.resolveReactiveStreamsService(context, configuration.getServiceType(), engineConfiguration);
}
 
示例3
public static void main(String[] args) throws Exception {
    CamelContext context = new DefaultCamelContext();
    CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
    Subscriber<String> subscriber = camel
            .subscriber("file:./target?fileName=values.txt&fileExist=append", String.class);

    ReactiveStreams.of("hello", "from", "smallrye", "reactive", "stream", "operators",
            "using", "Apache", "Camel")
            .map(String::toUpperCase) // Transform the words
            .filter(s -> s.length() > 4) // Filter items
            .peek(System.out::println)
            .map(s -> s + " ")
            .to(subscriber)
            .run();
    context.start();

    // Just wait until it's done.
    Thread.sleep(1000);

    Files.readAllLines(new File("target/values.txt").toPath())
            .forEach(s -> System.out.println("File >> " + s));
}
 
示例4
@Test
public void testConsumerBackPressure() throws Exception {
    CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(context);

    // create an array with the messages
    String[] inbox = new String[100];
    for (int i = 0; i < 100; i++) {
        inbox[i] = "Hello " + i;
    }

    // use stream engine create a publisher
    Flowable.fromArray(inbox)
        .doOnRequest(n -> {
            // log each time we are request more data from the publisher
            log.info("Requesting {} messages", n);
        })
        .subscribe(rsCamel.streamSubscriber("inbox", String.class));

    // let it run for 10 seconds
    Thread.sleep(10 * 1000L);
}
 
示例5
@Test
public void testNoBackPressure() throws Exception {
    CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(context);

    // create a published that receive from the inbox stream
    Publisher<String> inbox = rsCamel.fromStream("inbox", String.class);

    // use stream engine to subscribe from the publisher
    Flowable.fromPublisher(inbox)
        .doOnNext(c -> {
            log.info("Processing message {}", c);
            Thread.sleep(1000);
        })
        .subscribe();

    // send in 200 messages
    log.info("Sending 200 messages ...");
    for (int i = 0; i < 200; i++) {
        fluentTemplate.withBody("Hello " + i).to("seda:inbox?waitForTaskToComplete=Never").send();
    }
    log.info("Sent 200 messages done");

    // let it run for 250 seconds
    Thread.sleep(250 * 1000L);
}
 
示例6
@Test
public void testInflightBackPressure() throws Exception {
    CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(context);

    // create a published that receive from the inbox stream
    Publisher<String> inbox = rsCamel.fromStream("inbox", String.class);

    // use stream engine to subscribe from the publisher
    Flowable.fromPublisher(inbox)
        .doOnNext(c -> {
            log.info("Processing message {}", c);
            Thread.sleep(1000);
        })
        .subscribe();

    // send in 200 messages
    log.info("Sending 200 messages ...");
    for (int i = 0; i < 200; i++) {
        fluentTemplate.withBody("Hello " + i).to("seda:inbox?waitForTaskToComplete=Never").send();
    }
    log.info("Sent 200 messages done");

    // let it run for 250 seconds
    Thread.sleep(250 * 1000L);
}
 
示例7
@Test
public void testFiles() throws Exception {
    getMockEndpoint("mock:inbox").expectedMessageCount(4);
    getMockEndpoint("mock:camel").expectedMessageCount(2);

    CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(context);

    // use stream engine to subscribe from the publisher
    // where we filter out the big numbers which is logged
    Flowable.fromPublisher(rsCamel.from("file:target/inbox"))
        // call the direct:inbox Camel route from within this flow
        .doOnNext(e -> rsCamel.to("direct:inbox", e))
        // filter out files which has Camel in the text
        .filter(e -> e.getIn().getBody(String.class).contains("Camel"))
        // let Camel also be subscriber by the endpoint direct:camel
        .subscribe(rsCamel.subscriber("direct:camel"));

    // create some test files
    fluentTemplate.to("file:target/inbox").withBody("Hello World").withHeader(Exchange.FILE_NAME, "hello.txt").send();
    fluentTemplate.to("file:target/inbox").withBody("Hello Camel").withHeader(Exchange.FILE_NAME, "hello2.txt").send();
    fluentTemplate.to("file:target/inbox").withBody("Bye Camel").withHeader(Exchange.FILE_NAME, "bye.txt").send();
    fluentTemplate.to("file:target/inbox").withBody("Bye World").withHeader(Exchange.FILE_NAME, "bye2.txt").send();

    assertMockEndpointsSatisfied();
}
 
示例8
@Test
public void testNumbers() throws Exception {
    CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(context);

    // create a published that receive from the numbers stream
    Publisher<Integer> numbers = rsCamel.fromStream("numbers", Integer.class);

    // use stream engine to subscribe from the publisher
    // where we filter out the big numbers which is logged
    Flowable.fromPublisher(numbers)
        .filter(n -> n > 5)
        .doOnNext(c -> log.info("Streaming big number {}", c))
        .subscribe();

    // let it run for 10 seconds
    Thread.sleep(10000);
}
 
示例9
@Test
public void testLatestBackPressure() throws Exception {
    CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(context);

    // create a published that receive from the inbox stream
    Publisher<String> inbox = rsCamel.fromStream("inbox", String.class);

    // use stream engine to subscribe from the publisher
    Flowable.fromPublisher(inbox)
        .doOnNext(c -> {
            log.info("Processing message {}", c);
            Thread.sleep(1000);
        })
        .subscribe();

    // send in 200 messages
    log.info("Sending 200 messages ...");
    for (int i = 0; i < 200; i++) {
        fluentTemplate.withBody("Hello " + i).to("seda:inbox?waitForTaskToComplete=Never").send();
    }
    log.info("Sent 200 messages done");

    // let it run for 250 seconds
    Thread.sleep(250 * 1000L);
}
 
示例10
@Test
public void testNumbers() throws Exception {
    CamelReactiveStreamsService reactive = CamelReactiveStreams.get(context);

    // create a published that receive from the numbers stream
    Publisher<Integer> numbers = reactive.fromStream("numbers", Integer.class);

    // use stream engine to subscribe from the publisher
    // where we filter out the big numbers which is logged
    Flux.from(numbers)
        .filter(n -> n > 5)
        .doOnNext(c -> log.info("Streaming big number {}", c))
        .subscribe();

    // let it run for 10 seconds
    Thread.sleep(10000);
}
 
示例11
@Test
public void testToStream() throws Exception {
    CamelContext camelctx = new DefaultCamelContext();
    camelctx.addRoutes(new RouteBuilder() {
        public void configure() {
            from("reactive-streams:reactive")
                .setBody().constant("123");
        }
    });

    camelctx.start();
    try {
        CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);
        Publisher<Exchange> publisher = crs.toStream("reactive", new DefaultExchange(camelctx));
        Exchange res = Flux.from(publisher).blockFirst();

        Assert.assertNotNull(res);

        String content = res.getIn().getBody(String.class);

        Assert.assertNotNull(content);
        Assert.assertEquals("123", content);
    } finally {
        camelctx.close();
    }
}
 
示例12
@Test
public void testTo() throws Exception {
    CamelContext camelctx = createWildFlyCamelContext();
    camelctx.start();
    try {
        CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);

        Set<String> values = Collections.synchronizedSet(new TreeSet<>());
        CountDownLatch latch = new CountDownLatch(3);

        Flux.just(1, 2, 3)
            .flatMap(e -> crs.to("bean:hello", e, String.class))
            .doOnNext(res -> values.add(res))
            .doOnNext(res -> latch.countDown())
            .subscribe();

        Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
        Assert.assertEquals(new TreeSet<>(Arrays.asList("Hello 1", "Hello 2", "Hello 3")), values);
    } finally {
        camelctx.close();
    }
}
 
示例13
@Test
public void testToWithExchange() throws Exception {
    CamelContext camelctx = createWildFlyCamelContext();
    camelctx.start();
    try {
        CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);

        Set<String> values = Collections.synchronizedSet(new TreeSet<>());
        CountDownLatch latch = new CountDownLatch(3);

        Flux.just(1, 2, 3)
            .flatMap(e -> crs.to("bean:hello", e))
            .map(e -> e.getOut())
            .map(e -> e.getBody(String.class))
            .doOnNext(res -> values.add(res))
            .doOnNext(res -> latch.countDown())
            .subscribe();

        Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
        Assert.assertEquals(new TreeSet<>(Arrays.asList("Hello 1", "Hello 2", "Hello 3")), values);
    } finally {
        camelctx.close();
    }
}
 
示例14
@Test
public void testToFunction() throws Exception {
    CamelContext camelctx = createWildFlyCamelContext();
    camelctx.start();
    try {
        CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);

        /* A TreeSet will order the messages alphabetically regardless of the insertion order
         * This is important because in the Flux returned by Flux.flatMap(Function<? super T, ? extends
         * Publisher<? extends R>>) the emissions may interleave */
        Set<String> values = Collections.synchronizedSet(new TreeSet<>());
        CountDownLatch latch = new CountDownLatch(3);
        Function<Object, Publisher<String>> fun = crs.to("bean:hello", String.class);

        Flux.just(1, 2, 3)
            .flatMap(fun)
            .doOnNext(res -> values.add(res))
            .doOnNext(res -> latch.countDown())
            .subscribe();

        Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
        Assert.assertEquals(new TreeSet<>(Arrays.asList("Hello 1", "Hello 2", "Hello 3")), values);
    } finally {
        camelctx.close();
    }
}
 
示例15
@Test
public void testToFunctionWithExchange() throws Exception {
    CamelContext camelctx = createWildFlyCamelContext();
    camelctx.start();
    try {
        CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);
        Set<String> values = Collections.synchronizedSet(new TreeSet<>());
        CountDownLatch latch = new CountDownLatch(3);
        Function<Object, Publisher<Exchange>> fun = crs.to("bean:hello");

        Flux.just(1, 2, 3)
            .flatMap(fun)
            .map(e -> e.getOut())
            .map(e -> e.getBody(String.class))
            .doOnNext(res -> values.add(res))
            .doOnNext(res -> latch.countDown())
            .subscribe();

        Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
        Assert.assertEquals(new TreeSet<>(Arrays.asList("Hello 1", "Hello 2", "Hello 3")), values);
    } finally {
        camelctx.close();
    }
}
 
示例16
@Test
public void testOnCompleteHeaderNotForwarded() throws Exception {
    CamelContext camelctx = createCamelContext();
    camelctx.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("reactive-streams:numbers")
                .to("mock:endpoint");
        }
    });

    CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);
    Subscriber<Integer> numbers = crs.streamSubscriber("numbers", Integer.class);

    camelctx.start();
    try {
        Flowable.<Integer>empty().subscribe(numbers);

        MockEndpoint endpoint = camelctx.getEndpoint("mock:endpoint", MockEndpoint.class);
        endpoint.expectedMessageCount(0);
        endpoint.assertIsSatisfied(200);
    } finally {
        camelctx.close();
    }
}
 
示例17
@Test
public void testConfiguration() throws Exception {
    CamelReactiveStreamsService service = CamelReactiveStreams.get(context);
    assertTrue(service instanceof DefaultCamelReactiveStreamsService);
    assertEquals(service, reactiveStreamsService);

    ReactiveStreamsComponent component = context.getComponent(ReactiveStreamsConstants.SCHEME, ReactiveStreamsComponent.class);
    assertEquals("rs-test", component.getThreadPoolName());
}
 
示例18
@Test
public void testService() throws Exception {
    CamelReactiveStreamsService service = CamelReactiveStreams.get(context);
    CountDownLatch latch = new CountDownLatch(1);
    String[] res = new String[1];
    Throwable[] error = new Throwable[1];
    Publisher<String> string = service.fromStream("stream", String.class);

    string.subscribe(new Subscriber<String>() {
        @Override
        public void onSubscribe(Subscription subscription) {
            subscription.request(100);
        }

        @Override
        public void onNext(String s) {
            res[0] = s;
            latch.countDown();
        }

        @Override
        public void onError(Throwable throwable) {
            error[0] = throwable;
        }

        @Override
        public void onComplete() {
        }
    });

    context.createFluentProducerTemplate().to("direct:endpoint").withBody("Hello").send();

    assertTrue(latch.await(5, TimeUnit.SECONDS));
    assertEquals("Hello", res[0]);
    Thread.sleep(100);
    assertNull(error[0]);
}
 
示例19
@Test
public void testAutoConfiguration() throws Exception {

    new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("reactive-streams:data")
                    .log("${body}");
        }
    }.addRoutesToCamelContext(context);

    Assert.assertTrue(context.getStatus().isStarted());
    CamelReactiveStreamsService service = CamelReactiveStreams.get(context);
    Assert.assertTrue(service instanceof DefaultCamelReactiveStreamsService);
}
 
示例20
@Test
public void testConsumeNumbers() throws Exception {
    CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(context);

    // use stream engine create a publisher
    // that just sends 5 numbers, which needs to be sorted
    // and then each data is send to Camel on the reactive-streams:number endpoint
    Flowable.just("3", "4", "1", "5", "2")
        .sorted(String::compareToIgnoreCase)
        .subscribe(rsCamel.streamSubscriber("numbers", String.class));

    // let it run for 2 seconds
    Thread.sleep(2000);
}
 
示例21
@Test
public void testCamelFirst() throws Exception {
    LOG.info("Starting RX-Java2 Flowable Camel first");

    // create Camel
    CamelContext camel = new DefaultCamelContext();

    // create Reative Camel
    CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(camel);

    camel.start();
    rsCamel.start();

    // create a publisher from Camel seda:words endpoint
    Publisher<String> publisher = rsCamel.from("seda:words", String.class);

    Flowable.fromPublisher(publisher)
        // upper case the word
        .map(w -> w.toUpperCase())
        // log the big number
        .doOnNext(w -> LOG.info(w))
        .subscribe();

    // send some words to Camel
    FluentProducerTemplate template = camel.createFluentProducerTemplate();

    template.withBody("Camel").to("seda:words").send();
    template.withBody("rocks").to("seda:words").send();
    template.withBody("streams").to("seda:words").send();
    template.withBody("as").to("seda:words").send();
    template.withBody("well").to("seda:words").send();

    // sleep a bit for reactive subscriber to complete
    Thread.sleep(1000);

    camel.stop();
    rsCamel.stop();
}
 
示例22
@Test
public void testFromStreamDirect() throws Exception {

    CamelContext camelctx = new DefaultCamelContext();
    camelctx.addRoutes(new RouteBuilder() {
        public void configure() {
            from("direct:reactive")
                .to("reactive-streams:numbers");
        }
    });

    camelctx.start();
    try {
        AtomicInteger value = new AtomicInteger(0);

        CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);
        Flux.from(crs.fromStream("numbers", Integer.class))
            .doOnNext(res -> Assert.assertEquals(value.incrementAndGet(), res.intValue()))
            .subscribe();

        ProducerTemplate template = camelctx.createProducerTemplate();
        template.sendBody("direct:reactive", 1);
        template.sendBody("direct:reactive", 2);
        template.sendBody("direct:reactive", 3);

        Assert.assertEquals(3, value.get());
    } finally {
        camelctx.close();
    }
}
 
示例23
@Test
public void testFromStreamTimer() throws Exception {

    CamelContext camelctx = new DefaultCamelContext();
    camelctx.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("timer:tick?period=5&repeatCount=30")
                .setBody().header(Exchange.TIMER_COUNTER)
                .to("reactive-streams:tick");
        }
    });

    final int num = 30;
    final CountDownLatch latch = new CountDownLatch(num);
    final AtomicInteger value = new AtomicInteger(0);

    CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);
    Flux.from(crs.fromStream("tick", Integer.class))
        .doOnNext(res -> Assert.assertEquals(value.incrementAndGet(), res.intValue()))
        .doOnNext(n -> latch.countDown())
        .subscribe();

    camelctx.start();
    try {
        latch.await(5, TimeUnit.SECONDS);
        Assert.assertEquals(num, value.get());
    } finally {
        camelctx.close();
    }
}
 
示例24
@Test
public void testFromStreamMultipleSubscriptionsWithDirect() throws Exception {

    CamelContext camelctx = new DefaultCamelContext();
    camelctx.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:reactive")
                .to("reactive-streams:direct");
        }
    });

    camelctx.start();
    try {
        CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);

        CountDownLatch latch1 = new CountDownLatch(2);
        Flux.from(crs.fromStream("direct", Integer.class))
            .doOnNext(res -> latch1.countDown())
            .subscribe();

        CountDownLatch latch2 = new CountDownLatch(2);
        Flux.from(crs.fromStream("direct", Integer.class))
            .doOnNext(res -> latch2.countDown())
            .subscribe();

        ProducerTemplate template = camelctx.createProducerTemplate();
        template.sendBody("direct:reactive", 1);
        template.sendBody("direct:reactive", 2);

        Assert.assertTrue(latch1.await(5, TimeUnit.SECONDS));
        Assert.assertTrue(latch2.await(5, TimeUnit.SECONDS));
    } finally {
        camelctx.close();
    }
}
 
示例25
@Test
public void testFrom() throws Exception {

    CamelContext camelctx = new DefaultCamelContext();

    CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);
    Publisher<Exchange> timer = crs.from("timer:reactive?period=250&repeatCount=3");

    AtomicInteger value = new AtomicInteger(0);
    CountDownLatch latch = new CountDownLatch(3);

    camelctx.start();
    try {

        // [#2936] Reactive stream has no active subscriptions
        Thread.sleep(500);

        Flux.from(timer)
            .map(exchange -> ExchangeHelper.getHeaderOrProperty(exchange, Exchange.TIMER_COUNTER, Integer.class))
            .doOnNext(res -> Assert.assertEquals(value.incrementAndGet(), res.intValue()))
            .doOnNext(res -> latch.countDown())
            .subscribe();

        Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
    } finally {
        camelctx.close();
    }
}
 
示例26
@Test
public void testFromPublisher() throws Exception {

    CamelContext camelctx = new DefaultCamelContext();
    camelctx.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:source")
                .to("direct:stream")
                .setBody()
                    .simple("after stream: ${body}");
        }
    });

    camelctx.start();
    try {
        CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);
        crs.process("direct:stream",
            publisher ->
                Flux.from(publisher)
                    .map(e -> {
                        int i = e.getIn().getBody(Integer.class);
                        e.getOut().setBody(-i);

                        return e;
                    }
                )
        );

        ProducerTemplate template = camelctx.createProducerTemplate();
        for (int i = 1; i <= 3; i++) {
            Assert.assertEquals(
                "after stream: " + (-i),
                template.requestBody("direct:source", i, String.class)
            );
        }
    } finally {
        camelctx.close();
    }
}
 
示例27
@Test
public void testFromPublisherWithConversion() throws Exception {
    CamelContext camelctx = new DefaultCamelContext();
    camelctx.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:source")
                .to("direct:stream")
                .setBody()
                    .simple("after stream: ${body}");
        }
    });

    camelctx.start();
    try {
        CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);

        crs.process("direct:stream",
            Integer.class,
            publisher ->
                Flux.from(publisher).map(Math::negateExact)
        );

        ProducerTemplate template = camelctx.createProducerTemplate();
        for (int i = 1; i <= 3; i++) {
            Assert.assertEquals(
                "after stream: " + (-i),
                template.requestBody("direct:source", i, String.class)
            );
        }
    } finally {
        camelctx.close();
    }
}
 
示例28
@Test
public void testSubscriber() throws Exception {
    CamelContext camelctx = new DefaultCamelContext();
    camelctx.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:reactor")
                .to("mock:result");
        }
    });

    camelctx.start();
    try {
        CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);

        Flux.just(1, 2, 3)
            .subscribe(crs.subscriber("direct:reactor", Integer.class));

        MockEndpoint mock = camelctx.getEndpoint("mock:result", MockEndpoint.class);
        mock.expectedMessageCount(3);
        mock.assertIsSatisfied();

        int idx = 1;
        for (Exchange ex : mock.getExchanges()) {
            Assert.assertEquals(new Integer(idx++), ex.getIn().getBody(Integer.class));
        }
    } finally {
        camelctx.close();
    }
}
 
示例29
@Test
public void testOnCompleteHeaderForwarded() throws Exception {
    CamelContext camelctx = createCamelContext();
    camelctx.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("reactive-streams:numbers?forwardOnComplete=true")
                .to("mock:endpoint");
        }
    });

    CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);
    Subscriber<Integer> numbers = crs.streamSubscriber("numbers", Integer.class);

    camelctx.start();
    try {
        Flowable.<Integer>empty().subscribe(numbers);

        MockEndpoint endpoint = camelctx.getEndpoint("mock:endpoint", MockEndpoint.class);
        endpoint.expectedMessageCount(1);
        endpoint.expectedHeaderReceived(ReactiveStreamsConstants.REACTIVE_STREAMS_EVENT_TYPE, "onComplete");
        endpoint.expectedBodiesReceived(new Object[]{null});
        endpoint.assertIsSatisfied();
    } finally {
        camelctx.close();
    }
}
 
示例30
@Test
public void testOnNextHeaderForwarded() throws Exception {
    CamelContext camelctx = createCamelContext();
    camelctx.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("reactive-streams:numbers")
                .to("mock:endpoint");
        }
    });

    CamelReactiveStreamsService crs = CamelReactiveStreams.get(camelctx);
    Subscriber<Integer> numbers = crs.streamSubscriber("numbers", Integer.class);

    camelctx.start();
    try {
        Flowable.just(1).subscribe(numbers);

        MockEndpoint endpoint = camelctx.getEndpoint("mock:endpoint", MockEndpoint.class);
        endpoint.expectedHeaderReceived(ReactiveStreamsConstants.REACTIVE_STREAMS_EVENT_TYPE, "onNext");
        endpoint.expectedMessageCount(1);
        endpoint.assertIsSatisfied();

        Exchange ex = endpoint.getExchanges().get(0);
        Assert.assertEquals(1, ex.getIn().getBody());
    } finally {
        camelctx.close();
    }
}