kafka连接mssql中的多对多表

7gyucuyw  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(430)

我正在研究Kafka连接,将我们的一些数据库传输到一个数据湖。为了测试kafka connect,我在中的一个项目数据库中设置了一个数据库。到现在为止,一直都还不错。
下一步,我用以下属性配置了kafka connect模式:

{
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "timestamp.column.name": "updated_at,created_at",
  "incrementing.column.name": "id",
  "dialect.name": "SqlServerDatabaseDialect",
  "validate.non.null": "false",
  "tasks.max": "1",
  "mode": "timestamp+incrementing",
  "topic.prefix": "mssql-jdbc-",
  "poll.interval.ms": "10000",
}

虽然这适用于我的大多数表,其中我得到了一个id和一个created\u at/updated\u at字段,但它不适用于我的表,其中我用一个中间的表和一个复合键解决了我的多对多关系。请注意,我使用的是通用jdbc配置和来自microsoft的jdbc驱动程序。
有没有办法为这些特殊情况配置kafka connect?

yiytaume

yiytaume1#

可能需要创建多个连接器,而不是一个连接器来拉动所有表。如果您想使用不同的方法来获取数据,或者使用不同的id/时间戳列,就会出现这种情况。正如@cricket\u007所说,你可以使用 query 选项来收回查询的结果,该查询可能是 SELECT 表示多表联接。即使从单个表对象中提取数据,jdbc连接器本身也只是发出一个 SELECT * 从给定的表中,使用 WHERE predicate 来限制基于递增id/时间戳选择的行。
另一种方法是使用基于日志的变更数据捕获(cdc),并将所有变更直接从数据库流式传输到kafka中。
无论您使用jdbc还是基于日志的cdc,都可以使用流处理来解析kafka本身中的连接。Kafka流或ksql就是一个例子。关于后者我在这里写了很多。
您可能还会发现本文非常有用,它详细描述了将数据库与kafka集成的选项。
免责声明:我为confluent工作,这是一家支持开源ksql项目的公司。

相关问题