我正试图从neo4j流向kafka主题发送消息。
这是我的neo4j的docker compose配置
version: '3'
services:
neo4j:
image: neo4j:3.5.8-enterprise
hostname: klinks
container_name: klinks
volumes:
- ./plugins:/plugins
ports:
- "7474:7474"
- "7687:7687"
environment:
NEO4J_dbms_logs_query_threshold: 0s
NEO4J_dbms_logs_query_enabled: "true"
NEO4J_cypher_lenient__create__relationship: "true"
NEO4J_apoc_trigger_enabled: "true"
NEO4J_dbms_security_procedures_unrestricted: "apoc.*,algo.*"
NEO4J_ACCEPT_LICENSE_AGREEMENT: "yes"
NEO4J_dbms_logs_debug_level: DEBUG
NEO4J_dbms_memory_heap_max__size: 2G
NEO4J_dbms_memory_heap_initial__size: 1G
NEO4J_dbms_memory_pagecache_size: 1G
NEO4J_AUTH: neo4j/admin #la password è admin
NEO4J_ACCEPT_LICENSE_AGREEMENT: "yes"
NEO4J_kafka_group_id: p2
NEO4J_streams_sink_topic_cypher_friends: "
MERGE (p1:Person { name: event.initiated })
MERGE (p2:Person { name: event.accepted })
CREATE (p1)-[:FRIENDS { when: event.date }]->(p2)
"
NEO4J_streams_sink_enabled: "true"
NEO4J_streams_procedures_enabled: "true"
NEO4J_streams_source_enabled: "true"
NEO4J_kafka_zookeeper_connect: docker.for.win.localhost:2181
NEO4J_kafka_bootstrap_servers: docker.for.win.localhost:9092
NEO4J_streams_source_topic_nodes_recommendations: Person{*}
NEO4J_streams_source_schema_polling_interval: 10000
NEO4J_kafka_acks: 1
NEO4J_kafka_num_partitions: 1
NEO4J_kafka_retries: 2
NEO4J_kafka_batch_size: 16384
NEO4J_kafka_buffer_memory: 33554432
NEO4J_kafka_session_timeout_ms: 15000
NEO4J_kafka_reindex_batch_size: 1000
NEO4J_kafka_connection_timeout_ms: 10000
NEO4J_kafka_replication: 1
NEO4J_kafka_linger_ms: 1
我的最终流程应该是
1[确定]。向发送消息 friends
主题 kafka
和一个java制作人
2[确定]。发布此 friends
流到 neo4j
使用neo4j接收器连接器应用此规则
MERGE (p1:Person { name: event.initiated })
MERGE (p2:Person { name: event.accepted })
CREATE (p1)-[:FRIENDS { when: event.date }]->(p2)
3[不起作用]。流回到 kafka
一些数据来自 neo4j
查询或侦听配置中的某类节点(仅用于测试目的)我侦听类型为的节点 Person{*}
```
...
NEO4J_streams_source_topic_nodes_recommendations: Person{*}
...
第1步和第2步按预期工作,第3步失败,我看到这个日志来自 `neo4j` ```
klinks | 2020-10-02 11:01:17.402+0000 DEBUG Trying to send a transaction event with txId 8 and txEventId 8 to kafka -> recommendations
klinks | 2020-10-02 11:02:17.404+0000 DEBUG Sent record in partition null offset null data null key size null
我想我在neo4j流的配置中遗漏了一些东西。有人能帮我理解吗。
暂无答案!
目前还没有任何答案,快来回答吧!