我需要将大型数据集从服务器发送到客户端,流程如下
查询红移->对数据集进行一些转换->将其推送到客户端(通过RESTAPI,大约1GB)
目前我正在处理内存中的所有数据,所以堆已耗尽。
所以我计划使用streamingresponsebody,这样数据就可以作为块发送,这样api就可以使用最小的堆(如果我在这里错了,请纠正我!!)。
我在网上得到了一些参考,我添加了以下示例端点来测试代码
当我从浏览器测试我的端点时,我期望显示1到1000的数据,而我只得到1-写在控制台上(部分数据),我对streams是新手,有人能指出原因吗,我可以在日志中看到我迭代了1000次,我可以看到日志打印了1000次??
当我点击api时的实际数据
预期数据
这是我的控制器
import java.io.IOException;
import java.io.OutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
@RestController
public class StreamingController {
private final static Logger logger = LoggerFactory.getLogger(StreamingController.class);
@RequestMapping(method = RequestMethod.GET, value = {"/streamData"})
public StreamingResponseBody handleRequest () {
return new StreamingResponseBody() {
@Override
public void writeTo (OutputStream out) throws IOException {
for (int i = 0; i < 1000; i++) {
out.write((Integer.toString(i) + " - ")
.getBytes());
out.flush();
logger.debug("Flushed {}",i);
try {
Thread.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
}
}
并添加了如下同步配置
import java.util.concurrent.Callable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.aop.interceptor.SimpleAsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.context.request.NativeWebRequest;
import org.springframework.web.context.request.async.CallableProcessingInterceptor;
import org.springframework.web.context.request.async.TimeoutCallableProcessingInterceptor;
import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
@Configuration
@EnableAsync
@EnableScheduling
public class AsyncConfiguration implements AsyncConfigurer{
private final static Logger logger = LoggerFactory.getLogger(AsyncConfiguration.class);
@Override
@Bean (name = "taskExecutor")
public AsyncTaskExecutor getAsyncExecutor() {
logger.debug("Creating Async Task Executor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25);
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new SimpleAsyncUncaughtExceptionHandler();
}
/**Configure async support for Spring MVC. */
@Bean
public WebMvcConfigurer webMvcConfigurerConfigurer(AsyncTaskExecutor taskExecutor,
CallableProcessingInterceptor callableProcessingInterceptor) {
return new WebMvcConfigurer() {
@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
configurer.setDefaultTimeout(-1).setTaskExecutor(taskExecutor);
configurer.registerCallableInterceptors(callableProcessingInterceptor);
WebMvcConfigurer.super.configureAsyncSupport(configurer);
}
};
}
@Bean
public CallableProcessingInterceptor callableProcessingInterceptor() {
return new TimeoutCallableProcessingInterceptor() {
@Override
public <T> Object handleTimeout(NativeWebRequest request, Callable<T> task) throws
Exception {
logger.error("timeout!");
return super.handleTimeout(request, task);
}
};
}
}
我指的是这篇使用流式响应体的文章https://www.logicbig.com/tutorials/spring-framework/spring-web-mvc/streaming-response-body.html
暂无答案!
目前还没有任何答案,快来回答吧!