我正在研究Kafka连接,将我们的一些数据库传输到一个数据湖。为了测试kafka connect,我在中的一个项目数据库中设置了一个数据库。到现在为止,一直都还不错。
下一步,我用以下属性配置了kafka connect模式:
{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"timestamp.column.name": "updated_at,created_at",
"incrementing.column.name": "id",
"dialect.name": "SqlServerDatabaseDialect",
"validate.non.null": "false",
"tasks.max": "1",
"mode": "timestamp+incrementing",
"topic.prefix": "mssql-jdbc-",
"poll.interval.ms": "10000",
}
虽然这适用于我的大多数表,其中我得到了一个id和一个created\u at/updated\u at字段,但它不适用于我的表,其中我用一个中间的表和一个复合键解决了我的多对多关系。请注意,我使用的是通用jdbc配置和来自microsoft的jdbc驱动程序。
有没有办法为这些特殊情况配置kafka connect?
1条答案
按热度按时间yiytaume1#
可能需要创建多个连接器,而不是一个连接器来拉动所有表。如果您想使用不同的方法来获取数据,或者使用不同的id/时间戳列,就会出现这种情况。正如@cricket\u007所说,你可以使用
query
选项来收回查询的结果,该查询可能是SELECT
表示多表联接。即使从单个表对象中提取数据,jdbc连接器本身也只是发出一个SELECT *
从给定的表中,使用WHERE
predicate 来限制基于递增id/时间戳选择的行。另一种方法是使用基于日志的变更数据捕获(cdc),并将所有变更直接从数据库流式传输到kafka中。
无论您使用jdbc还是基于日志的cdc,都可以使用流处理来解析kafka本身中的连接。Kafka流或ksql就是一个例子。关于后者我在这里写了很多。
您可能还会发现本文非常有用,它详细描述了将数据库与kafka集成的选项。
免责声明:我为confluent工作,这是一家支持开源ksql项目的公司。