我试图使用kafka connect只向db(consumer)提供一定数量的新行。为此,我将源配置文件配置为
source.properties的外观如下:
name=source-postgres
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
batch.max.rows = 10
connection.url=jdbc:postgresql://<URL>/postgres?user=postgres&password=post
mode=timestamp+incrementing
timestamp.column.name=updated_at
incrementing.column.name=id
topic.prefix=postgres_
这是接收器属性文件的内容
name=dbx-sink
batch.size=5
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
# The topics to consume from - required for sink connectors like this one
topics=postgres_users
# Configuration specific to the JDBC sink connector.
# We want to connect to a SQLite database stored in the file test.db and auto-create tables.
connection.url=jdbc:postgresql://<URL>:35000/postgres?user=dba&password=nopasswd
auto.create=true
但这没有任何效果,只要有新行可用,它就会被插入到db(consumer)中。因此,我向接收器添加了另一个配置参数 batch.size=10
. 这也没有效果。
当我启动connect-standalone.sh脚本时,我可以在控制台上看到batch.max.rows=10。
我做错了什么,或者如何修复它?
1条答案
按热度按时间wbrvyc0a1#
batch.max.rows
每批发送10行;它不会限制总共发送的行数。