我使用Kafka连接hdfs接收器连接器写hdfs从Kafka,它是正常工作。我的信息如下:
key: my-key
value: {
"name": "helen"
}
我的用例是需要将消息的键附加到我发送给hdfs的事件中。
问题是键没有出现在值有效负载中,因此我无法使用:
"partitioner.class":
"io.confluent.connect.hdfs.partitioner.FieldPartitioner",
"partition.field.name": "key",
我的问题是如何将密钥添加到发送给hdfs的消息中,或者如何基于密钥进行分区?
1条答案
按热度按时间xvw2m8pv1#
开箱即用,你不能(对于s3connect也是如此),仅仅基于代码的编写方式,而不是connect框架的限制
至少,您需要构建这个smt并将其添加到connect worker中,这将在写入存储之前将键、主题和分区全部“移动”到connect记录的“值”中
https://github.com/jcustenborder/kafka-connect-transform-archive