我在hdfs上有一个巨大的文件,它是我数据库的摘录。例如。:
1||||||1||||||||||||||0002||01||1999-06-01 16:18:38||||2999-12-31 00:00:00||||||||||||||||||||||||||||||||||||||||||||||||||||||||2||||0||W.ISHIHARA||||1999-06-01 16:18:38||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||19155||||||||||||||1||1||NBV||||||||||||||U||||||||N||||||||||||||||||||||
1||||||8||2000-08-25 00:00:00||||||||3||||0001||01||1999-06-01 16:26:16||||1999-06-01 17:57:10||||||||||300||||||PH||400||Yes||PH||0255097�`||400||||1||103520||||||1||4||10||||20||||||||||2||||0||S.OSARI||1961-10-05 00:00:00||1999-06-01 16:26:16||�o��������������||�o��������������||1||||����||||1||1994-01-24 00:00:00||2||||||75||1999-08-25 00:00:00||1999-08-25 00:00:00||0||1||||4||||||�l��������������||�o��������������||�l��������������||||�o��������������||NP||||�l��������������||�l��������������||||||5||19055||||||||||1||||8||1||NBV||||||||||||||U||||||||N||||||||||||||||||||||
文件大小:40gb
记录数:~1200000
字段数:112
现场sep:| |
行sep:\n
编码:sjis
我想使用pyspark(1.6和python3)在配置单元中加载这个文件。但我的工作不断失败。这是我的密码:
toProcessFileDF = sc.binaryFiles("MyFile")\
.flatMap(lambda x: x[1].split(b'\n'))\
.map(lambda x: x.decode('sjis'))\
.filter(lambda x: x.count('|')==sepCnt*2)\
.map(lambda x: x.split('||'))\
.toDF(schema=tableSchema) #tableSchema is the schema retrieved from hive
toProcessFileDF.write.saveAsTable(tableName, mode='append')
我收到了几个错误,但除此之外,jave143(内存错误)、心跳超时和内核已经死亡(如果您需要确切的日志错误,请告诉我)。
这样做对吗?也许有更聪明或更有效的方法。你能告诉我怎么做吗?
2条答案
按热度按时间m1m5dgzv1#
我发现databrick csv阅读器非常有用。
不幸的是,我只能使用分隔符选项拆分一个字符。因此,我的解决方案是用tab拆分,因为我确信我的文件中没有任何内容。然后我可以在我的线路上进行分割。
这是不完美的,但至少我有正确的编码,我没有把所有的记忆。
uqxowvwt2#
日志会有用的。
binaryFiles
将此作为二进制文件从hdfs读取为单个记录,并以键值对的形式返回,其中键是每个文件的路径,值是每个文件的内容。spark文档中的注意事项:首选小文件,也允许使用大文件,但可能会导致性能下降。
如果你用它会更好
textFile
```toProcessFileDF = sc.textFile("MyFile")
.map(lambda line: line.split("||"))
....