提问者:小点点

Java-Spring Boot-响应式Redis流(TEXT_EVENT_STREAM_VALUE)


我想写一个endpoint,它总是显示redis流的最新消息(反应式)。

这些实体看起来像这样{'key':'some_key','status':'some_string'}

所以我想得到以下结果:

  1. 页面被调用,内容将例如显示一个实体:
{'key' : 'abc', 'status' : 'status_A'}

页面未关闭

XADD mystream * key abc status statusB
{'key' : 'abc', 'status' : 'status_A'}
{'key' : 'abc', 'status' : 'status_B'}
    @GetMapping(value="/light/live/mock", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    @ResponseBody
    public Flux<Light> liveLightMock() {
        List<Light> test = Arrays.asList(new Light("key", "on") , new Light("key", "off"),
                new Light("key", "on") , new Light("key", "off"),
                new Light("key", "on") , new Light("key", "off"),
                new Light("key", "on") , new Light("key", "off"),
                new Light("key", "on") , new Light("key", "off"));
        return Flux.fromIterable(test).delayElements(Duration.ofMillis(500));
    }

列表的各个元素一个接一个地显示,项目之间的延迟为500ms。

然而,当我尝试访问Redis而不是模拟变体时,它不再起作用。我尝试先后测试部分功能。因此,我的想法首先有效保存(1)功能必须起作用,如果保存功能起作用,显示没有reactiv功能的旧记录必须起作用(2),最后但并非最不重要的是,如果两者都起作用,我有点需要让reactiv部分开始工作。

也许你们可以帮我让反应式零件正常工作。我已经工作了好几天,没有得到任何改进。

Ty伙计们:)

看起来它在工作。

    @GetMapping(value="/light/create", produces = MediaType.APPLICATION_JSON_VALUE)
    @ResponseBody
    public Flux<Light> createTestLight() {
        String status = (++statusIdx % 2 == 0) ? "on" : "off";
        Light light = new Light(Consts.LIGHT_ID, status);
        return LightRepository.save(light).flux();
    }
    @Override
    public Mono<Light> save(Light light) {
        Map<String, String> lightMap = new HashMap<>();
        lightMap.put("key", light.getKey());
        lightMap.put("status", light.getStatus());

        return operations.opsForStream(redisSerializationContext)
                .add("mystream", lightMap)
                .map(__ -> light);
    }

似乎起作用了,但没有恢复-

我如何让getLights返回与订阅流的TEXT_EVENT_STREAM_VALUE一起工作的东西?

    @Override
    public Flux<Object> getLights() {
        ReadOffset readOffset = ReadOffset.from("0");
        StreamOffset<String> offset = StreamOffset.fromStart("mystream"); //fromStart or Latest

        Function<? super MapRecord<String, Object, Object>, ? extends Publisher<?>> mapFunc = entries -> {
            Map<Object, Object> kvp = entries.getValue();
            String key = (String) kvp.get("key");
            String status = (String) kvp.get("status");
            Light light = new Light(key, status);
            return Flux.just(light);
        };

        return operations.opsForStream()
                .read(offset)
                .flatMap(mapFunc);
    }
    @GetMapping(value="/light/live", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    @ResponseBody
    public Flux<Object> lightLive() {
        return LightRepository.getLights();
    }

endpoint

    @GetMapping(value="/light/create", produces = MediaType.APPLICATION_JSON_VALUE)
    @ResponseBody
    public Flux<Light> createTestLight() {
        String status = (++statusIdx % 2 == 0) ? "on" : "off";
        Light light = new Light(Consts.LIGHT_ID, status);
        return LightRepository.save(light).flux();
    }
    @Override
    public Mono<Light> save(Light light) {
        Map<String, String> lightMap = new HashMap<>();
        lightMap.put("key", light.getKey());
        lightMap.put("status", light.getStatus());

        return operations.opsForStream(redisSerializationContext)
                .add("mystream", lightMap)
                .map(__ -> light);
    }

验证函数i

  1. 删除流,清空它
127.0.0.1:6379> del mystream
(integer) 1
127.0.0.1:6379> XLEN myStream
(integer) 0

两次调用Creation Endpoint/light/create我希望Stream现在有两个Item,on with status=on,一个with off

127.0.0.1:6379> XLEN mystream
(integer) 2
127.0.0.1:6379> xread STREAMS mystream 0-0
1) 1) "mystream"
   2) 1) 1) "1610456865517-0"
         2) 1) "key"
            2) "light_1"
            3) "status"
            4) "off"
      2) 1) "1610456866708-0"
         2) 1) "key"
            2) "light_1"
            3) "status"
            4) "on"

看起来保存部分正在工作。

似乎起作用了,但没有恢复-

    @Override
    public Flux<Object> getLights() {
        ReadOffset readOffset = ReadOffset.from("0");
        StreamOffset<String> offset = StreamOffset.fromStart("mystream"); //fromStart or Latest

        Function<? super MapRecord<String, Object, Object>, ? extends Publisher<?>> mapFunc = entries -> {
            Map<Object, Object> kvp = entries.getValue();
            String key = (String) kvp.get("key");
            String status = (String) kvp.get("status");
            Light light = new Light(key, status);
            return Flux.just(light);
        };

        return operations.opsForStream()
                .read(offset)
                .flatMap(mapFunc);
    }
    @GetMapping(value="/light/live", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    @ResponseBody
    public Flux<Object> lightLive() {
        return LightRepository.getLights();
    }
  1. 调用/light/live-

显示信息有效(1),(2)的添加部分有效,按终端检查,4)不起作用

因此,显示器正在工作,但它没有反应

在我刷新浏览器(5)后,我得到了预期的N 2条目-所以(2)也有效


共1个答案

匿名用户

这里有一个误解,被动阅读Redis并不意味着您订阅了新事件。

Reactive不会为您提供实时更新,它会调用一次Redis,它会显示那里的任何内容。因此,即使您等待一两天,UI/控制台中的任何内容都不会改变,您仍然会看到N个条目。

您需要使用Redis PUB/SUB,或者您需要重复调用Redis以获取最新更新。

编辑:

一个可行的解决方案…

  private List<Light> reactiveReadToList() {
    log.info("reactiveReadToList");
    return read().collectList().block();
  }

  private Flux<Light> read() {
    StreamOffset<Object> offset = StreamOffset.fromStart("mystream");
    return redisTemplate
        .opsForStream()
        .read(offset)
        .flatMap(
            e -> {
              Map<Object, Object> kvp = e.getValue();
              String key = (String) kvp.get("key");
              String id = (String) kvp.get("id");
              String status = (String) kvp.get("status");
              Light light = new Light(id, key, status);
              log.info("{}", light);
              return Flux.just(light);
            });
  }

使用响应式模板按需从Redis读取数据并使用偏移量将其发送到客户端的阅读器,它一次只发送一个事件,我们可以发送所有事件。

  @RequiredArgsConstructor
  class DataReader {
    @NonNull FluxSink<Light> sink;
    private List<Light> readLights = null;
    private int currentOffset = 0;
    void register() {
      readLights = reactiveReadToList();
      sink.onRequest(
          e -> {
            long demand = sink.requestedFromDownstream();
            for (int i = 0; i < demand && currentOffset < readLights.size(); i++, currentOffset++) {
              sink.next(readLights.get(currentOffset));
            }
            if (currentOffset == readLights.size()) {
              readLights = reactiveReadToList();
              currentOffset = 0;
            }
          });
    }
  }

一种使用DataReader生成流量的方法

  public Flux<Light> getLights() {
    return Flux.create(e -> new DataReader(e).register());
  }

现在我们在接收器上添加了一个onRequest方法来处理客户端的需求,它根据需要从Redis流中读取数据并将其发送给客户端。

这看起来非常CPU密集,如果没有更多的新事件,也许我们应该延迟调用,如果我们看到流中没有新元素,也许应该在寄存器方法中添加一个睡眠调用。