在hadoop中加载大型日语文件

wnvonmuf  于 2021-06-26  发布在  Hive
关注(0)|答案(2)|浏览(237)

我在hdfs上有一个巨大的文件,它是我数据库的摘录。例如。:

1||||||1||||||||||||||0002||01||1999-06-01 16:18:38||||2999-12-31 00:00:00||||||||||||||||||||||||||||||||||||||||||||||||||||||||2||||0||W.ISHIHARA||||1999-06-01 16:18:38||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||19155||||||||||||||1||1||NBV||||||||||||||U||||||||N||||||||||||||||||||||
1||||||8||2000-08-25 00:00:00||||||||3||||0001||01||1999-06-01 16:26:16||||1999-06-01 17:57:10||||||||||300||||||PH||400||Yes||PH||0255097�`||400||||1||103520||||||1||4||10||||20||||||||||2||||0||S.OSARI||1961-10-05 00:00:00||1999-06-01 16:26:16||�o��������������||�o��������������||1||||����||||1||1994-01-24 00:00:00||2||||||75||1999-08-25 00:00:00||1999-08-25 00:00:00||0||1||||4||||||�l��������������||�o��������������||�l��������������||||�o��������������||NP||||�l��������������||�l��������������||||||5||19055||||||||||1||||8||1||NBV||||||||||||||U||||||||N||||||||||||||||||||||

文件大小:40gb
记录数:~1200000
字段数:112
现场sep:| |
行sep:\n
编码:sjis
我想使用pyspark(1.6和python3)在配置单元中加载这个文件。但我的工作不断失败。这是我的密码:

toProcessFileDF = sc.binaryFiles("MyFile")\
    .flatMap(lambda x: x[1].split(b'\n'))\
    .map(lambda x: x.decode('sjis'))\
    .filter(lambda x: x.count('|')==sepCnt*2)\
    .map(lambda x: x.split('||'))\
    .toDF(schema=tableSchema) #tableSchema is the schema retrieved from hive
toProcessFileDF.write.saveAsTable(tableName, mode='append')

我收到了几个错误,但除此之外,jave143(内存错误)、心跳超时和内核已经死亡(如果您需要确切的日志错误,请告诉我)。
这样做对吗?也许有更聪明或更有效的方法。你能告诉我怎么做吗?

m1m5dgzv

m1m5dgzv1#

我发现databrick csv阅读器非常有用。

toProcessFileDF_raw = sqlContext.read.format('com.databricks.spark.csv')\
                                        .options(header='false',
                                                 inferschema='false',
                                                 charset='shift-jis',
                                                 delimiter='\t')\
                                        .load(toProcessFile)

不幸的是,我只能使用分隔符选项拆分一个字符。因此,我的解决方案是用tab拆分,因为我确信我的文件中没有任何内容。然后我可以在我的线路上进行分割。
这是不完美的,但至少我有正确的编码,我没有把所有的记忆。

uqxowvwt

uqxowvwt2#

日志会有用的。 binaryFiles 将此作为二进制文件从hdfs读取为单个记录,并以键值对的形式返回,其中键是每个文件的路径,值是每个文件的内容。
spark文档中的注意事项:首选小文件,也允许使用大文件,但可能会导致性能下降。
如果你用它会更好 textFile ```
toProcessFileDF = sc.textFile("MyFile")
.map(lambda line: line.split("||"))
....

另外,一个选项是在读取初始文本文件(例如sc.textfile(path,200000))时指定更多的分区,而不是在读取后重新分区。
另一个重要的事情是确保您的输入文件是可拆分的(一些压缩选项使其不可拆分,在这种情况下,spark可能必须在一台机器上读取它,从而导致OOM)。

相关问题