我想使用这个扩展:[Quarkus Smallrye响应式消息传递Kafka]
但在我的应用程序中,主题的名称是事先不知道的,它是根据运行时从用户那里收到的消息指定的。如何在没有注解的情况下以编程方式指定主题名称和与主题相关的设置?(仅用于向Kafka -> Produce发送消息)
@ApplicationScoped
public class PriceGenerator {
private Random random = new Random();
// Don't want to use this
// "generated-price" not known at build time
@Outgoing("generated-price")
public Multi<Integer> generate() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(5))
.onOverflow().drop()
.map(tick -> random.nextInt(100));
}
}
或者,应在运行时以编程方式设置这些参数
mp.messaging.outgoing.generated-price.connector=smallrye-kafka
mp.messaging.outgoing.generated-price.topic=prices
mp.messaging.outgoing.generated-price.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer
因为我不认识路,所以我使用了原生的Kafka驱动程序
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kafka-client</artifactId>
</dependency>
Properties props = new Properties();
props.put("bootstrap.servers", "85.93.89.115:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("linger.ms", 1);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>(topicName.toString(), messageFactory.MessageToString(message)));
2条答案
按热度按时间piztneat1#
您可以在启动时或任何需要的时候动态覆盖topic的值,但这里有一段代码指示如何覆盖topic的预定义值:
qnakjoqk2#
你可以使用emitter,使用元数据,代码如下所示
这里是官方文档:https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/3.9/kafka/kafka.html#_dynamic_topic_names