我有一个关于debezium的问题,我被困了两个多星期。
我每分钟都使用debezium从远程postgresql中提取数据,我总是看到以下错误作为输出:
"option "include-unchanged-toast" = "0" is unknown".
环境如下:
debezium 0.9.5 final
postgresql 10.8
kafka 2.1.1
连接器配置如下:
name=diansheng340831
slot.name=diansheng340831
connector.class=io.debezium.connector.postgresql.PostgresConnector
database.hostname=*.*.*
database.port=5432
database.user=*
database.password=*
database.dbname=*
database.history.kafka.bootstrap.servers=localhost:9092
database.server.name=diansheng340831
plugin.name=wal2json
table.whitelist=*
transforms=dropFieldByValue
transforms.dropFieldByValue.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.dropFieldByValue.whitelist=after
max.batch.size=5120
snapshot.fetch.size=5120
database.tcpKeepAlive=true
schema.refresh.mode=columns_diff_exclude_unchanged_toast
max.queue.size=202400
snapshot.mode=never
误差如下:
ERROR unexpected exception while streaming logical changes (io.debezium.connector.postgresql.RecordsStreamProducer:147)
org.postgresql.util.PSQLException: ERROR: option "include-unchanged-toast" = "0" is unknown
Where: slot "guangdian10831", output plugin "wal2json", in the startup callback
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2440)
at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1116)
at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1035)
at org.postgresql.core.v3.CopyDualImpl.readFromCopy(CopyDualImpl.java:41)
at org.postgresql.core.v3.replication.V3PGReplicationStream.receiveNextData(V3PGReplicationStream.java:155)
at org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:124)
at org.postgresql.core.v3.replication.V3PGReplicationStream.read(V3PGReplicationStream.java:70)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.read(PostgresReplicationConnection.java:264)
at io.debezium.connector.postgresql.RecordsStreamProducer.streamChanges(RecordsStreamProducer.java:140)
at io.debezium.connector.postgresql.RecordsStreamProducer.lambda$start$0(RecordsStreamProducer.java:123)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2019-08-22 11:16:23,425] INFO WorkerSourceTask{id=guangdian10831-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:397)
[2019-08-22 11:16:23,426] INFO WorkerSourceTask{id=guangdian10831-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:414)
[2019-08-22 11:16:23,426] ERROR WorkerSourceTask{id=guangdian10831-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: An exception ocurred in the change event producer. This connector will be stopped.
at io.debezium.connector.base.ChangeEventQueue.throwProducerFailureIfPresent(ChangeEventQueue.java:170)
at io.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:151)
at io.debezium.connector.postgresql.PostgresConnectorTask.poll(PostgresConnectorTask.java:161)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:244)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:220)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
有人能帮我吗?
1条答案
按热度按时间zlwx9yxi1#
我已经解决了这个问题。我在debezium源代码中提交以下代码并重建它@重写public chainedLogicalStrambuilder tryonCoptions(chainedLogicalStrambuilder builder){//return builder.withslotoption(“包含未更改的toast”,0)---原返回生成器---修改}