Kafka生产者0.9性能问题与小消息

dgiusagp  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(433)

我们发现JavaKafkaProducer0.9客户端在发送小消息时性能非常差。消息不会被累积到一个更大的请求批中,因此每个小记录都是单独发送的。
我们的客户端配置有什么问题?还是其他问题?
使用kafka客户端0.9.0.0。我们在kafka unreleased 9.0.1或9.1 fixed或unresolved列表中没有看到任何相关的帖子,因此我们主要关注客户机配置和服务器示例。
我们理解linger.ms应该导致客户机将记录累积到一个批中。
我们将linger.ms设置为10(也尝试了100和1000),但这并没有导致批量累积记录。如果记录大小约为100字节,请求缓冲区大小为16k,那么我们预计在单个请求中会发送大约160条消息。
尽管已经分配了一个新的bluemix消息传递中心(kafka服务器0.9)服务示例,但客户端的跟踪似乎表明分区可能已满。测试客户机在一个循环中发送多条消息,没有其他i/o。
日志显示了一个重复序列,其中有一行可疑的内容:“唤醒发件人,因为主题mytopic分区0已满或正在获取新批”。
因此,在我们的测试用例中,新分配的分区基本上应该是空的,那么为什么生产者客户机将获得一个新的批呢?

2015-12-10 15:14:41,335 3677 [main] TRACE com.isllc.client.producer.ExploreProducer  - Sending record: Topic='mytopic', Key='records', Value='Kafka 0.9 Java Client Record Test Message 00011 2015-12-10T15:14:41.335-05:00'  
2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.KafkaProducer  - Sending record ProducerRecord(topic=mytopic, partition=null, key=[B@670b40af, value=[B@4923ab24 with callback null to topic mytopic partition 0  
2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.internals.RecordAccumulator  - Allocating a new 16384 byte message buffer for topic mytopic partition 0  
2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.KafkaProducer  - Waking up the sender since topic mytopic partition 0 is either full or getting a new batch  
2015-12-10 15:14:41,348 3690 [kafka-producer-network-thread | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender  - Nodes with data ready to send: [Node(0, kafka01-prod01.messagehub.services.us-south.bluemix.net, 9094)]  
2015-12-10 15:14:41,348 3690 [kafka-producer-network-thread | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender  - Created 1 produce requests: [ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.producer.internals.Sender$1@6d62e963, request=RequestSend(header={api_key=0,api_version=1,correlation_id=11,client_id=ExploreProducer}, body={acks=-1,timeout=30000,topic_data=[{topic=mytopic,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0 lim=110 cap=16384]}]}]}), createdTimeMs=1449778481348, sendTimeMs=0)]  
2015-12-10 15:14:41,412 3754 [kafka-producer-network-thread | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender  - Received produce response from node 0 with correlation id 11  
2015-12-10 15:14:41,412 3754 [kafka-producer-network-thread | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.RecordBatch  - Produced messages to topic-partition mytopic-0 with base offset offset 130 and error: null.  
2015-12-10 15:14:41,412 3754 [main] TRACE com.isllc.client.producer.ExploreProducer  - Send returned metadata: Topic='mytopic', Partition=0, Offset=130  
2015-12-10 15:14:41,412 3754 [main] TRACE com.isllc.client.producer.ExploreProducer  - Sending record: Topic='mytopic', Key='records', Value='Kafka 0.9 Java Client Record Test Message 00012 2015-12-10T15:14:41.412-05:00'

Log entries repeat like the above for each record sent

我们提供了以下属性文件:

2015-12-10 15:14:37,843 185  [main] INFO  com.isllc.client.AbstractClient  - Properties retrieved from file for Kafka client: kafka-producer.properties
2015-12-10 15:14:37,909 251  [main] INFO  com.isllc.client.AbstractClient  -     acks=-1
2015-12-10 15:14:37,909 251  [main] INFO  com.isllc.client.AbstractClient  -     ssl.protocol=TLSv1.2
2015-12-10 15:14:37,909 251  [main] INFO  com.isllc.client.AbstractClient  -     key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     client.id=ExploreProducer
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     ssl.truststore.identification.algorithm=HTTPS
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     ssl.truststore.password=changeit
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     ssl.truststore.type=JKS
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     ssl.enabled.protocols=TLSv1.2
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     ssl.truststore.location=/Library/Java/JavaVirtualMachines/jdk1.8.0_51.jdk/Contents/Home/jre/lib/security/cacerts
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     bootstrap.servers=kafka01-prod01.messagehub.services.us-south.bluemix.net:9094,kafka02-prod01.messagehub.services.us-south.bluemix.net:9094,kafka03-prod01.messagehub.services.us-south.bluemix.net:9094,kafka04-prod01.messagehub.services.us-south.bluemix.net:9094,kafka05-prod01.messagehub.services.us-south.bluemix.net:9094
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     security.protocol=SASL_SSL

Plus we added linger.ms=10 in code.

kafka客户端显示展开/合并的配置列表(并显示linger.ms设置):

2015-12-10 15:14:37,970 312  [main] INFO  org.apache.kafka.clients.producer.ProducerConfig  - ProducerConfig values: 
    compression.type = none
    metric.reporters = []
    metadata.max.age.ms = 300000
    metadata.fetch.timeout.ms = 60000
    reconnect.backoff.ms = 50
    sasl.kerberos.ticket.renew.window.factor = 0.8
    bootstrap.servers = [kafka01-prod01.messagehub.services.us-south.bluemix.net:9094, kafka02-prod01.messagehub.services.us-south.bluemix.net:9094, kafka03-prod01.messagehub.services.us-south.bluemix.net:9094, kafka04-prod01.messagehub.services.us-south.bluemix.net:9094, kafka05-prod01.messagehub.services.us-south.bluemix.net:9094]
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    buffer.memory = 33554432
    timeout.ms = 30000
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    ssl.keystore.type = JKS
    ssl.trustmanager.algorithm = PKIX
    block.on.buffer.full = false
    ssl.key.password = null
    max.block.ms = 60000
    sasl.kerberos.min.time.before.relogin = 60000
    connections.max.idle.ms = 540000
    ssl.truststore.password = [hidden]
    max.in.flight.requests.per.connection = 5
    metrics.num.samples = 2
    client.id = ExploreProducer
    ssl.endpoint.identification.algorithm = null
    ssl.protocol = TLSv1.2
    request.timeout.ms = 30000
    ssl.provider = null
    ssl.enabled.protocols = [TLSv1.2]
    acks = -1
    batch.size = 16384
    ssl.keystore.location = null
    receive.buffer.bytes = 32768
    ssl.cipher.suites = null
    ssl.truststore.type = JKS
    security.protocol = SASL_SSL
    retries = 0
    max.request.size = 1048576
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    ssl.truststore.location = /Library/Java/JavaVirtualMachines/jdk1.8.0_51.jdk/Contents/Home/jre/lib/security/cacerts
    ssl.keystore.password = null
    ssl.keymanager.algorithm = SunX509
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    send.buffer.bytes = 131072
    linger.ms = 10

Kafka在发送100条记录后的衡量标准:

Duration for 100 sends 8787 ms. Sent 7687 bytes.  
    batch-size-avg = 109.87 [The average number of bytes sent per partition per-request.]  
    batch-size-max = 110.0 [The max number of bytes sent per partition per-request.]  
    buffer-available-bytes = 3.3554432E7 [The total amount of buffer memory that is not being used (either unallocated or in the free list).]  
    buffer-exhausted-rate = 0.0 [The average per-second number of record sends that are dropped due to buffer exhaustion]  
    buffer-total-bytes = 3.3554432E7 [The maximum amount of buffer memory the client can use (whether or not it is currently used).]  
    bufferpool-wait-ratio = 0.0 [The fraction of time an appender waits for space allocation.]  
    byte-rate = 291.8348916277093 []  
    compression-rate = 0.0 []  
    compression-rate-avg = 0.0 [The average compression rate of record batches.]  
    connection-close-rate = 0.0 [Connections closed per second in the window.]  
    connection-count = 2.0 [The current number of active connections.]  
    connection-creation-rate = 0.05180541884681138 [New connections established per second in the window.]  
    incoming-byte-rate = 10.342564641029007 []  
    io-ratio = 0.0038877559207471236 [The fraction of time the I/O thread spent doing I/O]  
    io-time-ns-avg = 353749.2840375587 [The average length of time for I/O per select call in nanoseconds.]  
    io-wait-ratio = 0.21531227995769162 [The fraction of time the I/O thread spent waiting.]  
    io-wait-time-ns-avg = 1.9591901192488264E7 [The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.]
    metadata-age = 8.096 [The age in seconds of the current producer metadata being used.]  
    network-io-rate = 5.2937784999213795 [The average number of network operations (reads or writes) on all connections per second.]  
    outgoing-byte-rate = 451.2298783403283 []  
    produce-throttle-time-avg = 0.0 [The average throttle time in ms]  
    produce-throttle-time-max = 0.0 [The maximum throttle time in ms]  
    record-error-rate = 0.0 [The average per-second number of record sends that resulted in errors]  
    record-queue-time-avg = 15.5 [The average time in ms record batches spent in the record accumulator.]  
    record-queue-time-max = 434.0 [The maximum time in ms record batches spent in the record accumulator.]  
    record-retry-rate = 0.0 []  
    record-send-rate = 2.65611304417116 [The average number of records sent per second.]  
    record-size-avg = 97.87 [The average record size]  
    record-size-max = 98.0 [The maximum record size]  
    records-per-request-avg = 1.0 [The average number of records per request.]  
    request-latency-avg = 0.0 [The average request latency in ms]  
    request-latency-max = 74.0 []  
    request-rate = 2.6468892499606897 [The average number of requests sent per second.]  
    request-size-avg = 42.0 [The average size of all requests in the window..]  
    request-size-max = 170.0 [The maximum size of any request sent in the window.]  
    requests-in-flight = 0.0 [The current number of in-flight requests awaiting a response.]  
    response-rate = 2.651196976060479 [The average number of responses received per second.]  
    select-rate = 10.989861465830819 [Number of times the I/O layer checked for new I/O to perform per second]  
    waiting-threads = 0.0 [The number of user threads blocked waiting for buffer memory to enqueue their records]

谢谢

30byixjq

30byixjq1#

kafka用户邮件列表中的guozhang wang通过查看我们的应用程序代码能够识别问题:
郭璋,
是的-你发现了问题!
我们已经插入了用于调试的.get(),但是没有想到(巨大的!)副作用。
使用异步回调可以很好地工作。
我们现在可以在14秒内将100000条记录从笔记本电脑发送到bluemix云,速度快了约1000倍,
非常感谢你!
加里
2015年12月13日下午2:48, Realm 璋写道:
加里,
您正在调用“kafkaproducer.send(record).get();”对于每一条消息,get()调用都会阻塞直到future被初始化,这可以有效地同步发送的所有消息,方法是在发送下一条消息之前请求每条消息的ack,因此不会进行批处理。
您可以尝试使用“send(record,callback)”进行异步发送,并让回调处理来自返回元数据的错误。
郭璋

相关问题