在streamlit应用程序中使用pyspark创建Dataframe导致pickling错误

qcbq4gxm  于 2021-05-19  发布在  Spark
关注(0)|答案(0)|浏览(366)

我正在streamlight应用程序中使用pyspark,并且我可以成功地创建 SparkSession 在应用程序中(作为spark客户机)与集群通信。我可以使用sparksql从我的spark集群中检索数据。例如,显示Hive表,

df1 = spark.sql("show databases")

我想从我的应用程序中创建一个简单的Dataframe,如下所示

import streamlit as st
import pandas as pd
df = spark.createDataFrame([("a", 1), ("b", 2), ("c", 3)], ["Col1", "Col2"])
st.dataframe(df.toPandas())

当我第一次运行streamlit应用程序时,我就可以得到预期的输出。

+----+----+
|Col1|Col2|
+----+----+
|   a|   1|
|   b|   2|
|   c|   3|
+----+----+

但是当我稍微修改代码时,例如 ("c", 3)("c", 4) ,保存并重新运行应用程序。触发以下异常。

PicklingError: Could not pickle object as excessively deep recursion required.
Traceback:
File "/opt/streamlit/envs/streamlit/lib/python3.7/site-packages/streamlit/script_runner.py", line 324, in _run_script
    exec(code, module.__dict__)
File "/opt/streamlit/app/8550.py", line 133, in <module>
    schemaPeople = spark.createDataFrame(people)
File "/usr/python/pyspark/sql/session.py", line 687, in createDataFrame
    rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
File "/usr/python/pyspark/sql/session.py", line 384, in _createFromRDD
    struct = self._inferSchema(rdd, samplingRatio, names=schema)
File "/usr/python/pyspark/sql/session.py", line 355, in _inferSchema
    first = rdd.first()
File "/usr/python/pyspark/rdd.py", line 1376, in first
    rs = self.take(1)
File "/usr/python/pyspark/rdd.py", line 1358, in take
    res = self.context.runJob(self, takeUpToNumLeft, p)
File "/usr/python/pyspark/context.py", line 1001, in runJob
    port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
File "/usr/python/pyspark/rdd.py", line 2470, in _jrdd
    self._jrdd_deserializer, profiler)
File "/usr/python/pyspark/rdd.py", line 2403, in _wrap_function
    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
File "/usr/python/pyspark/rdd.py", line 2389, in _prepare_for_python_RDD
    pickled_command = ser.dumps(command)
File "/usr/python/pyspark/serializers.py", line 568, in dumps
    return cloudpickle.dumps(obj, 2)
File "/usr/python/pyspark/cloudpickle.py", line 919, in dumps
    cp.dump(obj)
File "/usr/python/pyspark/cloudpickle.py", line 239, in dump
    raise pickle.PicklingError(msg)

追溯到异常的根本原因,它位于 ClouldPickler 无法序列化某些对象(可能是spark rdd)。它是由

Pickler.dump(self, obj)

当以上代码在没有streamlight背景的pyspark shell中运行时,没有发生错误。因此,我怀疑原因与streamlit缓存机制有关。有没有解决这个问题的办法?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题