csv 使用Java中的StreamingOutput将字节从gRPC onNext流到客户端

xxls0lw8  于 2023-06-03  发布在  Java
关注(0)|答案(1)|浏览(210)

我是通过InputStream/OutputStream处理文件的新手。
我有一个需求,即在CSV的内容生成时对其进行流式处理,以便端点可以节省内存。
以下是我到目前为止所做的:
1.当请求从客户端(此时为终端)传入时,服务器通过gRPC调用向另一个服务(csv服务)发送请求。

  1. csv服务处理gRPC请求并开始生成CSV文件。
    1.对于每一行,它使用onNext方法以字节的形式发送内容。
recordObserver.onNext(Response.newBuilder().setData(ByteString.copyFrom(csvRow)).build());

1.主服务器(gRPC请求的客户端)接收数据块中存在的每个onNext字节。
我遇到的问题是如何将在onNext方法中接收到的字节流回原始客户端(终端),以便整个CSV文件可以以流的方式填充?
到目前为止,我已经做到了这一点,但是一旦从终端发送请求,连接就会关闭,并且当从服务器接收到第一个数据字节时,onNext方法中的outputStream.write(bytes);也会抛出异常。

@GET
@Produces(MediaType.APPLICATION_OCTET_STREAM)
@Path(DOWNLOAD_CSV)
StreamingOutput downloadCsv(@PathParam("Id") UUID Id) {
  return outputStream -> {
            try {
               
                mainService.downloadCsv(Id, outputStream);
                outputStream.flush();
            } catch (Exception e) {
                throw new WebApplicationException("Error occurred while attempting to download");
            }
        };

}

这是处理gRPC响应的客户端

public void downloadCsv(UUID Id, OutputStream outputStream) {
    Request request = Request.newBuilder()
            .setId(Id)
            .build();

    csvServiceStub.downloadCsv(request, new StreamObserver<Response>() {
        @Override
        public void onNext(Response value) {
            try {
                byte[] bytes = value.getBytes().toByteArray();
                outputStream.write(bytes);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        @Override
        public void onError(Throwable t) {
        }
        @Override
        public void onCompleted() {
        }
    });
}

下面是我通过终端发送的请求:

curl 'https://server-address/52097f0e-5dd8-49bd-98f1-69c31a73b62a/download/csv \
  -X 'GET' \
  -H 'authority: server-address' \
  -H 'accept: application/octet-stream' \
  -H 'accept-language: en-US,en;q=0.9,ja;q=0.8' \
  -H 'authorization: OBFUSCATED' \
  -H 'content-type: application/json' \
  -H 'cookie: __zlcmid=1BzlEPGJ1UvbeK4' \
  -H 'dnt: 1' \
  -H 'origin: server-address' \
  -H 'referer: server-address' \
  --compressed

我不知道我做错了什么,但正如我提到的,我面临着:
1.当我到达这个端点时,终端中的连接关闭。此处未报告错误。我的期望是它应该能够给予二进制输出。
1.在onNext方法中,outputStream.write(bytes);抛出一个异常。

4xy9mtcn

4xy9mtcn1#

回答我自己的问题:我必须使用forEachRemaining,同时使用BlockingStub接收字节。

相关问题