spark中基于列的hive合并数据

8cdiaqws  于 2021-06-28  发布在  Hive
关注(0)|答案(1)|浏览(579)

我在配置单元表中有以下格式的数据。

  1. user | purchase | time_of_purchase

我想把数据放进去

  1. user | list of purchases ordered by time

如何在pyspark或hiveql中执行此操作?
我曾尝试在配置单元中使用collect\u list,但它无法按时间戳正确保留顺序。
编辑:按kartikkannapur的要求添加样本数据。这是一个示例数据

  1. 94438fef-c503-4326-9562-230e78796f16 | Bread | Jul 7 20:48
  2. 94438fef-c503-4326-9562-230e78796f16 | Shaving Cream | July 10 14:20
  3. a0dcbb3b-d1dd-43aa-91d7-e92f48cee0ad | Milk | July 7 3:48
  4. a0dcbb3b-d1dd-43aa-91d7-e92f48cee0ad | Bread | July 7 3:49
  5. a0dcbb3b-d1dd-43aa-91d7-e92f48cee0ad | Lotion | July 7 15:30

我想要的输出是

  1. 94438fef-c503-4326-9562-230e78796f16 | Bread , Shaving Cream
  2. a0dcbb3b-d1dd-43aa-91d7-e92f48cee0ad | Milk , Bread , Lotion
34gzjxbg

34gzjxbg1#

一种方法是
首先创建一个配置单元上下文并将表读取到rdd。

  1. from pyspark import HiveContext
  2. purchaseList = HiveContext(sc).sql('from purchaseList select *')

然后处理rdd

  1. from datetime import datetime as dt
  2. purchaseList = purchaseList.map(lambda x:(x[0],[x[1],dt.strptime(x[2],"%b %d %H:%M")]))
  3. purchaseByUser = purchaseList.groupByKey()
  4. purchaseByUser = purchaseByUser.map(lambda x:(x[0],[y[0] for y in sorted(x[1], key=lambda z:z[1])]))
  5. print(purchaseByUser.take(2))

输出

  1. [('94438fef-c503-4326-9562-230e78796f16', ['Bread', 'Shaving Cream']), ('a0dcbb3b-d1dd-43aa-91d7-e92f48cee0ad', ['Milk', 'Bread', 'Lotion'])]

将rdd另存为新配置单元表

  1. schema_rdd = HiveContext(sc).inferSchema(purchaseByUser)
  2. schema_rdd.saveAsTable('purchaseByUser')

有关读取和写入配置单元表的信息,请参阅stackoverflow问题和spark文档

展开查看全部

相关问题