我有一个大约有600万个点和一列的df,它是一个我转换为pyspark.pandas df的df。每当我在每行上使用apply时,它只在前1000行上执行功能,我不知道如何停止它。谁能帮帮我。我会使用一个正常的Sparkdf,但我的列有数字作为列名和Spark不会接受这一点。谁能帮
下面是代码:
import ast
import os
import findspark
import pyspark.pandas as ps
from pyspark.sql import SparkSession
df = pd.read_parquet(
"training_data_fv_unlabeled_spark_libsvm.parquet", engine="fastparquet"
)
os.environ["KMP_DUPLICATE_LIB_OK"] = "True"
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
memory = "100g"
pyspark_submit_args = " --driver-memory " + memory + " pyspark-shell"
findspark.find()
findspark.init()
spark = SparkSession.builder.getOrCreate()
# Get the default configurations
spark.sparkContext._conf.getAll()
# Update the default configurations
conf = spark.sparkContext._conf.setAll(
[
("spark.driver.maxResultSize", "500G"),
("spark.executor.memory", "200G"),
("spark.app.name", "Spark Updated Conf"),
("spark.executor.cores", "5"),
("spark.executor.instances", "5"),
("spark.cores.max", "8"),
("spark.driver.memory", "500G"),
]
)
# Stop the current Spark Session
spark.sparkContext.stop()
# Create a Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", "500")
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.default.parallelism", "500")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
def fv_to_dict(row):
fv_str = row
fv_str = "{" + fv_str.replace(" ", ", ") + "}"
dict_str = ast.literal_eval(fv_str)
out = _parse_dict_ket_to_str(dict_str)
# dict_list[index] = dict_str
dict_list.append(out)
dict_list = []
psdf = ps.DataFrame(df)
psdf.apply(lambda row: fv_to_dict(row), axis=1)
这段代码在4秒内停止,dict_list的len总是1001。我不知道该怎么办。
1条答案
按热度按时间91zkwejq1#
试试这个: