我的应用程序中的kafka debezium postgres连接器引发以下错误:
org.apache.kafka.connect.errors.ConnectException: Unable to obtain valid replication slot. Make sure there are no long-running transactions running in parallel as they may hinder the allocation of the replication slot when starting this connector
at io.debezium.connector.postgresql.connection.PostgresConnection.readReplicationSlotInfo(PostgresConnection.java:226)
at io.debezium.connector.postgresql.connection.PostgresConnection.getReplicationSlotState(PostgresConnection.java:150)
at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:98)
at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:49)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
应用程序使用postgresql 9.6.11版本,值为 max_replication_slots
是10。我可以看到数据库中的活动逻辑复制\u插槽,确认\u flush \u lsn=null,restart \u lsn=3/93043310,catalog \u xmin=202656,active=t,datoid=16407,slot \u type=logical,active \u pid=32183,plugin=wal2json,slot \u name=slot1,database=db1(我已经用伪值替换了slot name和db name)
根据我的理解,因为这里的逻辑复制插槽的confirmed\u flush\u lsn=null导致了这个错误,因为它阻止连接器找到这个插槽。
如何修复此问题?为什么确认的\u flush \u lsn值将为空?
1条答案
按热度按时间3j86kqsm1#
我通过重新启动连接器引用的aws中的rds db示例修复了这个问题,之后confirmed_flush_lsn的值被重置为一个非空值,有点类似于(restart_lsn=3/93043310)。Kafka连接能够按预期找到复制槽“slot1”。连接器也被拔掉了。这暂时解决了我的问题,但我仍然想了解什么设置确认\u刷新\u lsn=null的逻辑复制\u插槽放在第一位。