我是新来Kafka的,我真的不明白Kafka配置的含义,谁能给我解释得更明白些!
这是我的密码:
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "master:9092,slave1:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "GROUP_2017",
"auto.offset.reset" -> "latest", //earliest or latest
"enable.auto.commit" -> (true: java.lang.Boolean)
)
在我的代码里是什么意思?
3条答案
按热度按时间2w3rbyxf1#
增加了更多关于标题中提到的配置的细节:“不清楚
auto.offset.reset
以及enable.auto.commit
“在Kafka”自动偏移复位
与
auto.offset.reset
配置您可以在您的使用者组从未从特定主题消费和提交或从该使用者组最后提交的偏移量被删除(例如,通过清除策略)的情况下引导使用者(作为使用者组的一部分)的行为。Kafka主题分区中的每条消息都有一个唯一标识符,即
offset
. 每个Kafka分区的偏移量是唯一的。使用者通常会提交其所使用主题的每个分区上的偏移量。这样,消费者就可以避免重复读数。假设您有一个消费者第一次阅读某个主题(或者如果您更改了消费者组名称)。因此,消费者群体从未承诺任何补偿。根据配置文档,您可以使用配置在以下行为之间进行选择
auto.offset.reset
:最早:自动将偏移量重置为最早偏移量
最新:自动将偏移重置为最新偏移
无:如果没有为使用者的组找到以前的偏移量,则向使用者抛出异常
其他:向消费者抛出异常。
默认设置为
latest
.启用.auto.commit
如上所述,在使用来自kafka的消息时,考虑偏移量及其提交是非常重要的。设置配置时
enable.auto.commit
至true
消费补偿将在后台自动提交。在kafkaconsumer的javadocs中,您将找到一个很好的示例,说明如何使用
为了再次强调客户机中偏移管理的重要性,有必要阅读完整的java文档描述或关于偏移管理的合流kafka文档。
jc3wubiy2#
我会向你解释的意思,但我强烈建议阅读Kafka网站配置
基本上是kafka集群配置:ip和端口。
这个答案解释了目的是什么。
使用者进程将属于groupid。一个groupid可以有多个使用者,kafka只将一个使用者进程分配给一个分区(用于数据消费)。如果使用者的数量大于可用分区的数量,则某些进程将处于空闲状态。
不管这个标志是真的,Kafka都可以使用zookeeper提交您从Kafka带来的消息,以保存它读取的最后一个“偏移量”。当您希望为生产系统提供更健壮的解决方案时,这种方法不是最好的,因为它不能确保您带来的记录得到正确处理(使用您在代码中编写的逻辑)。如果此标志为false,kafka将不知道上次读取的偏移量是哪一个,因此当您重新启动进程时,它将根据下一个标志的值(auto.offset.reset)开始读取“最早”或“最新”偏移量。最后,这篇cloudera文章详细解释了如何以适当的方式管理补偿。
这个标志告诉kafka在哪里开始读取偏移量,以防您还没有任何“提交”。换句话说,如果您尚未在zookeeper中保留任何偏移量(手动或使用enable.auto.commit标志),则它将从“最早”或“最晚”开始。
ldfqzlk83#
中的全套使用者配置参数记录在apache kafka网站上https://kafka.apache.org/documentation.html#newconsumerconfigs