将sparkDataframe推送到janusgraph,以便spark在emr中运行

eqoofvh9  于 2021-06-14  发布在  Cassandra
关注(0)|答案(1)|浏览(331)

我在ec2示例上运行了一个janusgraph,对于它的后端存储,我在ec2上也有cassandra集群。我想将聚合和过滤数据从运行在amazonemr上的python代码apachespark(pyspark)推送到janusgraph。
我已经搜索了:
我读过使用gremlin在这里插入的内容,但是它定义了对单个顶点和边的插入。我想它被插入一次,可能就像推所有的Dataframe到janusgraph。
有没有什么有效的方法可以直接从spark批量插入,而不必将其转换为csv或从bash执行命令的任何中间步骤

1l5u6lss

1l5u6lss1#

我花了大约两周的时间来寻找答案,把它贴出来,这样它就能帮助别人。
要编写在远程计算机上运行的Dataframe,可以使用 gremlin ,但是为了有效地阅读(如果您想添加边),您可能需要 SparkGraphComputer . 因为我的用例主要是插入。我现在要集中精力
如果您想从头开始进行图形遍历和配置,请遵循长答案/安装

简短回答(从spark插入顶点/边并从远程查询janus)

你需要安装gremlin( sudo pip install gremlinpython )在远程,你可以像这样插入边
1) 基本gremline导入与远程图形对象的生成

from gremlin_python.structure.graph import Graph
    from gremlin_python.process.graph_traversal import __
    from gremlin_python.process.strategies import *
    from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
    graph = Graph()
    myGraphTraversal = graph.traversal().withRemote(DriverRemoteConnection('ws://<Your IP of JANUS>:8182/gremlin','myGraphTraversal'))

2) 对于顶点

for row in df.rdd.collect():
         myGraphTraversal.addV('Country').property('name',row["name"]).next()

3) 对于边

for row in df.rdd.collect():

        node_from = myGraphTraversal.V().has('country',"name",row["from_country_name"]).next
        wallet_to = myGraphTraversal.V().has('country',"name",row["to_country_name"]).next()
       myGraphTraversal.V(wallet_to).as_('t').V(wallet_from).addE("sends").to("t").property('value',row["value"]).toList()

从远程测试顶点计数(导入和图形对象与以前类似)

print(myGraphTraversal.V().count().next())

=> 11800

长答案/配置:

在这里,我假设您的数据存储和janus在不同的示例中,但是我已经给出了本地的todo提示,如果它们不是
在janus服务器节点上,为python gremlin和tinkerpop安装jars

cd janus*    
./bin/gremlin-server.sh -i org.apache.tinkerpop gremlin-python 3.4.0(or 3.2.9)

首先编辑/创建连接到gremlin的配置文件(janusgraph.properties)。

sudo vim janusgraph.properties

编写这些配置(注意gremlin.graph和graph.graphname)

storage.backend = cql (whatever you bakend is)
storage.hostname = 192.xx.xx.XXX (DataStore/CASSANDRA NODE/NODE2 IP)
gremlin.graph=org.janusgraph.core.ConfiguredGraphFactory
graph.graphname=ConfigurationManagementGraph
index.search.backend=elasticsearch
index.search.hostname=127.0.0.1

备份默认的gremlin服务器配置

cp conf/gremlin-server/gremlin-server.yaml conf/gremlin-server/gremlin-server.yaml.orig

将configuredgraphfactory配置设为默认值

cp conf/gremlin-server/gremlin-server-configuration.yaml conf/gremlin-server/gremlin-server.yaml

现在编辑conf/gremlin-server/gremlin-server.yaml

sudo vim conf/gremlin-server/gremlin-server.yaml

进行此更改(设置主机,从文件数组[]中删除任何内容)

host: 0.0.0.0 
port: 8182 (8182 is default, and you should have this but for me I have 6182)
org.apache.tinkerpop.gremlin.jsr223.ScriptFileGremlinPlugin: {files: []}}}}

运行gremlin服务器

bin/gremlin-server.sh # it should say that it is up on node 8182

编辑conf/remote.yaml并定义您的janusgraph ip(如果您从同一台机器加入gremlin控制台,也可以使用127.0.0.1)

sudo vim conf/remote.yaml

将主机端口更改为

hosts: [192.xx.xx.xx] # your Janus node IP
port: 8182

打开控制台

bin/gremlin.sh

连接到远程(本例中为本地)8182端口

:remote connect tinkerpop.server conf/remote.yaml session

下一步,把所有的命令转移到你的janusgraph上运行:8154

:remote console

创建图形,

gremlin> map = new HashMap();
gremlin> map.put("storage.backend", "cql");
gremlin> map.put("storage.hostname", "192.xx.xx.xx(IP of storage backend)");
gremlin> map.put("graph.graphname", "graph1");
gremlin> ConfiguredGraphFactory.createConfiguration(new MapConfiguration(map));
==>null

不要关闭gremlin,也不要关闭服务器(如您所愿或如下所示)

ps -ef | grep gremlin-python
sudo kill -9 <gremlin process id 1> <gremlin process id 2> <gremlin process id n>

编辑script/empty-sample.groovy使graph1成为遍历源

def globals = [:]
myGraph = ConfiguredGraphFactory.open("graph1")
globals = [myGraphTraversal : myGraph.traversal()]

再次编辑conf/gremlin-server/gremlin-server.yaml

sudo vim conf/gremlin-server/gremlin-server.yaml

进行这些更改,并将groovy添加到脚本到文件中,以便启用从远程进行遍历的访问

org.apache.tinkerpop.gremlin.jsr223.ScriptFileGremlinPlugin: {files: [scripts/empty-sample.groovy]}}

重新运行gremlin服务器

bin/gremlin-server.sh

现在,从远程pc连接到janus。
安装gremlin python

sudo yum -y install python-pip
pip install gremlinpython

step 1 of short answer (gremlin导入和图形对象)
远程测试顶点

print(myGraphTraversal.V().count().next())

=> 11800
电子病历
把它放在引导程序中,这样就可以在执行spark脚本步骤之前安装gremlin了

sudo pip-3.6 install gremlinpython #pip install gremlinpython for python2

相关问题