我正在尝试将javaDataframe转换为pysparkDataframe。为此,我在java进程中创建一个dataframe(或行的数据集),并在java端启动一个py4j.gatewayserver服务器进程。然后在python端,我创建一个py4j.java_gateway.javagateway()客户机对象,并将其传递给pyspark的sparkcontext构造函数,将其链接到已经启动的jvm进程。但我得到了一个错误:-
File: "path_to_virtual_environment/lib/site-packages/pyspark/conf.py", line 120, in __init__
self._jconf = _jvm.SparkConf(loadDefaults)
TypeError: 'JavaPackage' object is not callable
有人能帮忙吗?下面是我的密码using:-
javacode:-
import py4j.GatewayServer
public class TestJavaToPythonTransfer{
Dataset<Row> df1;
public TestJavaToPythonTransfer(){
SparkSession spark =
SparkSession.builder().appName("test1").config("spark.master","local").getOrCreate();
df1 = spark.read().json("path/to/local/json_file");
}
public Dataset<Row> getDf(){
return df1;
}
public static void main(String args[]){
GatewayServer gatewayServer = new GatewayServer(new TestJavaToPythonTransfer());
gatewayServer.start();
System.out.println("Gateway server started");
}
}
pythoncode:-
from pyspark.sql import SQLContext, DataFrame
from pyspark import SparkContext, SparkConf
from py4j.java_gateway import JavaGateway
gateway = JavaGateway()
conf = SparkConf().set('spark.io.encryption.enabled','true')
py_sc = SparkContext(gateway=gateway,conf=conf)
j_df = gateway.getDf()
py_df = DataFrame(j_df,SQLContext(py_sc))
print('print dataframe content')
print(dpy_df.collect())
运行python的命令code:-
python path_to_python_file.py
我也试过了this:-
$SPARK_HOME/bin/spark-submit --master local path_to_python_file.py
但在这里,虽然代码没有抛出任何错误,但它没有打印任何东西到终端。我需要为此设置一些spark conf吗?
p、 如果代码或错误中有错别字,请提前道歉,因为我无法直接从公司的ide复制代码和错误堆栈。
1条答案
按热度按时间mm9b1k5b1#
在调用getdf()之前,缺少对入口点的调用
所以,试试这个:
此外,我在下面使用python和scala创建了工作副本(希望您不介意),它显示了在scala端py4j网关是如何使用spark会话和示例dataframe启动的,在python端,我访问了该dataframe并在转换回python端spark会话的dataframe之前将其转换为python列表[tuple]:
Python:
斯卡拉: