我有一个15GB大小的大文本文件,其中的数据被认为是一个字符串,有大约2000万条数据记录,每条记录的长度为5000,每条记录有450+列
现在我想把文本文件中的每条记录都拆分成新的一行,然后按照模式用一些分隔符将每条记录拆分成 Dataframe 。
这是样本方法-样本数据:
HiIamRowData1HiIamRowData2HiIamRowData3HiIamRowData4HiIamRowData5HiIamRowData6HiIamRowData7HiIamRowData8
预期产出:
Hi#I#am#Row#Data#1#
Hi#I#am#Row#Data#2#
Hi#I#am#Row#Data#3#
Hi#I#am#Row#Data#4#
Hi#I#am#Row#Data#5#
Hi#I#am#Row#Data#6#
Hi#I#am#Row#Data#7#
Hi#I#am#Row#Data#8#
代码:
### Schema
schemaData = [['col1',0,2],['col2',2,1],['col3',3,2],['col4',5,3],['col5',8,4],['col6',12,1]]
df = pd.DataFrame(data= schemaData, columns=['FeildName','offset','size'])
print(df.head(5))
file = 'sampleText.txt'
inputFile = open(file, 'r').read()
recordLen = 13
totFileLen = len(inputFile)
finalStr = ''
### First for loop to split the each record based on record length
for i in range(0,totFileLen,recordLen):
record = inputFile[i:i+recordLen]
recStr = ''
### Second For loop to apply the Schema on top of each record.
for index, row in df.iterrows():
#print(record[row['offset']:row['offset'] + row['size']])
recStr = recStr + record[row['offset']:row['offset'] + row['size']] + '#'
recStr = recStr + '\n'
finalStr += recStr
print(finalStr)
text_file = open("Output.txt", "w")
text_file.write(finalStr)
对于上面的8行样本数据,它需要56次(8行+48行乘以列)总迭代。在实际数据集中,我有2500万行和500列。它需要25mil +25mil X 500col迭代
制约因素:
1.文本文件中的整个数据都是序列数据,所有记录都放在一起,整个数据都在一个字符串中。我想读取文本文件,并将最终数据写入新的文本文件。
1.我不想分割文件成较小的大小块,而处理。像50 MB的数据文件,通过这样做,如果最后一个记录得到了一半的第一个50MB和第二个50MB的块之间的分裂,那么从第二个50MB的块向前的数据将是错误的切片。
1.如果我可以根据文本文件内的文件长度分割每个块,这将是可能的方法。
我已经尝试了下面的python方法。对于较小的文件,它工作得很好。但是对于大于500MB的文件,它需要几个小时才能将每个记录模式明智地拆分。
我也尝试过多线程和多处理方法,但没有看到太大的改进。
- 问题**:在Python或PySpark中,有没有更好的方法来解决这个问题?
2条答案
按热度按时间bjg7j2ky1#
您可以通过以下方式有效地迭代处理大文件:
pandas.read_fwf
我得到的
output.txt
内容:pod7payv2#
是的,我们可以使用PySpark UDF和Spark函数来达到同样的效果。让我来告诉你如何通过5个步骤:
需要导入
1.使用Spark读取方法读取文本文件
1.将您的自定义函数转换为PySpark UDF(用户定义函数)以便在Spark中使用
1.注册用户定义函数
1.对要修改的列使用自定义函数
1.使用PySpark split()函数基于分隔符"\n"拆分记录
1.使用explode()函数将数组或Map的列转换为多行
1.让我们打印输出
输出: