logstash:elasticsearch输出和非结构化数据

wnrlj8wa  于 2021-06-15  发布在  ElasticSearch
关注(0)|答案(1)|浏览(483)

filebeat.yml文件:

  1. filebeat.inputs:
  2. - type: log
  3. paths:
  4. - C:\Program Files\Filebeat\test_logs\*\*\*\*.txt
  5. exclude_lines: ['^Infobase.+']
  6. output.logstash:
  7. hosts: ["localhost:5044"]
  8. worker: 1

filebeat从这样的文件夹结构收集日志:

  1. C:\Program Files\Filebeat\test_logs\*\*\*\*.txt

这里有许多文件夹,每个文件夹的末尾至少有几个日志。
日志文件示例(在多个日志文件中,时间可能相同,因为日志来自不同的用户):

  1. "03.08.2020 10:56:38","Event LClick","Type Menu","t=0","beg"
  2. "03.08.2020 10:56:38","Event LClick","Type Menu","Detail SomeDetail","t=109","end"
  3. "03.08.2020 10:56:40","Event LClick","t=1981","beg"
  4. "03.08.2020 10:56:40","Event LClick","t=2090","end"
  5. "03.08.2020 10:56:41","Event LClick","Type ToolBar","t=3026","beg"
  6. "03.08.2020 10:56:43","Event LClick","Type ToolBar","Detail User_Desktop","t=4477","end"
  7. "03.08.2020 10:56:44","Event FormActivate","Name Form_Name:IsaA","t=5444"
  8. "03.08.2020 10:56:51","Event LClick","t=12543","beg"
  9. "03.08.2020 10:56:51","Event LClick","t=12605","end"
  10. "03.08.2020 10:56:52","Event LClick","Form ","Type Label","Name Application.for.training","t=13853","beg"
  11. "03.08.2020 10:57:54","Event LClick","Form Application.for.training","Type Label","Name Application.for.training","t=75442","end"
  12. "03.08.2020 10:57:54","Event FormActivate","Name List.form","t=75785"
  13. "03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85769","beg"
  14. "03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85769","end"
  15. "03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85847","beg"
  16. "03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85847","end"
  17. "03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85879","beg"
  18. "03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85879","end"
  19. "03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85925","beg"
  20. "03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85925","end"
  21. "03.08.2020 10:58:08","Event LClick","Form List.form","Type FormTable","Name Список","t=89373","beg"
  22. "03.08.2020 10:58:08","Event LClick","Form List.form","Type FormTable","Name Список","Detail Data","t=89451","end"
  23. "03.08.2020 10:58:15","Event LClick","Form List.form","Type FormTable","Name Список","t=96580","beg"
  24. "03.08.2020 10:58:15","Event LClick","Form List.form","Type FormTable","Name Список","Detail Data","t=96643","end"

日志存储确认文件:

  1. input {
  2. beats {
  3. port => '5044'
  4. }
  5. }
  6. filter {
  7. grok {
  8. patterns_dir => ['./patterns']
  9. match => { 'message' => '%{TIME:timestamp}(","Event\s)(?<Event>([^"]+))(","Form\s)?(?<Form>([^"]+))?(","ParentType\s)?(?<parent_type>([^"]+))?(","ParentName\s)?(?<parent_name>([^"]+))?(","Type\s)?(?<type>([^"]+))?(","Name\s)?(?<Name_of_form>([^"]+))?(","Detail\s)?(?<Detail>([^"]+))?(","t=)?(?<t>([\d]+))?(",")?(?<Status>(end|beg))?' }
  10. add_tag => [ '%{Status}' ]
  11. }
  12. dissect {
  13. mapping => {
  14. '[log][file][path]' => 'C:\Program Files\Filebeat\test_logs\%{somethingtoo}\%{something}\%{User_Name}\%{filename}.txt'
  15. }
  16. }
  17. date {
  18. match => [ 'timestamp', 'dd.MM.yyyy HH:mm:ss' ]
  19. }
  20. elapsed {
  21. unique_id_field => 'Event'
  22. start_tag => 'beg'
  23. end_tag => 'end'
  24. new_event_on_match => false
  25. }
  26. if 'elapsed' in [tags] {
  27. aggregate {
  28. task_id => '%{Event}'
  29. code => 'map["duration"] = [(event.get("elapsed_time")*1000).to_i]'
  30. map_action => 'create'
  31. }
  32. }
  33. mutate {
  34. remove_field => ['timestamp', 'ecs', 'log', 'tags', 'message', '@version', 'something', 'somethingtoo', 'filename', 'input', 'host', 'agent', 't', 'parent_type', 'parent_name', 'type']
  35. rename => {'elapsed_time' => 'Event_duration'}
  36. }
  37. }
  38. output {
  39. elasticsearch {
  40. hosts => ['localhost:9200']
  41. index => 'test'
  42. }
  43. }

在我的logstash.conf中,我使用了聚合过滤器并将worker 1(-w 1)设置为正常工作。
当我只使用一个日志文件进行测试和配置时,我设置了-w1,一切正常。但是当我开始收集每个目录的所有日志时,问题就出现了。数据没有正确地放入elasticsearch(这从基于聚合结果的奇怪数字中可以清楚地看出)
我试着在logstash输出(worker:1)的filebeat.yml中设置它,但仍然没有帮助。
问题:
也许你知道怎么解决这个问题?因为奇怪的是,对于一个日志文件或一个目录末尾的多个日志文件,所有内容都正常工作,当添加更多目录时,所有内容都会突然崩溃。
如果我正确理解了这个理论,那么elasticsearch就有了索引和类型。每个日志都有一个时间和一个用户名,这个用户名是谁的日志,也许我应该按日志时间把数据放在索引中,按用户名键入,这样不同用户的同一时间的日志就不会重叠。我应该如何实现这一点?我试图查找信息,只找到有关文档类型的信息,该类型已被弃用。

f2uvfpb9

f2uvfpb91#

您正在使用 elapsed 以及 aggregate 对于不唯一的字段,可以为 Event 字段,可以使 elapsed 筛选器使用一个文件中的开始事件和另一个文件中的结束事件。
这是因为filebeat harvester并行归档并将其批量发送到logstash。这个 worker 配置中的选项在您的情况下没有任何用处,它与传送数据的工作进程数有关,而不是与收集数据有关。
你可以尝试使用这个选项 harvester_limit: 1 ,以限制并行收割机的数量,但这会降低数据处理速度,并且不能保证不会混淆过滤器。而且,filebeat不保证事件的顺序,只保证至少一次交付。
最好的解决方案是创建一个连接 Eventfilename 字段,这样就不会混淆不同文件中的事件。
您可以通过添加 mutate 在您的 elapsed 过滤器。

  1. mutate {
  2. add_field => { "uniqueEvent" => "%{Event}_%{filename}" }
  3. }

这将创建一个名为 uniqueEvent 具有如下值 Lclick_filename ,然后您将在 elapsed 以及 aggregate 过滤器。
如果在不同的文件夹中有相同的文件名,则需要使用路径中的另一个字段,直到将 uniqueEvent 独特的价值。

相关问题