根据Kafka文献:生产者负责选择要分配给主题中的哪个分区的消息。如何使用将消息发送到选定的分区 kafka-console-producer.sh ?我想在发送消息时指定某种类型的“分区id”。
kafka-console-producer.sh
zhte4eai1#
这是你的出发点: partitioner.class 设置在您的 Properties 示例。在Kafka中,默认实现是 kafka.producer.DefaultPartitioner .该设置的目标是:用于在子主题之间划分消息的partitioner类。默认分区器基于密钥的哈希。这意味着,如果要更改默认分区器的行为,则需要创建自己的实现 kafka.producer.Partitioner 接口。我建议在创建自己的策略时要非常小心,并且要经常测试并监视主题及其分区。
partitioner.class
Properties
kafka.producer.DefaultPartitioner
kafka.producer.Partitioner
rdlzhqv92#
C:\arunsingh\demo\kafka_2.13-2.4.0\bin\windows>kafka-console-producer.bat --broker-list 127.0.0.1:9094 --topic arun_topic --property parse.key=true --property key.separator=, --producer-property acks=all >myKey1, Message with key >myKey2, Message with key 2 >
xriantvc3#
更新:这个答案在2014年是正确的,但是更多最新版本的kafka可以通过console producer生成键/值对。看看下面的答案kafka-console-producer.sh不支持在开箱即用的情况下为特定分区生成消息。不过,更新脚本以传递分区id的额外参数,然后在自定义分区器中处理它应该非常简单,正如@chiron在kafka.tools.consoleproducer类的修改版本中所述。请查看以下源代码:https://apache.googlesource.com/kafka/+/refs/heads/trunk/bin/kafka-console-producer.shhttps用法:/apache.googlesource.com/kafka/+/refs/heads/trunk/core/src/main/scala/kafka/tools/consoleproducer.scala
hwamh0ep4#
到现在为止 ConsoleProducer 似乎支持为主题编写键控消息。kafka将使用密钥的散列将消息分发到分区中,至少使用默认行为。当前,默认分隔符为 \t ,所以进入 key[\t]message 将在分区之间分发:
ConsoleProducer
\t
key[\t]message
key1 a-message
可通过提供 key.separator 配置,例如:
key.separator
kafka-console-producer --broker-list localhost:9092,localhost:9093 \ --topic mytopic --property key.separator=,
发送如下消息:
key2,another-message
我已经用默认选项卡和自定义分隔符成功地测试了这一点。消息被分发到两个独立的分区。
pb3skfrl5#
根据当前状态(kafka>=0.10.0.1),kafka-console-producer.sh脚本和底层的consoleproducer java类支持使用密钥发送数据,但默认情况下禁用了这种支持,必须从cli启用。也就是说,您需要设置属性 parse.key . 另外,如果要使用不同于制表符的字符,请使用 key.separator 如塞德里克的回答所述。最后,命令行将是:
parse.key
kafka-console.producer.sh --broker-list kafka:9092,kafka2:9092 \ --topic $TOPIC --property parse.key=true --property key.separator=|
5条答案
按热度按时间zhte4eai1#
这是你的出发点:
partitioner.class
设置在您的Properties
示例。在Kafka中,默认实现是kafka.producer.DefaultPartitioner
.该设置的目标是:
用于在子主题之间划分消息的partitioner类。默认分区器基于密钥的哈希。
这意味着,如果要更改默认分区器的行为,则需要创建自己的实现
kafka.producer.Partitioner
接口。我建议在创建自己的策略时要非常小心,并且要经常测试并监视主题及其分区。
rdlzhqv92#
xriantvc3#
更新:这个答案在2014年是正确的,但是更多最新版本的kafka可以通过console producer生成键/值对。看看下面的答案
kafka-console-producer.sh不支持在开箱即用的情况下为特定分区生成消息。
不过,更新脚本以传递分区id的额外参数,然后在自定义分区器中处理它应该非常简单,正如@chiron在kafka.tools.consoleproducer类的修改版本中所述。
请查看以下源代码:
https://apache.googlesource.com/kafka/+/refs/heads/trunk/bin/kafka-console-producer.shhttps用法:/apache.googlesource.com/kafka/+/refs/heads/trunk/core/src/main/scala/kafka/tools/consoleproducer.scala
hwamh0ep4#
到现在为止
ConsoleProducer
似乎支持为主题编写键控消息。kafka将使用密钥的散列将消息分发到分区中,至少使用默认行为。当前,默认分隔符为
\t
,所以进入key[\t]message
将在分区之间分发:可通过提供
key.separator
配置,例如:发送如下消息:
我已经用默认选项卡和自定义分隔符成功地测试了这一点。消息被分发到两个独立的分区。
pb3skfrl5#
根据当前状态(kafka>=0.10.0.1),kafka-console-producer.sh脚本和底层的consoleproducer java类支持使用密钥发送数据,但默认情况下禁用了这种支持,必须从cli启用。
也就是说,您需要设置属性
parse.key
. 另外,如果要使用不同于制表符的字符,请使用key.separator
如塞德里克的回答所述。最后,命令行将是: