我有一个简单的quarkus项目,创建主题“数据库”,并发送数据库列表给它。
如何控制分区?
我希望每个DatabaseID都有不同的分区,这样具有相同ID的所有消息都会转到同一个分区。
这里是我的application.properties:
kafka.bootstrap.servers=localhost:9092
mp.messaging.outgoing.databases.connector=smallrye-kafka
mp.messaging.outgoing.databases.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.databases.value.serializer=com.secupi.dbmanager.model.DatabaseSerializer
quarkus.class-loading.removed-resources."org.slf4j\:slf4j-api"=org.slf4j.Logger.class
我的代码
@Outgoing("databases")
public <T> Multi<Record<String, Database>> generate() {
Collection<Database> dbs = databaseDao.getAll(new FilterParams());
return Multi.createFrom().items(dbs.stream().map(db -> Record.of(db.getName(), db)));
}
默认情况下,我看到的主题已经创建了100个分区。
谢谢
1条答案
按热度按时间xdnvmnnf1#
主题中分区的数量取决于您如何创建它。如果它是自动创建的,则默认分区数在Kafka broker config中配置。如果您在使用Quarkus devservices的开发模式下,您可以将其配置为为您创建主题分区:https://quarkus.io/guides/kafka#configuring-kafka-topics
如果主题已经创建了100个分区,那么您需要做的就是生成具有一致键的Kafka记录。在您发送的代码片段中,如果您使用数据库ID作为键创建
Record.of
,则默认分区程序每次都会将其分配给唯一的分区。