我创建了一个https://github.com/mongodb/mongo-kafka
但是如何运行它来连接我正在运行的kafka示例。
甚至这个问题听起来有多蠢。但似乎没有可用的文档使其能够在本地运行 replicaset
的 mongodb
.
所有的博客都指向使用MongoAtlas。
如果你有一个很好的资源,请引导我走向它。
更新1--
使用maven插件-https://search.maven.org/artifact/org.mongodb.kafka/mongo-kafka-connect
把它放进Kafka插件,重启Kafka。
更新2——如何启用mongodb作为kafka的源代码?
https://github.com/mongodb/mongo-kafka/blob/master/config/mongosourceconnector.properties
要用作Kafka配置的文件
bin/kafka-server-start.sh config/server.properties --override config/MongoSourceConnector.properties
更新3-上面的方法没有工作回到博客没有提到端口8083是什么。
安装汇合和汇合枢纽,仍然不确定的蒙戈连接器与Kafka工作。
更新4-
zookeeper、kafka服务器、kafka连接正在运行
mongo kafka库文件kafka connect avro连接器库文件
使用下面的命令,我的源代码开始工作-
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/connect-standalone.sh config/connect-standalone.properties config/MongoSourceConnector.properties
使用下面的logstash配置,我可以将数据推入elasticsearch-
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["users","organisations","skills"]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
}
stdout { codec => rubydebug }
}
因此,现在一个mongosourceconnector.properties保留了一个从中读取的集合名称,我需要为每个集合运行带有不同属性文件的kafka connect。
我的日志库将新数据推入elasticsearch,而不是更新旧数据。另外,它不会根据集合的名称创建索引。想法是这应该能够与我的mongodb数据库完美同步。
最后更新-一切都进展顺利,
为kafka connect创建了多个属性文件
最新的logstash实际上根据主题名创建索引,并相应地更新索引
input {
kafka {
bootstrap_servers => "localhost:9092"
decorate_events => true
topics => ["users","organisations","skills"]
}
}
filter {
json {
source => "message"
target => "json_payload"
}
json {
source => "[json_payload][payload]"
target => "payload"
}
mutate {
add_field => { "[es_index]" => "%{[@metadata][kafka][topic]}" }
rename => { "[payload][fullDocument][_id][$oid]" => "mongo_id"}
rename => { "[payload][fullDocument]" => "document"}
remove_field => ["message","json_payload","payload"]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "%{es_index}"
action => "update"
doc_as_upsert => true
document_id => "%{mongo_id}"
}
stdout {
codec =>
rubydebug {
metadata => true
}
}
}
2条答案
按热度按时间6fe3ivhb1#
端口8083是kafka connect,从
connect-*.sh
脚本。它是独立于代理的,并且不从代理设置属性
kafka-server-start
zlhcx6iw2#
成功使mongodb与elasticsearch同步的步骤-
首先部署mongodb副本-
为kafka创建配置属性
确保您的kafka插件可以使用下面url中的jar文件-
maven中央存储库搜索
Kafka连接avro转换器
部署Kafka
配置日志存储-
启动elasticsearch,kibana和logstash
测试
打开蒙哥罗盘
创建一个集合,在logstash主题中提到这些集合,并为kafka创建属性文件
向其中添加数据
更新数据
elasticsearch中的审阅索引