如何从我的本地PySpark读取远程HDFS parquet?

w8f9ii69  于 2022-12-09  发布在  HDFS
关注(0)|答案(1)|浏览(206)

我正在尝试将数据从远程HDFS文件系统加载到本地Mac机器上的本地PySpark会话中:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.getOrCreate()

path = "/xx/yy/order_info_20220413/partn_date=20220511/part-00085-dd.gz.parquet"
host = "host"
port = 1234
orders = spark.read.parquet(
    f"hdfs://{host}:{port}{path}"
)

错误如下:

Py4JJavaError: An error occurred while calling o55.parquet.
: org.apache.hadoop.ipc.RpcException: RPC response exceeds maximum data length
    at org.apache.hadoop.ipc.Client$IpcStreams.readResponse(Client.java:1936)
    at org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1238)
    at org.apache.hadoop.ipc.Client$Connection.run(Client.java:1134)

我试着理解RPC response exceeds maximum data length是什么。我没有发现任何与core-site.xml中的代码块类似的东西,如https://stackoverflow.com/a/60701948/6693221所示:

<property>
    <name>fs.default.name</name>
    <value>hdfs://host:port</value>
</property>

然而,当我在Mac OS终端中输入telnet host port时,我已经连接了。解决方案是什么?

zdwk9cvp

zdwk9cvp1#

您应该在创建spark会话之前配置您的文件系统,您可以在core-site.xml文件中或直接在会话配置中进行配置,然后要读取parquet,您只需要提供路径,您已经在该路径中配置了您的会话以使用远程hdfs集群作为FS:

from pyspark.sql import SparkSession

path = "/xx/yy/order_info_20220413/partn_date=20220511/part-00085-dd.gz.parquet"
host = "host"
port = 1234

spark = (
    SparkSession.builder
    .config("spark.hadoop.fs.default.name", f"hdfs://{host}:{port}")
    .config("spark.hadoop.fs.defaultFS", f"hdfs://{host}:{port}")
    .getOrCreate()
) 

orders = spark.read.parquet(path)

相关问题