如何在Python或PySpark中根据行大小将大型文本文件拆分为记录

wnrlj8wa  于 2023-02-14  发布在  Python
关注(0)|答案(2)|浏览(209)

我有一个15GB大小的大文本文件,其中的数据被认为是一个字符串,有大约2000万条数据记录,每条记录的长度为5000,每条记录有450+列
现在我想把文本文件中的每条记录都拆分成新的一行,然后按照模式用一些分隔符将每条记录拆分成 Dataframe 。
这是样本方法-样本数据:

HiIamRowData1HiIamRowData2HiIamRowData3HiIamRowData4HiIamRowData5HiIamRowData6HiIamRowData7HiIamRowData8

预期产出:

Hi#I#am#Row#Data#1#
Hi#I#am#Row#Data#2#
Hi#I#am#Row#Data#3#
Hi#I#am#Row#Data#4#
Hi#I#am#Row#Data#5#
Hi#I#am#Row#Data#6#
Hi#I#am#Row#Data#7#
Hi#I#am#Row#Data#8#

代码:

### Schema 
schemaData = [['col1',0,2],['col2',2,1],['col3',3,2],['col4',5,3],['col5',8,4],['col6',12,1]]
df = pd.DataFrame(data= schemaData, columns=['FeildName','offset','size'])
print(df.head(5))

file = 'sampleText.txt'
inputFile = open(file, 'r').read()

recordLen = 13
totFileLen = len(inputFile)
finalStr = ''

### First for loop to split the each record based on record length
for i in range(0,totFileLen,recordLen):
    record = inputFile[i:i+recordLen]
    recStr = ''

    ### Second For loop to apply the Schema on top of each record.
    for index, row in df.iterrows():
        #print(record[row['offset']:row['offset'] + row['size']])
        recStr = recStr + record[row['offset']:row['offset'] + row['size']] + '#'  
    recStr = recStr + '\n'
    finalStr += recStr
print(finalStr)

text_file = open("Output.txt", "w")
text_file.write(finalStr)

对于上面的8行样本数据,它需要56次(8行+48行乘以列)总迭代。在实际数据集中,我有2500万行和500列。它需要25mil +25mil X 500col迭代
制约因素:
1.文本文件中的整个数据都是序列数据,所有记录都放在一起,整个数据都在一个字符串中。我想读取文本文件,并将最终数据写入新的文本文件。
1.我不想分割文件成较小的大小块,而处理。像50 MB的数据文件,通过这样做,如果最后一个记录得到了一半的第一个50MB和第二个50MB的块之间的分裂,那么从第二个50MB的块向前的数据将是错误的切片。
1.如果我可以根据文本文件内的文件长度分割每个块,这将是可能的方法。
我已经尝试了下面的python方法。对于较小的文件,它工作得很好。但是对于大于500MB的文件,它需要几个小时才能将每个记录模式明智地拆分。
我也尝试过多线程和多处理方法,但没有看到太大的改进。

    • 问题**:在Python或PySpark中,有没有更好的方法来解决这个问题?
bjg7j2ky

bjg7j2ky1#

您可以通过以下方式有效地迭代处理大文件:

  • 一次捕获所需大小的连续块
  • 将其传递给具有预定义列宽的pandas.read_fwf
  • 并立即将构建的 Dataframe 导出到输出csv文件(如果不存在,则创建),并附加具有指定分隔符的行
from io import StringIO

rec_len = 13
widths = [2, 1, 2, 3, 4, 1]

with open('sampleText.txt') as inp, open('output.txt', 'w+') as out:
    while (line := inp.read(rec_len).strip()):
        pd.read_fwf(StringIO(line), widths=widths, header=None) \
            .to_csv(out, sep='#', header=False, index=False, mode='a')

我得到的output.txt内容:

Hi#I#am#Row#Data#1
Hi#I#am#Row#Data#2
Hi#I#am#Row#Data#3
Hi#I#am#Row#Data#4
Hi#I#am#Row#Data#5
Hi#I#am#Row#Data#6
Hi#I#am#Row#Data#7
Hi#I#am#Row#Data#8
pod7payv

pod7payv2#

是的,我们可以使用PySpark UDF和Spark函数来达到同样的效果。让我来告诉你如何通过5个步骤:
需要导入

import pandas as pd
from pyspark.sql.functions import udf, split, explode

1.使用Spark读取方法读取文本文件

sample_df = spark.read.text("path/to/file.txt")

1.将您的自定义函数转换为PySpark UDF(用户定义函数)以便在Spark中使用

def delimit_records(value):
    recordLen = 13
    totFileLen = len(value)
    finalStr = ''

    for i in range(0,totFileLen,recordLen):
        record = value[i:i+recordLen]

        schemaData = [['col1',0,2],['col2',2,1],['col3',3,2],['col4',5,3],['col5',8,4],['col6',12,1]]
        pdf = pd.DataFrame(data= schemaData, columns=['FeildName','offset','size'])
        
        recStr = ''
        for index, row in pdf.iterrows():
            recStr = recStr + record[row['offset']:row['offset'] + row['size']] + '#'  
        recStr = recStr + '\n'
        finalStr += recStr
        
    return finalStr.rstrip()

1.注册用户定义函数

delimit_records = udf(delimit_records)

1.对要修改的列使用自定义函数

df1 = sample_df.withColumn("value", delimit_records("value"))

1.使用PySpark split()函数基于分隔符"\n"拆分记录

df2 = df1.withColumn("value", split("value", "\n"))

1.使用explode()函数将数组或Map的列转换为多行

df3 = df2.withColumn("value", explode("value"))

1.让我们打印输出

df3.show()

输出:

+-------------------+
|              value|
+-------------------+
|Hi#I#am#Row#Data#1#|
|Hi#I#am#Row#Data#2#|
|Hi#I#am#Row#Data#3#|
|Hi#I#am#Row#Data#4#|
|Hi#I#am#Row#Data#5#|
|Hi#I#am#Row#Data#6#|
|Hi#I#am#Row#Data#7#|
|Hi#I#am#Row#Data#8#|
+-------------------+

相关问题