我在用 Kibana 7.5.2
以及 Elastic search 7
.
我的第一种方法:我分别使用fluentd和metricbeat将日志数据和系统数据发送到我的kibana服务器。一切正常。我可以在探索页面上看到数据。但我有大量的数据。所以我在服务器之间加了Kafka。
我的第二种方法:现在我将日志数据和系统数据发送给kafka。然后从Kafka,我将它发送到kibana服务器。对于通过metricbeat的系统数据,我在kibana中得到@timestamp字段,对于通过fluent的日志数据,我没有得到@timestamp字段。
如果我一个接一个地运行kafka服务器,一切都正常。但如果我把他们两个一起跑。“发现”选项卡中未显示系统数据。
系统数据的kibana索引: metricbeat-*
日志数据的kibana索引: prelive-data*
fluentd形态
<match laravel.**>
@type kafka2
# list of seed brokers
brokers <my-Broker-ip>:9092
<buffer topic>
@type file
path /var/log/td-agent/buffer/td
flush_interval 3s
</buffer>
# topic settings
# data type settings
<format>
@type json
</format>
# topic settings
topic_key prelive-data-log
default_topic prelive-data-log
# producer settings
compression_codec gzip
# max_send_retries 1
required_acks -1
</match>
格律节拍
# ----------------------------- KAFKA --------------------------------
output.kafka:
# # initial brokers for reading cluster metadata
hosts: ["<my-broker-ip>:9092"]
topic: 'metricbeat-7.6.0-2020.03.25-000001'
系统数据kafka服务器的worker.properties(metricbeat)
offset.storage.file.filename=/tmp/connectm.offsets
bootstrap.servers=:9092
offset.flush.interval.ms=10000
rest.port=10084
rest.host.name=localhost
rest.advertised.port=10085
rest.advertised.host.name=localhost
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
plugin.path=/usr/share/java
系统数据kafka服务器的filesource.properties(metricbeat)
name=config-name
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=metricbeat-7.6.0-2020.03.25-000001
key.ignore=true
schema.ignore=true
connection.url=http://<ES-IP>:9200
connection.username=username
connection.password=password
type.name=kafka-connect
~
系统数据的kafka服务器的worker.properties(fluentd)
offset.storage.file.filename=/tmp/connectf.offsets
bootstrap.servers=:9092
offset.flush.interval.ms=10000
rest.port=10082
rest.host.name=localhost
rest.advertised.port=10083
rest.advertised.host.name=localhost
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
plugin.path=/usr/share/java
系统数据kafka服务器(fluentd)的filesource.properties
name=<config-name>
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=2
topics=prelive-data-log
key.ignore=true
schema.ignore=true
connection.url=http://<my-ES-ip>:9200
connection.username=username
connection.password=password
type.name=kafka-connect
我在运行我的Kafka服务器 /usr/bin/connect-standalone worker.properties filesource.properties
我被困在这里一个星期了。任何帮助都将不胜感激。
暂无答案!
目前还没有任何答案,快来回答吧!