spark:如何在向kafka写入数据时使用自定义partitionner

ljsrvy3e  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(313)

向kafka写入数据时,可以使用名为key的列来选择分区:

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save()

我需要手动决定分区,独立于键。可以手动指定分区吗?或者提供一个定制的partitionner,以便我控制选择分区的逻辑是什么?

slhcrj9b

slhcrj9b1#

你只需要添加选项 kafka.partitioner.class 与您的自定义分区与适当的逻辑。

val dataStreamWriter: DataStreamWriter[Row] = ???
dataStreamWriter.option("kafka.partitioner.class", "com.example.CustomKafkaPartitioner")

相关问题