使用mongosinkconnector删除文档

webghufk  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(279)

我可以在mongo中插入/更新文档,但我很难删除文档。
这就是数据是如何通过来自sql server源的debezium connect记录在kafka主题中的(最后一行是delete操作的样子):

{"user_code":1001}      {"user_code":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"}
{"user_code":1002}      {"user_code":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"}  
{"user_code":1003}      {"user_code":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"}       
{"user_code":1003}      null

在本例中,即使在空值之后,用户代码为1003的文档仍在mongodb中。
下面是我如何配置我的mongosinkconnector(我已经尝试了这两种方法) mongodb.delete.on.null.values 以及 delete.on.null.values 但没有一个有效):

{
  "name": "inventory-connector-sink-2",
  "config": {
      "connector.class" : "com.mongodb.kafka.connect.MongoSinkConnector",
      "tasks.max" : "1",
      "topics": "server1.dbo.customers",
      "connection.uri": "mongodb://root:root@mongo:27017/",
      "database": "testDB",
      "collection": "customers_2",
      "database.history.kafka.bootstrap.servers" : "kafka:9092",
      "database.history.kafka.topic": "schema-changes.inventory",
      "key.converter": "org.apache.kafka.connect.json.JsonConverter",
      "key.converter.schemas.enable": false,
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable": false,
      "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.FullKeyStrategy",
      "writemodel.strategy":"com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy",
      "mongodb.delete.on.null.values": true
  }
}

我也试过使用partialValue策略,但运气不好。
ps:我正在与confluentinc/cp kafka connect docker映像合作,用于我的Flume连接器。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题