当kafka重新启动时,logstash将丢失与kafka的输入连接

slmsl1lt  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(945)

我已经建立了一个elak(elasticsearch+logstash+kibana)堆栈,数据来自kafka安装。写给Kafka的主题被logstash使用,logstash最终将它们存储到elasticsearch。每个不同的产品都在docker swarm集群中的一个单独的容器中运行,只要Kafka没有失败,一切都正常。如果我重新启动kafka容器,logstash将永远失去与kafka的连接。在这种情况下,恢复logstash和kafka之间连接的唯一方法是重新启动logstash。
以下是我在Kafka重启时从logstash得到的日志:

[2019-11-25T11:42:12,420][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-5, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,420][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-11, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,422][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-4, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,422][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-8, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,422][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-0, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,425][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-4, groupId=logstash] Discovered group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null)
[2019-11-25T11:42:12,426][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-4, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,426][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-0, groupId=logstash] Discovered group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null)
[2019-11-25T11:42:12,426][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-0, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,426][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-11, groupId=logstash] Discovered group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null)
[2019-11-25T11:42:12,427][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-11, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,428][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-12, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,428][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-10, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,428][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-14, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,428][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-9, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,429][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-1, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,431][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-14, groupId=logstash] Discovered group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null)
[2019-11-25T11:42:12,431][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-10, groupId=logstash] Discovered group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null)
[2019-11-25T11:42:12,432][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-14, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,432][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-10, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,434][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-13, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,434][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-7, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,434][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-2, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,438][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-3, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,438][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-6, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,481][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-docker-desktop-12, groupId=logstash] Connection to node 1001 (b0679a32c0ac/10.0.2.43:9092) could not be established. Broker may not be available.
[2019-11-25T11:42:12,481][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-docker-desktop-1, groupId=logstash] Connection to node 1001 (b0679a32c0ac/10.0.2.43:9092) could not be established. Broker may not be available.
[2019-11-25T11:42:12,484][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-docker-desktop-7, groupId=logstash] Connection to node 1001 (b0679a32c0ac/10.0.2.43:9092) could not be established. Broker may not be available.
[2019-11-25T11:42:12,484][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-docker-desktop-2, groupId=logstash] Connection to node 1001 (b0679a32c0ac/10.0.2.43:9092) could not be established. Broker may not be available.
[2019-11-25T11:42:12,484][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-docker-desktop-13, groupId=logstash] Connection to node 1001 (b0679a32c0ac/10.0.2.43:9092) could not be established. Broker may not be available.
[2019-11-25T11:42:12,489][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-docker-desktop-6, groupId=logstash] Connection to node 1001 (b0679a32c0ac/10.0.2.43:9092) could not be established. Broker may not be available.
[2019-11-25T11:42:12,526][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-docker-desktop-5, groupId=logstash] Connection to node 1001 (b0679a32c0ac/10.0.2.43:9092) could not be established. Broker may not be available.
[2019-11-25T11:42:12,526][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-docker-desktop-8, groupId=logstash] Connection to node 1001 (b0679a32c0ac/10.0.2.43:9092) could not be established. Broker may not be available.
[2019-11-25T11:42:12,529][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-docker-desktop-9, groupId=logstash] Connection to node 1001 (b0679a32c0ac/10.0.2.43:9092) could not be established. Broker may not be available.
[2019-11-25T11:42:12,540][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-docker-desktop-3, groupId=logstash] Connection to node 1001 (b0679a32c0ac/10.0.2.43:9092) could not be established. Broker may not be available.
[2019-11-25T11:42:12,577][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-docker-desktop-4, groupId=logstash] Connection to node 1001 (b0679a32c0ac/10.0.2.43:9092) could not be established. Broker may not be available.
[2019-11-25T11:42:12,578][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-docker-desktop-11, groupId=logstash] Connection to node 1001 (b0679a32c0ac/10.0.2.43:9092) could not be established. Broker may not be available.

以及Kafka关于Kafka重启后日志存储的日志:

[2019-11-25 11:43:04,639] INFO [GroupCoordinator 1001]: Member logstash-docker-desktop-10-2e6bedb9-e420-4fb9-b00c-e450f2c9bce2 in group logstash has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-11-25 11:43:04,641] INFO [GroupCoordinator 1001]: Preparing to rebalance group logstash in state PreparingRebalance with old generation 17 (__consumer_offsets-49) (reason: removing member logstash-docker-desktop-10-2e6bedb9-e420-4fb9-b00c-e450f2c9bce2 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
[2019-11-25 11:43:04,643] INFO [GroupCoordinator 1001]: Member logstash-docker-desktop-0-3bb02e48-b4f4-424e-9233-c56e6af44d3b in group logstash has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-11-25 11:43:04,643] INFO [GroupCoordinator 1001]: Member logstash-docker-desktop-7-fd07dca9-2dd1-4bbc-b2e0-f89973a48dba in group logstash has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-11-25 11:43:04,643] INFO [GroupCoordinator 1001]: Member logstash-docker-desktop-6-ca63542a-8658-4438-93a2-a9414fe2a641 in group logstash has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-11-25 11:43:04,643] INFO [GroupCoordinator 1001]: Member logstash-docker-desktop-5-028c55cc-a5a4-4339-add0-57a059191675 in group logstash has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-11-25 11:43:04,643] INFO [GroupCoordinator 1001]: Member logstash-docker-desktop-1-7e18cf0b-ecfa-4bc4-a517-d1a89168c092 in group logstash has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-11-25 11:43:04,644] INFO [GroupCoordinator 1001]: Member logstash-docker-desktop-14-c799054a-bdd0-4903-8926-e6044271a553 in group logstash has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-11-25 11:43:04,644] INFO [GroupCoordinator 1001]: Member logstash-docker-desktop-13-9e802762-fae6-4983-a031-0d1bc867fcb0 in group logstash has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-11-25 11:43:04,644] INFO [GroupCoordinator 1001]: Member logstash-docker-desktop-3-4bee5bf1-12a7-42ee-b785-93dfec6726dd in group logstash has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-11-25 11:43:04,644] INFO [GroupCoordinator 1001]: Member logstash-docker-desktop-11-65fa14aa-1ec8-49d4-a840-e6b796352660 in group logstash has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-11-25 11:43:04,644] INFO [GroupCoordinator 1001]: Member logstash-docker-desktop-4-de8eaf3d-baf4-425f-a1a9-e4378b759fe4 in group logstash has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-11-25 11:43:04,645] INFO [GroupCoordinator 1001]: Member logstash-docker-desktop-12-f8b42828-fb86-4000-92f9-18c0075ac2f7 in group logstash has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-11-25 11:43:04,645] INFO [GroupCoordinator 1001]: Member logstash-docker-desktop-2-4389c233-c115-419d-bd8c-4161eeebc1c6 in group logstash has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-11-25 11:43:04,645] INFO [GroupCoordinator 1001]: Member logstash-docker-desktop-8-dc5c88bc-dde4-4628-955e-ab87b2a70376 in group logstash has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-11-25 11:43:04,645] INFO [GroupCoordinator 1001]: Member logstash-docker-desktop-9-4dcbfb2a-af32-45f0-bd02-dff26f9655dc in group logstash has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-11-25 11:43:04,647] INFO [GroupCoordinator 1001]: Group logstash with generation 18 is now empty (__consumer_offsets-49) (kafka.coordinator.group.GroupCoordinator)

logstash.conf中logstash的配置如下所示:

input {
  kafka {
    bootstrap_servers => "kafka:9092"
    group_id => "logstash"
    topics_pattern => ".*"
    consumer_threads => 15
    metadata_max_age_ms => "30000"
    decorate_events => true
    codec => json
  }
}

Kafka的docker服务有这样的定义:

kafka:
    image: wurstmeister/kafka:2.12-2.3.0
    depends_on:
      - zookeeper
    networks:
      - internal
    ports:
      - 9094:9094
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 3000
      KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE: 'true'
      KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094,PLAINTEXT://:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://:8088,PLAINTEXT://:9093
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_LOG_DIRS: /kafka/kafka-logs
      KAFKA_LOG_RETENTION_HOURS: 24
      KAFKA_DELETE_TOPIC_ENABLE: "true"
      KAFKA_NUM_PARTITIONS: 2
      JMX_PORT: 9997

有人能给我一个提示,如果可以配置logstash或Kafka在不同的方式,使logstash恢复连接Kafka自动不重新启动logstash?
提前谢谢!

6qqygrtg

6qqygrtg1#

我看你没有具体说明 reconnect_backoff_ms https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-输入-Kafka-重新连接\u退避\u ms我想这就是你所缺少的。

相关问题