我有一个用例,我与另一个通过HTTP执行长轮询的服务同步数据。为此,我使用了一个SynchronousQueue,它工作得很好,只是当我从消费服务接收到一个轮询时,我会在队列上调用take,但远程服务可能会死亡,导致与HTTP请求相关联的线程保持活动状态,而最终不会返回任何HTTP响应。尽管如此,take方法最终还是会在该线程中返回,并消耗掉随后丢失的数据。我该如何处理这个问题?
SynchronousQueue
take
1aaf6o9v1#
不知道是否有更好的解决方案(想知道),但我分享我的:
private record ContentAndFile(LongPollingDto dto, File file) { } private Queue<CompletableFuture<LongPollingDto>> consumers = new ConcurrentLinkedQueue<>(); private Queue<ContentAndFile> messages = new ConcurrentLinkedQueue<>(); public CompletableFuture<LongPollingDto> getFutureDto() throws IOException { CompletableFuture<LongPollingDto> future = new CompletableFuture<>(); consumers.add(future); match(); return future; } public void enqueue(LongPollingDto content) throws IOException { String uniqueFileName = UUID.randomUUID().toString() + ".json"; File file = new File(syncTmpDataDirectory, uniqueFileName); writer.writeValue(file, content); doEnqueue(content, file); match(); } @EventListener(ApplicationReadyEvent.class) protected void synchronizeTemporaryFiles() throws IOException { File tmpDir = new File(syncTmpDataDirectory); List<File> files = Arrays.asList(tmpDir.listFiles()); files.sort(Comparator.comparing(File::lastModified)); files.forEach(x -> { try { LongPollingDto dto = mapper.readValue(x, LongPollingDto.class); doEnqueue(dto, x); } catch (IOException e) { log.error("Error when syncing file: {}", x.getAbsolutePath(), e); } }); match(); } private void match() throws IOException { if (!consumers.isEmpty() && !messages.isEmpty()) { CompletableFuture<LongPollingDto> future = consumers.poll(); if (!future.isCancelled()) { ContentAndFile contentAndFile = messages.poll(); future.complete(contentAndFile.dto); Files.delete(contentAndFile.file.toPath()); } match(); } } private void doEnqueue(LongPollingDto dto, File file) { messages.add(new ContentAndFile(dto, file)); }
@GetMapping("long-polling") public DeferredResult<LongPollingDto> longPolling() throws IOException { DeferredResult<LongPollingDto> result = new DeferredResult<>(); CompletableFuture<LongPollingDto> futureDto = longPollingService.getFutureDto(); executor.execute(() -> { try { result.setResult(futureDto.get()); } catch (InterruptedException | ExecutionException e) { log.error("An error occurred when transferring the file: ", e); Thread.currentThread().interrupt(); } }); result.onCompletion(() -> futureDto.cancel(true)); return result; }
这个解决方案允许我有多个并行的消费者来执行长轮询,并且它允许在客户端死亡时清理消费者(以避免数据丢失)。它还将消息持久保存在磁盘上,以避免在应用程序停机时丢失数据。
1条答案
按热度按时间1aaf6o9v1#
不知道是否有更好的解决方案(想知道),但我分享我的:
服务
控制器
这个解决方案允许我有多个并行的消费者来执行长轮询,并且它允许在客户端死亡时清理消费者(以避免数据丢失)。它还将消息持久保存在磁盘上,以避免在应用程序停机时丢失数据。