我是通过InputStream/OutputStream处理文件的新手。
我有一个需求,即在CSV的内容生成时对其进行流式处理,以便端点可以节省内存。
以下是我到目前为止所做的:
1.当请求从客户端(此时为终端)传入时,服务器通过gRPC调用向另一个服务(csv服务)发送请求。
- csv服务处理gRPC请求并开始生成CSV文件。
1.对于每一行,它使用onNext方法以字节的形式发送内容。
recordObserver.onNext(Response.newBuilder().setData(ByteString.copyFrom(csvRow)).build());
1.主服务器(gRPC请求的客户端)接收数据块中存在的每个onNext字节。
我遇到的问题是如何将在onNext方法中接收到的字节流回原始客户端(终端),以便整个CSV文件可以以流的方式填充?
到目前为止,我已经做到了这一点,但是一旦从终端发送请求,连接就会关闭,并且当从服务器接收到第一个数据字节时,onNext方法中的outputStream.write(bytes);
也会抛出异常。
@GET
@Produces(MediaType.APPLICATION_OCTET_STREAM)
@Path(DOWNLOAD_CSV)
StreamingOutput downloadCsv(@PathParam("Id") UUID Id) {
return outputStream -> {
try {
mainService.downloadCsv(Id, outputStream);
outputStream.flush();
} catch (Exception e) {
throw new WebApplicationException("Error occurred while attempting to download");
}
};
}
这是处理gRPC响应的客户端
public void downloadCsv(UUID Id, OutputStream outputStream) {
Request request = Request.newBuilder()
.setId(Id)
.build();
csvServiceStub.downloadCsv(request, new StreamObserver<Response>() {
@Override
public void onNext(Response value) {
try {
byte[] bytes = value.getBytes().toByteArray();
outputStream.write(bytes);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
}
});
}
下面是我通过终端发送的请求:
curl 'https://server-address/52097f0e-5dd8-49bd-98f1-69c31a73b62a/download/csv \
-X 'GET' \
-H 'authority: server-address' \
-H 'accept: application/octet-stream' \
-H 'accept-language: en-US,en;q=0.9,ja;q=0.8' \
-H 'authorization: OBFUSCATED' \
-H 'content-type: application/json' \
-H 'cookie: __zlcmid=1BzlEPGJ1UvbeK4' \
-H 'dnt: 1' \
-H 'origin: server-address' \
-H 'referer: server-address' \
--compressed
我不知道我做错了什么,但正如我提到的,我面临着:
1.当我到达这个端点时,终端中的连接关闭。此处未报告错误。我的期望是它应该能够给予二进制输出。
1.在onNext方法中,outputStream.write(bytes);
抛出一个异常。
1条答案
按热度按时间4xy9mtcn1#
回答我自己的问题:我必须使用
forEachRemaining
,同时使用BlockingStub
接收字节。