为什么Kafka的经纪人在信息生产短暂的“沉默”之后滞后如此之高?

5us2dqdw  于 2023-01-04  发布在  Apache
关注(0)|答案(1)|浏览(170)

我花了几个小时试图优化我的Kafka经纪人的延迟,但没有成功。我遇到的问题是,最初的消息产生一个主题后,很短的时间(5 - 10秒或冷启动)的"静默"大约需要3 - 4秒才能出现在主题中,并准备好供用户使用。消息开始出现在主题中后,消息之间的延迟非常好(〈1ms),但消息开始出现在主题中之前的初始延迟非常高。我正在开发一个延迟要求非常低的应用程序,因此这对我来说是不可接受的,所以我希望在代理或使用者中有一个设置,我可以更改它来摆脱这个高初始延迟。
最初在数据库插入过程中注意到问题,因为有字段跟踪消息创建和插入数据库的时间。但是,我不认为问题出在数据库上,因为在Confluent Control Center中,我看到主题中的消息有相同的延迟,大约6 - 7ms是由于数据插入数据库造成的。
以下是我的设置:

    • 群集:**

1个代理,每个主题2个分区,8核2.8ghz cpu(开发)

    • 我尝试在Kafka Broker中更改的参数:**
  • log.flush.interval.messages
  • log.flush.interval.ms
  • log.flush.scheduler.interval.ms
    • 尝试在Kafka JDBC连接器中更改的参数:**
  • heartbeat.interval.ms

另外,我使用的是铁 rust ,但应该很容易阅读配置

    • 生产者设置:**
let producer: FutureProducer = ClientConfig::new()
    .set("bootstrap.servers", kafka_broker_address.clone())
    .set("batch.size", "1")
    .set("acks","all")
    .set("linger.ms","0")
    .set("compression.type", "lz4")
    .set("enable.idempotence", "true")
    .create()
    .expect("error");
    • Kafka代理设置(在Docker文件中):**
broker:
    image: confluentinc/cp-server:7.0.1
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_NUM_PARTITIONS: 2
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
    • Kafka Connect JDBC接收器连接器:**
curl -i -X PUT http://$KAFKA_CONNECT_SERVER_ADDRESS:$KAFKA_CONNECT_SERVER_PORT/connectors/db1-sink-postgres-topic_a/config \
     -H "Content-Type: application/json" \
     -d '{
            "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
            "value.converter.schema.registry.url": "http://schema-registry:8081",
            "value.converter": "io.confluent.connect.avro.AvroConverter",
            "key.converter.schema.registry.url": "http://schema-registry:8081",
            "key.converter": "io.confluent.connect.avro.AvroConverter",
            "connection.url": "jdbc:postgresql://'$DB_SERVER_ADDRESS_FROM_INSIDE_DOCKER_CONTAINER':'$DB_SERVER_PORT'/'$DB_NAME'",
            "connection.user": "'$DB_USER'",
            "connection.password": "'$DB_USER_PWD'",
            "consumer.override.isolation.level": "read_committed",
            "insert.mode": "upsert",
            "pk.mode": "record_value",
            "pk.fields": "id, year",
            "topics": "topic_a",
            "errors.log.enable":true,
            "errors.log.include.messages":true,
            "transforms": "TimestampConverter",
            "transforms.TimestampConverter.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value",
            "transforms.TimestampConverter.format":"yyyy-MM-dd HH:mm:ss.SSS z",
            "transforms.TimestampConverter.field":"program_datetime_eastern_when_retrieved",
            "transforms.TimestampConverter.target.type":"Timestamp"
         }'
vktxenjb

vktxenjb1#

问题与Kafka无关,我比较了不正确的时间戳值,这些值也说明了生成主题之前所述消息的处理时间。

相关问题