在pyspark中执行大规模并行任务

wsxa1bj1  于 2021-07-12  发布在  Spark
关注(0)|答案(0)|浏览(222)

我经常发现自己在spark中执行大量可并行的任务,但由于某些原因,spark一直在消亡。例如,现在我有两个表(都存储在s3上),它们本质上只是(唯一)字符串的集合。我想交叉连接,计算levenshtein距离,并将其作为一个新表写入s3。所以我的代码看起来像:

OUT_LOC = 's3://<BUCKET>/<PREFIX>/'

if __name__ == '__main__':
    from pyspark.sql import SparkSession, functions as F
    spark = SparkSession.builder \
        .appName('my-app') \
        .config("hive.metastore.client.factory.class",
                "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") \
        .enableHiveSupport() \
        .getOrCreate()
    tb0 = spark.sql('SELECT col0 FROM db0.table0')
    tb1 = spark.sql('SELECT col1 FROM db1.table1')

    spark.sql("set spark.sql.crossJoin.enabled=true")

    tb0.join(tb1).withColumn("levenshtein_distance",
                             F.levenshtein(F.col("col0"), F.col("col1"))) \
        .write.format('parquet').mode('overwrite') \
        .options(path=OUT_LOC, compression='snappy', maxRecordsPerFile=10000) \
        .saveAsTable('db2.new_table')

在我看来,这是大规模并行化的,spark应该能够在一次只读取少量数据的情况下快速完成这一过程。但由于某种原因,任务一直在进行。
所以我的问题是:
有什么我不知道的吗?或者更一般地说,这里发生了什么?
没有理由把所有的东西都存储在本地,对吧?
我应该考虑哪些最佳实践?
不管它值多少钱,我在谷歌上搜索了很多地方,但是找不到其他有这个问题的人。也许我的google foo不够强大,或者我只是在做一些蠢事。

编辑

听@egordoe的建议。。。我跑了那辆车 explain 然后又回来了。。。

== Parsed Logical Plan ==
'Project [col0#0, col1#3, levenshtein('col0, 'col1) AS levenshtein_distance#14]
+- Join Inner
   :- Project [col0#0]
   :  +- Project [col0#0]
   :     +- SubqueryAlias `db0`.`table0`
   :        +- Relation[col0#0] parquet
   +- Project [col1#3]
      +- Project [col1#3]
         +- SubqueryAlias `db1`.`table1`
            +- Relation[col1#3] parquet

== Analyzed Logical Plan ==
col0: string, col1: string, levenshtein_distance: int
Project [col0#0, col1#3, levenshtein(col0#0, col1#3) AS levenshtein_distance#14]
+- Join Inner
   :- Project [col0#0]
   :  +- Project [col0#0]
   :     +- SubqueryAlias `db0`.`table0`
   :        +- Relation[col0#0] parquet
   +- Project [col1#3]
      +- Project [col1#3]
         +- SubqueryAlias `db1`.`table1`
            +- Relation[col1#3] parquet

== Optimized Logical Plan ==
Project [col0#0, col1#3, levenshtein(col0#0, col1#3) AS levenshtein_distance#14]
+- Join Inner
   :- Relation[col0#0] parquet
   +- Relation[col1#3] parquet

== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- Project [col0#0, col1#3, levenshtein(col0#0, col1#3) AS levenshtein_distance#14]
   +- BroadcastNestedLoopJoin BuildRight, Inner
      :- FileScan parquet db0.table0[col0#0] Batched: true, Format: Parquet, Location: InMemoryFileIndex[s3://REDACTED], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<col0:string>
      +- BroadcastExchange IdentityBroadcastMode
         +- FileScan parquet db1.table1[col1#3] Batched: true, Format: Parquet, Location: InMemoryFileIndex[s3://REDACTED], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<col1:string>
========== finished

对我来说似乎是合理的,但解释不包括数据的实际书写。我假设这是因为它喜欢在本地建立一个结果缓存,然后将整个结果作为一个表发送到s3?那就太差劲了。

编辑1

我还跑了 foreach 你用一个简单的 print 声明在里面。在我杀死它之前,它挂了40分钟没有打印任何东西。我现在用一个什么都不做的函数来运行这个作业(它只是一个 pass 声明)看它是否完成。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题