我最近从经典的kafka客户机转向java akka-reactive kafka
以提高我的消费者绩效。我用的是最新版本的 akka-reactive-kafka (0.16)
. 我的流程很简单: read from kafka then group by batch then process by batch then commit
. 我做了一个小基准,但我得到了非常差的结果,大约 2 000 msg / sec
. 作为替补,我负责 kafka-producer-perf-test.sh
.
bin/kafka-producer-perf-test.sh --topic benchmark-producer --producer.config config/producer.properties --record-size 1322 --num-records 300000 --throughput 10000
28153 records sent, 5630,6 records/sec (7,10 MB/sec), 1187,7 ms avg latency, 2187,0 max latency.
29748 records sent, 5949,6 records/sec (7,50 MB/sec), 3279,2 ms avg latency, 4076,0 max latency.
30528 records sent, 6099,5 records/sec (7,69 MB/sec), 3946,9 ms avg latency, 4043,0 max latency.
28476 records sent, 5694,1 records/sec (7,18 MB/sec), 4278,0 ms avg latency, 4425,0 max latency.
29112 records sent, 5811,9 records/sec (7,33 MB/sec), 4168,8 ms avg latency, 4303,0 max latency.
28404 records sent, 5680,8 records/sec (7,16 MB/sec), 4486,7 ms avg latency, 4603,0 max latency.
29220 records sent, 5837,0 records/sec (7,36 MB/sec), 4081,9 ms avg latency, 4243,0 max latency.
28728 records sent, 5745,6 records/sec (7,24 MB/sec), 4381,9 ms avg latency, 4477,0 max latency.
29088 records sent, 5816,4 records/sec (7,33 MB/sec), 4089,1 ms avg latency, 4238,0 max latency.
28080 records sent, 5614,9 records/sec (7,08 MB/sec), 4472,6 ms avg latency, 4627,0 max latency.
300000 records sent, 5798,446016 records/sec (7,31 MB/sec), 3852,98 ms avg latency, 4627,00 ms max latency, 4103 ms 50th, 4585 ms 95th, 4615 ms 99th, 4623 ms 99.9th.
在scala代码+日志下面:
implicit val actorSystem = ActorSystem("benchmark-kafka")
implicit val actorMaterializer = ActorMaterializer()
val consumerSettings =
ConsumerSettings(actorSystem, new StringDeserializer, new ByteArrayDeserializer)
.withBootstrapServers("mybroker:9094")
.withGroupId("kafka-producer-bench")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
.withProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "2000000")
Consumer
.committableSource(consumerSettings, Subscriptions.topics("benchmark-producer"))
.groupedWithin(5000, 1.seconds)
.mapAsync(1) { group =>
println((new java.sql.Timestamp(System.currentTimeMillis()) + " : Fetch " + group.size + " records"))
Future.successful(group)
}
.map {
group =>
group.foldLeft(CommittableOffsetBatch.empty) { (batch, elem) =>
batch.updated(elem.committableOffset)
}
}
.mapAsync(1) { msg =>
println((new Timestamp(System.currentTimeMillis())) + " Will commit : " + msg.getOffsets())
msg.commitScaladsl()
}
.runWith(Sink.ignore)
[
info] 2017-06-22 18:38:28.456 : Fetch 12 records
[info] 2017-06-22 18:38:28.46 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22970806}
[info] 2017-06-22 18:38:29.456 : Fetch 372 records
[info] 2017-06-22 18:38:29.459 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22971178}
[info] 2017-06-22 18:38:30.456 : Fetch 773 records
[info] 2017-06-22 18:38:30.458 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22971951}
[info] 2017-06-22 18:38:31.456 : Fetch 773 records
[info] 2017-06-22 18:38:31.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22972724}
[info] 2017-06-22 18:38:32.456 : Fetch 773 records
[info] 2017-06-22 18:38:32.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22973497}
[info] 2017-06-22 18:38:33.456 : Fetch 1546 records
[info] 2017-06-22 18:38:33.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22975043}
[info] 2017-06-22 18:38:34.456 : Fetch 1546 records
[info] 2017-06-22 18:38:34.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22976589}
[info] 2017-06-22 18:38:35.456 : Fetch 1546 records
[info] 2017-06-22 18:38:35.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22978135}
[info] 2017-06-22 18:38:36.456 : Fetch 2319 records
[info] 2017-06-22 18:38:36.458 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22980454}
[info] 2017-06-22 18:38:37.456 : Fetch 1546 records
[info] 2017-06-22 18:38:37.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22982000}
[info] 2017-06-22 18:38:38.455 : Fetch 2383 records
[info] 2017-06-22 18:38:38.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22984383}
[info] 2017-06-22 18:38:39.456 : Fetch 1546 records
[info] 2017-06-22 18:38:39.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22985929}
[info] 2017-06-22 18:38:40.456 : Fetch 2319 records
[info] 2017-06-22 18:38:40.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22988248}
[info] 2017-06-22 18:38:41.456 : Fetch 1546 records
[info] 2017-06-22 18:38:41.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22989794}
[info] 2017-06-22 18:38:42.456 : Fetch 1546 records
[info] 2017-06-22 18:38:42.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22991340}
[info] 2017-06-22 18:38:43.455 : Fetch 2319 records
[info] 2017-06-22 18:38:43.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22993659}
[info] 2017-06-22 18:38:44.456 : Fetch 1546 records
[info] 2017-06-22 18:38:44.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22995205}
[info] 2017-06-22 18:38:45.456 : Fetch 2319 records
[info] 2017-06-22 18:38:45.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22997524}
[info] 2017-06-22 18:38:46.456 : Fetch 1546 records
[info] 2017-06-22 18:38:46.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22999070}
[info] 2017-06-22 18:38:47.456 : Fetch 1546 records
[info] 2017-06-22 18:38:47.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=23000616}
[info] 2017-06-22 18:38:48.456 : Fetch 2319 records
[info] 2017-06-22 18:38:48.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=23002935}
[info] 2017-06-22 18:38:49.456 : Fetch 1546 records
[info] 2017-06-22 18:38:49.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=23004481}
[info] 2017-06-22 18:38:50.456 : Fetch 2319 records
[info] 2017-06-22 18:38:50.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=23006800}
[info] 2017-06-22 18:38:51.456 : Fetch 1546 records
[info] 2017-06-22 18:38:51.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=23008346}
[info] 2017-06-22 18:38:52.455 : Fetch 1546 records
[info] 2017-06-22 18:38:52.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=23009892}
[info] 2017-06-22 18:38:53.455 : Fetch 2046 records
作为参考,我在ec2t2中型机器上运行了这段代码,使用一个用python编写的kafka客户机,我们得到了5000 msg/sec的速率,所以我认为我在使用reactiveapi的方式上做了一些错误,但是我找不到什么。读了这个关于Kafka基准测试的博客,我应该有更好的吞吐量!
暂无答案!
目前还没有任何答案,快来回答吧!