无法恢复kafka mongodb源连接器

ltqd579y  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(408)

我正在使用kafka mongodb源连接器[https://www.confluent.io/hub/mongodb/kafka-connect-mongodb]融合平台v5.4.1和mongodb v3.6。kafka mongodb源连接器被删除了,现在当它在一个月后重新创建时,我得到了以下错误。

com.mongodb.MongoQueryException: Query failed with error code 280 and error message 'resume of change stream was not possible, as the resume token was not found. {_data: BinData(0, "825F06E90400000004463C5F6964003C38316266623663632D326638612D343530662D396534652D31393936336362376130386500005A1004A486EE3E58984454ADD5BF58F364361E04")}' on server 40.118.122.226:27017
        at com.mongodb.operation.QueryHelper.translateCommandException(QueryHelper.java:29)
        at com.mongodb.operation.QueryBatchCursor.getMore(QueryBatchCursor.java:267)
        at com.mongodb.operation.QueryBatchCursor.tryHasNext(QueryBatchCursor.java:216)
        at com.mongodb.operation.QueryBatchCursor.tryNext(QueryBatchCursor.java:200)
        at com.mongodb.operation.ChangeStreamBatchCursor$3.apply(ChangeStreamBatchCursor.java:86)
        at com.mongodb.operation.ChangeStreamBatchCursor$3.apply(ChangeStreamBatchCursor.java:83)
        at com.mongodb.operation.ChangeStreamBatchCursor.resumeableOperation(ChangeStreamBatchCursor.java:166)
        at com.mongodb.operation.ChangeStreamBatchCursor.tryNext(ChangeStreamBatchCursor.java:83)
        at com.mongodb.client.internal.MongoChangeStreamCursorImpl.tryNext(MongoChangeStreamCursorImpl.java:78)
        at com.mongodb.kafka.connect.source.MongoSourceTask.getNextDocument(MongoSourceTask.java:338)
        at com.mongodb.kafka.connect.source.MongoSourceTask.poll(MongoSourceTask.java:155)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:265)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2020-07-09 09:53:09,353] INFO Watching for collection changes on '<myDBName.myCollectionName>' (com.mongodb.kafka.connect.source.MongoSourceTask:374)

在搜索了这个错误的原因之后,我知道在oplog中找不到resume令牌,因为oplog是内存/大小限制的,它会清除旧信息。我也明白,为了尽量减少这个问题的发生,我应该增加oplog的大小等,但我想知道是否有任何可能从kafka/合流平台方面解决这个问题?例如,如果我可以删除kafka主题,ksql主题,因为我正在使用主题“mydbname.mycollectionnamedata”创建流,与该主题相关的数据,或者在kafka connect中执行一些操作,以便mongodb源连接器从当前开始再次捕获mongodb集合中的更改,丢弃旧信息?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题