我花了几个小时试图优化我的Kafka经纪人的延迟,但没有成功。我遇到的问题是,最初的消息产生一个主题后,很短的时间(5 - 10秒或冷启动)的"静默"大约需要3 - 4秒才能出现在主题中,并准备好供用户使用。消息开始出现在主题中后,消息之间的延迟非常好(〈1ms),但消息开始出现在主题中之前的初始延迟非常高。我正在开发一个延迟要求非常低的应用程序,因此这对我来说是不可接受的,所以我希望在代理或使用者中有一个设置,我可以更改它来摆脱这个高初始延迟。
最初在数据库插入过程中注意到问题,因为有字段跟踪消息创建和插入数据库的时间。但是,我不认为问题出在数据库上,因为在Confluent Control Center中,我看到主题中的消息有相同的延迟,大约6 - 7ms是由于数据插入数据库造成的。
以下是我的设置:
- 群集:**
1个代理,每个主题2个分区,8核2.8ghz cpu(开发)
- 我尝试在Kafka Broker中更改的参数:**
- log.flush.interval.messages
- log.flush.interval.ms
- log.flush.scheduler.interval.ms
- 尝试在Kafka JDBC连接器中更改的参数:**
- heartbeat.interval.ms
另外,我使用的是铁 rust ,但应该很容易阅读配置
- 生产者设置:**
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", kafka_broker_address.clone())
.set("batch.size", "1")
.set("acks","all")
.set("linger.ms","0")
.set("compression.type", "lz4")
.set("enable.idempotence", "true")
.create()
.expect("error");
- Kafka代理设置(在Docker文件中):**
broker:
image: confluentinc/cp-server:7.0.1
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_NUM_PARTITIONS: 2
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
- Kafka Connect JDBC接收器连接器:**
curl -i -X PUT http://$KAFKA_CONNECT_SERVER_ADDRESS:$KAFKA_CONNECT_SERVER_PORT/connectors/db1-sink-postgres-topic_a/config \
-H "Content-Type: application/json" \
-d '{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"connection.url": "jdbc:postgresql://'$DB_SERVER_ADDRESS_FROM_INSIDE_DOCKER_CONTAINER':'$DB_SERVER_PORT'/'$DB_NAME'",
"connection.user": "'$DB_USER'",
"connection.password": "'$DB_USER_PWD'",
"consumer.override.isolation.level": "read_committed",
"insert.mode": "upsert",
"pk.mode": "record_value",
"pk.fields": "id, year",
"topics": "topic_a",
"errors.log.enable":true,
"errors.log.include.messages":true,
"transforms": "TimestampConverter",
"transforms.TimestampConverter.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.format":"yyyy-MM-dd HH:mm:ss.SSS z",
"transforms.TimestampConverter.field":"program_datetime_eastern_when_retrieved",
"transforms.TimestampConverter.target.type":"Timestamp"
}'
1条答案
按热度按时间vktxenjb1#
问题与Kafka无关,我比较了不正确的时间戳值,这些值也说明了生成主题之前所述消息的处理时间。