ksql |消费者滞后|融合云|

oipij1gg  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(441)

我使用kafka confluent cloud作为生态系统中的消息队列。有两个主题,a和b。
b中的消息在a的消息发布后稍晚到达延迟30秒)
我将使用ksql加入这两个主题,ksql服务器部署在内部,并连接到合流云。在ksql中,我将这两个主题作为基于公共标识符的流连接起来,比如requestid,并创建一个新的流c。c是连接流。
有时,c steam显示它产生了一个延迟,它没有处理a&b的消息。这个延迟在合流云ui中是可见的。当我登录到ksql服务器时,我可以看到以下错误,在重新启动ksql服务器后,一切正常。这种情况在2-3天内间歇性发生。
这是我在部署在内部的ksql服务器中的配置。


# A comma separated list of the Confluent Cloud broker endpoints

bootstrap.servers=${bootstrap_servers}
ksql.internal.topic.replicas=3
ksql.streams.replication.factor=3
ksql.logging.processing.topic.replication.factor=3
listeners=http://0.0.0.0:8088
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${bootstrap_auth_key}" password="${bootstrap_secret_key}";

# Schema Registry specific settings

ksql.schema.registry.basic.auth.credentials.source=USER_INFO
ksql.schema.registry.basic.auth.user.info=${schema_registry_auth_key}:${schema_registry_secret_key}
ksql.schema.registry.url=${schema_registry_url}

# Additinoal settings

ksql.streams.producer.delivery.timeout.ms=2147483647
ksql.streams.producer.max.block.ms=9223372036854775807
ksql.query.pull.enable.standby.reads=false

# ksql.streams.num.standby.replicas=3 // TODO if we need HA 1+1

# num.standby.replicas=3

# Automatically create the processing log topic if it does not already exist:

ksql.logging.processing.topic.auto.create=true

# Automatically create a stream within KSQL for the processing log:

ksql.logging.processing.stream.auto.create=true
compression.type=snappy
ksql.streams.state.dir=${base_storage_directory}/kafka-streams

ksql服务器日志中的错误消息。

[2020-11-25 14:08:49,785] INFO stream-thread [_confluent-ksql-default_query_CSAS_WINYES01QUERY_0-04b1e77c-e2ba-4511-b7fd-1882f63796e5-StreamThread-2] State transition from RUNNING to PARTITIONS_ASSIGNED (org.apache.kafka.streams.processor.internals.StreamThread:220)
[2020-11-25 14:08:49,790] ERROR [Consumer clientId=_confluent-ksql-default_query_CSAS_WINYES01QUERY_0-04b1e77c-e2ba-4511-b7fd-1882f63796e5-StreamThread-3-consumer, groupId=_confluent-ksql-default_query_CSAS_WINYES01QUERY_0] Offset commit failed on partition yes01-0 at offset 32606388: The coordinator is not aware of this member. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1185)
[2020-11-25 14:08:49,790] ERROR [Consumer clientId=_confluent-ksql-default_query_CSAS_WINYES01QUERY_0-04b1e77c-e2ba-4511-b7fd-1882f63796e5-StreamThread-3-consumer, groupId=_confluent-ksql-default_query_CSAS_WINYES01QUERY_0] Offset commit failed on partition yes01-0 at offset 32606388: The coordinator is not aware of this member. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1185)
[2020-11-25 14:08:49,790] WARN stream-thread [_confluent-ksql-default_query_CSAS_WINYES01QUERY_0-04b1e77c-e2ba-4511-b7fd-1882f63796e5-StreamThread-3] Detected that the thread is being fenced. This implies that this thread missed a rebalance and dropped out of the consumer group. Will close out all assigned tasks and rejoin the consumer group. (org.apache.kafka.streams.processor.internals.StreamThread:572)
org.apache.kafka.streams.errors.TaskMigratedException: Consumer committing offsets failed, indicating the corresponding thread is no longer part of the group; it means all tasks belonging to this thread should be migrated.
        at org.apache.kafka.streams.processor.internals.TaskManager.commitOffsetsOrTransaction(TaskManager.java:1009)
        at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:962)
        at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:851)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:714)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1251)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1158)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1132)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1107)
        at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)

编辑:
在此异常期间。我已经验证了ksql服务器有足够的ram和cpu

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题