Apache Spark 将文本编码为cp500格式

siotufzp  于 2023-08-06  发布在  Apache
关注(0)|答案(2)|浏览(97)

我有一个5亿行~ 27 GB大小的纯文本文件存储在AWS S3上。我已经使用下面的代码和它一直在运行从过去3个小时。我试着用pyspark寻找编码方法,但没有找到任何这样的函数。下面是代码。

import pandas as pd
import hashlib
import csv

chunksize = 10000000

def convert_email(email):
    cp500_email = email.encode('cp500')
    sha1 = hashlib.sha1(cp500_email).hexdigest()
    return email, cp500_email, sha1

reader = pd.read_csv("s3://bucket/cp500_input.csv", chunksize=chunksize , sep='|')

for chunk in reader:
    chunk[['EMAIL', 'CP500_EMAIL', 'SHA']] = chunk['EMAIL'].apply(convert_email).apply(pd.Series)
    chunk[['EMAIL', 'CP500_EMAIL', 'SHA']].to_csv("s3://bucket/cp500_output.csv")

字符串

jgwigjjp

jgwigjjp1#

是的,由于内存的限制,依靠pandas来处理这个大小的文件可能不是最有效的方法,相反,可以使用PySpark以分布式的方式处理大文件,从而提高效率和性能,所以可以这样做:

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import hashlib

spark = SparkSession.builder.appName("Email Conversion").getOrCreate()

def convert_email(email):
    cp500_email = email.encode('cp500')
    sha1 = hashlib.sha1(cp500_email).hexdigest()
    return email, cp500_email, sha1

convert_email_udf = udf(convert_email, StringType())
df = spark.read.csv("s3://bucket/cp500_input.csv", header=True, sep='|')
df = df.withColumn("CP500_EMAIL", convert_email_udf(df["EMAIL"]))
df = df.withColumn("EMAIL", df["CP500_EMAIL"].getItem(0))
df = df.withColumn("CP500_EMAIL", df["CP500_EMAIL"].getItem(1))
df = df.withColumn("SHA", df["CP500_EMAIL"].getItem(2))
df = df.select("EMAIL", "CP500_EMAIL", "SHA")
df.write.csv("s3://bucket/cp500_output.csv", header=True)

字符串
祝你好运!

dxxyhpgq

dxxyhpgq2#

谢谢@Freeman。
提供的代码由于udf函数中的结构类型而引发错误。我做了一些修改,下面是工作代码。

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import hashlib

spark = SparkSession.builder.appName("Email Conversion").getOrCreate()
    
    
def convert_sha1(email):
    cp500_email = email.encode('cp500')
    sha1 = hashlib.sha1(cp500_email).hexdigest()
    return sha1
    

def convert_email(email):
    cp500_email = email.encode('cp500')
    return cp500_email
    
    
convert_email_udf = udf(convert_email, StringType())
convert_sha_udf = udf(convert_sha1, StringType())
df = spark.read.csv("s3://bucket/Email_file.csv", header=True, sep='|')
df = df.withColumn("CP500_EMAIL", convert_email_udf(df["EMAIL"]).cast("String"))
df = df.withColumn("SHA1", convert_sha_udf(df["EMAIL"]).cast("String"))
df = df.select("EMAIL", "CP500_EMAIL", "SHA1")
df.write.csv("s3://bucket/cp500_output1.csv", header=True)

字符串

相关问题