如何使用smallrye-kafka定义分区类型?

bqucvtff  于 12个月前  发布在  Apache
关注(0)|答案(1)|浏览(110)

我有一个简单的quarkus项目,创建主题“数据库”,并发送数据库列表给它。
如何控制分区?
我希望每个DatabaseID都有不同的分区,这样具有相同ID的所有消息都会转到同一个分区。
这里是我的application.properties:

kafka.bootstrap.servers=localhost:9092

mp.messaging.outgoing.databases.connector=smallrye-kafka
mp.messaging.outgoing.databases.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.databases.value.serializer=com.secupi.dbmanager.model.DatabaseSerializer
quarkus.class-loading.removed-resources."org.slf4j\:slf4j-api"=org.slf4j.Logger.class

我的代码

@Outgoing("databases")
public <T> Multi<Record<String, Database>> generate() {
                Collection<Database> dbs = databaseDao.getAll(new FilterParams());
            return Multi.createFrom().items(dbs.stream().map(db -> Record.of(db.getName(), db)));                   
}

默认情况下,我看到的主题已经创建了100个分区。
谢谢

xdnvmnnf

xdnvmnnf1#

主题中分区的数量取决于您如何创建它。如果它是自动创建的,则默认分区数在Kafka broker config中配置。如果您在使用Quarkus devservices的开发模式下,您可以将其配置为为您创建主题分区:https://quarkus.io/guides/kafka#configuring-kafka-topics
如果主题已经创建了100个分区,那么您需要做的就是生成具有一致键的Kafka记录。在您发送的代码片段中,如果您使用数据库ID作为键创建Record.of,则默认分区程序每次都会将其分配给唯一的分区。

相关问题