pyspark代码改进思想(降低持续时间)

ilmyapht  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(268)

我有几个csv文件存储在hdfs中,每个文件大约是500mo(因此大约有200k行和20列)。数据主要是字符串类型(90%)。
我正在对这些文件进行基本操作,比如过滤、Map、regexextract,以生成两个“干净文件”。我不要求spark去推断模式。

过滤器步骤:


# Filter with document type

self.data = self.data.filter('type=="1" or type=="3"') 

# Remove message document without ID

self.data = self.data.filter('id_document!="1" and id_document!="7" and id_document is not NULL') 

# Remove message when the Service was unavailable.

self.data = self.data.filter('message!="Service Temporarily Unavailable"')

# Chose message with the following actions

expression = '(.*receiv.*|.*reject.*|.*[^r]envoi$|creation)'
self.data = self.data.filter(self.data['action'].rlike(expression))

# Chose message with a valid date format

self.data = self.data.filter(self.data['date'].rlike('(\d{4}-\d{2}-\d{2}.*)'))

# Parse message

self.data = self.data.withColumn('message_logiciel', regexp_extract(col('message'), r'.*app:([A-Za-z\s-\déèàç

然后我根据一个词汇表做一个Map,我通过如下步骤完成:

self.log = self.log.withColumn('action_result', col('message')) \
    .withColumn('action_result', regexp_replace('action_result',
                                                  '.*(^[Ee]rror.{0,15}422|^[Ee]rror.{0,15}401).*',
                                                  ': error c')) \
    .withColumn('action_result', regexp_replace('action_result',
                                                  '.*(^[Ee]rror.*failed to connect.*|^[Ee]rror.{0,15}502).*',
                                                  ': error ops')) \
    .withColumn('action_result', regexp_replace('action_resul', '.*(.*connector.*off*).*',
                                                  ': connector off'))

最后我把清理过的数据放进了Hive

我的问题是过程的持续时间;每500兆文件需要15分钟。这对我来说不太好。我猜:
要么我的代码不遵循spark逻辑(对于许多regexextract?)
要么就是把长的放进Hive里。
我想知道为什么要花这么长时间;如果你有任何线索可以降低这个持续时间。
我的环境如下:用pyspark和jupyter笔记本进行编码(用livy连接到集群,集群是一个简单的3节点集群(4x3核,3x16gbram)。我试图增加分配的核心和内存,但持续时间仍然是一样的。
我用经典的python对panda做了同样的练习,效果很好。
谢谢你的帮助。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题