Spring云网关ServerHttpResponseDecorator writeWith在接收响应时未调用

mm5n2pyu  于 2023-06-05  发布在  Spring
关注(0)|答案(1)|浏览(159)

我有一个Spring Cloud Gateway应用程序,我想实现一个过滤器,它在将响应传递给客户端之前读取响应体。下面是自定义过滤器的代码:

@Component
public class CreateResourceGatewayFilterFactory extends AbstractGatewayFilterFactory<CreateResourceGatewayFilterFactory.Config> implements Ordered {

    private static final Logger log = LoggerFactory.getLogger(CreateResourceGatewayFilterFactory.class);
    
    public static class Config {

        private ResourceType resourceType;
        private String requiredRoles;

        public ResourceType getResourceType() {
            return resourceType;
        }
        public void setResourceType(ResourceType resourceType) {
            this.resourceType = resourceType;
        }

        public String getRequiredRoles() {
            return requiredRoles;
        }
        public void setRequiredRoles(String requiredRoles) {
            this.requiredRoles = requiredRoles;
        }
    }

    public CreateResourceGatewayFilterFactory() {
        super(Config.class);
    }

    @Override
    public List<String> shortcutFieldOrder() {
        List<String> order = new ArrayList<>(2);
        order.add("resourceType");
        order.add("requiredRoles");
        return order;
    }

    @Override
    public GatewayFilter apply(Config config) {

        return (exchange, chain) -> {
            log.debug("Received request at CreateResource filter.");
            return chain.filter(exchange.mutate().request(getDecoratedRequest(exchange.getRequest())).response(getDecoratedResponse(exchange.getResponse(), config.getResourceType())).build());
        };
    }

    private ServerHttpResponseDecorator getDecoratedResponse(ServerHttpResponse response, ResourceType resourceType) {
        return new ServerHttpResponseDecorator(response) {
            @Override
            public Mono<Void> writeWith(final Publisher<? extends DataBuffer> body) {
                log.debug("Inside response decorator");
                if (body instanceof Flux) {
                    log.debug("Received response from microservice.");
                    Flux<? extends DataBuffer> fluxBody = (Flux<? extends DataBuffer>) body;
                    return super.writeWith(fluxBody.buffer()
                        .map(dataBuffers -> {
                            DefaultDataBuffer joinedBuffers = new DefaultDataBufferFactory().join(dataBuffers);
                            byte[] content = new byte[joinedBuffers.readableByteCount()];
                            joinedBuffers.read(content);
                            String responseBody = new String(content, StandardCharsets.UTF_8);
                            log.debug("Buffered response parsed to String {}", responseBody);
                            return responseBody;
                        })
                        .flatMap(responseBody -> doSomething(resourceType, responseBody))
                        .map(responseBody -> response.bufferFactory().wrap(responseBody.getBytes())))
                    .onErrorResume(err -> {
                        log.error("Error extracting resoruce from message {}",err.getMessage());
                        return Mono.empty();
                    });
                }
                return super.writeWith(body);
            }
        };
    }

    private Flux<String> doSomething(ResourceType resourceType, String payload) {
        // DO SOMETHING WITH PAYLOAD
        return Flux.just(payload);
    }

    private ServerHttpRequest getDecoratedRequest(ServerHttpRequest request) {
        return new ServerHttpRequestDecorator(request) {
            @Override
            public Flux<DataBuffer> getBody() {
                log.debug("requestId: {}, method: {} , url: {}", request.getId(), request.getMethodValue(), request.getURI());
                return request.getBody();
            }
        };
    }

    @Override
    public int getOrder() {
        return NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER - 1;
    }

我用的是post as reference
getDecoratedResponse是创建ServerHttpResponseDecorator对象的方法。CreateResourceGatewayFilterFactory是过滤器类,Config是配置类,它指定将传递给过滤器的参数。
application.yml中的DSL是:

spring:
  cloud:
    gateway:
      default-filters:
        - TokenRelay
      discovery:
        locator:
          enabled: true
          lower-case-service-id: true
          predicates:
            - name: Path
              args:
                pattern: "'/services/'+serviceId.toLowerCase()+'/**'"
          filters:
            - name: RewritePath
              args:
                regexp: "'/services/' + serviceId.toLowerCase() + '/(?<remaining>.*)'"
                replacement: "'/${remaining}'"
      routes:
        - id: sample_route
          uri: http://localhost:8097
          predicates:
            - Method=POST
            - Path=/api/resources/**
          filters:
            - CreateResource=DEVICE,SUPER_ADMIN

请求在过滤器和请求装饰器中被捕获,因为我可以看到日志,但是没有来自响应装饰器的日志。
响应的HTTP状态代码为201
我需要帮助来理解为什么响应装饰器没有被触发。
注意:这个过滤器的目的是读取响应正文。此进程可以在differnet线程中运行,因为它在将响应体发送回客户端之前不会修改响应体。如果有比ServerHttpResponseDecorator更好的方法,请给我建议。可能是后过滤器逻辑。

g6ll5ycj

g6ll5ycj1#

package com.javainuse.config;

import lombok.extern.slf4j.Slf4j;

import org.json.JSONObject;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DefaultDataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;

import com.fasterxml.jackson.core.JsonParser;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.io.ByteArrayOutputStream;
import java.nio.channels.Channels;
import java.nio.charset.StandardCharsets;

/**
 * Project: gateway-service-filter-properties.<br/>
 * Des: <br/>
 * User: HieuTT<br/>
 * Date: 04/08/2022<br/>
 */
@Component
@Slf4j
public class LoggingWebFilter implements WebFilter {

    private Logger logger = LoggerFactory.getLogger(LoggingWebFilter.class);

    /**
     * Process the Web request and (optionally) delegate to the next
     * {@code WebFilter} through the given {@link WebFilterChain}.
     *
     * @param exchange the current server exchange
     * @param chain    provides a way to delegate to the next filter
     * @return {@code Mono<Void>} to indicate when request processing is complete
     */
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        System.out.println("XXXXXXXXXXXXXXs347823489349");
        ServerHttpResponse response = exchange.getResponse();
        ServerHttpRequest request = exchange.getRequest();
        DataBufferFactory dataBufferFactory = response.bufferFactory();

        // log the request body
        ServerHttpRequest decoratedRequest = getDecoratedRequest(request);
        // log the response body
        ServerHttpResponseDecorator decoratedResponse = getDecoratedResponse(response, request, dataBufferFactory);
        return chain.filter(exchange.mutate().request(decoratedRequest).response(decoratedResponse).build());
    }

    private ServerHttpResponseDecorator getDecoratedResponse(ServerHttpResponse response, ServerHttpRequest request,
            DataBufferFactory dataBufferFactory) {
        return new ServerHttpResponseDecorator(response) {
            @Override
            public Mono<Void> writeWith(final Publisher<? extends DataBuffer> body) {

                if (body instanceof Flux) {

                    Flux<? extends DataBuffer> fluxBody = (Flux<? extends DataBuffer>) body;

                    return super.writeWith(fluxBody.buffer().map(dataBuffers -> {

                        DefaultDataBuffer joinedBuffers = new DefaultDataBufferFactory().join(dataBuffers);
                        byte[] content = new byte[joinedBuffers.readableByteCount()];
                        joinedBuffers.read(content);
                        String responseBody = new String(content, StandardCharsets.UTF_8);// MODIFY RESPONSE and Return
                                                                                            // the Modified response
                        JSONObject jsonObject = new JSONObject(responseBody);
                        System.out.println("final value red from : " + jsonObject.getString("message"));
                        jsonObject.put("message", "");
                        return dataBufferFactory.wrap(jsonObject.toString().getBytes());
                    }).switchIfEmpty(Flux.defer(() -> {

                        System.out.println("Write to database here");
                        return Flux.just();
                    }))).onErrorResume(err -> {
                        log.error("error while decorating Response: {}", err.getMessage());
                        return Mono.empty();
                    });

                } else {
                    System.out.println("2000000000");
                }
                return super.writeWith(body);
            }
        };
    }

    private ServerHttpRequest getDecoratedRequest(ServerHttpRequest request) {

        return new ServerHttpRequestDecorator(request) {
            @Override
            public Flux<DataBuffer> getBody() {

                log.info("requestId: {}, method: {} , url: {}", request.getId(), request.getMethodValue(),
                        request.getURI());
                return super.getBody().publishOn(Schedulers.elastic()).doOnNext(dataBuffer -> {
                    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
                        Channels.newChannel(baos).write(dataBuffer.asByteBuffer().asReadOnlyBuffer());
                        String body = baos.toString(StandardCharsets.UTF_8);
                        log.info("for requestId: {}, request body :{}", request.getId(), body);
                    } catch (Exception e) {
                        log.error(e.getMessage());
                    }
                });
            }
        };
    }
}

***我已经解决了这个问题。修饰后的响应是通过Webfilter调用的,而不是通过Gateway filter调用的。

你可以在API gateway中引用这段代码

相关问题