@SuppressWarnings("Duplicates")
RSocket accept(RSocket rSocket) {
RSocket pong = new RSocketProxy(rSocket) {
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return Flux.from(payloads).map(Payload::getDataUtf8).doOnNext(str -> {
int received = pingsReceived.incrementAndGet();
log.info("received " + str + "(" + received + ") in Pong");
}).map(PingPongApp::reply).map(reply -> {
ByteBuf data = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT,
reply);
ByteBuf routingMetadata = getForwardingMetadata(strategies,
"ping", 1L);
return DefaultPayload.create(data, routingMetadata);
});
}
};
return pong;
}
@Before
public void startup() {
server =
RSocketServer.create((setup, sendingSocket) -> Mono.just(new RSocketProxy(handler)))
.bind(TcpServerTransport.create("localhost", 0))
.block();
}