kafka连接mysql源代码

bf1o4zei  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(781)

在我开始之前,我想先说我对Kafka是完全陌生的,对linux也是相当陌生的,所以如果这最终是一个可笑的简单答案,请客气点!:)
我尝试做的高层次的想法是使用confluent的kafka connect从mysql数据库中读取传感器数据,该数据库在一分钟或亚分钟的基础上将传感器数据流到数据库中,然后使用kafka作为“etl管道”,将数据立即路由到数据仓库和/或mongodb,以便报告,甚至直接从我们的数据库连接到kafkaweb应用程序。
我使用robin moffatt的系列以及confluent的jdbc源代码连接器快速启动作为我的初始指南。至于它们的宿主位置,我使用一个amazonrdsmysql数据库和一个单独的aws ec2 t2.large示例与ubuntu16.04.2一起运行kafka connect。
使用robin的工作流,我已经创建了配置文件,但我没有使用他使用的json格式。我使用的是快速入门文章中的格式。

name=jdbc_source_mysql_4427_Data       
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
connection.url=jdbc:mysql://lndbtest.cdveaddpnevv.us-east-2.rds.amazonaws.com:3306/LNDBv1?user=adminRDS&password=*****              
table.whitelist=4427_Data              
mode=timestamp                
timestamp.column.name=TmStamp               
validate.non.null=false                
topic.prefix=mysql-

保存在:

/etc/kafka-connect-jdbc/kafka-connect-jdbc-source.properties

然后我跑:

/usr/bin/confluent load jdbc_source_mysql_4427_Data  -d /etc/kafka-connect-jdbc/kafka-connect-jdbc-source.properties

并得到以下错误:

{
  "error_code": 400,
  "message": "Connector configuration is invalid and contains the following 2 error(s):\nInvalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://lndbtest.cdveaddpnevv.us-east-2.rds.amazonaws.com:3306/LNDBv1?user=adminRDS&password=***for configuration Couldn't open connection to jdbc:mysql://lndbtest.cdveaddpnevv.us-east-2.rds.amazonaws.com:3306/LNDBv1?user=adminRDS&password=***\nInvalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://lndbtest.cdveaddpnevv.us-east-2.rds.amazonaws.com:3306/LNDBv1?user=adminRDS&password=***for configuration Couldn't open connection to jdbc:mysql://lndbtest.cdveaddpnevv.us-east-2.rds.amazonaws.com:3306/LNDBv1?user=adminRDS&password=***\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"
}

这似乎是司机的问题。我现在的问题是,“我需要将mysql jdbc驱动程序下载到我的ec2示例吗,还是应该将其包含在confluent平台包中?”
另外,我的整体想法听起来是否适合Kafka?
正如我前面提到的,我对这些技术还不熟悉,但我发现学习东西的最好方法是直接投入并尝试解决问题。任何想法和建议都是非常受欢迎的。谢谢您!

ldfqzlk8

ldfqzlk81#

正如@dawsaw所说,您确实需要使mysql jdbc驱动程序对连接器可用。
我在这里的观察是,如果你能自由地使用你描述的所有应用程序和架构,那么最好是从传感器到kafka,然后从那里到mysql、mongo、webapp等等。
如果您有选择的话,将数据流传输到一个db中,然后再从db中流出来并不是一个完美的选择。

guicsvcw

guicsvcw2#

总体概念对我来说很有意义。您确实需要下载驱动程序并将其添加到worker类路径中。我想它不是出于许可的原因打包的。

svmlkihl

svmlkihl3#

这是因为在confluent的发行版中没有mysql驱动程序。我认为您可以通过下载一个mysql驱动jar文件来解决这个问题,然后将它放在confluent/share/java/kafka connect jdbc文件夹中并重新运行程序。

相关问题