如何检查kafka服务器是否正在运行?

khbbv19g  于 2021-06-07  发布在  Kafka
关注(0)|答案(9)|浏览(935)

在开始生产和消费作业之前,我要确保kafka服务器是否正在运行。它在windows环境下,下面是我的kafka服务器在eclipse中的代码。。。

Properties kafka = new Properties();
kafka.setProperty("broker.id", "1");
kafka.setProperty("port", "9092");
kafka.setProperty("log.dirs", "D://workspace//");
kafka.setProperty("zookeeper.connect", "localhost:2181");    
Option<String> option = Option.empty();
KafkaConfig config = new KafkaConfig(kafka);        
KafkaServer server = new KafkaServer(config, new CurrentTime(), option);
server.startup();

在这种情况下 if (server != null) 这是不够的,因为它总是正确的。所以有没有办法知道,我的Kafka服务器正在运行,并准备生产。我有必要检查这个,因为它会导致一些起始数据包丢失。

fcipmucu

fcipmucu1#

好的选择是在开始生成或使用消息之前使用adminclient,如下所示

private static final int ADMIN_CLIENT_TIMEOUT_MS = 5000;           
 try (AdminClient client = AdminClient.create(properties)) {
            client.listTopics(new ListTopicsOptions().timeoutMs(ADMIN_CLIENT_TIMEOUT_MS)).listings().get();
        } catch (ExecutionException ex) {
            LOG.error("Kafka is not available, timed out after {} ms", ADMIN_CLIENT_TIMEOUT_MS);
            return;
        }
8yparm6h

8yparm6h2#

如果服务器正在运行,您可以使用下面的代码检查可用的代理。

import org.I0Itec.zkclient.ZkClient;
     public static boolean isBrokerRunning(){
        boolean flag = false;
        ZkClient zkClient = new ZkClient(endpoint.getZookeeperConnect(), 10000);//, kafka.utils.ZKStringSerializer$.MODULE$);
        if(zkClient!=null){
            int brokersCount = zkClient.countChildren(ZkUtils.BrokerIdsPath());
            if(brokersCount > 0){
                logger.info("Following Broker(s) {} is/are available on Zookeeper.",zkClient.getChildren(ZkUtils.BrokerIdsPath()));
                flag = true;    
            }
            else{
                logger.error("ERROR:No Broker is available on Zookeeper.");
            }
            zkClient.close();

        }
        return flag;
    }
cygmwpex

cygmwpex3#

保罗的回答很好,从经纪人的Angular 来看,Kafka和zk实际上是如何合作的。
我想说,检查kafka服务器是否正在运行的另一个简单选项是创建一个指向cluste的简单kafkaconsumer并尝试一些操作,例如listtopics()。如果kafka服务器没有运行,您将得到timeoutexception,然后可以使用 try-catch 判决。

def validateKafkaConnection(kafkaParams : mutable.Map[String, Object]) : Unit = {
    val props = new Properties()
    props.put("bootstrap.servers", kafkaParams.get("bootstrap.servers").get.toString)
    props.put("group.id", kafkaParams.get("group.id").get.toString)
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    val simpleConsumer = new KafkaConsumer[String, String](props)
    simpleConsumer.listTopics()
  }
oxcyiej7

oxcyiej74#

首先,您需要创建adminclient bean:

@Bean
 public AdminClient adminClient(){
   Map<String, Object> configs = new HashMap<>();
   configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
   StringUtils.arrayToCommaDelimitedString(new Object[]{"your bootstrap server address}));
   return AdminClient.create(configs);
 }

然后,您可以使用以下脚本:

while (true) {
   Map<String, ConsumerGroupDescription> groupDescriptionMap =
         adminClient.describeConsumerGroups(Collections.singletonList(groupId))
         .all()
         .get(10, TimeUnit.SECONDS);

   ConsumerGroupDescription consumerGroupDescription = groupDescriptionMap.get(groupId);

   log.debug("Kafka consumer group ({}) state: {}",
                groupId,
                consumerGroupDescription.state());

   if (consumerGroupDescription.state().equals(ConsumerGroupState.STABLE)) {
        boolean isReady = true;
        for (MemberDescription member : consumerGroupDescription.members()) {
            if (member.assignment() == null || member.assignment().topicPartitions().isEmpty()) {
            isReady = false;
            }
        }

        if (isReady) {
            break;
           }
        }

        log.debug("Kafka consumer group ({}) is not ready. Waiting...", groupId);
        TimeUnit.SECONDS.sleep(1);
}

此脚本将每秒检查使用者组的状态,直到状态稳定为止。因为所有使用者都分配到主题分区,所以您可以断定服务器正在运行并准备就绪。

ej83mcc0

ej83mcc05#

我使用了adminclient api。

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("connections.max.idle.ms", 10000);
properties.put("request.timeout.ms", 5000);
try (AdminClient client = KafkaAdminClient.create(properties))
{
    ListTopicsResult topics = client.listTopics();
    Set<String> names = topics.names().get();
    if (names.isEmpty())
    {
        // case: if no topic found.
    }
    return true;
}
catch (InterruptedException | ExecutionException e)
{
    // Kafka is not available
}
jq6vz3qz

jq6vz3qz6#

我发现了一个事件 OnError 在合流Kafka中:

consumer.OnError += Consumer_OnError;

 private void Consumer_OnError(object sender, Error e)
    {
        Debug.Log("connection error: "+ e.Reason);
        ConsumerConnectionError(e);
    }

及其代码文档:

//
    // Summary:
    //     Raised on critical errors, e.g. connection failures or all brokers down. Note
    //     that the client will try to automatically recover from errors - these errors
    //     should be seen as informational rather than catastrophic
    //
    // Remarks:
    //     Executes on the same thread as every other Consumer event handler (except OnLog
    //     which may be called from an arbitrary thread).
    public event EventHandler<Error> OnError;
yr9zkbsy

yr9zkbsy7#

所有Kafka经纪人都必须被指派 broker.id . 启动时,代理将在zookeeper中创建一个临时节点,路径为 /broker/ids/$id . 由于节点是短暂的,一旦代理断开连接(例如关闭),它就会被删除。
您可以查看临时代理节点的列表,如下所示: echo dump | nc localhost 2181 | grep brokers zookeeper客户端界面公开了许多命令; dump 列出集群的所有会话和临时节点。
注意,以上假设:
您正在默认端口上运行zookeeper( 2181 )在 localhost ,然后呢 localhost 是集群的领导者
你的 zookeeper.connect kafka config没有为kafka集群指定chroot env,即它只是 host:port 而不是 host:port/path

bmp9r5qi

bmp9r5qi8#

您可以在机器上安装kafkacat工具
例如,在ubuntu上,您可以使用

apt-get install kafkacat

一旦安装了kafkacat,您就可以使用下面的命令来连接它

kafkacat -b <your-ip-address>:<kafka-port> -t test-topic

用机器ip替换
可以替换为kafka正在运行的端口。通常是9092
运行上述命令后,如果kafkacat能够建立连接,则表示kafka已启动并正在运行

ars1skjm

ars1skjm9#

对于linux,“ps aux | grep kafka”查看结果中是否显示kafka属性。e、 g./path/to/kafka/server.properties

相关问题