重新创建数据库容器时,debezium源任务无法重新连接到postgresql数据库

pu82cl6c  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(624)

我们有一个kubernetes集群,其中debezium作为源任务从postgresql运行并写入kafka。德贝齐姆、博士后和Kafka都在各自的豆荚里跑。当postgres pod被删除并且kubernetes重新创建pod时,debezium pod无法重新连接。debezium吊舱的日志:

  1. 2018-07-17 08:31:38,311 ERROR || WorkerSourceTask{id=inventory-connector-0} Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
  2. 2018-07-17 08:31:38,311 INFO || [Producer clientId=producer-4] Closing the Kafka producer with timeoutMillis = 30000 ms. [org.apache.kafka.clients.producer.KafkaProducer]

debezium继续尝试每隔一段时间刷新未完成的消息,但出现以下异常:

  1. 2018-07-17 08:32:38,167 ERROR || WorkerSourceTask{id=inventory-connector-0} Exception thrown while calling task.commit() [org.apache.kafka.connect.runtime.WorkerSourceTask]
  2. org.apache.kafka.connect.errors.ConnectException: org.postgresql.util.PSQLException: Database connection failed when writing to copy
  3. at io.debezium.connector.postgresql.RecordsStreamProducer.commit(RecordsStreamProducer.java:151)
  4. at io.debezium.connector.postgresql.PostgresConnectorTask.commit(PostgresConnectorTask.java:138)
  5. at org.apache.kafka.connect.runtime.WorkerSourceTask.commitSourceTask(WorkerSourceTask.java:437)
  6. at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:378)
  7. at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:108)
  8. at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:45)
  9. at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:82)
  10. at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  11. at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
  12. at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
  13. at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
  14. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  15. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  16. at java.lang.Thread.run(Thread.java:748)
  17. Caused by: org.postgresql.util.PSQLException: Database connection failed when writing to copy
  18. at org.postgresql.core.v3.QueryExecutorImpl.flushCopy(QueryExecutorImpl.java:942)
  19. at org.postgresql.core.v3.CopyDualImpl.flushCopy(CopyDualImpl.java:23)
  20. at org.postgresql.core.v3.replication.V3PGReplicationStream.updateStatusInternal(V3PGReplicationStream.java:176)
  21. at org.postgresql.core.v3.replication.V3PGReplicationStream.forceUpdateStatus(V3PGReplicationStream.java:99)
  22. at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.doFlushLsn(PostgresReplicationConnection.java:246)
  23. at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.flushLsn(PostgresReplicationConnection.java:239)
  24. at io.debezium.connector.postgresql.RecordsStreamProducer.commit(RecordsStreamProducer.java:146)
  25. ... 13 more
  26. Caused by: java.net.SocketException: Broken pipe (Write failed)
  27. at java.net.SocketOutputStream.socketWrite0(Native Method)
  28. at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
  29. at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
  30. at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
  31. at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
  32. at org.postgresql.core.PGStream.flush(PGStream.java:553)
  33. at org.postgresql.core.v3.QueryExecutorImpl.flushCopy(QueryExecutorImpl.java:939)
  34. ... 19 more

有没有办法让debezium在上市后重新与postgres建立联系?或者我错过了一些配置?
debezium版本0.8
kubernetes版本1.10.3
postgres 9.6版

gev0vcfq

gev0vcfq1#

看起来这是一个常见的问题,在debezium和kafka中都有开放的特性请求
https://issues.jboss.org/browse/dbz-248
https://issues.apache.org/jira/browse/kafka-5352
虽然这些都是开放的,但这似乎是预期的行为
作为一种解决方法,我将这个活跃度探测器添加到部署中

  1. livenessProbe:
  2. exec:
  3. command:
  4. - sh
  5. - -ec
  6. - ipaddress=$(ip addr | grep 'state UP' -A2 | tail -n1 | awk '{print $2}' | cut -f1 -d'/'); reply=$(curl -s $ipaddress:8083/connectors/inventory-connector/status | grep -o RUNNING | wc -l); if [ $reply -lt 2 ]; then exit 1; fi;
  7. initialDelaySeconds: 30
  8. periodSeconds: 5

第一个子句获取容器ip地址:

  1. ipaddress=$(ip addr | grep 'state UP' -A2 | tail -n1 | awk '{print $2}' | cut -f1 -d'/');

第二个子句发出请求并统计响应json中“running”的示例:

  1. reply=$(curl -s $ipaddress:8083/connectors/inventory-connector/status | grep -o RUNNING | wc -l);

如果“running”出现少于两次,则第三个子句返回退出代码1

  1. if [ $reply -lt 2 ]; then exit 1; fi

它似乎正在进行初始测试,即重新启动postgresdb会触发debezium容器的重新启动。我猜像这样的脚本(尽管可能是“健壮的”)可以包含在图像中,以便于探测。

展开查看全部

相关问题