我设置Kafka neo4j sink连接器的方式如下:
1.创建/tmp/plugins
,下载neo4j-kafka-connect-neo4j-5.0.3-kc-oss.zip
(用于Apache Kafka)并解压缩到plugins文件夹,即
1.修改connect-standalone.properties
,添加plugin.path=/tmp/plugins
1.在配置中使用以下内容创建connect-neo4j-sink.properties
topics=my-events
connector.class=streams.kafka.connect.sink.Neo4jSinkConnector
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
errors.retry.timeout=-1
errors.retry.delay.max.ms=1000
errors.tolerance=all
errors.log.enable=true
errors.log.include.messages=true
neo4j.server.uri=bolt://neo4j:7687
neo4j.authentication.basic.username=neo4j
neo4j.authentication.basic.password=neo4j
neo4j.encryption.enabled=false
neo4j.topic.cypher.my-topic=MERGE (p:Person{name: event.name, surname: event.surname}) MERGE (f:Family{name: event.surname}) MERGE (p)-[:BELONGS_TO]->(f)```
字符串
当我开始的时候:/bin/connect-standalone.sh config/connect-standalone.properties config/connect-neo4j-sink.properties
个
我得到这个错误:Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches streams.kafka.connect.sink.Neo4jSinkConnector
个
环境
- neo4j:5-community
- Apache Kafka 2.13-3.6.0
1条答案
按热度按时间2j4z5cfb1#
我试着重建工件,发现我的java版本已经过时了。升级到java 11解决了这个问题。