React性Kafka性能问题

368yc8dk  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(245)

我最近从经典的kafka客户机转向java akka-reactive kafka 以提高我的消费者绩效。我用的是最新版本的 akka-reactive-kafka (0.16) . 我的流程很简单: read from kafka then group by batch then process by batch then commit . 我做了一个小基准,但我得到了非常差的结果,大约 2 000 msg / sec . 作为替补,我负责 kafka-producer-perf-test.sh .

bin/kafka-producer-perf-test.sh --topic benchmark-producer --producer.config config/producer.properties --record-size 1322 --num-records 300000 --throughput 10000

28153 records sent, 5630,6 records/sec (7,10 MB/sec), 1187,7 ms avg latency, 2187,0 max latency.
29748 records sent, 5949,6 records/sec (7,50 MB/sec), 3279,2 ms avg latency, 4076,0 max latency.
30528 records sent, 6099,5 records/sec (7,69 MB/sec), 3946,9 ms avg latency, 4043,0 max latency.
28476 records sent, 5694,1 records/sec (7,18 MB/sec), 4278,0 ms avg latency, 4425,0 max latency.
29112 records sent, 5811,9 records/sec (7,33 MB/sec), 4168,8 ms avg latency, 4303,0 max latency.
28404 records sent, 5680,8 records/sec (7,16 MB/sec), 4486,7 ms avg latency, 4603,0 max latency.
29220 records sent, 5837,0 records/sec (7,36 MB/sec), 4081,9 ms avg latency, 4243,0 max latency.
28728 records sent, 5745,6 records/sec (7,24 MB/sec), 4381,9 ms avg latency, 4477,0 max latency.
29088 records sent, 5816,4 records/sec (7,33 MB/sec), 4089,1 ms avg latency, 4238,0 max latency.
28080 records sent, 5614,9 records/sec (7,08 MB/sec), 4472,6 ms avg latency, 4627,0 max latency.
300000 records sent, 5798,446016 records/sec (7,31 MB/sec), 3852,98 ms avg latency, 4627,00 ms max latency, 4103 ms 50th, 4585 ms 95th, 4615 ms 99th, 4623 ms 99.9th.

在scala代码+日志下面:

implicit  val actorSystem = ActorSystem("benchmark-kafka")
    implicit val actorMaterializer = ActorMaterializer()

    val consumerSettings =
      ConsumerSettings(actorSystem, new StringDeserializer, new ByteArrayDeserializer)
        .withBootstrapServers("mybroker:9094")
        .withGroupId("kafka-producer-bench")
        .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
        .withProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "2000000")

    Consumer
      .committableSource(consumerSettings, Subscriptions.topics("benchmark-producer"))
      .groupedWithin(5000, 1.seconds)
      .mapAsync(1) { group =>
        println((new java.sql.Timestamp(System.currentTimeMillis()) + " : Fetch " + group.size + " records"))
        Future.successful(group)
      }
      .map {
        group =>
          group.foldLeft(CommittableOffsetBatch.empty) { (batch, elem) =>
            batch.updated(elem.committableOffset)
          }
      }
      .mapAsync(1) { msg =>
        println((new Timestamp(System.currentTimeMillis())) + " Will commit : " + msg.getOffsets())
        msg.commitScaladsl()
      }
      .runWith(Sink.ignore)

[

info] 2017-06-22 18:38:28.456 : Fetch 12 records
[info] 2017-06-22 18:38:28.46 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22970806}
[info] 2017-06-22 18:38:29.456 : Fetch 372 records
[info] 2017-06-22 18:38:29.459 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22971178}
[info] 2017-06-22 18:38:30.456 : Fetch 773 records
[info] 2017-06-22 18:38:30.458 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22971951}
[info] 2017-06-22 18:38:31.456 : Fetch 773 records
[info] 2017-06-22 18:38:31.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22972724}
[info] 2017-06-22 18:38:32.456 : Fetch 773 records
[info] 2017-06-22 18:38:32.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22973497}
[info] 2017-06-22 18:38:33.456 : Fetch 1546 records
[info] 2017-06-22 18:38:33.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22975043}
[info] 2017-06-22 18:38:34.456 : Fetch 1546 records
[info] 2017-06-22 18:38:34.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22976589}
[info] 2017-06-22 18:38:35.456 : Fetch 1546 records
[info] 2017-06-22 18:38:35.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22978135}
[info] 2017-06-22 18:38:36.456 : Fetch 2319 records
[info] 2017-06-22 18:38:36.458 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22980454}
[info] 2017-06-22 18:38:37.456 : Fetch 1546 records
[info] 2017-06-22 18:38:37.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22982000}
[info] 2017-06-22 18:38:38.455 : Fetch 2383 records
[info] 2017-06-22 18:38:38.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22984383}
[info] 2017-06-22 18:38:39.456 : Fetch 1546 records
[info] 2017-06-22 18:38:39.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22985929}
[info] 2017-06-22 18:38:40.456 : Fetch 2319 records
[info] 2017-06-22 18:38:40.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22988248}
[info] 2017-06-22 18:38:41.456 : Fetch 1546 records
[info] 2017-06-22 18:38:41.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22989794}
[info] 2017-06-22 18:38:42.456 : Fetch 1546 records
[info] 2017-06-22 18:38:42.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22991340}
[info] 2017-06-22 18:38:43.455 : Fetch 2319 records
[info] 2017-06-22 18:38:43.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22993659}
[info] 2017-06-22 18:38:44.456 : Fetch 1546 records
[info] 2017-06-22 18:38:44.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22995205}
[info] 2017-06-22 18:38:45.456 : Fetch 2319 records
[info] 2017-06-22 18:38:45.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22997524}
[info] 2017-06-22 18:38:46.456 : Fetch 1546 records
[info] 2017-06-22 18:38:46.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=22999070}
[info] 2017-06-22 18:38:47.456 : Fetch 1546 records
[info] 2017-06-22 18:38:47.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=23000616}
[info] 2017-06-22 18:38:48.456 : Fetch 2319 records
[info] 2017-06-22 18:38:48.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=23002935}
[info] 2017-06-22 18:38:49.456 : Fetch 1546 records
[info] 2017-06-22 18:38:49.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=23004481}
[info] 2017-06-22 18:38:50.456 : Fetch 2319 records
[info] 2017-06-22 18:38:50.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=23006800}
[info] 2017-06-22 18:38:51.456 : Fetch 1546 records
[info] 2017-06-22 18:38:51.457 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=23008346}
[info] 2017-06-22 18:38:52.455 : Fetch 1546 records
[info] 2017-06-22 18:38:52.456 Will commit : {GroupTopicPartition(kafka-bebch,benchmark-producer,0)=23009892}
[info] 2017-06-22 18:38:53.455 : Fetch 2046 records

作为参考,我在ec2t2中型机器上运行了这段代码,使用一个用python编写的kafka客户机,我们得到了5000 msg/sec的速率,所以我认为我在使用reactiveapi的方式上做了一些错误,但是我找不到什么。读了这个关于Kafka基准测试的博客,我应该有更好的吞吐量!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题