在pyspark中,我很难有效地读取和解析大量流文件!
上下文
这是我正在用json读取的流文件的模式。空格是为了保密而编辑的。
root
|-- location_info: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- restaurant_type: string (nullable = true)
| | |
| | |
| | |-- other_data: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- other_data_1 string (nullable = true)
| | | | |-- other_data_2: string (nullable = true)
| | | | |-- other_data_3: string (nullable = true)
| | | | |-- other_data_4: string (nullable = true)
| | | | |-- other_data_5: string (nullable = true)
| | |
| | |-- latitude: string (nullable = true)
| | |-- longitude: string (nullable = true)
| | |-- timezone: string (nullable = true)
|-- restaurant_id: string (nullable = true)
当前的读取和解析方法(虽然有效,但耗时太长)
尽管下面的方法是有效的,而且它本身就是一个解决方案,甚至可以开始读取文件,但是当文件数量增加到数千个时,这个方法会花费很长时间
每个文件大小约为10mb
这些文件是基本的“流”文件,名称如下 s3://bucket_name/raw/2020/03/05/04/file-stream-6-2020-03-05-04-01-04-123-b978-2e2b-5672-fa243fs4aeb4
. 因此,我在pyspark中将它作为json读入(不确定我还将以其他方式读入什么?)
如果您注意到我要求用'\n{“restaurant\u id”替换restaurant\u id',这是因为如果我不这样做,那么read操作只读取文件中的第一条记录,而忽略其他内容。。。
# Reading multiple files in the dir
source_df_1 = spark.read.json(sc.wholeTextFiles("file_path/*")
.values()
.flatMap(lambda x: x
.replace('{"restaurant_id','\n{"restaurant_id' ).split('\n')))
# explode here to have restaurant_id, and nested data
exploded_source_df_1 = source_df_1.select(col('restaurant_id'),
explode(col('location_info')).alias('location_info') )
# Via SQL operation : this will solve the problem for parsing
exploded_source_df_1.createOrReplaceTempView('result_1')
subset_data_1 = spark.sql(
'''
SELECT restaurant_id, location_infos.latitude,location_infos.longitude,location_infos.timezone
from result_1
'''
)
我想帮忙的事情:
(1) 有没有更快的方法让我读懂这个?
(2) 如果我尝试缓存/持久化Dataframe,我什么时候才能做到,因为它看起来像 .values().flatMap(lambda x: x.replace('{"restaurant_id','\n{"restaurant_id' )
它本身就是一个操作,所以如果我在最后调用persist(),它似乎会重做整个读取过程?
你可以参考这篇文章来了解我是如何首先得到这个解决方案的:link。非常感谢您抽出时间
暂无答案!
目前还没有任何答案,快来回答吧!