提问者:小点点

如何检查结果元素流中包含的2个或更多值的内容-通量(spring WebFlux)


我有一个方法可以验证Stream中包含的一些元素。

  • 任务

例如,有一个序列有一系列数字(数字不重复),每个数字都比另一个大:

1, 20, 35, 39, 45, 43

... 有必要检查此流中是否存在指定范围,例如35。。。如果没有这样的范围,那么你需要抛出一个异常。

但由于这是异步处理,通常的方法在这里不起作用。毕竟,我们处理流中的元素,不知道下一个元素是什么。

该序列的元素需要折叠,添加应逐渐进行(一旦找到元素的初始范围)。

在服务期间,您需要检查生成的序列中是否存在endpoint,以及何时接收到整个元素流,但这一点不是,然后请求异常,因为未接收到指定范围的上限

此外,在找到指定范围的起点之前,不要开始计算,

虽然我们不能阻止流,否则我们将得到相同的Exstrong TextException。

如何组织这种检查?

当我使用常规线程时,它看起来是这样的:


   private boolean isRangeValues() {

        List<BigInteger> sequence = Arrays
                .asList(
                        new BigInteger("11"),
                        new BigInteger("15"),
                        new BigInteger("23"),
                        new BigInteger("27"),
                        new BigInteger("30"));

           BigInteger startRange =   new BigInteger("15");
           BigInteger finishRange = new BigInteger("27");

        boolean isStartRangeMember = sequence.contains(startRange);
        boolean isFinishRangeMembe = sequence.contains(finishRange);


        return isStartRangeMember && isFinishRangeMember;
    }

但是我有一个任务来处理以一定间隔生成的元素流。为了得到结果,在Spring中使用了Reactor栈,我在Flux中得到了结果。

只需转换为列表和进程,-它不会工作,会有异常。过滤完这些元素后,流将继续被处理。

但是如果我在过滤时看到数据验证中的错误(在这种情况下,不需要任何元素),那么我将需要请求异常,该异常将被全局处理并返回给客户端。

    @GetMapping("v1/sequence/{startRange}/{endRange}")
    Mono<BigInteger> getSumSequence(
            @PathVariable BigInteger startRange,
            @PathVariable BigInteger endRange) {

        Flux<BigInteger> sequenceFlux = sequenceGenerated();
        
         validateSequence(sequenceFlux)
      
       return sum(sequenceFlux );
     }

     private Mono<BigInteger> sum (Flux<BigInteger> sequenceFlux ){
      .....
     }

       private void validateSequence(Flux<BigInteger> sequenceFlux){
         ... is wrong
         throw new RuntimeException();
     }
}

我提出了一些解决方案(我在本主题中发表了它)。

    public void validateRangeSequence(sequenceDto dto) {

        Flux<BigInteger> sequenceFlux = dto.getSequenceFlux();
        BigInteger startRange = dto.getStartRange();
        BigInteger endRange = dto.getEndRange();

        Mono<Boolean> isStartRangeMember = sequenceFlux.hasElement(startRange);
        
        Mono<Boolean> isEndRangeMember = sequenceFlux.hasElement(endRange);

        if ( !isStartRangeMember.equals(isEndRangeMember) ){
            throw new RuntimeException("error");
        }

但是它并没有像预期的那样工作,即使是正确的结果也会导致异常。

使现代化

public void validateRangeSeq(RangeSequenceDto dto) {

        Flux<BigInteger> sequenceFlux = dto.getSequenceFlux();
        BigInteger startRange = dto.getStartRange();
        BigInteger endRange = dto.getEndRange();

        Mono<Boolean> isStartRangeMember = sequenceFlux.hasElement(startRange);
        Mono<Boolean> isEndRangeMember = sequenceFlux.hasElement(endRange);

        sequenceFlux
                .handle((number, sink) -> {
                    if (!isStartRangeMember.equals(isEndRangeMember) ){
                        sink.error(new RangeWrongSequenceExc("It is wrong given range!."));
                    } else {
                        sink.next(number);
                    }
                });
    }

  • 不幸的是,这个决定也不管用。
        sequenceFlux
                .handle(((bigInteger, synchronousSink) -> {
                    if(!bigInteger.equals(startRange)){
                        synchronousSink.error(new RuntimeException("!!!!!!!!!!! ---- Wrong range!"));
                    } else {
                        synchronousSink.next(bigInteger);
                    }
                }));

它的一段代码-它不起作用。(不以任何方式做出反应)

谁认为这是什么?应该这样做还是有其他方法?

我不熟悉Spring中的Reactor栈,也不知道如何处理这种情况。

也许有人对如何组织这样的过滤有想法,并且不会阻止流中元素的处理。


共1个答案

匿名用户

你可以试着这样做

    Flux<Integer> yourStream = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).share();
Flux.zip(
        yourStream.filter(integer -> integer.equals(4)),
        yourStream.filter(integer -> integer.equals(6)),
        (integer, integer2) -> Tuple2.of(integer, integer2))
    .subscribe(System.out::println);