我正在尝试实现一个PostgreSQL->Debezium Kafka Connect源代码->Kafka->Neo4j Kafka Connect Sink->Neo4j。Debezium发送的数据包含包含字段op = "c/u/d"
(创建/更新/删除)的事件。示例文档显示了使用FOREACH
测试是否应该进行创建/更新以及该部分是否正常工作的模式。我无法解决的问题是,如果出现op = "d"
事件,如何删除节点(我在主题中看到了这一点)。
我目前Kafka主题的密码行是这样的(格式化后,原来是一个很长的行):
FOREACH (run_me_once in CASE WHEN event.op <> 'd' THEN [1] ELSE [] END |
MERGE (p:DemoTable{id: event.after.id})
SET p.message = event.after.message, p.last_changed = event.ts_ms
)
WITH event
MATCH (p:DemoTable{id: event.after.id})
FOREACH (run_me_once in CASE WHEN event.op = 'd' THEN [1] ELSE [] END |
DELETE p
)
这不会显示任何错误,但也不会删除任何节点。
我尝试了删除部分的多个版本,直到我得到以下结果:
- 不带
WITH event
(MATCH
错误,FOREACH
只支持WITH
), - 在Foreach中运行匹配(不允许
FOREACH
中的MATCH
), - 在
FOREACH
中使用DELETE (p:DemoTable{id: event.after.id})
(代码末尾expected whitespace or a relationship pattern
错误)
有条件地处理删除事件的正确模式是什么?
1条答案
按热度按时间xzlaal3s1#
发现了问题:debezium中用于删除的事件具有
event.after = null
,因此在删除中,它需要MATCH (p:DemoTable{id: event.before.id})
。最终代码: