我使用kafka-connect-jdbc-4.0.0.jar和postgresql-9.4-1206-jdbc41.jar
kafka connect连接器配置
{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "timestamp",
"timestamp.column.name": "updated_at",
"topic.prefix": "streaming.data.v2",
"connection.password": "password",
"connection.user": "user",
"schema.pattern": "test",
"query": "select * from view_source",
"connection.url": "jdbc:postgresql://host:5432/test?currentSchema=test"
}
我使用jdbc驱动程序配置了两个连接器,一个是源,另一个是接收器,针对postgresql数据库(“postgresql9.6.9”),一切正常
我怀疑连接器是如何收集源数据的,查看日志我发现在执行查询之间有21秒的时间差
11/1/2019 9:20:18[2019-01-11 08:20:18,985] DEBUG Checking for next block of results from TimestampIncrementingTableQuerier{name='null', query='select * from view_source', topicPrefix='streaming.data.v2', timestampColumn='updated_at', incrementingColumn='null'} (io.confluent.connect.jdbc.source.JdbcSourceTask)
11/1/2019 9:20:18[2019-01-11 08:20:18,985] DEBUG TimestampIncrementingTableQuerier{name='null', query='select * from view_source', topicPrefix='streaming.data.v2', timestampColumn='updated_at', incrementingColumn='null'} prepared SQL query: select * from view_source WHERE "updated_at" > ? AND "updated_at" < ? ORDER BY "updated_at" ASC (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)
11/1/2019 9:20:18[2019-01-11 08:20:18,985] DEBUG executing query select CURRENT_TIMESTAMP; to get current time from database (io.confluent.connect.jdbc.util.JdbcUtils)
11/1/2019 9:20:18[2019-01-11 08:20:18,985] DEBUG Executing prepared statement with timestamp value = 2019-01-11 08:17:07.000 end time = 2019-01-11 08:20:18.985 (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)
11/1/2019 9:20:19[2019-01-11 08:20:19,070] DEBUG Resetting querier TimestampIncrementingTableQuerier{name='null', query='select * from view_source', topicPrefix='streaming.data.v2', timestampColumn='updated_at', incrementingColumn='null'} (io.confluent.connect.jdbc.source.JdbcSourceTask)
11/1/2019 9:20:49[2019-01-11 08:20:49,499] DEBUG Checking for next block of results from TimestampIncrementingTableQuerier{name='null', query='select * from view_source', topicPrefix='streaming.data.v2', timestampColumn='updated_at', incrementingColumn='null'} (io.confluent.connect.jdbc.source.JdbcSourceTask)
11/1/2019 9:20:49[2019-01-11 08:20:49,500] DEBUG TimestampIncrementingTableQuerier{name='null', query='select * from view_source', topicPrefix='streaming.data.v2', timestampColumn='updated_at', incrementingColumn='null'} prepared SQL query: select * from view_source WHERE "updated_at" > ? AND "updated_at" < ? ORDER BY "updated_at" ASC (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)
11/1/2019 9:20:49[2019-01-11 08:20:49,500] DEBUG executing query select CURRENT_TIMESTAMP; to get current time from database (io.confluent.connect.jdbc.util.JdbcUtils)
11/1/2019 9:20:49[2019-01-11 08:20:49,500] DEBUG Executing prepared statement with timestamp value = 2019-01-11 08:20:39.000 end time = 2019-01-11 08:20:49.500 (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)
第一个查询收集08:17:07.000和08:20:18.985之间的数据,但第二个查询收集08:20:39.000和08:20:49.500之间的数据。。两者之间有21秒的差异,其中可能有记录。。。
11/1/2019 9:20:18[2019-01-11 08:20:18,985] DEBUG Executing prepared statement with timestamp value = 2019-01-11 08:17:07.000 end time = 2019-01-11 08:20:18.985
11/1/2019 9:20:49[2019-01-11 08:20:49,500] DEBUG Executing prepared statement with timestamp value = 2019-01-11 08:20:39.000 end time = 2019-01-11 08:20:49.500
我假设其中一个数据是获得的最后一条记录,另一个值是该时刻的时间戳
我找不到一个关于这是正常工作的连接器的解释?你是否应该假设你并不总是收集所有的信息?
1条答案
按热度按时间rur96b6h1#
jdbc连接器不能保证检索每一条消息。为此,需要基于日志的更改数据捕获。对于德贝齐姆和Kafka提供的postgres。你可以在这里了解更多。
免责声明:我为confluent工作,并撰写了上述博客
编辑:这是一个录音,上面的博客也可以从ApacheCon2020:?https://rmoff.dev/no-more-silos