kafka连接器

mrfwxfqh  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(570)

我正在尝试将kafka jdbc连接器(源和接收器)与非常旧的数据库(cloudscape)一起使用。我有这个数据库的jdbc驱动程序。我将驱动程序放在confluent(版本5)的“/share/java/kafka/connect/jdbc”文件夹中,并创建了属性文件。

name=test-source-cloud-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB
mode=incrementing
incrementing.column.name=id
topic.prefix=test-cloud-jdbc-

启动连接器时,日志为:

[2019-01-31 11:23:36,582] DEBUG Finding best dialect for JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:119)
[2019-01-31 11:23:36,582] DEBUG Dialect Db2DatabaseDialect scored 0 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:126)
[2019-01-31 11:23:36,582] DEBUG Dialect DerbyDatabaseDialect scored 0 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:126)
[2019-01-31 11:23:36,582] DEBUG Dialect GenericDatabaseDialect scored 10 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:126)
[2019-01-31 11:23:36,582] DEBUG Dialect MySqlDatabaseDialect scored 0 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:126)
[2019-01-31 11:23:36,582] DEBUG Dialect OracleDatabaseDialect scored 0 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:126)
[2019-01-31 11:23:36,582] DEBUG Dialect PostgreSqlDatabaseDialect scored 0 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:126)
[2019-01-31 11:23:36,583] DEBUG Dialect SapHanaDatabaseDialect scored 0 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:126)
[2019-01-31 11:23:36,583] DEBUG Dialect SqlServerDatabaseDialect scored 0 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:126)
[2019-01-31 11:23:36,583] DEBUG Dialect SqliteDatabaseDialect scored 0 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:126)
[2019-01-31 11:23:36,583] DEBUG Dialect SybaseDatabaseDialect scored 0 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:126)
[2019-01-31 11:23:36,583] DEBUG Dialect VerticaDatabaseDialect scored 0 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:126)
[2019-01-31 11:23:36,583] DEBUG Using dialect GenericDatabaseDialect with score 10 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:132)
[2019-01-31 11:23:36,587] ERROR Failed to create job for ./etc/kafka-connect-jdbc/CloudscapeProperties.properties (org.apache.kafka.connect.cli.ConnectStandalone:102)
[2019-01-31 11:23:36,588] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:113)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s):
Invalid value java.sql.SQLException: No suitable driver found for jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB for configuration Couldn't open connection to jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB
Invalid value java.sql.SQLException: No suitable driver found for jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB for configuration Couldn't open connection to jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
    at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:79)
    at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:66)
    at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:110)

我想jdbc驱动程序非常旧(它使用java1.3)这一事实有一个问题。驱动程序使用rmi协议进行通信。如果我使用rmijdbc.jar和cloudscape.jar(驱动程序)运行一个非常简单的JavaRMI客户机来查询db,那么我就可以查询db并得到结果。
你认为这是一个与java版本相关的问题吗?而且,有没有可能实现一个自定义的kafka驱动程序来从这个db读取数据?关于这个问题,或者如何为旧数据库实现一个定制的kafka驱动程序,有什么建议吗?

4dbbbstv

4dbbbstv1#

您可以更改meta-inf/services/java.sql.driver,更改它,并由驱动程序类覆盖它。它可以为我工作!

lyfkaqu1

lyfkaqu12#

我不会立即得出结论,驱动程序/数据库是旧的。。。
你不能指指点点 plugin.path 直接连接到jdbc驱动程序。
更新 plugin.path 成为通往顶级的必经之路 /path/to/confluent-x.y.z/usr/share/java ,并将驱动程序复制到 kafka-connect-jdbc 文件夹。
重新启动connect,然后应该找到驱动程序。
您可以通过查找 kafka/connect-log4j.properties 并设置

log4j.rootLogger=DEBUG, stdout

然后您将看到这样的日志(例如mysql)

[2019-01-15 09:45:26,949] DEBUG Registered java.sql.Driver: com.mysql.cj.jdbc.Driver@37303f12 to java.sql.DriverManager (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:277)

相关问题