我试图在filebeat和logstash之间实现Kafka。
当把filebeat发送给kafka(它是文本格式的)和logstash时。为此,我假设,logstash无法处理输入。
当数据来自Kafka,直接从filebeat到logstash时,情况就不同了。
来自Kafka:
{
"message" => "nice",
"tags" => [
[0] "kafka-stream"
],
"@timestamp" => 2020-06-30T08:29:29.071Z,
"@version" => "1"
}
{
"message" => "{\"@timestamp\":\"2020-06-30T08:34:28.178Z\",\"@metadata\":{\"beat\":\"filebeat\",\"type\":\"_doc\",\"version\":\"7.8.0\"},\"agent\":{\"hostname\":\"Smits-MacBook-Pro.local\",\"ephemeral_id\":\"b9779246-3cc9-408b-83ac-e69eeef3cd28\",\"id\":\"864be1a9-e233-4d41-8624-cf94e916a0b7\",\"name\":\"Smits-MacBook-Pro.local\",\"type\":\"filebeat\",\"version\":\"7.8.0\"},\"log\":{\"offset\":11341,\"file\":{\"path\":\"/Users/Smit/Downloads/chrome/observability/spring_app_log_file.log\"}},\"message\":\"2020-06-30 16:34:20.328 INFO 63741 --- [http-nio-8080-exec-7] c.e.o.controller.HomeController : AUDIT_LOG >> customer id a8703\",\"tags\":[\"observability\",\"audit\"],\"input\":{\"type\":\"log\"},\"ecs\":{\"version\":\"1.5.0\"},\"host\":{\"name\":\"Smits-MacBook-Pro.local\"}}",
"tags" => [
[0] "kafka-stream"
],
"@timestamp" => 2020-06-30T08:34:29.222Z,
"@version" => "1"
}
来自filebeat:
{
"type" => "log",
"@timestamp" => 2020-06-30T04:37:18.935Z,
"@version" => "1",
"log" => {
"file" => {
"path" => "/Users/Smit/Downloads/chrome/observability/spring_app_log_file.log"
},
"offset" => 10846
},
"input" => {
"type" => "log"
},
"ecs" => {
"version" => "1.5.0"
},
"message" => "2020-06-30 12:37:16.900 INFO 63741 --- [http-nio-8080-exec-3] c.e.o.controller.HomeController : AUDIT_LOG >> customer id d6ebe",
"tags" => [
[0] "observability",
[1] "audit",
[2] "beats",
[3] "beats_input_codec_plain_applied"
],
"hostname" => {
"name" => "Smits-MacBook-Pro.local"
},
"agent" => {
"type" => "filebeat",
"version" => "7.8.0",
"name" => "Smits-MacBook-Pro.local",
"hostname" => "Smits-MacBook-Pro.local",
"ephemeral_id" => "1ca4e838-eeaa-4b87-b52a-89fa385865b8",
"id" => "864be1a9-e233-4d41-8624-cf94e916a0b7"
}
}
现在,当我在kibana中可视化数据时:
以下是日志从filebeat直接发送到logstash时的输出:
以下是日志从filebeat到kafka再到logstash时的输出:
如果你需要更多的信息,请告诉我。
elk中每个产品的配置如下:https://github.com/shah-smit/observability-spring-demo
1条答案
按热度按时间23c0lvtd1#
当您将日志直接发送到logstash时,您使用beats协议,该协议为您的事件添加了一些“额外”字段:
https://www.elastic.co/guide/en/beats/filebeat/current/exported-fields-beat-common.html
我不确定kafka输出使用哪种协议,但可以肯定的是,这不会添加额外的字段。
因此,当您将日志发送到kafka,然后从logstash中读取它们时,字段就更少了。