如何运行mongo-kafka连接器作为kafka的源,并将其与logstash输入集成以使用elasticsearch作为接收器?

yqlxgs2m  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(247)

我创建了一个https://github.com/mongodb/mongo-kafka
但是如何运行它来连接我正在运行的kafka示例。
甚至这个问题听起来有多蠢。但似乎没有可用的文档使其能够在本地运行 replicasetmongodb .
所有的博客都指向使用MongoAtlas。
如果你有一个很好的资源,请引导我走向它。
更新1--
使用maven插件-https://search.maven.org/artifact/org.mongodb.kafka/mongo-kafka-connect
把它放进Kafka插件,重启Kafka。
更新2——如何启用mongodb作为kafka的源代码?
https://github.com/mongodb/mongo-kafka/blob/master/config/mongosourceconnector.properties
要用作Kafka配置的文件

bin/kafka-server-start.sh config/server.properties --override config/MongoSourceConnector.properties

更新3-上面的方法没有工作回到博客没有提到端口8083是什么。
安装汇合和汇合枢纽,仍然不确定的蒙戈连接器与Kafka工作。
更新4-
zookeeper、kafka服务器、kafka连接正在运行
mongo kafka库文件kafka connect avro连接器库文件
使用下面的命令,我的源代码开始工作-

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/connect-standalone.sh config/connect-standalone.properties config/MongoSourceConnector.properties

使用下面的logstash配置,我可以将数据推入elasticsearch-

input {
  kafka {
        bootstrap_servers => "localhost:9092"
        topics => ["users","organisations","skills"]
  }
}
output {
  elasticsearch {
        hosts => ["localhost:9200"]
  }
  stdout { codec => rubydebug }
}

因此,现在一个mongosourceconnector.properties保留了一个从中读取的集合名称,我需要为每个集合运行带有不同属性文件的kafka connect。
我的日志库将新数据推入elasticsearch,而不是更新旧数据。另外,它不会根据集合的名称创建索引。想法是这应该能够与我的mongodb数据库完美同步。
最后更新-一切都进展顺利,
为kafka connect创建了多个属性文件
最新的logstash实际上根据主题名创建索引,并相应地更新索引

input {
    kafka {
        bootstrap_servers => "localhost:9092"
        decorate_events => true
        topics => ["users","organisations","skills"]
    }
}
filter {
    json {
        source => "message"
        target => "json_payload"
    }

    json {
        source => "[json_payload][payload]"
        target => "payload"
    }

    mutate {
        add_field => { "[es_index]" => "%{[@metadata][kafka][topic]}" }
        rename => { "[payload][fullDocument][_id][$oid]" => "mongo_id"}
        rename => { "[payload][fullDocument]" => "document"}
        remove_field => ["message","json_payload","payload"]
    }
}
output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "%{es_index}"
        action => "update"
        doc_as_upsert => true
        document_id => "%{mongo_id}"
    }
    stdout {
        codec =>
        rubydebug {
            metadata => true
        }
    }
}
6fe3ivhb

6fe3ivhb1#

端口8083是kafka connect,从 connect-*.sh 脚本。
它是独立于代理的,并且不从代理设置属性 kafka-server-start

zlhcx6iw

zlhcx6iw2#

成功使mongodb与elasticsearch同步的步骤-
首先部署mongodb副本-

//Make sure no mongo deamon instance is running
//To check all the ports which are listening or open
sudo lsof -i -P -n | grep LISTEN 

//Kill the process Id of mongo instance
sudo kill 775

//Deploy replicaset
mongod --replSet "rs0" --bind_ip localhost --dbpath=/data/db

为kafka创建配置属性

//dummycollection.properties <- Filename
name=dummycollection-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1

# Connection and source configuration

connection.uri=mongodb://localhost:27017
database=dummydatabase
collection=dummycollection
copy.existing=true
topic.prefix=
poll.max.batch.size=1000
poll.await.time.ms=5000

# Change stream options

publish.full.document.only=true
pipeline=[]
batch.size=0
collation=

确保您的kafka插件可以使用下面url中的jar文件-
maven中央存储库搜索
Kafka连接avro转换器
部署Kafka

//Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

//Kaka Server
bin/kafka-server-start.sh config/server.properties

//Kaka Connect
bin/connect-standalone.sh config/connect-standalone.properties config/dummycollection.properties

配置日志存储-

// /etc/logstash/conf.d/apache.conf  <- File 
input {
  kafka {
        bootstrap_servers => "localhost:9092"
        decorate_events => true
        topics => ["dummydatabase.dummycollection"]
  }
}
filter {
    json {
        source => "message"
        target => "json_payload"
    }

    json {
        source => "[json_payload][payload]"
        target => "payload"
    }

    mutate {
        add_field => { "[es_index]" => "%{[@metadata][kafka][topic]}" }
        rename => { "[payload][fullDocument][_id][$oid]" => "mongo_id"}
        rename => { "[payload][fullDocument]" => "document"}
        remove_field => ["message","json_payload","payload"]
    }
}
output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "%{es_index}"
        action => "update"
        doc_as_upsert => true
        document_id => "%{mongo_id}"
    }
    stdout {
      codec =>
        rubydebug {
            metadata => true
        }
    }
}

启动elasticsearch,kibana和logstash

sudo systemctl start elasticsearch
sudo systemctl start kibana
sudo systemctl start logstash

测试
打开蒙哥罗盘
创建一个集合,在logstash主题中提到这些集合,并为kafka创建属性文件
向其中添加数据
更新数据
elasticsearch中的审阅索引

相关问题