Java源码示例:io.rsocket.util.RSocketProxy

示例1
@SuppressWarnings("Duplicates")
RSocket accept(RSocket rSocket) {
	RSocket pong = new RSocketProxy(rSocket) {

		@Override
		public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
			return Flux.from(payloads).map(Payload::getDataUtf8).doOnNext(str -> {
				int received = pingsReceived.incrementAndGet();
				log.info("received " + str + "(" + received + ") in Pong");
			}).map(PingPongApp::reply).map(reply -> {
				ByteBuf data = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT,
						reply);
				ByteBuf routingMetadata = getForwardingMetadata(strategies,
						"ping", 1L);
				return DefaultPayload.create(data, routingMetadata);
			});
		}
	};
	return pong;
}
 
示例2
@Before
public void startup() {
  server =
      RSocketServer.create((setup, sendingSocket) -> Mono.just(new RSocketProxy(handler)))
          .bind(TcpServerTransport.create("localhost", 0))
          .block();
}