使用hadoop parquet将大数据处理为csv输出

dw1jzc5e  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(405)

我有3个数据集,我想加入和分组,以获得一个包含聚合数据的csv。
数据作为Parquet文件存储在hadoop中,我使用zeppelin运行apachespark+scala进行数据处理。
我的数据集如下所示:

user_actions.show(10)
user_clicks.show(10)
user_options.show(10)

+--------------------+--------------------+
|                  id|             keyword|
+--------------------+--------------------+
|00000000000000000001|               aaaa1|
|00000000000000000002|               aaaa1|
|00000000000000000003|               aaaa2|
|00000000000000000004|               aaaa2|
|00000000000000000005|               aaaa0|
|00000000000000000006|               aaaa4|
|00000000000000000007|               aaaa1|
|00000000000000000008|               aaaa2|
|00000000000000000009|               aaaa1|
|00000000000000000010|               aaaa1|
+--------------------+--------------------+
+--------------------+-------------------+
|           search_id|   selected_user_id|
+--------------------+-------------------+
|00000000000000000001|               1234|
|00000000000000000002|               1234|
|00000000000000000003|               1234|
|00000000000000000004|               1234|
+--------------------+-------------------+

+--------------------+----------+----------+
|           search_id|   user_id|  position|
+--------------------+----------+----------+
|00000000000000000001|      1230|         1|
|00000000000000000001|      1234|         3|
|00000000000000000001|      1232|         2|
|00000000000000000002|      1231|         1|
|00000000000000000002|      1232|         2|
|00000000000000000002|      1233|         3|
|00000000000000000002|      1234|         4|
|00000000000000000003|      1234|         1|
|00000000000000000004|      1230|         1|
|00000000000000000004|      1234|         2|
+--------------------+----------+----------+

我试图实现的是为每个用户id获取一个带有关键字的json,因为我需要将它们导入mysql,并将用户id作为pk。

user_id,keywords
1234,"{\"aaaa1\":3.5,\"aaaa2\":0.5}"

如果json不是现成的,我可以使用元组或任何字符串:

user_id,keywords
1234,"(aaaa1,0.58333),(aaaa2,1.5)"

到目前为止我所做的是:

val user_actions_data = user_actions
                                .join(user_options, user_options("search_id") === user_actions("id"))

val user_actions_full_data = user_actions_data
                                    .join(
                                            user_clicks,
                                            user_clicks("search_id") === user_actions_data("search_id") && user_clicks("selected_user_id") === user_actions_data("user_id"),
                                            "left_outer"
                                        )

val user_actions_data_groupped = user_actions_full_data
                                        .groupBy("user_id", "search")
                                        .agg("search" -> "count", "selected_user_id" -> "count", "position" -> "avg")

def udfScoreForUser = ((position: Double, searches: Long) =>  ( position/searches ))

val search_log_keywords = user_actions_data_groupped.rdd.map({row => row(0) -> (row(1) -> udfScoreForUser(row.getDouble(4), row.getLong(2)))}).groupByKey()

val search_log_keywords_array = search_log_keywords.collect.map(r => (r._1.asInstanceOf[Long], r._2.mkString(", ")))

val search_log_keywords_df = sc.parallelize(search_log_keywords_array).toDF("user_id","keywords")
    .coalesce(1)
    .write.format("csv")
    .option("header", "true")
    .mode("overwrite")
    .save("hdfs:///Search_log_testing_keywords/")

虽然这在一个小数据集和我的输出csv文件中工作正常:

user_id,keywords
1234,"(aaaa1,0.58333), (aaaa2,0.5)"

我有问题时,它对200+gb的数据运行。
我对spark&scala还比较陌生,但我觉得我遗漏了一些东西,我不应该使用df到rdd,collect到array上Map,并将其并行化回df以导出到csv。
作为总结,我想对所有关键字应用评分,并按用户id对它们进行分组,然后将其保存到csv中。到目前为止,我所做的工作适用于一个小数据集,但当我将其应用于200gb+的数据时,apachespark失败了。

jrcvhitl

jrcvhitl1#

是的,任何依靠 collect 在spark中通常是错误的-除非您正在调试某些东西。当你打电话的时候 collect 所有数据都是在一个数组中的驱动程序中收集的,因此对于大多数大数据集来说,这甚至不是一个选项—您的驱动程序将抛出一个oom并死亡。
我不明白的是你为什么一开始就要收藏?为什么不简单地Map到分布式数据集上呢?

search_log_keywords
  .map(r => (r._1.asInstanceOf[Long], r._2.mkString(", ")))
  .toDF("user_id","keywords")
  .coalesce(1)
  .write.format("csv")
  .option("header", "true")
  .mode("overwrite")
  .save("hdfs:///Search_log_testing_keywords/")

这样,一切都是并行进行的。
关于在 dataframes 以及 rdds ,那我现在就不用太担心了。我知道社区大多提倡使用 dataframes ,但取决于spark的版本和您的用例, rdds 也许是个更好的选择。

00jrzges

00jrzges2#

hdfs的主要目标是将文件分割成块并冗余地存储它。最好将数据分区存储在hdfs中,除非您绝对需要一个大文件。

相关问题