我正在使用Kafka流的功能方法,我想通过检查线程状态来获取运行状况

nr9pn0ug  于 2023-08-02  发布在  Apache
关注(0)|答案(1)|浏览(124)

我的问题与下面的问题类似。我想检查通过函数方法编码的Kstream应用程序的运行状况。Spring Actuator + Kafka Streams - Add kafka stream status to health check endpoint
在上面的链接中,答案是根据自动装配Kafka Streams给出的。我无法自动接线,因为它给出以下错误。
. <package_name>metrics.KafkaStreamsHealthIndicator中的字段kafkaStreams需要类型为'org.apache.kafka.streams.KafkaStreams'的bean,但找不到。
我试着添加下面的类作为上面的链接解释,但它给出了Kafka流自动装配的错误

`import org.apache.kafka.streams.KafkaStreams;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.stereotype.Component;

//Note that class name prefix before `HealthIndicator` will be camel-cased
//and used as a health component name, `kafkaStreams` here
@Component
@EnableKafkaStreams
public class KafkaStreamsHealthIndicator implements HealthIndicator {
//    StreamsBuilder streamsBuilder = new StreamsBuilder();

    //if you have multiple instances, inject as Map<String, KafkaStreams>
    //Spring will map KafkaStreams instances by bean names present in context
    //so you can provide status details for each stream by name
    @Autowired
    private KafkaStreams kafkaStreams;

    @Override
    public Health health() {
        KafkaStreams.State kafkaStreamsState = kafkaStreams.state();

        // CREATED, RUNNING or REBALANCING
        if (kafkaStreamsState == KafkaStreams.State.CREATED || kafkaStreamsState.isRunningOrRebalancing()) {
            //set details if you need one
            return Health.up().build();
        }

        // ERROR, NOT_RUNNING, PENDING_SHUTDOWN,
        return Health.down().withDetail("state", kafkaStreamsState.name()).build();
    }
}`

字符串

axr492tv

axr492tv1#

KafkaStreams不是spring-kafka模块中定义的bean,相反,您应该使用StreamsBuilderFactoryBean,它在初始化KafkaStreams后公开它们,并提供其他spring和kafka相关的生命周期方法。
您还可以连接到Kafka streams状态侦听器,从那里更改健康状态。

相关问题