Kafka连接rds到红移未启动

2nbm6dog  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(204)

我能够在一个小得多的表上实现kafka connect,但我正在尝试在一个更大的数据库上实现它。我的源和接收器配置如下
资料来源:

name=rds-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
table.whitelist=users,places,sales
tasks.max=1
connection.url=jdbc:postgresql://my-rds-source-url/db?user=<USERNAME>&password=<PASSWORD>
mode=timestamp+incrementing
timestamp.column.name=updated_at
incrementing.column.name=id
topic.prefix=rds_

Flume:

name=redshift-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=rds_test
connection.url=jdbc:redshift://my-redshift-url:5439/db?user=<USERNAME>&password=<PASSWORD>
auto.create=true

然后我从这里下载了最新的红移驱动程序,并把它放在里面 /usr/local/confluent/share/java/kafka-connect-jdbc/ 然后初始化confluent平台,创建主题,并使用以下命令运行使用者:

confluent start
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic rds_test
kafka-avro-console-consumer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --property print.key=true --topic rds_test --from-beginning

我得到的不是一个错误,而是一个没有反馈的空白屏幕。我的假设是,Kafka将不得不扫描这些表,我在我的电脑中看到这个奇怪的查询
pg_stat_activity SELECT * FROM "users" WHERE "updated_at" < $1 AND (("updated_at" = $2 AND "id" > $3) OR "updated_at" > $4) ORDER BY "updated_at","id" ASC 所以我假设kafka查询新条目时有问题,或者这个查询返回了导致序列化错误的内容。我不确定我还缺什么。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题