pyspark 如何在将大型JSON文件转换为JSON之前使用AWS glueContext拆分/分块?

qzwqbdag  于 2024-01-06  发布在  Spark
关注(0)|答案(2)|浏览(142)

我正在尝试使用AWS Glue将20 GB JSON gzip文件转换为parquet。
我已经设置了一个作业使用Pyspark与下面的代码。
我收到了这个日志警告消息:
第一个月
我想知道是否有一种方法来分割/分块文件?我知道我可以用pandas来做,但不幸的是这需要太长的时间(12+小时)。

  1. import sys
  2. from awsglue.transforms import *
  3. from awsglue.utils import getResolvedOptions
  4. from pyspark.context import SparkContext
  5. import pyspark.sql.functions
  6. from pyspark.sql.functions import col, concat, reverse, translate
  7. from awsglue.context import GlueContext
  8. from awsglue.job import Job
  9. glueContext = GlueContext(SparkContext.getOrCreate())
  10. test = glueContext.create_dynamic_frame_from_catalog(
  11. database="test_db",
  12. table_name="aws-glue-test_table")
  13. # Create Spark DataFrame, remove timestamp field and re-name other fields
  14. reconfigure = test.drop_fields(['timestamp']).rename_field('name', 'FirstName').rename_field('LName', 'LastName').rename_field('type', 'record_type')
  15. # Create pyspark DF
  16. spark_df = reconfigure.toDF()
  17. # Filter and only return 'a' record types
  18. spark_df = spark_df.where("record_type == 'a'")
  19. # Once filtered, remove the record_type column
  20. spark_df = spark_df.drop('record_type')
  21. spark_df = spark_df.withColumn("LastName", translate("LastName", "LName:", ""))
  22. spark_df = spark_df.withColumn("FirstName", reverse("FirstName"))
  23. spark_df.write.parquet("s3a://aws-glue-bucket/parquet/test.parquet")

字符串

mi7gmzs6

mi7gmzs61#

Spark不会并行阅读单个gzip文件。但是,您可以将其拆分为块。
另外,Spark在阅读gzip文件时非常慢(因为它没有并行化)。你可以这样做来加速它:

  1. file_names_rdd = sc.parallelize(list_of_files, 100)
  2. lines_rdd = file_names_rdd.flatMap(lambda _: gzip.open(_).readlines())

字符串

ufj5ltwl

ufj5ltwl2#

我有一个单一的大不可分割的CSV文件,这是Gzip压缩这个问题。我相信接受的答案只适用于一个文件列表。
我使用新的AWS Glue for ray和AWS Wrangler批量读取分区,如下所示:

  1. import awswrangler as wr
  2. import ray
  3. import pandas
  4. from ray import data
  5. import pandas as pd
  6. ray.init('auto')
  7. large_dataset = wr.s3.read_csv(
  8. 's3://path_to_large_csv.gz,
  9. sep=',',
  10. header = True,
  11. chunksize = 1e5,
  12. )
  13. @ray.remote
  14. def read_batch(batch):
  15. #print (batch.shape)
  16. return batch
  17. futures = [read_batch.remote(part) for part in large_dataset]
  18. large_distributed_dataset = data.from_pandas(ray.get(futures))
  19. large_distributed_dataset.write_parquet(
  20. "s3://path_to_output/"
  21. )

字符串
awswrangler[modin]必须按照glue文档中的指导使用--pip-install添加到作业中。

展开查看全部

相关问题