把Kafka的主题交给mongodb

k4aesqcs  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(331)

我有一个项目,需要使用java从json文件中获取数据,并将其放入kafka主题,然后将该主题中的数据放入mongodb。我已经找到了kafka mongodb连接器,但是文档只能用于使用合流平台进行连接。我试过:
从maven下载mongo-kafka-connect-1.2.0.jar。
将文件放入/kafka/plugins
在connect-standalone.properties中添加了此行“plugin.path=c:\kafka\plugins”。
已创建mongosinkconnector.properties。

name=mongo-sink
topics=test
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
tasks.max=1
key.ignore=true

# Specific global MongoDB Sink Connector configuration

connection.uri=mongodb://localhost:27017
database=student_kafka
collection=students
max.num.retries=3
retries.defer.timeout=5000
type.name=kafka-connect

然后我执行了命令 .\bin\windows\connect-standalone.bat .\config\connect-standalone.properties .\config\MongoSinkConnector.properties 我犯了这个错误

[2020-08-09 20:18:30,329] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone)
java.util.concurrent.ExecutionException: java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: com/mongodb/ConnectionString
        at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:115)
        at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:99)
        at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:118)
Caused by: java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: com/mongodb/ConnectionString
        at com.mongodb.kafka.connect.sink.MongoSinkConfig.createConfigDef(MongoSinkConfig.java:248)
        at com.mongodb.kafka.connect.sink.MongoSinkConfig.<clinit>(MongoSinkConfig.java:139)
        at com.mongodb.kafka.connect.MongoSinkConnector.config(MongoSinkConnector.java:72)
        at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:366)
        at org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$1(AbstractHerder.java:326)
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.util.concurrent.FutureTask.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NoClassDefFoundError: com/mongodb/ConnectionString
        ... 10 more
Caused by: java.lang.ClassNotFoundException: com.mongodb.ConnectionString
        at java.net.URLClassLoader.findClass(Unknown Source)
        at java.lang.ClassLoader.loadClass(Unknown Source)
        at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
        at java.lang.ClassLoader.loadClass(Unknown Source)
        ... 10 more

编辑:多亏博格丹苏卡丘的帮助,我找到了解决这个问题的方法。
您需要将以下jar添加到kafka/lib文件夹中。
mongodb-driver-3.12.7.jar和mongodb-driver-core-3.12.7.jar以及mongo-java-driver-3.12.6.jar和mongo-kafka-connect-1.0.1.jar。
ps:我在使用最新的mongo kafka connect时遇到了一些问题。所以我不得不用这个版本。

c0vxltue

c0vxltue1#

您缺少mongodb驱动程序。mongodb连接器jar只包含与kafka connect相关的类,但它仍然需要一个驱动程序才能连接到mongodb示例。您需要下载该驱动程序并将jar文件复制到发布连接器的同一路径( C:\kafka\plugins ).
为了保持干净,您还应该在插件目录中创建另一个文件夹(例如: C:\kafka\plugins\mongodb )把所有和这个连接器有关的东西移到那里。
以后编辑:
我检查了kafka connect和mongodb接收器连接器的旧(er)设置,发现了以下jar:

这让我相信 kafka-connect-mongdb jar和瓶子 mongodb-driver 还不够。不过你可以试试。

相关问题