如何解决Spring云网关内存泄漏

qltillow  于 2024-01-06  发布在  Spring
关注(0)|答案(1)|浏览(169)

我在我的服务中使用Spring Cloud Gateway,并在我的LoggingFilter中使用下面的RequestDecorator作为 Package 器。

  1. public class RequestDecorator extends ServerHttpRequestDecorator {
  2. private final List<DataBuffer> dataBuffers = new ArrayList<>();
  3. public RequestDecorator(ServerHttpRequest delegate) {
  4. super(delegate);
  5. super.getBody()
  6. .map(
  7. dataBuffer -> {
  8. dataBuffers.add(dataBuffer);
  9. return dataBuffer;
  10. })
  11. .subscribe();
  12. }
  13. @Override
  14. public Flux<DataBuffer> getBody() {
  15. return copy();
  16. }
  17. private Flux<DataBuffer> copy() {
  18. return Flux.fromIterable(dataBuffers)
  19. .map(dataBuffer -> dataBuffer.factory().wrap(dataBuffer.asByteBuffer()));
  20. }
  21. }

字符串
当Jmeter使用该服务进行性能测试时,我在日志中发现了以下内存泄漏错误。

  1. i.n.u.ResourceLeakDetector : - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
  2. Recent access records:
  3. Created at:
  4. io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:403)
  5. io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
  6. io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
  7. io.netty.channel.unix.PreferredDirectByteBufAllocator.ioBuffer(PreferredDirectByteBufAllocator.java:53)
  8. io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:120)
  9. io.netty.channel.epoll.EpollRecvByteAllocatorHandle.allocate(EpollRecvByteAllocatorHandle.java:75)
  10. io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:785)
  11. io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499)
  12. io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397)
  13. io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
  14. io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
  15. io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
  16. java.base/java.lang.Thread.run(Thread.java:834)


在网上查看了一些内容后,我发现了下面的评论-“如果你使用DataBuffer,你可能会得到同样的错误。Spring有DataBufferUtils库来释放资源。”
DataBufferUtils.release(data Buffer);
但是我想知道我如何在我的装饰器类中使用它,因为我在我的LoggingFilter中使用了这个 Package 器。
有人能给点建议吗?

mzsu5hc0

mzsu5hc01#

你好兄弟这里是我的loggingFilter。我只记录请求对象,因为在我的项目中,响应对象可以是4 5 mb,我不想在日志中看到它。在你的情况下,你应该释放databuffer与这样的东西。doOnDiscard(PooledDataBuffer.class,DataBufferUtils::release);

  1. @Slf4j
  2. @Component
  3. public class LoggingFilter implements GlobalFilter, Ordered {
  4. @Autowired private Tracer tracer;
  5. private static final Set<String> LOGGABLE_CONTENT_TYPES =
  6. new HashSet<>(
  7. Arrays.asList(
  8. MediaType.APPLICATION_JSON_VALUE.toLowerCase(),
  9. MediaType.APPLICATION_JSON_UTF8_VALUE.toLowerCase(),
  10. MediaType.TEXT_PLAIN_VALUE,
  11. MediaType.TEXT_XML_VALUE));
  12. @Override
  13. public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
  14. String traceId =
  15. tracer.currentSpan() != null
  16. ? tracer.currentSpan().context().traceIdString()
  17. : tracer.nextSpan().context().traceIdString();
  18. ServerHttpRequest mutatedServerHttpRequest =
  19. exchange.getRequest().mutate().header("x-b3-traceid", traceId).build();
  20. var requestMutated =
  21. new ServerHttpRequestDecorator(mutatedServerHttpRequest) {
  22. @Override
  23. public Flux<DataBuffer> getBody() {
  24. var requestLogger = new Logger(getDelegate());
  25. if (LOGGABLE_CONTENT_TYPES.contains(
  26. String.valueOf(getHeaders().getContentType()).toLowerCase())) {
  27. return super.getBody()
  28. .map(
  29. ds -> {
  30. requestLogger.appendBody(ds.asByteBuffer());
  31. return ds;
  32. })
  33. .doFinally((s) -> requestLogger.log())
  34. .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
  35. } else {
  36. requestLogger.log();
  37. return super.getBody().doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
  38. }
  39. }
  40. };
  41. var responseMutated =
  42. new ServerHttpResponseDecorator(exchange.getResponse()) {
  43. @Override
  44. public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
  45. //var responseLogger = new Logger(getDelegate());
  46. if (LOGGABLE_CONTENT_TYPES.contains(
  47. String.valueOf(getHeaders().getContentType()).toLowerCase())) {
  48. return join(body)
  49. .flatMap(
  50. db -> {
  51. //responseLogger.appendBody(db.asByteBuffer());
  52. //responseLogger.log();
  53. return getDelegate().writeWith(Mono.just(db));
  54. }).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
  55. } else {
  56. //responseLogger.log();
  57. return getDelegate().writeWith(body).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
  58. }
  59. }
  60. };
  61. return chain.filter(
  62. exchange.mutate().request(requestMutated).response(responseMutated).build());
  63. }
  64. private Mono<? extends DataBuffer> join(Publisher<? extends DataBuffer> dataBuffers) {
  65. Assert.notNull(dataBuffers, "'dataBuffers' must not be null");
  66. return Flux.from(dataBuffers)
  67. .collectList()
  68. .filter((list) -> !list.isEmpty())
  69. .map((list) -> list.get(0).factory().join(list))
  70. .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
  71. }
  72. @Override
  73. public int getOrder() {
  74. return Ordered.HIGHEST_PRECEDENCE;
  75. }
  76. @ToString
  77. private class Logger {
  78. private Map<String, String> headers;
  79. private HttpStatus status;
  80. private String path;
  81. private String body;
  82. Logger(ServerHttpResponse response) {
  83. headers = response.getHeaders().toSingleValueMap();
  84. status = HttpStatus.valueOf(response.getStatusCode().value());
  85. }
  86. Logger(ServerHttpRequest request) {
  87. if (tracer.currentSpan() == null || tracer.currentSpan().context() == null) {
  88. MDC.put("traceId", request.getHeaders().getFirst("x-b3-traceid"));
  89. } else {
  90. MDC.put("traceId", tracer.currentSpan().context().traceIdString());
  91. }
  92. headers = request.getHeaders().toSingleValueMap();
  93. path = request.getMethod() + " " + request.getPath();
  94. }
  95. void appendBody(ByteBuffer byteBuffer) {
  96. body = StandardCharsets.UTF_8.decode(byteBuffer).toString();
  97. }
  98. void log() {
  99. log.info(this.toString());
  100. }
  101. }
  102. }

字符串

展开查看全部

相关问题