java—在SpringBoot应用程序中使用redis流通过http长轮询阻止http响应

yx2lnoni  于 2021-06-08  发布在  Redis
关注(0)|答案(1)|浏览(531)

我有一个springbootweb应用程序,它的功能是更新一个名为 StudioLinking . 这个实体描述了两个物联网设备之间的一个临时的、可变的、描述性的逻辑链接,我的web应用就是它们的云服务。这些设备之间的联系在本质上是短暂的,但是 StudioLinking 实体保留在数据库中以进行报告。 StudioLinking 使用spring数据/hibernate以传统方式存储到基于sql的数据存储中。这个studiolinking实体将不时地用restapi中的新信息进行更新。当链接更新时,设备需要响应(改变颜色、音量等)。目前,这是通过每5秒轮询一次来处理的,但这会造成从人类用户向系统输入更新到物联网设备实际更新的延迟。可能只有一毫秒,也可能长达5秒!显然,增加投票频率是不可持续的,绝大多数时候根本没有更新!
因此,我正在尝试在同一个应用程序上开发另一个RESTAPI,该应用程序使用http长轮询,当给定的studioLink实体更新时或超时后将返回。监听器不支持websocket或类似的东西,这给我留下了很长的轮询时间。长时间的轮询可能会留下一个竞争条件,在这种情况下,您必须考虑这样一种可能性:对于连续的消息,在http请求之间有一条消息可能会“丢失”(当连接关闭和打开时,如果我使用了pub/sub,则可能会出现一个新的“更新”,而不会被“注意到”)。
需要注意的是,这个“订阅更新”api应该只返回 StudioLinking ,但仅当存在实际更新或自上次签入后发生更新时才应这样做。“订阅更新”客户机最初会发布一个api请求来设置一个新的侦听会话,并将其传递给服务器,以便服务器知道他们是谁。因为有可能多个设备需要监视同一设备的更新 StudioLinking 实体。我相信我可以通过在redisxread中使用单独命名的消费者来实现这一点(请记住这一点,以便在后面的问题中使用)
经过数小时的研究,我相信实现这一点的方法是使用redis流。
我在spring data redis中找到了关于redis流的以下两个链接:
https://www.vinsguru.com/redis-reactive-stream-real-time-producing-consuming-streams-with-spring-boot/https用法:/medium.com//@amitptl.in/redis-stream-in-action-using-java-and-spring-data-redis-a73257f9a281
我也读过这个关于长轮询的链接,这两个链接在长轮询期间都有一个睡眠计时器,这是为了演示,但显然我想做一些有用的事情。
https://www.baeldung.com/spring-deferred-result
这两个链接都非常有用。现在,我没有问题弄清楚如何将更新发布到redis流-(这是未经测试的“伪代码”,但我预计实现它不会有任何问题)

// In my StudioLinking Entity

@PostUpdate
public void postToRedis() {
    StudioLinking link = this;
    ObjectRecord<String, StudioLinking> record = StreamRecords.newRecord()
            .ofObject(link)
            .withStreamKey(streamKey); //I am creating a stream for each individual linking probably?
    this.redisTemplate
            .opsForStream()
            .add(record)
            .subscribe(System.out::println);
    atomicInteger.incrementAndGet();
}

但是,当我订阅上述流时,我就一蹶不振了:所以基本上我想在这里做的是——请原谅那些被屠宰的伪代码,它只是出于想法的目的。我很清楚,代码并不能说明语言和框架的实际行为:)

// Parameter studioLinkingID refers to the StudioLinking that the requester wants to monitor
// updateList is a unique token to track individual consumers in Redis
@GetMapping("/subscribe-to-updates/{linkId}/{updatesId}")
public DeferredResult<ResponseEntity<?>> subscribeToUpdates(@PathVariable("linkId") Integer linkId, @PathVariable("updatesId") Integer updatesId) {
    LOG.info("Received async-deferredresult request");
    DeferredResult<ResponseEntity<?>> output = new DeferredResult<>(5000l);

    deferredResult.onTimeout(() -> 
      deferredResult.setErrorResult(
        ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT)
          .body("IT WAS NOT UPDATED!")));

    ForkJoinPool.commonPool().submit(() -> {
        //----------------------------------------------
        // Made up stuff... here is where I want to subscribe to a stream and block!
        //----------------------------------------------
        LOG.info("Processing in separate thread");
        try {
            // Subscribe to Redis Stream, get any updates that happened between long-polls
            // then block until/if a new message comes over the stream
            var subscription = listenerContainer.receiveAutoAck(
                Consumer.from(studioLinkingID, updateList),
                StreamOffset.create(studioLinkingID, ReadOffset.lastConsumed()),
                streamListener);
            listenerContainer.start();
        } catch (InterruptedException e) {
        }
        output.setResult("IT WAS UPDATED!");
    });

    LOG.info("servlet thread freed");
    return output;
}

有没有一个很好的解释来解释我会怎么做?我认为答案就在https://docs.spring.io/spring-data/redis/docs/current/api/org/springframework/data/redis/core/reactiveredistemplate.html 但我不是一个足够大的SpringPower用户,无法真正理解java文档中的术语(spring文档非常好,但是javadocs是用密集的技术语言编写的,我很欣赏这种语言,但还不太明白)。
我的实施还有两个障碍:
我对Spring的确切理解还没有达到100%。我还没有完全理解为什么这些豆子到处飘浮。我想这就是为什么我不能在这里得到东西的关键。。。redis的配置在springether中浮动,我不知道如何调用它。我真的需要继续调查这个问题(这对我来说是一个巨大的障碍)。
这些 StudioLinking 它们的寿命很短,所以我也需要清理一下。我会实施这个稍后一旦我得到了整个事情的地面,我知道这将是必要的。

htzpubme

htzpubme1#

为什么不使用阻塞轮询机制?不需要使用SpringDataRedis的花哨东西。只需使用5秒的简单阻塞读取,因此此调用可能需要6秒左右的时间。您可以减少或增加阻塞超时。

class LinkStatus {
    private final boolean updated;

    LinkStatus(boolean updated) {
      this.updated = updated;
    }
  }

// Parameter studioLinkingID refers to the StudioLinking that the requester wants to monitor
  // updateList is a unique token to track individual consumers in Redis
  @GetMapping("/subscribe-to-updates/{linkId}/{updatesId}")
  public LinkStatus subscribeToUpdates(
      @PathVariable("linkId") Integer linkId, @PathVariable("updatesId") Integer updatesId) {
    StreamOperations<String, String, String> op = redisTemplate.opsForStream();

    Consumer consumer = Consumer.from("test-group", "test-consumer");
    // auto ack block stream read with size 1 with timeout of 5 seconds
    StreamReadOptions readOptions = StreamReadOptions.empty().block(Duration.ofSeconds(5)).count(1);
    List<MapRecord<String, String, String>> records =
        op.read(consumer, readOptions, StreamOffset.latest("test-stream"));
    return new LinkStatus(!CollectionUtils.isEmpty(records));
  }

相关问题