kafka connect jdbc不从源获取连续的时间戳

imzjd6km  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(326)

我使用kafka-connect-jdbc-4.0.0.jar和postgresql-9.4-1206-jdbc41.jar
kafka connect连接器配置

{
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "mode": "timestamp",
  "timestamp.column.name": "updated_at",
  "topic.prefix": "streaming.data.v2",
  "connection.password": "password",
  "connection.user": "user",
  "schema.pattern": "test",
  "query": "select * from view_source",
  "connection.url": "jdbc:postgresql://host:5432/test?currentSchema=test"
}

我使用jdbc驱动程序配置了两个连接器,一个是源,另一个是接收器,针对postgresql数据库(“postgresql9.6.9”),一切正常
我怀疑连接器是如何收集源数据的,查看日志我发现在执行查询之间有21秒的时间差

11/1/2019 9:20:18[2019-01-11 08:20:18,985] DEBUG Checking for next block of results from TimestampIncrementingTableQuerier{name='null', query='select * from view_source', topicPrefix='streaming.data.v2', timestampColumn='updated_at', incrementingColumn='null'} (io.confluent.connect.jdbc.source.JdbcSourceTask)
11/1/2019 9:20:18[2019-01-11 08:20:18,985] DEBUG TimestampIncrementingTableQuerier{name='null', query='select * from view_source', topicPrefix='streaming.data.v2', timestampColumn='updated_at', incrementingColumn='null'} prepared SQL query: select * from view_source WHERE "updated_at" > ? AND "updated_at" < ? ORDER BY "updated_at" ASC (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)
11/1/2019 9:20:18[2019-01-11 08:20:18,985] DEBUG executing query select CURRENT_TIMESTAMP; to get current time from database (io.confluent.connect.jdbc.util.JdbcUtils)
11/1/2019 9:20:18[2019-01-11 08:20:18,985] DEBUG Executing prepared statement with timestamp value = 2019-01-11 08:17:07.000 end time = 2019-01-11 08:20:18.985 (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)
11/1/2019 9:20:19[2019-01-11 08:20:19,070] DEBUG Resetting querier TimestampIncrementingTableQuerier{name='null', query='select * from view_source', topicPrefix='streaming.data.v2', timestampColumn='updated_at', incrementingColumn='null'} (io.confluent.connect.jdbc.source.JdbcSourceTask)

11/1/2019 9:20:49[2019-01-11 08:20:49,499] DEBUG Checking for next block of results from TimestampIncrementingTableQuerier{name='null', query='select * from view_source', topicPrefix='streaming.data.v2', timestampColumn='updated_at', incrementingColumn='null'} (io.confluent.connect.jdbc.source.JdbcSourceTask)
11/1/2019 9:20:49[2019-01-11 08:20:49,500] DEBUG TimestampIncrementingTableQuerier{name='null', query='select * from view_source', topicPrefix='streaming.data.v2', timestampColumn='updated_at', incrementingColumn='null'} prepared SQL query: select * from view_source WHERE "updated_at" > ? AND "updated_at" < ? ORDER BY "updated_at" ASC (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)
11/1/2019 9:20:49[2019-01-11 08:20:49,500] DEBUG executing query select CURRENT_TIMESTAMP; to get current time from database (io.confluent.connect.jdbc.util.JdbcUtils)
11/1/2019 9:20:49[2019-01-11 08:20:49,500] DEBUG Executing prepared statement with timestamp value = 2019-01-11 08:20:39.000 end time = 2019-01-11 08:20:49.500 (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)

第一个查询收集08:17:07.000和08:20:18.985之间的数据,但第二个查询收集08:20:39.000和08:20:49.500之间的数据。。两者之间有21秒的差异,其中可能有记录。。。

11/1/2019 9:20:18[2019-01-11 08:20:18,985] DEBUG Executing prepared statement with timestamp value = 2019-01-11 08:17:07.000 end time = 2019-01-11 08:20:18.985 
11/1/2019 9:20:49[2019-01-11 08:20:49,500] DEBUG Executing prepared statement with timestamp value = 2019-01-11 08:20:39.000 end time = 2019-01-11 08:20:49.500

我假设其中一个数据是获得的最后一条记录,另一个值是该时刻的时间戳
我找不到一个关于这是正常工作的连接器的解释?你是否应该假设你并不总是收集所有的信息?

rur96b6h

rur96b6h1#

jdbc连接器不能保证检索每一条消息。为此,需要基于日志的更改数据捕获。对于德贝齐姆和Kafka提供的postgres。你可以在这里了解更多。
免责声明:我为confluent工作,并撰写了上述博客
编辑:这是一个录音,上面的博客也可以从ApacheCon2020:?https://rmoff.dev/no-more-silos

相关问题