我有一个方法可以验证Stream中包含的一些元素。
例如,有一个序列有一系列数字(数字不重复),每个数字都比另一个大:
1, 20, 35, 39, 45, 43
... 有必要检查此流中是否存在指定范围,例如35。。。如果没有这样的范围,那么你需要抛出一个异常。
但由于这是异步处理,通常的方法在这里不起作用。毕竟,我们处理流中的元素,不知道下一个元素是什么。
该序列的元素需要折叠,添加应逐渐进行(一旦找到元素的初始范围)。
在服务期间,您需要检查生成的序列中是否存在endpoint,以及何时接收到整个元素流,但这一点不是,然后请求异常,因为未接收到指定范围的上限
此外,在找到指定范围的起点之前,不要开始计算,
虽然我们不能阻止流,否则我们将得到相同的Exstrong TextException。
如何组织这种检查?
当我使用常规线程时,它看起来是这样的:
private boolean isRangeValues() {
List<BigInteger> sequence = Arrays
.asList(
new BigInteger("11"),
new BigInteger("15"),
new BigInteger("23"),
new BigInteger("27"),
new BigInteger("30"));
BigInteger startRange = new BigInteger("15");
BigInteger finishRange = new BigInteger("27");
boolean isStartRangeMember = sequence.contains(startRange);
boolean isFinishRangeMembe = sequence.contains(finishRange);
return isStartRangeMember && isFinishRangeMember;
}
但是我有一个任务来处理以一定间隔生成的元素流。为了得到结果,在Spring中使用了Reactor栈,我在Flux中得到了结果。
只需转换为列表和进程,-它不会工作,会有异常。过滤完这些元素后,流将继续被处理。
但是如果我在过滤时看到数据验证中的错误(在这种情况下,不需要任何元素),那么我将需要请求异常,该异常将被全局处理并返回给客户端。
@GetMapping("v1/sequence/{startRange}/{endRange}")
Mono<BigInteger> getSumSequence(
@PathVariable BigInteger startRange,
@PathVariable BigInteger endRange) {
Flux<BigInteger> sequenceFlux = sequenceGenerated();
validateSequence(sequenceFlux)
return sum(sequenceFlux );
}
private Mono<BigInteger> sum (Flux<BigInteger> sequenceFlux ){
.....
}
private void validateSequence(Flux<BigInteger> sequenceFlux){
... is wrong
throw new RuntimeException();
}
}
我提出了一些解决方案(我在本主题中发表了它)。
public void validateRangeSequence(sequenceDto dto) {
Flux<BigInteger> sequenceFlux = dto.getSequenceFlux();
BigInteger startRange = dto.getStartRange();
BigInteger endRange = dto.getEndRange();
Mono<Boolean> isStartRangeMember = sequenceFlux.hasElement(startRange);
Mono<Boolean> isEndRangeMember = sequenceFlux.hasElement(endRange);
if ( !isStartRangeMember.equals(isEndRangeMember) ){
throw new RuntimeException("error");
}
但是它并没有像预期的那样工作,即使是正确的结果也会导致异常。
使现代化
public void validateRangeSeq(RangeSequenceDto dto) {
Flux<BigInteger> sequenceFlux = dto.getSequenceFlux();
BigInteger startRange = dto.getStartRange();
BigInteger endRange = dto.getEndRange();
Mono<Boolean> isStartRangeMember = sequenceFlux.hasElement(startRange);
Mono<Boolean> isEndRangeMember = sequenceFlux.hasElement(endRange);
sequenceFlux
.handle((number, sink) -> {
if (!isStartRangeMember.equals(isEndRangeMember) ){
sink.error(new RangeWrongSequenceExc("It is wrong given range!."));
} else {
sink.next(number);
}
});
}
sequenceFlux
.handle(((bigInteger, synchronousSink) -> {
if(!bigInteger.equals(startRange)){
synchronousSink.error(new RuntimeException("!!!!!!!!!!! ---- Wrong range!"));
} else {
synchronousSink.next(bigInteger);
}
}));
它的一段代码-它不起作用。(不以任何方式做出反应)
谁认为这是什么?应该这样做还是有其他方法?
我不熟悉Spring中的Reactor栈,也不知道如何处理这种情况。
也许有人对如何组织这样的过滤有想法,并且不会阻止流中元素的处理。
你可以试着这样做
Flux<Integer> yourStream = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).share();
Flux.zip(
yourStream.filter(integer -> integer.equals(4)),
yourStream.filter(integer -> integer.equals(6)),
(integer, integer2) -> Tuple2.of(integer, integer2))
.subscribe(System.out::println);