提问者:小点点

如何使用Spring WebFlux和WebSockets确保响应式流完成


我用静态编程语言为Spring WebFlux编写了一个测试客户端和服务器。客户端向服务器发送一个数字(例如4)并返回这么多数字(例如0、1、2和3)。以下是服务器实现:

class NumbersWebSocketHandler : WebSocketHandler {
    override fun handle(session: WebSocketSession): Mono<Void> {
        var index = 0
        var count = 1
        val publisher = Flux.generate<Int> { sink ->
            if (index < count) {
                sink.next(index)
                index++
            } else {
                sink.complete()
            }
        }.map(Int::toString)
            .map(session::textMessage)
            .delayElements(Duration.ofMillis(500))

        return session.receive()
            .map(WebSocketMessage::getPayloadAsText)
            .doOnNext {
                println("About to send $it numbers")
                count = it.toInt()
            }
            .then()
            .and(session.send(publisher))
            .then()
    }
}

这是客户:

fun main(args: Array<String>) {
    val uri = URI("ws://localhost:8080/numbers")
    val client = ReactorNettyWebSocketClient()

    println("How many numbers would you like?")
    val input = Flux.just(readLine())

    client.execute(uri) { session ->
        session.send(input.map(session::textMessage))
            .then(
                session.receive()
                    .map(WebSocketMessage::getPayloadAsText)
                    .map { it.toInt() }
                    .reduce { a,b ->
                        println("Reduce called with $a and $b")
                        a + b
                    }
                    .doOnNext(::println)
                    .then()
            )
            .then()
    }.block()
}

客户端成功接收数字并按如下方式调用duce:

使用0和1调用的减少

减少用1和2调用

减少用3和3调用

然而,对doOnNext的调用永远不会到达——大概是因为客户端不知道最后一项已发送。我的问题是,我需要在客户端或服务器上添加什么代码才能打印总数?

更新:在服务器端关闭会话没有帮助。我试过:

.delayElements(Duration.ofMillis(500))
.doOnComplete { session.close() }

还有:

.delayElements(Duration.ofMillis(500))
.doFinally { session.close() }

但是两者都不会对客户端的行为产生任何影响。在调用“发送”后尝试显式关闭会话也不会:

.and(session.send(publisher))
.then()
.and { session.close() }
.then()

共2个答案

匿名用户

<代码>单声道

客户端使用duce等待来自服务器端的输入结束,但服务器使用接收()doOnNext然后()永远等待接收消息。所以客户端和服务器都互相等待。在客户端添加有助于打破僵局:

client.execute(uri, session -> session
        .send(input.map(session::textMessage))
        .then(session.receive()
                .map(WebSocketMessage::getPayloadAsText)
                .map(Integer::valueOf)
                .take(3)
                .reduce((a,b) -> {
                    logger.debug("Reduce called with " + a + " and " + b);
                    return a + b;
                })
                .doOnNext(logger::debug)
        )
        .then()
).block();

第二个问题是服务器组成不正确。发送与通过. and接收并行触发。相反,发送流应该首先依赖于接收计数:

session.receive()
        .map(WebSocketMessage::getPayloadAsText)
        .flatMap(s -> {
            logger.debug("About to send " + s + " numbers");
            count.set(Integer.parseInt(s));
            return session.send(publisher);
        })
        .then()

匿名用户

duce的大理石图中可以看出,它期望上游发布者发送一个on完成事件来发出事件本身。我不确定,但我认为只有当连接被优雅地终止时才会发出。这就是doOnNext永远不会执行的原因。

而不是duce,我认为您应该使用订阅(),并定义您自己的Subscriber实例,您可以在其中保留状态。

编辑:您不能在此处使用订阅方法。尽管如果在服务器上关闭连接,它应该这样工作:

.map(Int::toString)
        .map(session::textMessage)
        .delayElements(Duration.ofMillis(500))
        .then(session::close)