改进pyspark作业分析数据

ntjbwcob  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(337)

我在google存储中有一些json文件,其中包含大量数据(500gib到1tib)。这些文件每行包含1个json对象,格式如下:

{"country":"US", "col1":"val1", "col2":"val2", "col3":"val3"}
{"country":"CA", "col1":"val4", "col2":"val5", "col3":"val6"}

我的目标是在bigquery中为我能在这些数据中找到的10个国家创建不同的表。因此,我将得到10个表,例如一个将命名为 data_us 使用模式: col1,col2,col3 .
我目前的做法是使用pyspark并在google dataproc上的计算机集群上运行作业:

data = spark.read.json(bucket_source)
    data.createOrReplaceTempView('data')

    for c in country_list:
        table_name = "data_{}".format(c)
        query = "select col1, col2, col3, from data where language = '{}'".format(c)
        result_folder = "result_{}".format(c)
        result = spark.sql(query)
        push_bigquery(bucket_dest, cluster_name, project_name, dataset_name, result, result_folder, table_name)

所以基本上,我只是加载数据,创建一个视图,然后让pyspark为每个国家运行一个请求。然后我打电话给 push_bigquery 函数,它只是将结果转储到csv文件并将它们加载到bigquery中。这个解决方案是可行的,但对于大数据量来说似乎有点慢(对于接近1tib的数据大小,大约12小时)。
我有两个问题:
我目前在每个国家运行一个查询,所以每次查询都会对整个数据进行分析。有没有一种方法可以“分析”每一行并立即将其写入正确的结果/文件中?我觉得它会更快,但因为我不是很熟悉Spark/pyspark,但我不知道这个解决方案是否有意义。
有没有一个完全不同的更好的方法来完成这个任务比使用Spark?
谢谢你的帮助

qnzebej0

qnzebej01#

我错过了.cache,但基于n country<->n表要求,以下将是我的第一次尝试:
没有缓存。
读取、重新分区并通过 df.repartition(country).write...partitionBy(country)... 有适当的选择。
然后我会把你的for循环应用到每个依赖分区的国家,对你来说确实需要单独的表,对此不得不说,这并不是完全必要的。i、 e.一般来说,第2步就足够了。

oalqel3c

oalqel3c2#

您可以使用@thebluephantom提供的代码对其进行优化,并通过这样做进行更多优化。

df.repartition(country).write...partitionBy(country)

保存Dataframe后,现在每个国家都有单独的文件夹。因此,在运行bq load命令时,可以在这些文件夹上创建国家级表。这样,您就不需要做任何进一步的处理,并且可以将数据从所有大查询表中分离出来。

相关问题