如何通过neo4j流向kafka发送节点集合

zpgglvta  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(634)

我们希望使用neo4j来构建(产品)类别的层次结构(树结构)。我们的数据从Kafka(接收器连接器)输入。我们计划使用neo4j-4.0.3,使用neo4j-streams-source将我们的分类树上的更新实时流回到kafka上。
通常,-streams source-方法是指定一个模式并将其链接到Kafka主题,如下所述:https://neo4j.com/docs/labs/neo4j-streams/current/#neo4j_streams_source
为了利用neo4j的强大功能,我们希望每次都发送一个节点集合。此集合由返回-path的-query上的所有节点组成: nodes(path) . 更具体地说,这样的节点集合表示从叶到根的路径。
我们可以想到两个不起作用的替代方案:
使用图案定义。据我所知,模式似乎仅限于匹配单个节点(可能是具有多个标签的节点)——https://neo4j.com/docs/labs/neo4j-streams/current/#source-模式-因此我们一次只能流式输出一个节点。
将节点集合(在路径上)流回到具有不同标签的节点中,并使用该输出节点标签对{*}进行模式匹配。neo4j是一个面向属性的图形数据库,它不允许我将“节点集合”写入一个节点。
总而言之,我们想要的是将节点的集合流到每个Kafka记录中。对我们如何做到这一点有什么建议吗?

iugsix8n

iugsix8n1#

streams.publish过程会将任何可以用cypher格式化的任意数据发送到您选择的主题。你可以随心所欲地格式化数据。
我在想象这样的事情:

MATCH (a:MyLabel { id: 'startingPoint' })
WITH a
MATCH p=shortestPath((a)-[:REL*]->(b:MyLabel { id: 'EndingPoint' }))
UNWIND nodes(p) as node
WITH collect({ 
    my: 'custom-object',
    prop: node.prop
}) as recordsToSendToKafka
CALL streams.publish('my-topic', recordsToSendToKafka)

这将从原始匹配路径发送一个按您选择的格式格式化的json记录数组。
请注意,使用apoc触发器,您可以响应neo4j中的其他事务来执行这类操作,因此这不必是一个一劳永逸的手动执行查询模式。
热释光;如果您可以匹配数据库中的任何内容,那么可以使用cypher将其重新格式化为json对象,并将任何数据发送到kafka上的任何主题。

相关问题