java 如何处理同步队列中的消费者线程中断?

5uzkadbs  于 2023-05-21  发布在  Java
关注(0)|答案(1)|浏览(116)

我有一个用例,我与另一个通过HTTP执行长轮询的服务同步数据。
为此,我使用了一个SynchronousQueue,它工作得很好,只是当我从消费服务接收到一个轮询时,我会在队列上调用take,但远程服务可能会死亡,导致与HTTP请求相关联的线程保持活动状态,而最终不会返回任何HTTP响应。尽管如此,take方法最终还是会在该线程中返回,并消耗掉随后丢失的数据。
我该如何处理这个问题?

1aaf6o9v

1aaf6o9v1#

不知道是否有更好的解决方案(想知道),但我分享我的:

服务

private record ContentAndFile(LongPollingDto dto, File file) {
}

private Queue<CompletableFuture<LongPollingDto>> consumers = new ConcurrentLinkedQueue<>();
private Queue<ContentAndFile> messages = new ConcurrentLinkedQueue<>();

public CompletableFuture<LongPollingDto> getFutureDto() throws IOException {
    CompletableFuture<LongPollingDto> future = new CompletableFuture<>();
    consumers.add(future);
    match();

    return future;
}

public void enqueue(LongPollingDto content) throws IOException {
    String uniqueFileName = UUID.randomUUID().toString() + ".json";
    File file = new File(syncTmpDataDirectory, uniqueFileName);
    writer.writeValue(file, content);
    doEnqueue(content, file);
    match();
}

@EventListener(ApplicationReadyEvent.class)
protected void synchronizeTemporaryFiles() throws IOException {
    File tmpDir = new File(syncTmpDataDirectory);

    List<File> files = Arrays.asList(tmpDir.listFiles());
    files.sort(Comparator.comparing(File::lastModified));

    files.forEach(x -> {
        try {
            LongPollingDto dto = mapper.readValue(x, LongPollingDto.class);
            doEnqueue(dto, x);
        } catch (IOException e) {
            log.error("Error when syncing file: {}", x.getAbsolutePath(), e);
        }
    });

    match();
}

private void match() throws IOException {
    if (!consumers.isEmpty() && !messages.isEmpty()) {
        CompletableFuture<LongPollingDto> future = consumers.poll();
        if (!future.isCancelled()) {
            ContentAndFile contentAndFile = messages.poll();
            future.complete(contentAndFile.dto);
            Files.delete(contentAndFile.file.toPath());
        }

        match();
    }
}

private void doEnqueue(LongPollingDto dto, File file) {
    messages.add(new ContentAndFile(dto, file));
}

控制器

@GetMapping("long-polling")
public DeferredResult<LongPollingDto> longPolling() throws IOException {

    DeferredResult<LongPollingDto> result = new DeferredResult<>();

    CompletableFuture<LongPollingDto> futureDto = longPollingService.getFutureDto();

    executor.execute(() -> {
        try {
            result.setResult(futureDto.get());
        } catch (InterruptedException | ExecutionException e) {
            log.error("An error occurred when transferring the file: ", e);
            Thread.currentThread().interrupt();
        }
    });

    result.onCompletion(() -> futureDto.cancel(true));

    return result;
}

这个解决方案允许我有多个并行的消费者来执行长轮询,并且它允许在客户端死亡时清理消费者(以避免数据丢失)。它还将消息持久保存在磁盘上,以避免在应用程序停机时丢失数据。

相关问题