如何有效地读取pyspark中嵌套的json?

jgzswidk  于 2021-05-29  发布在  Spark
关注(0)|答案(0)|浏览(377)

在pyspark中,我很难有效地读取和解析大量流文件!

上下文

这是我正在用json读取的流文件的模式。空格是为了保密而编辑的。

  1. root
  2. |-- location_info: array (nullable = true)
  3. | |-- element: struct (containsNull = true)
  4. | | |-- restaurant_type: string (nullable = true)
  5. | | |
  6. | | |
  7. | | |-- other_data: array (nullable = true)
  8. | | | |-- element: struct (containsNull = true)
  9. | | | | |-- other_data_1 string (nullable = true)
  10. | | | | |-- other_data_2: string (nullable = true)
  11. | | | | |-- other_data_3: string (nullable = true)
  12. | | | | |-- other_data_4: string (nullable = true)
  13. | | | | |-- other_data_5: string (nullable = true)
  14. | | |
  15. | | |-- latitude: string (nullable = true)
  16. | | |-- longitude: string (nullable = true)
  17. | | |-- timezone: string (nullable = true)
  18. |-- restaurant_id: string (nullable = true)

当前的读取和解析方法(虽然有效,但耗时太长)

尽管下面的方法是有效的,而且它本身就是一个解决方案,甚至可以开始读取文件,但是当文件数量增加到数千个时,这个方法会花费很长时间
每个文件大小约为10mb
这些文件是基本的“流”文件,名称如下 s3://bucket_name/raw/2020/03/05/04/file-stream-6-2020-03-05-04-01-04-123-b978-2e2b-5672-fa243fs4aeb4 . 因此,我在pyspark中将它作为json读入(不确定我还将以其他方式读入什么?)
如果您注意到我要求用'\n{“restaurant\u id”替换restaurant\u id',这是因为如果我不这样做,那么read操作只读取文件中的第一条记录,而忽略其他内容。。。

  1. # Reading multiple files in the dir
  2. source_df_1 = spark.read.json(sc.wholeTextFiles("file_path/*")
  3. .values()
  4. .flatMap(lambda x: x
  5. .replace('{"restaurant_id','\n{"restaurant_id' ).split('\n')))
  6. # explode here to have restaurant_id, and nested data
  7. exploded_source_df_1 = source_df_1.select(col('restaurant_id'),
  8. explode(col('location_info')).alias('location_info') )
  9. # Via SQL operation : this will solve the problem for parsing
  10. exploded_source_df_1.createOrReplaceTempView('result_1')
  11. subset_data_1 = spark.sql(
  12. '''
  13. SELECT restaurant_id, location_infos.latitude,location_infos.longitude,location_infos.timezone
  14. from result_1
  15. '''
  16. )

我想帮忙的事情:

(1) 有没有更快的方法让我读懂这个?
(2) 如果我尝试缓存/持久化Dataframe,我什么时候才能做到,因为它看起来像 .values().flatMap(lambda x: x.replace('{"restaurant_id','\n{"restaurant_id' ) 它本身就是一个操作,所以如果我在最后调用persist(),它似乎会重做整个读取过程?
你可以参考这篇文章来了解我是如何首先得到这个解决方案的:link。非常感谢您抽出时间

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题