我用静态编程语言为Spring WebFlux编写了一个测试客户端和服务器。客户端向服务器发送一个数字(例如4)并返回这么多数字(例如0、1、2和3)。以下是服务器实现:
class NumbersWebSocketHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
var index = 0
var count = 1
val publisher = Flux.generate<Int> { sink ->
if (index < count) {
sink.next(index)
index++
} else {
sink.complete()
}
}.map(Int::toString)
.map(session::textMessage)
.delayElements(Duration.ofMillis(500))
return session.receive()
.map(WebSocketMessage::getPayloadAsText)
.doOnNext {
println("About to send $it numbers")
count = it.toInt()
}
.then()
.and(session.send(publisher))
.then()
}
}
这是客户:
fun main(args: Array<String>) {
val uri = URI("ws://localhost:8080/numbers")
val client = ReactorNettyWebSocketClient()
println("How many numbers would you like?")
val input = Flux.just(readLine())
client.execute(uri) { session ->
session.send(input.map(session::textMessage))
.then(
session.receive()
.map(WebSocketMessage::getPayloadAsText)
.map { it.toInt() }
.reduce { a,b ->
println("Reduce called with $a and $b")
a + b
}
.doOnNext(::println)
.then()
)
.then()
}.block()
}
客户端成功接收数字并按如下方式调用duce:
使用0和1调用的减少
减少用1和2调用
减少用3和3调用
然而,对doOnNext的调用永远不会到达——大概是因为客户端不知道最后一项已发送。我的问题是,我需要在客户端或服务器上添加什么代码才能打印总数?
更新:在服务器端关闭会话没有帮助。我试过:
.delayElements(Duration.ofMillis(500))
.doOnComplete { session.close() }
还有:
.delayElements(Duration.ofMillis(500))
.doFinally { session.close() }
但是两者都不会对客户端的行为产生任何影响。在调用“发送”后尝试显式关闭会话也不会:
.and(session.send(publisher))
.then()
.and { session.close() }
.then()
<代码>单声道
客户端使用duce
等待来自服务器端的输入结束,但服务器使用接收()
doOnNext
然后()
永远等待接收消息。所以客户端和服务器都互相等待。在客户端添加取
有助于打破僵局:
client.execute(uri, session -> session
.send(input.map(session::textMessage))
.then(session.receive()
.map(WebSocketMessage::getPayloadAsText)
.map(Integer::valueOf)
.take(3)
.reduce((a,b) -> {
logger.debug("Reduce called with " + a + " and " + b);
return a + b;
})
.doOnNext(logger::debug)
)
.then()
).block();
第二个问题是服务器组成不正确。发送与通过. and
接收并行触发。相反,发送流应该首先依赖于接收计数:
session.receive()
.map(WebSocketMessage::getPayloadAsText)
.flatMap(s -> {
logger.debug("About to send " + s + " numbers");
count.set(Integer.parseInt(s));
return session.send(publisher);
})
.then()
从duce
的大理石图中可以看出,它期望上游发布者发送一个on完成
事件来发出事件本身。我不确定,但我认为只有当连接被优雅地终止时才会发出。这就是doOnNext
永远不会执行的原因。
而不是duce
,我认为您应该使用订阅()
,并定义您自己的Subscriber
实例,您可以在其中保留状态。
编辑:您不能在此处使用订阅
方法。尽管如果在服务器上关闭连接,它应该这样工作:
.map(Int::toString)
.map(session::textMessage)
.delayElements(Duration.ofMillis(500))
.then(session::close)