logsatsh和kafka

kq0g1dla  于 2021-06-08  发布在  Kafka
关注(0)|答案(3)|浏览(309)

我的服务器上运行单节点kafka。我使用以下命令创建主题“bin/kafka-topics.sh--create--zookeeperlocalhost:2181 --replication-factor 1--分区1--主题测试”。我有两个logstash示例正在运行。第一个从某个java应用程序日志文件中读取数据并将其注入kafka。它工作得很好,我可以在控制台上使用“bin/kafka-console-consumer.sh--zookeeper”查看kafka中的数据localhost:2181 --topic test--from start“命令。但是另一个从kafka(同一主题“test”)读取并注入elasicsearch的logstash示例失败了。第二个logstash示例无法读取数据。我将其配置文件更改为从kafka读取并在控制台上打印,然后它也不会输出任何内容。以下是失败日志存储的配置文件:

// config file
     input {
        kafka {
        zk_connect => "localhost:2181"
        topic_id => "test" 
        }
        }
        output {
        stdout{}
        }

logstash既不打印任何内容,也不抛出任何错误。我使用logstash2.4和kafka0.10。我用了Kafka快速入门指南(http://kafka.apache.org/documentation.html#quickstart)

rta7y2nd

rta7y2nd1#

请看一下Kafka输入配置选项https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html
请找到Kafka数据的logstash配置并将其推送到elk堆栈。

input {
  kafka {
    bootstrap_servers => "localhost:9092"
    topics => ["topic_name"]
  }
}

output{
    elasticsearch{
        hosts => ["http://localhost:9200/"]
        index => "index_name"
    }
}

希望有帮助!

shstlldc

shstlldc2#

@wjp 
Hi wjp, I am running single node kafka cluster. There is no Schema Registry running. zookeeper is also running.   I used following command to create topic "bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test".  I have two logstash instances running. First one reads data from some java application log file inject the same to the kafka. It works fine, I can see data in kafka on console using "bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning" command. But the other logstash instance which reads from kafka(same topic "test") and injects into elasicsearch, is failing. This second instance of logstash fails to read data. I changed its configuration file to read from kafka and print on console, then also it does not output anything.Here is the config file for failing logstash:
input {
kafka {
zk_connect => "localhost:2181"
topic_id => "test" 
}
}
output {
stdout{}
}
Logstash neither print anything nor it throws any error.
I am using Logstash 2.4 and kafka 0.10.
I used kafka quick start guide (http://kafka.apache.org/documentation.html#quickstart)
rkue9o1l

rkue9o1l3#

如果您查看kafka输入插件配置,您可以看到一个重要的参数,它允许连接到kafka集群:zk\u connect。
根据文档,它默认设置为localhost:2181. 确保将其设置为kafka集群示例,或者理想情况下设置为多个示例,具体取决于您的设置。
例如,假设您使用json主题连接到一个三节点kafka集群。配置如下:

kafka {
 topic_id => "your_topic"
 zk_connect => "kc1.host:2181,kc2.host:2181,kc3.host:2181"
}

另外,为主题配置正确的编解码器也很重要。上面的示例将处理json事件。如果使用avro,则需要设置另一个参数codec。有关如何配置它的详细信息,请参见文档页面。它基本上需要指向avro模式文件,可以指定为avsc文件或模式注册表端点(在我看来,这是更好的解决方案)。
如果您的kafka环境中运行了schema registry,您可以将编解码器指向它的url。一个完整的例子是:

kafka {
 codec => avro_schema_registry { endpoint => "http://kc1.host:8081"}
 topic_id => "your_topic"
 zk_connect => "kc1.host:2181,kc2.host:2181,kc3.host:2181"
}

希望有用!

相关问题