我有几个csv文件存储在hdfs中,每个文件大约是500mo(因此大约有200k行和20列)。数据主要是字符串类型(90%)。
我正在对这些文件进行基本操作,比如过滤、Map、regexextract,以生成两个“干净文件”。我不要求spark去推断模式。
过滤器步骤:
# Filter with document type
self.data = self.data.filter('type=="1" or type=="3"')
# Remove message document without ID
self.data = self.data.filter('id_document!="1" and id_document!="7" and id_document is not NULL')
# Remove message when the Service was unavailable.
self.data = self.data.filter('message!="Service Temporarily Unavailable"')
# Chose message with the following actions
expression = '(.*receiv.*|.*reject.*|.*[^r]envoi$|creation)'
self.data = self.data.filter(self.data['action'].rlike(expression))
# Chose message with a valid date format
self.data = self.data.filter(self.data['date'].rlike('(\d{4}-\d{2}-\d{2}.*)'))
# Parse message
self.data = self.data.withColumn('message_logiciel', regexp_extract(col('message'), r'.*app:([A-Za-z\s-\déèàç
然后我根据一个词汇表做一个Map,我通过如下步骤完成:
self.log = self.log.withColumn('action_result', col('message')) \
.withColumn('action_result', regexp_replace('action_result',
'.*(^[Ee]rror.{0,15}422|^[Ee]rror.{0,15}401).*',
': error c')) \
.withColumn('action_result', regexp_replace('action_result',
'.*(^[Ee]rror.*failed to connect.*|^[Ee]rror.{0,15}502).*',
': error ops')) \
.withColumn('action_result', regexp_replace('action_resul', '.*(.*connector.*off*).*',
': connector off'))
最后我把清理过的数据放进了Hive
我的问题是过程的持续时间;每500兆文件需要15分钟。这对我来说不太好。我猜:
要么我的代码不遵循spark逻辑(对于许多regexextract?)
要么就是把长的放进Hive里。
我想知道为什么要花这么长时间;如果你有任何线索可以降低这个持续时间。
我的环境如下:用pyspark和jupyter笔记本进行编码(用livy连接到集群,集群是一个简单的3节点集群(4x3核,3x16gbram)。我试图增加分配的核心和内存,但持续时间仍然是一样的。
我用经典的python对panda做了同样的练习,效果很好。
谢谢你的帮助。
暂无答案!
目前还没有任何答案,快来回答吧!