java 使用Spring WebClient被动读取行

r7xajy2e  于 2023-03-16  发布在  Java
关注(0)|答案(2)|浏览(170)

TLDR:如何使用Spring WebClient逐行处理GET响应(React式)?
详情:

  • 远程服务器返回大小高达20 Gb的响应
  • 我的服务单独解析行(每一行都被编码为UTF8),并对结果进行流处理(跳过99%的行)
  • 我不想将整个响应加载到内存中,例如,我想逐行解析服务器更新。

不幸的是,我没有找到任何将Flux<ByteBuffer>转换为Flux<String>的解决方案(通过在行尾拆分)。

**问题:**是否有嵌入式转换器/解码器可实现此功能?

可能的解决方案:

  • 创建临时缓冲区(最初为空)
  • 对于每个输入缓冲器:
  • 在新缓冲区之前添加临时缓冲区,重新创建临时缓冲区。
  • 尝试从此缓冲区读取单行(例如,读取到行尾):
  • 如果有字节剩余-返回此字符串并重复行阅读
  • 如果缓冲区已完成(例如,没有行分隔符):只需将这些字节复制到临时缓冲区。
  • 最后一次缓冲后:读取临时缓冲区直到结束。

另外:您不能只将输入缓冲区转换为字符串,因为一些utf8字符可以从缓冲区N开始,并在缓冲区N+1继续。

flmtquvp

flmtquvp1#

下面的代码可以工作,但是这是完全同步的代码(可能只有预取功能)。它使用Apache Http Components

HttpClientBuilder.create().build().use { client ->
    val responseHandler = ResponseHandler { response ->
        response.entity.content.use { content ->
            content.bufferedReader().use { buffered ->
                // create class, which can process each line. 
                val processor = StreamedLinesProcessor<TResult>()

                do {
                    val nextLine = buffered.readLine()
                    val needContinue = processor.processNextLine(nextLine)
                } while (needContinue)
                processor.getResult()
            }
        }
    }

    client.execute(HttpGet(url.toString()), responseHandler)
}
kmbjn2e3

kmbjn2e32#

我还没有找到一个嵌入式的,我正在使用类似下面代码的东西。我假设你正在使用单字符\n行结束,并且\n在这个类看到的任何编码中总是被编码为单字节字符,例如UTF16将不被支持。

Mono<ResponseEntity<Flux<String>>>  = webclient
                .post()
                .uri(...)
                .contentType(...)
                .bodyValue(...)
                .retrieve()
                .toEntityFlux(new LineBodyExtractor());
import lombok.NonNull;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.client.reactive.ClientHttpResponse;
import org.springframework.util.MimeType;
import org.springframework.web.reactive.function.BodyExtractor;
import reactor.core.publisher.Flux;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Optional.of;

/**
 * Only supports single char separators and charsets
 **/
public class LineBodyExtractor implements BodyExtractor<Flux<String>, ClientHttpResponse> {
    private static final String SEPARATOR_CHAR = "\n";

    @NonNull
    @Override
    public Flux<String> extract(ClientHttpResponse inputMessage, @NonNull Context ignored) {
        var charSet = of(inputMessage.getHeaders())
                .map(HttpHeaders::getContentType)
                .map(MimeType::getCharset)
                .orElse(UTF_8);
        var separatorBytes = SEPARATOR_CHAR.getBytes(charSet);
        if (separatorBytes.length != 1) {
            throw new IllegalStateException("Charset %s doesn't encode separator %s to a single byte and is unsupported".formatted(charSet, SEPARATOR_CHAR));
        }
        byte separator = separatorBytes[0];
        return inputMessage.getBody()
                .flatMap(buf -> this.splitBy(separator, buf))
                .windowUntil(buf -> buf.indexOf(i -> i == separator, 0) == 0)
                // need to join byte buffers before calling to string, UTF8 is multi byte
                .flatMap(DataBufferUtils::join)
                .map(buf -> buf.toString(charSet));
    }

    Flux<DataBuffer> splitBy(Byte separator, DataBuffer buf) {
        if (buf.capacity() <= 0) { // e.g. an empty trailing line should still emit an event
            return Flux.just(buf);
        }
        Flux<DataBuffer> r = Flux.empty();
        int separatorIndex;
        while ((separatorIndex = buf.indexOf(i -> i == separator, 0)) >= 0) {
            r = Flux.concat(r, Flux.just(buf.split(separatorIndex), buf.split(1)));
        }
        return Flux.concat(r, Flux.just(buf)).filter(b -> b.capacity() > 0);
    }
}

相关问题