如何为批处理配置kafka集群

qq24tv8q  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(282)

我试图使用kafka connect只向db(consumer)提供一定数量的新行。为此,我将源配置文件配置为
source.properties的外观如下:

name=source-postgres
 connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
 tasks.max=1
 batch.max.rows = 10
 connection.url=jdbc:postgresql://<URL>/postgres?user=postgres&password=post
 mode=timestamp+incrementing
 timestamp.column.name=updated_at
 incrementing.column.name=id
 topic.prefix=postgres_

这是接收器属性文件的内容

name=dbx-sink
batch.size=5
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1

# The topics to consume from - required for sink connectors like this one

topics=postgres_users

# Configuration specific to the JDBC sink connector.

# We want to connect to a SQLite database stored in the file test.db and auto-create tables.

connection.url=jdbc:postgresql://<URL>:35000/postgres?user=dba&password=nopasswd
auto.create=true

但这没有任何效果,只要有新行可用,它就会被插入到db(consumer)中。因此,我向接收器添加了另一个配置参数 batch.size=10 . 这也没有效果。
当我启动connect-standalone.sh脚本时,我可以在控制台上看到batch.max.rows=10。
我做错了什么,或者如何修复它?

wbrvyc0a

wbrvyc0a1#

batch.max.rows 每批发送10行;它不会限制总共发送的行数。

相关问题