我正在使用confluent platform v5.4、mongo db 3.6版和kafka mongo db连接器。我已经配置了kafka mongo db source connector,以便在mongo db集合中创建新记录时,将数据从mongo db推送到kafka主题。我已经将我的kafka代理配置为将数据日志保留1天,而之前的默认配置是7天。现在我的问题是,我在mongodb日志中反复出现这样的错误-
2020-09-08T14:58:34.500+0000 I COMMAND [conn66] command local.oplog.rs command: aggregate { aggregate: "topic_1", pipeline: [ { $changeStream: { fullDocument: "default", resumeAfter: { _data: BinData(0, 825F50DCCE00000008463C5F6964003C35363962633732642D386430352D343034622D616262362D64363632656136663464303000005A1004B31FA99AA5C141CB9EE9AA845877B80D04) } } } ], cursor: {}, $db: "ctc", $clusterTime: { clusterTime: Timestamp(1599577091, 6), signature: { hash: BinData(0, 6550821FD90928ABC3A2DFE066FC34E824A83762), keyId: 6847473605622628353 } }, lsid: { id: UUID("2a2e5878-4743-4e83-a4fd-eed68b5efe02") }, $readPreference: { mode: "primaryPreferred" } } planSummary: COLLSCAN exception: resume of change stream was not possible, as the resume token was not found. {_data: BinData(0, "825F579ACE00000008463C5F6964003C31313632333863352D316561612D343961652D613437632D30366263336131333436313900005A1004B31FA99AA5C141CB9EE9AA845877B80D04")} code:ChangeStreamFatalError numYields:8932 reslen:455 locks:{ Global: { acquireCount: { r: 17872 }, acquireWaitCount: { r: 97 }, timeAcquiringMicros: { r: 350686 } }, Database: { acquireCount: { r: 8936 } }, Collection: { acquireCount: { r: 1 } }, oplog: { acquireCount: { r: 8935 } } } protocol:op_msg 22198ms
我知道在kafka主题和mongodb的oplog之间,有一个resume令牌,它被共享以保持数据流的正常运行。但有时oplog会被清除(因为oplog大小只能是分配给mongo db的内存的某个百分比),或者kafka主题对应的数据被删除,导致流的中断。
我的问题是-怎样才能避免水流破裂?如何确保简历标记始终存在于oplog和kafka主题中?有没有什么方法可以让我从可用的地方手动获取恢复令牌,并在丢失的地方更新恢复令牌?
1条答案
按热度按时间k4aesqcs1#
在4.4兼容的驱动程序中进行了更改,以处理在很长时间内没有任何更改时从oplog末尾脱落的更改流恢复令牌。这里描述的是ruby驱动程序的用户端;根据需要适应java。关键是你需要使用
tryNext
并从更改流中读取恢复令牌,而不是从返回的更改文档中读取。如果您正在执行上述所有操作,您需要:
增加oplog窗口并保留更多的oplog,或者
更快地处理更改以保持在oplog窗口中。