有条件地从处理PostgreSQL数据库中debezium数据的Kafka连接中删除neo4j中的节点

y3bcpkx1  于 2022-10-21  发布在  PostgreSQL
关注(0)|答案(1)|浏览(226)

我正在尝试实现一个PostgreSQL->Debezium Kafka Connect源代码->Kafka->Neo4j Kafka Connect Sink->Neo4j。Debezium发送的数据包含包含字段op = "c/u/d"(创建/更新/删除)的事件。示例文档显示了使用FOREACH测试是否应该进行创建/更新以及该部分是否正常工作的模式。我无法解决的问题是,如果出现op = "d"事件,如何删除节点(我在主题中看到了这一点)。
我目前Kafka主题的密码行是这样的(格式化后,原来是一个很长的行):

FOREACH (run_me_once in CASE WHEN event.op <> 'd' THEN [1] ELSE [] END | 
  MERGE (p:DemoTable{id: event.after.id}) 
  SET p.message = event.after.message, p.last_changed = event.ts_ms
) 
WITH event
MATCH (p:DemoTable{id: event.after.id}) 
FOREACH (run_me_once in CASE WHEN event.op = 'd' THEN [1] ELSE [] END | 
  DELETE p
)

这不会显示任何错误,但也不会删除任何节点。
我尝试了删除部分的多个版本,直到我得到以下结果:

  • 不带WITH event(MATCH错误,FOREACH只支持WITH),
  • 在Foreach中运行匹配(不允许FOREACH中的MATCH),
  • FOREACH中使用DELETE (p:DemoTable{id: event.after.id})(代码末尾expected whitespace or a relationship pattern错误)

有条件地处理删除事件的正确模式是什么?

xzlaal3s

xzlaal3s1#

发现了问题:debezium中用于删除的事件具有event.after = null,因此在删除中,它需要MATCH (p:DemoTable{id: event.before.id})
最终代码:

FOREACH (run_me_once in CASE WHEN event.op <> 'd' THEN [1] ELSE [] END | 
  MERGE (p:DemoTable{id: event.after.id}) 
  SET p.message = event.after.message, p.last_changed = event.ts_ms
) 
WITH event
MATCH (p:DemoTable{id: event.before.id}) 
FOREACH (run_me_once in CASE WHEN event.op = 'd' THEN [1] ELSE [] END | 
  DELETE p
)

相关问题