我有以下控制器:
private final Flux<ReceiverRecord<String, String>> fluxReceiver;
@GetMapping(value = "/events", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<String> getEvents() {
return fluxReceiver.map(x -> x.value());
}
我宣布了豆子通量>,就像这样:
@Bean
public Flux<ReceiverRecord<String, String>> fluxReceiver() {
String bootstrapServers = "localhost:9092";
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "sample-consumer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(props);
ReceiverOptions<String, String> options = receiverOptions.subscription(Collections.singleton(TOPIC));
return KafkaReceiver.create(options).receive();
}
因此,我可以将它注入控制器,并在有人调用rest端点/事件时返回它。
我用curl调用那个端点。问题是,当我发送一个事件时,如果我调试它,我可以看到.map方法被触发,我返回事件的值,在这里是一个字符串。但是,在curl控制台中,我看不到事件。
但是,如果我不使用kafka,我创建了一个topicprocessor,那么我可以在curl中看到事件。我做错什么了吗?
1条答案
按热度按时间t3psigkw1#
免责声明:我对Kafka的了解有限。
Kafka的主题是连续的,所以
Flux
主题的表示不发出onComplete
. 因此,spring框架将扭转这种局面Flux
使用分块编码的流式响应。。。cUrl
期望从服务器返回默认的有限响应。它执行一点缓冲,总体上不一定在控制台上写入http块。但是在这种流式响应的情况下,您需要它做的是在数据可用时立即输出数据。这可以通过
--no-buffer
旗cUrl
.