如何在pyspark中正确并行多个json文件聚合

ldfqzlk8  于 2021-05-17  发布在  Spark
关注(0)|答案(1)|浏览(441)

我在s3上有一大组json_列表文件,其中包含一些我想聚合的日志(基本上只是按路径、位置等计算请求的数量)。我一直在做以下工作,但从日志来看,我不确定它是否真正并行化了。。首先,一个接一个地下载单个s3文件需要大约3分钟的时间,然后其余的似乎仍被拆分为1000次执行。。我以为spark会把它分解成一种map-reduce方法,但也许我完全误解了它做什么和不做什么。有人能给个提示吗。

df = (
    spark.read
    .json(test_paths, schema=schema)
    .filter(col('method') == 'GET')
    .filter((col('status_code') == 200) | (col('status_code') == 206))
    .withColumn('date', from_unixtime('timestamp').cast(DateType()))
    .groupBy('path', 'client_country_code', 'date', 'file_size')
    .count()
)

以下是1000个URL的驱动程序日志:

20/11/15 19:15:23 INFO InMemoryFileIndex: Listing leaf files and directories in parallel under 1000 paths. The first several paths are: s3n://bucket../10004.json_lines.gz.
20/11/15 19:15:23 INFO SparkContext: Starting job: json at NativeMethodAccessorImpl.java:0
20/11/15 19:15:23 INFO DAGScheduler: Got job 49 (json at NativeMethodAccessorImpl.java:0) with 1000 output partitions
20/11/15 19:15:23 INFO DAGScheduler: Final stage: ResultStage 75 (json at NativeMethodAccessorImpl.java:0)
20/11/15 19:15:23 INFO DAGScheduler: Parents of final stage: List()
20/11/15 19:15:23 INFO DAGScheduler: Missing parents: List()
20/11/15 19:15:23 INFO DAGScheduler: Submitting ResultStage 75 (MapPartitionsRDD[206] at json at NativeMethodAccessorImpl.java:0), which has no missing parents
20/11/15 19:15:23 INFO MemoryStore: Block broadcast_77 stored as values in memory (estimated size 84.3 KiB, free 2.2 GiB)
20/11/15 19:15:23 INFO MemoryStore: Block broadcast_77_piece0 stored as bytes in memory (estimated size 29.9 KiB, free 2.2 GiB)
20/11/15 19:15:23 INFO BlockManagerInfo: Added broadcast_77_piece0 in memory on e05e979b7108:34999 (size: 29.9 KiB, free: 2.2 GiB)
20/11/15 19:15:23 INFO SparkContext: Created broadcast 77 from broadcast at DAGScheduler.scala:1223
20/11/15 19:15:23 INFO DAGScheduler: Submitting 1000 missing tasks from ResultStage 75 (MapPartitionsRDD[206] at json at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
20/11/15 19:15:23 INFO TaskSchedulerImpl: Adding task set 75.0 with 1000 tasks
20/11/15 19:15:23 INFO TaskSetManager: Starting task 0.0 in stage 75.0 (TID 33224, e05e979b7108, executor driver, partition 0, PROCESS_LOCAL, 7473 bytes)
20/11/15 19:15:23 INFO TaskSetManager: Starting task 1.0 in stage 75.0 (TID 33225, e05e979b7108, executor driver, partition 1, PROCESS_LOCAL, 7473 bytes)
20/11/15 19:15:23 INFO TaskSetManager: Starting task 2.0 in stage 75.0 (TID 33226, e05e979b7108, executor driver, partition 2, PROCESS_LOCAL, 7474 bytes)
20/11/15 19:15:23 INFO TaskSetManager: Starting task 3.0 in stage 75.0 (TID 33227, e05e979b7108, executor driver, partition 3, PROCESS_LOCAL, 7475 bytes)
20/11/15 19:15:23 INFO TaskSetManager: Starting task 4.0 in stage 75.0 (TID 33228, e05e979b7108, executor driver, partition 4, PROCESS_LOCAL, 7476 bytes)
20/11/15 19:15:23 INFO TaskSetManager: Starting task 5.0 in stage 75.0 (TID 33229, e05e979b7108, executor driver, partition 5, PROCESS_LOCAL, 7477 bytes)
20/11/15 19:15:23 INFO TaskSetManager: Starting task 6.0 in stage 75.0 (TID 33230, e05e979b7108, executor driver, partition 6, PROCESS_LOCAL, 7477 bytes)
20/11/15 19:15:23 INFO TaskSetManager: Starting task 7.0 in stage 75.0 (TID 33231, e05e979b7108, executor driver, partition 7, PROCESS_LOCAL, 7477 bytes)
20/11/15 19:15:23 INFO Executor: Running task 0.0 in stage 75.0 (TID 33224)
20/11/15 19:15:23 INFO Executor: Running task 1.0 in stage 75.0 (TID 33225)
20/11/15 19:15:23 INFO Executor: Running task 2.0 in stage 75.0 (TID 33226)
20/11/15 19:15:23 INFO Executor: Running task 5.0 in stage 75.0 (TID 33229)
20/11/15 19:15:23 INFO Executor: Running task 3.0 in stage 75.0 (TID 33227)
20/11/15 19:15:23 INFO Executor: Running task 7.0 in stage 75.0 (TID 33231)
20/11/15 19:15:23 INFO Executor: Running task 6.0 in stage 75.0 (TID 33230)
20/11/15 19:15:23 INFO Executor: Running task 4.0 in stage 75.0 (TID 33228)
20/11/15 19:15:24 INFO Executor: Finished task 1.0 in stage 75.0 (TID 33225). 2025 bytes result sent to driver
20/11/15 19:15:24 INFO Executor: Finished task 0.0 in stage 75.0 (TID 33224). 2025 bytes result sent to driver
20/11/15 19:15:24 INFO TaskSetManager: Starting task 8.0 in stage 75.0 (TID 33232, e05e979b7108, executor driver, partition 8, PROCESS_LOCAL, 7477 bytes)
20/11/15 19:15:24 INFO TaskSetManager: Finished task 1.0 in stage 75.0 (TID 33225) in 567 ms on e05e979b7108 (executor driver) (1/1000)
20/11/15 19:15:24 INFO Executor: Running task 8.0 in stage 75.0 (TID 33232)
20/11/15 19:15:24 INFO Executor: Finished task 6.0 in stage 75.0 (TID 33230). 2033 bytes result sent to driver
20/11/15 19:15:24 INFO TaskSetManager: Starting task 9.0 in stage 75.0 (TID 33233, e05e979b7108, executor driver, partition 9, PROCESS_LOCAL, 7477 bytes)
20/11/15 19:15:24 INFO TaskSetManager: Starting task 10.0 in stage 75.0 (TID 33234, e05e979b7108, executor driver, partition 10, PROCESS_LOCAL, 7477 bytes)
20/11/15 19:15:24 INFO TaskSetManager: Finished task 0.0 in stage 75.0 (TID 33224) in 570 ms on e05e979b7108 (executor driver) (2/1000)
20/11/15 19:15:24 INFO Executor: Running task 9.0 in stage 75.0 (TID 33233)
20/11/15 19:15:24 INFO Executor: Running task 10.0 in stage 75.0 (TID 33234)
20/11/15 19:15:24 INFO TaskSetManager: Finished task 6.0 in stage 75.0 (TID 33230) in 571 ms on e05e979b7108 (executor driver) (3/1000)
....
20/11/15 19:15:43 INFO TaskSetManager: Finished task 998.0 in stage 75.0 (TID 34222) in 158 ms on e05e979b7108 (executor driver) (999/1000)
20/11/15 19:15:43 INFO Executor: Finished task 999.0 in stage 75.0 (TID 34223). 2033 bytes result sent to driver
20/11/15 19:15:43 INFO TaskSetManager: Finished task 999.0 in stage 75.0 (TID 34223) in 175 ms on e05e979b7108 (executor driver) (1000/1000)
20/11/15 19:15:43 INFO TaskSchedulerImpl: Removed TaskSet 75.0, whose tasks have all completed, from pool
20/11/15 19:15:43 INFO DAGScheduler: ResultStage 75 (json at NativeMethodAccessorImpl.java:0) finished in 19.850 s
20/11/15 19:15:43 INFO DAGScheduler: Job 49 is finished. Cancelling potential speculative or zombie tasks for this job
20/11/15 19:15:43 INFO TaskSchedulerImpl: Killing all running tasks in stage 75: Stage finished
20/11/15 19:15:43 INFO DAGScheduler: Job 49 finished: json at NativeMethodAccessorImpl.java:0, took 19.890458 s
20/11/15 19:15:43 INFO InMemoryFileIndex: It took 19936 ms to list leaf files for 1000 paths.
ecbunoof

ecbunoof1#

安装开销很大,特别是对于许多小文件。json也是一种非常低效的存储格式,因为每次都需要读取整个文件。理想情况下,每个文件应为64+mb,以便spark workers能够有效地处理足够的数据。
您是否考虑过让工作流的第一步只是读入json文件,然后以类似parquet的列格式保存到少量文件中。?

相关问题