我的spark-streaming版本是2.0,kafka版本是0.10.0.1,spark-streaming-kafka-0-10_.11。我使用直接的方法来获取Kafka记录,现在我想限制一批中获得的最大消息数。所以我设置了max.poll.records值,但它不起作用。spark中的consumer数是kafka中的分区数?因此spark流媒体中的最大记录数是max.poll.records*consumers?
我的spark-streaming版本是2.0,kafka版本是0.10.0.1,spark-streaming-kafka-0-10_.11。我使用直接的方法来获取Kafka记录,现在我想限制一批中获得的最大消息数。所以我设置了max.poll.records值,但它不起作用。spark中的consumer数是kafka中的分区数?因此spark流媒体中的最大记录数是max.poll.records*consumers?
1条答案
按热度按时间ztmd8pv51#
max.poll.records
控制轮询返回的记录数的上限。在spark streaming中,一个批次中可能会发生多个轮询。在那种情况下
max.poll.records
不会很有用。你应该使用spark.streaming.kafka.maxRatePerPartition
,根据文件一个重要的是spark.streaming.kafka.maxrateperpartition,它是此直接api读取每个kafka分区的最大速率(以每秒消息数为单位)
因此,每批的最大记录数为
(spark.streaming.kafka.maxrateperpartition)(批处理持续时间(秒)(kafka分区数)
e、 g如果主题中有2个分区,则批处理持续时间为30秒,并且
spark.streaming.kafka.maxRatePerPartition
如果是1000,则每批可以看到6000(2301000)条记录。还可以启用
spark.streaming.backpressure.enabled
根据处理批处理所用的时间,具有更自适应的速率。更多关于Kafka直接流引擎盖下工作的信息