如何使用kafka console producer向所选分区生成消息?

yfwxisqw  于 2021-06-09  发布在  Kafka
关注(0)|答案(5)|浏览(1291)

根据Kafka文献:
生产者负责选择要分配给主题中的哪个分区的消息。
如何使用将消息发送到选定的分区 kafka-console-producer.sh ?
我想在发送消息时指定某种类型的“分区id”。

zhte4eai

zhte4eai1#

这是你的出发点: partitioner.class 设置在您的 Properties 示例。在Kafka中,默认实现是 kafka.producer.DefaultPartitioner .
该设置的目标是:
用于在子主题之间划分消息的partitioner类。默认分区器基于密钥的哈希。
这意味着,如果要更改默认分区器的行为,则需要创建自己的实现 kafka.producer.Partitioner 接口。
我建议在创建自己的策略时要非常小心,并且要经常测试并监视主题及其分区。

rdlzhqv9

rdlzhqv92#

C:\arunsingh\demo\kafka_2.13-2.4.0\bin\windows>kafka-console-producer.bat --broker-list 127.0.0.1:9094 --topic arun_topic --property parse.key=true --property key.separator=, --producer-property acks=all
>myKey1, Message with key
>myKey2, Message with key 2
>
xriantvc

xriantvc3#

更新:这个答案在2014年是正确的,但是更多最新版本的kafka可以通过console producer生成键/值对。看看下面的答案
kafka-console-producer.sh不支持在开箱即用的情况下为特定分区生成消息。
不过,更新脚本以传递分区id的额外参数,然后在自定义分区器中处理它应该非常简单,正如@chiron在kafka.tools.consoleproducer类的修改版本中所述。
请查看以下源代码:
https://apache.googlesource.com/kafka/+/refs/heads/trunk/bin/kafka-console-producer.shhttps用法:/apache.googlesource.com/kafka/+/refs/heads/trunk/core/src/main/scala/kafka/tools/consoleproducer.scala

hwamh0ep

hwamh0ep4#

到现在为止 ConsoleProducer 似乎支持为主题编写键控消息。kafka将使用密钥的散列将消息分发到分区中,至少使用默认行为。
当前,默认分隔符为 \t ,所以进入 key[\t]message 将在分区之间分发:

key1    a-message

可通过提供 key.separator 配置,例如:

kafka-console-producer --broker-list localhost:9092,localhost:9093 \
  --topic mytopic --property key.separator=,

发送如下消息:

key2,another-message

我已经用默认选项卡和自定义分隔符成功地测试了这一点。消息被分发到两个独立的分区。

pb3skfrl

pb3skfrl5#

根据当前状态(kafka>=0.10.0.1),kafka-console-producer.sh脚本和底层的consoleproducer java类支持使用密钥发送数据,但默认情况下禁用了这种支持,必须从cli启用。
也就是说,您需要设置属性 parse.key . 另外,如果要使用不同于制表符的字符,请使用 key.separator 如塞德里克的回答所述。
最后,命令行将是:

kafka-console.producer.sh --broker-list kafka:9092,kafka2:9092 \
    --topic $TOPIC --property parse.key=true --property key.separator=|

相关问题