apply函数只对pyspark.pandas DF的前1000行起作用

xhv8bpkk  于 2023-10-15  发布在  Spark
关注(0)|答案(1)|浏览(97)

我有一个大约有600万个点和一列的df,它是一个我转换为pyspark.pandas df的df。每当我在每行上使用apply时,它只在前1000行上执行功能,我不知道如何停止它。谁能帮帮我。我会使用一个正常的Sparkdf,但我的列有数字作为列名和Spark不会接受这一点。谁能帮
下面是代码:

import ast
import os

import findspark
import pyspark.pandas as ps
from pyspark.sql import SparkSession

df = pd.read_parquet(
    "training_data_fv_unlabeled_spark_libsvm.parquet", engine="fastparquet"
)

os.environ["KMP_DUPLICATE_LIB_OK"] = "True"
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

memory = "100g"
pyspark_submit_args = " --driver-memory " + memory + " pyspark-shell"

findspark.find()
findspark.init()
spark = SparkSession.builder.getOrCreate()

# Get the default configurations
spark.sparkContext._conf.getAll()

# Update the default configurations
conf = spark.sparkContext._conf.setAll(
    [
        ("spark.driver.maxResultSize", "500G"),
        ("spark.executor.memory", "200G"),
        ("spark.app.name", "Spark Updated Conf"),
        ("spark.executor.cores", "5"),
        ("spark.executor.instances", "5"),
        ("spark.cores.max", "8"),
        ("spark.driver.memory", "500G"),
    ]
)

# Stop the current Spark Session
spark.sparkContext.stop()

# Create a Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", "500")
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.default.parallelism", "500")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

def fv_to_dict(row):
    fv_str = row
    fv_str = "{" + fv_str.replace(" ", ", ") + "}"
    dict_str = ast.literal_eval(fv_str)
    out = _parse_dict_ket_to_str(dict_str)
    # dict_list[index] = dict_str
    dict_list.append(out)

dict_list = []

psdf = ps.DataFrame(df)

psdf.apply(lambda row: fv_to_dict(row), axis=1)

这段代码在4秒内停止,dict_list的len总是1001。我不知道该怎么办。

91zkwejq

91zkwejq1#

试试这个:

df = pd.read_parquet('training_data_fv_unlabeled_spark_libsvm.parquet', engine = 'fastparquet')

import os
os.environ['KMP_DUPLICATE_LIB_OK']='True'

import os
memory = '100g'
pyspark_submit_args = ' --driver-memory ' + memory + ' pyspark-shell'
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

import findspark
findspark.find()
findspark.init()
import pyspark
from pyspark.sql import SparkSession
import random
import os
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
spark  = SparkSession.builder.getOrCreate()

from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

#Get the default configurations
spark.sparkContext._conf.getAll()

#Update the default configurations
conf = spark.sparkContext._conf.setAll([('spark.driver.maxResultSize', '500G'),( 'spark.executor.memory', '200G'), ('spark.app.name', 'Spark Updated Conf'), ('spark.executor.cores', '5'), ('spark.executor.instances', '5'), ('spark.cores.max', '8'), ('spark.driver.memory','500G')])

#Stop the current Spark Session
spark.sparkContext.stop()

#Create a Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", "500")
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.default.parallelism", "6000")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

import pyspark.pandas as ps

import ast

def fv_to_dict(row):
    fv_str  = row
    fv_str = '{' + fv_str.replace(" ", ", ") + '}'
    dict_str = ast.literal_eval(fv_str)
    out = _parse_dict_ket_to_str(dict_str)
    #dict_list[index] = dict_str
    dict_list.append(out)

dict_list = []

psdf= ps.DataFrame(df)

psdf.apply(lambda row: fv_to_dict(row),axis=1)

相关问题