kafka flux在静止控制器中不工作

s3fp2yjn  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(352)

我有以下控制器:

private final Flux<ReceiverRecord<String, String>> fluxReceiver;

@GetMapping(value = "/events", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<String> getEvents() {
    return fluxReceiver.map(x -> x.value());
}

我宣布了豆子通量>,就像这样:

@Bean
public Flux<ReceiverRecord<String, String>> fluxReceiver() {
    String bootstrapServers = "localhost:9092";
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, "sample-consumer");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-group");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(props);

    ReceiverOptions<String, String> options = receiverOptions.subscription(Collections.singleton(TOPIC));

    return KafkaReceiver.create(options).receive();

}

因此,我可以将它注入控制器,并在有人调用rest端点/事件时返回它。
我用curl调用那个端点。问题是,当我发送一个事件时,如果我调试它,我可以看到.map方法被触发,我返回事件的值,在这里是一个字符串。但是,在curl控制台中,我看不到事件。
但是,如果我不使用kafka,我创建了一个topicprocessor,那么我可以在curl中看到事件。我做错什么了吗?

t3psigkw

t3psigkw1#

免责声明:我对Kafka的了解有限。
Kafka的主题是连续的,所以 Flux 主题的表示不发出 onComplete . 因此,spring框架将扭转这种局面 Flux 使用分块编码的流式响应。。。 cUrl 期望从服务器返回默认的有限响应。它执行一点缓冲,总体上不一定在控制台上写入http块。但是在这种流式响应的情况下,您需要它做的是在数据可用时立即输出数据。
这可以通过 --no-buffercUrl .

相关问题