使用GCP PubSub源在Flink Job上获取http2异常

qhhrdooz  于 2024-01-04  发布在  Apache
关注(0)|答案(1)|浏览(185)

我有一个使用GCP PubSub作为源的flink作业。尽管我能够处理我收到的关于pubsub主题的消息,但我发现它有几个问题:
1.先前处理的消息再次显示(表示未进行确认)
1.我在flink作业中看到grpc http2异常,沿着出现检查点日志
异常堆栈跟踪:

io.grpc.StatusRuntimeException: INTERNAL: http2 exception
    at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262)
    at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243)
    at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156)
    at com.google.pubsub.v1.SubscriberGrpc$SubscriberBlockingStub.acknowledge(SubscriberGrpc.java:1628)
    at org.apache.flink.streaming.connectors.gcp.pubsub.BlockingGrpcPubSubSubscriber.acknowledge(BlockingGrpcPubSubSubscriber.java:99)
    at org.apache.flink.streaming.connectors.gcp.pubsub.common.AcknowledgeOnCheckpoint.notifyCheckpointComplete(AcknowledgeOnCheckpoint.java:84)
    at org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.notifyCheckpointComplete(PubSubSource.java:208)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:99)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:330)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1092)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$11(StreamTask.java:1057)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$13(StreamTask.java:1080)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:619)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:583)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
    at java.lang.Thread.run(Thread.java:748)
Caused by: io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception: Header size exceeded max allowed size (10240)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception.connectionError(Http2Exception.java:103)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2CodecUtil.headerListSizeExceeded(Http2CodecUtil.java:245)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader$HeadersBlockBuilder.headerSizeExceeded(DefaultHttp2FrameReader.java:694)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader$HeadersBlockBuilder.addFragment(DefaultHttp2FrameReader.java:710)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader$2.processFragment(DefaultHttp2FrameReader.java:481)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readHeadersFrame(DefaultHttp2FrameReader.java:491)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:254)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:160)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:174)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1526)
    at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1275)
    at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1322)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
    at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
    at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
    at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)

字符串
以下似乎是根本原因

Caused by: io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception: Header size exceeded max allowed size (10240)


下面是我如何配置我的PubSubSource对象:

PubSubSource pubSubSource =
        PubSubSource.newBuilder()
            .withDeserializationSchema(new SomeDeserializer())
            .withProjectName("some-project")
            .withSubscriptionName("some-subscription")
            .withCredentials(GoogleCredentials.fromStream(file))
            .withPubSubSubscriberFactory(100, Duration.ofSeconds(90), 3)
            .build();


检查点设置间隔为60秒(小于确认截止时间)
造成上述两个问题的原因是什么?

cbeh67ev

cbeh67ev1#

我将maxInboundMetadataSize进一步增加到1MB,然后它开始工作。

public class CustomPubSubSubscriberFactory implements PubSubSubscriberFactory {
  private final int retries;
  private final Duration timeout;
  private final int maxMessagesPerPull;
  private final String projectSubscriptionName;

  public CustomPubSubSubscriberFactory(
      String projectSubscriptionName, int retries, Duration pullTimeout, int maxMessagesPerPull) {
    this.retries = retries;
    this.timeout = pullTimeout;
    this.maxMessagesPerPull = maxMessagesPerPull;
    this.projectSubscriptionName = projectSubscriptionName;
  }

  @Override
  public PubSubSubscriber getSubscriber(Credentials credentials) throws IOException {
    ManagedChannel channel =
        NettyChannelBuilder.forTarget(SubscriberStubSettings.getDefaultEndpoint())
            .negotiationType(NegotiationType.TLS)
            .sslContext(GrpcSslContexts.forClient().ciphers(null).build())
            .maxInboundMetadataSize(1048576)
            .build();

    PullRequest pullRequest =
        PullRequest.newBuilder()
            .setMaxMessages(maxMessagesPerPull)
            .setSubscription(projectSubscriptionName)
            .build();
    SubscriberGrpc.SubscriberBlockingStub stub =
        SubscriberGrpc.newBlockingStub(channel)
            .withCallCredentials(MoreCallCredentials.from(credentials));
    return new BlockingGrpcPubSubSubscriber(
        projectSubscriptionName, channel, stub, pullRequest, retries, timeout);
  }
}

字符串
援引为

PubSubSource pubSubSource =
        PubSubSource.newBuilder()
            .withDeserializationSchema(new SomeDeserializer())
            .withProjectName("some-project")
            .withSubscriptionName("some-subscription")
            .withCredentials(GoogleCredentials.fromStream(file))
            .withPubSubSubscriberFactory(
                new CustomPubSubSubscriberFactory(
                    ProjectSubscriptionName.format(
                        "some-project", "some-subscription"),
                    3,
                    Duration.ofSeconds(90),
                    100))            
            .build();

相关问题