embeddedkafka即使设置了配置值也会引发recordtoolargeexception

xxls0lw8  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(486)

我正在尝试将kafka的默认消息大小从1mb增加到10mb。我正在用embeddedkafka和scalatest测试我的新配置,但它不起作用。
使用这个答案,我相应地增加了配置值:
经纪人:
message.max.bytes replica.fetch.max.bytes 消费者: max.partition.fetch.bytes 制作人: max.request.size 我的代码:

  1. val broker = s"localhost:${kafkaConfig.kafkaPort}"
  2. val maxSize: String = (ConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES * 10).toString // 10MiB
  3. val embeddedBrokerConfig = Map(
  4. "message.max.bytes" -> maxSize,
  5. "replica.fetch.max.bytes" -> maxSize
  6. )
  7. val embeddedConsumerConfig = Map[String, String](
  8. ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> broker,
  9. ConsumerConfig.GROUP_ID_CONFIG -> consumerGroup,
  10. ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
  11. ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false",
  12. ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG -> maxSize
  13. )
  14. val embeddedProducerConfig = Map[String, String](
  15. ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> broker,
  16. ProducerConfig.MAX_REQUEST_SIZE_CONFIG -> maxSize
  17. )
  18. val bigKafkaConfig =
  19. EmbeddedKafkaConfig(
  20. kafkaConfig.kafkaPort,
  21. kafkaConfig.zooKeeperPort,
  22. customBrokerProperties = embeddedBrokerConfig,
  23. customConsumerProperties = embeddedConsumerConfig,
  24. customProducerProperties = embeddedProducerConfig
  25. )
  26. val bigMessage = ("H" * 999999).getBytes()
  27. EmbeddedKafka.publishToKafka(inTopic, bigMessage)(bigKafkaConfig, valueSerializer)

当我用一个只有999999字节(低于1mb)的消息运行此代码时,我得到以下错误:

  1. Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
  2. at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
  3. at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:77)
  4. at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
  5. at net.manub.embeddedkafka.EmbeddedKafkaSupport.$anonfun$publishToKafka$7(EmbeddedKafka.scala:276)
  6. at scala.util.Try$.apply(Try.scala:209)
  7. at net.manub.embeddedkafka.EmbeddedKafkaSupport.publishToKafka(EmbeddedKafka.scala:276)

这是嵌入Kafka中的一个bug吗?还是我的应用程序配置错误?

wn9m85ua

wn9m85ua1#

我发现了问题所在。这是由于 EmbeddedKafka .
有一个 beforeAll 在使用默认配置进行测试之前启动embeddedkafka的语句。 EmbeddedKafka.start 需要更新的代理设置,并在 EmbeddedKafka 已经开始什么都不做了。

相关问题