streamingreponsebody只发送部分数据

j13ufse2  于 2021-07-11  发布在  Java
关注(0)|答案(0)|浏览(303)

我需要将大型数据集从服务器发送到客户端,流程如下
查询红移->对数据集进行一些转换->将其推送到客户端(通过RESTAPI,大约1GB)
目前我正在处理内存中的所有数据,所以堆已耗尽。
所以我计划使用streamingresponsebody,这样数据就可以作为块发送,这样api就可以使用最小的堆(如果我在这里错了,请纠正我!!)。
我在网上得到了一些参考,我添加了以下示例端点来测试代码
当我从浏览器测试我的端点时,我期望显示1到1000的数据,而我只得到1-写在控制台上(部分数据),我对streams是新手,有人能指出原因吗,我可以在日志中看到我迭代了1000次,我可以看到日志打印了1000次??
当我点击api时的实际数据
预期数据
这是我的控制器

import java.io.IOException;
import java.io.OutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;

@RestController
public class StreamingController {

private final static Logger logger = LoggerFactory.getLogger(StreamingController.class);

@RequestMapping(method = RequestMethod.GET, value = {"/streamData"})
public StreamingResponseBody handleRequest () {

    return new StreamingResponseBody() {
        @Override
        public void writeTo (OutputStream out) throws IOException {
            for (int i = 0; i < 1000; i++) {
                out.write((Integer.toString(i) + " - ")
                                    .getBytes());
                out.flush();
                logger.debug("Flushed {}",i);
                try {
                    Thread.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    };
   }
 }

并添加了如下同步配置

import java.util.concurrent.Callable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.aop.interceptor.SimpleAsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.context.request.NativeWebRequest;
import org.springframework.web.context.request.async.CallableProcessingInterceptor;
import org.springframework.web.context.request.async.TimeoutCallableProcessingInterceptor;
import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

@Configuration
@EnableAsync
@EnableScheduling
public class AsyncConfiguration  implements AsyncConfigurer{

private final static Logger logger = LoggerFactory.getLogger(AsyncConfiguration.class);

    @Override
    @Bean (name = "taskExecutor")
    public AsyncTaskExecutor getAsyncExecutor() {
        logger.debug("Creating Async Task Executor");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(25);
        return executor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new SimpleAsyncUncaughtExceptionHandler();
    }

    /**Configure async support for Spring MVC. */
    @Bean
    public WebMvcConfigurer webMvcConfigurerConfigurer(AsyncTaskExecutor taskExecutor, 
      CallableProcessingInterceptor callableProcessingInterceptor) {
        return new WebMvcConfigurer() {
            @Override
            public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
                configurer.setDefaultTimeout(-1).setTaskExecutor(taskExecutor);
                configurer.registerCallableInterceptors(callableProcessingInterceptor);
                WebMvcConfigurer.super.configureAsyncSupport(configurer);
            }
        };
    }

    @Bean
    public CallableProcessingInterceptor callableProcessingInterceptor() {
        return new TimeoutCallableProcessingInterceptor() {
            @Override
            public <T> Object handleTimeout(NativeWebRequest request, Callable<T> task) throws 
                    Exception {
                logger.error("timeout!");
                return super.handleTimeout(request, task);
            }
         };
     }
  }

我指的是这篇使用流式响应体的文章https://www.logicbig.com/tutorials/spring-framework/spring-web-mvc/streaming-response-body.html

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题