spark/hive-将数据分组为“透视表”格式

20jt8wwn  于 2021-06-26  发布在  Hive
关注(0)|答案(2)|浏览(414)

我有一组非常烦人的文件,结构如下:

  1. userId string,
  2. eventType string,
  3. source string,
  4. errorCode string,
  5. startDate timestamp,
  6. endDate timestamp

每个文件可能包含每个eventid的任意数量的记录,具有不同的事件类型和源,每个文件的代码和开始/结束日期也不同。
在hive或spark中,有没有一种方法可以将所有这些内容在userid上分组,有点像键值,其中的值是与userid相关联的所有字段的列表?具体来说,我希望它是由事件类型和源键。基本上我想用table的长度来交换宽度,有点像一张透视表。我的目标是最终存储为apacheparquet或avro文件格式,以便将来进行更快速的分析。
举个例子:
源数据:

  1. userId, eventType, source, errorCode, startDate, endDate
  2. 552113, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.432', '2017-09-01 12:01:45.452'
  3. 284723, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.675', '2017-09-01 12:01:45.775'
  4. 552113, 'TRADE', 'MERCH', 0, '2017-09-01 12:01:47.221', '2017-09-01 12:01:46.229'
  5. 552113, 'CHARGE', 'MERCH', 0, '2017-09-01 12:01:48.123', '2017-09-01 12:01:48.976'
  6. 284723, 'REFUND', 'MERCH', 1, '2017-09-01 12:01:48.275', '2017-09-01 12:01:48.947'
  7. 552113, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:49.908', '2017-09-01 12:01:50.623'
  8. 284723, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:50.112', '2017-09-01 12:01:50.777'

目标:

  1. userId, eventTypeAckProvider, sourceAckProvider, errorCodeAckProvider, startDateAckProvider, endDateAckProvider, eventTypeTradeMerch, sourceTradeMerch, errorCodeTradeMerch, startDateTradeMerch, endDateTradeMerch, eventTypeChargeMerch, sourceChargeMerch, errorCodeChargeMerch, startDateChargeMerch, endDateChargeMerch, eventTypeCloseProvider, sourceCloseProvider, errorCodeCloseProvider, startDateCloseProvider, endDateCloseProvider, eventTypeRefundMerch, sourceRefundMerch, errorCodeRefundMerch, startDateRefundMerch, endDateRefundMerch
  2. 552113, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.432', '2017-09-01 12:01:45.452', 'TRADE', 'MERCH', 0, '2017-09-01 12:01:47.221', '2017-09-01 12:01:46.229', 'CHARGE', 'MERCH', 0, '2017-09-01 12:01:48.123', '2017-09-01 12:01:48.976', 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:49.908', '2017-09-01 12:01:50.623', NULL, NULL, NULL, NULL, NULL
  3. 284723, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.675', '2017-09-01 12:01:45.775', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:50.112', '2017-09-01 12:01:50.777', 'REFUND', 'MERCH', 1, '2017-09-01 12:01:48.275', '2017-09-01 12:01:48.947'

字段名或顺序无关紧要,只要我能分辨出来。
我已经尝试了两种方法来实现这一点:
从表中手动选择每个组合并连接到主数据集。这工作得很好,并行性也很好,但是不允许为键字段提供任意数量的值,并且需要预定义模式。
使用spark创建key:value records 其中每个值都是一个字典。基本上,循环遍历数据集,如果字典不存在,则向其中添加一个新键;对于该条目,如果值字典不存在,则向其添加一个新字段。它工作得很漂亮,但是速度非常慢,而且并行性也不好。我也不确定这是否是一个avro/Parquet兼容的格式。
除了这两种方法,还有别的选择吗?或者比我的目标更好的结构?

mwngjboj

mwngjboj1#

你能试试这个并给出你的意见吗,

  1. >>> from pyspark.sql import SparkSession
  2. >>> from pyspark.sql import functions as F
  3. >>> from pyspark.sql.types import *
  4. >>> spark = SparkSession.builder.getOrCreate()
  5. >>> l=[(552113, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.432', '2017-09-01 12:01:45.452'),(284723, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.675', '2017-09-01 12:01:45.775'),(552113, 'TRADE', 'MERCH', 0, '2017-09-01 12:01:47.221', '2017-09-01 12:01:46.229'),(552113, 'CHARGE', 'MERCH', 0, '2017-09-01 12:01:48.123', '2017-09-01 12:01:48.976'),(284723, 'REFUND', 'MERCH', 1, '2017-09-01 12:01:48.275', '2017-09-01 12:01:48.947'),(552113, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:49.908', '2017-09-01 12:01:50.623'),(284723, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:50.112', '2017-09-01 12:01:50.777')]
  6. >>> df = spark.createDataFrame(l,['userId', 'eventType', 'source', 'errorCode', 'startDate','endDate'])
  7. >>> df.show(10,False)
  8. +------+---------+--------+---------+-----------------------+-----------------------+
  9. |userId|eventType|source |errorCode|startDate |endDate |
  10. +------+---------+--------+---------+-----------------------+-----------------------+
  11. |552113|ACK |PROVIDER|0 |2017-09-01 12:01:45.432|2017-09-01 12:01:45.452|
  12. |284723|ACK |PROVIDER|0 |2017-09-01 12:01:45.675|2017-09-01 12:01:45.775|
  13. |552113|TRADE |MERCH |0 |2017-09-01 12:01:47.221|2017-09-01 12:01:46.229|
  14. |552113|CHARGE |MERCH |0 |2017-09-01 12:01:48.123|2017-09-01 12:01:48.976|
  15. |284723|REFUND |MERCH |1 |2017-09-01 12:01:48.275|2017-09-01 12:01:48.947|
  16. |552113|CLOSE |PROVIDER|0 |2017-09-01 12:01:49.908|2017-09-01 12:01:50.623|
  17. |284723|CLOSE |PROVIDER|0 |2017-09-01 12:01:50.112|2017-09-01 12:01:50.777|
  18. +------+---------+--------+---------+-----------------------+-----------------------+
  19. >>> myudf = F.udf(lambda *cols : cols,ArrayType(StringType())) #composition to create rowwise list
  20. >>> df1 = df.select('userId',myudf('eventType', 'source', 'errorCode','startDate', 'endDate').alias('val_list'))
  21. >>> df2 = df1.groupby('userId').agg(F.collect_list('val_list')) # grouped on userId
  22. >>> eventtypes = ['ACK','TRADE','CHARGE','CLOSE','REFUND'] # eventtypes and the order required in output
  23. >>> def f(Vals):
  24. aggVals = [typ for x in eventtypes for typ in Vals if typ[0] == x] # to order the grouped data based on eventtypes above
  25. if len(aggVals) == 5:
  26. return aggVals
  27. else:
  28. missngval = [(idx,val) for idx,val in enumerate(eventtypes)if val not in zip(*aggVals)[0]] # get missing eventtypes with their index to create null
  29. for idx,val in missngval:
  30. aggVals.insert(idx,[None]*5)
  31. return aggVals
  32. >>> myudf2 = F.udf(f,ArrayType(ArrayType(StringType())))
  33. >>> df3 = df2.select('userId',myudf2('agg_list').alias('values'))
  34. >>> df4 = df3.select(['userId']+[df3['values'][i][x] for i in range(5) for x in range(5)]) # to select from Array[Array]
  35. >>> oldnames = df4.columns
  36. >>> destnames = ['userId', 'eventTypeAckProvider', 'sourceAckProvider', 'errorCodeAckProvider', 'startDateAckProvider', 'endDateAckProvider', 'eventTypeTradeMerch', 'sourceTradeMerch', 'errorCodeTradeMerch', 'startDateTradeMerch', 'endDateTradeMerch', 'eventTypeChargeMerch', 'sourceChargeMerch', 'errorCodeChargeMerch', 'startDateChargeMerch', 'endDateChargeMerch', 'eventTypeCloseProvider', 'sourceCloseProvider', 'errorCodeCloseProvider', 'startDateCloseProvider', 'endDateCloseProvider', 'eventTypeRefundMerch', 'sourceRefundMerch', 'errorCodeRefundMerch', 'startDateRefundMerch', 'endDateRefundMerch']
  37. >>> finalDF = reduce(lambda d,idx : d.withColumnRenamed(oldnames[idx],destnames[idx]),range(len(oldnames)),df4) # Renaming the columns
  38. >>> finalDF.show()
  39. +------+--------------------+-----------------+--------------------+-----------------------+-----------------------+-------------------+----------------+-------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+----------------------+-------------------+----------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+
  40. |userId|eventTypeAckProvider|sourceAckProvider|errorCodeAckProvider|startDateAckProvider |endDateAckProvider |eventTypeTradeMerch|sourceTradeMerch|errorCodeTradeMerch|startDateTradeMerch |endDateTradeMerch |eventTypeChargeMerch|sourceChargeMerch|errorCodeChargeMerch|startDateChargeMerch |endDateChargeMerch |eventTypeCloseProvider|sourceCloseProvider|errorCodeCloseProvider|startDateCloseProvider |endDateCloseProvider |eventTypeRefundMerch|sourceRefundMerch|errorCodeRefundMerch|startDateRefundMerch |endDateRefundMerch |
  41. +------+--------------------+-----------------+--------------------+-----------------------+-----------------------+-------------------+----------------+-------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+----------------------+-------------------+----------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+
  42. |284723|ACK |PROVIDER |0 |2017-09-01 12:01:45.675|2017-09-01 12:01:45.775|null |null |null |null |null |null |null |null |null |null |CLOSE |PROVIDER |0 |2017-09-01 12:01:50.112|2017-09-01 12:01:50.777|REFUND |MERCH |1 |2017-09-01 12:01:48.275|2017-09-01 12:01:48.947|
  43. |552113|ACK |PROVIDER |0 |2017-09-01 12:01:45.432|2017-09-01 12:01:45.452|TRADE |MERCH |0 |2017-09-01 12:01:47.221|2017-09-01 12:01:46.229|CHARGE |MERCH |0 |2017-09-01 12:01:48.123|2017-09-01 12:01:48.976|CLOSE |PROVIDER |0 |2017-09-01 12:01:49.908|2017-09-01 12:01:50.623|null |null |null |null |null |
  44. +------+--------------------+-----------------+--------------------+-----------------------+-----------------------+-------------------+----------------+-------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+----------------------+-------------------+----------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+
展开查看全部
jckbn6z7

jckbn6z72#

你想要这样的吗?

  1. from pyspark.sql.functions import struct, col, create_map, collect_list
  2. df = sc.parallelize([
  3. ['552113', 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.432', '2017-09-01 12:01:45.452'],
  4. ['284723', 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.675', '2017-09-01 12:01:45.775'],
  5. ['552113', 'TRADE', 'MERCH', 0, '2017-09-01 12:01:47.221', '2017-09-01 12:01:46.229'],
  6. ['552113', 'CHARGE', 'MERCH', 0, '2017-09-01 12:01:48.123', '2017-09-01 12:01:48.976'],
  7. ['284723', 'REFUND', 'MERCH', 1, '2017-09-01 12:01:48.275', '2017-09-01 12:01:48.947'],
  8. ['552113', 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:49.908', '2017-09-01 12:01:50.623'],
  9. ['284723', 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:50.112', '2017-09-01 12:01:50.777']
  10. ]).toDF(('userId', 'eventType', 'source', 'errorCode', 'startDate', 'endDate'))
  11. df.show()
  12. new_df = df.withColumn("eventType_source", struct([col('eventType'), col('source')])).\
  13. withColumn("errorCode_startEndDate", struct([col('errorCode'), col('startDate'), col('endDate')]))
  14. new_df = new_df.groupBy('userId').agg(collect_list(create_map(col('eventType_source'), col('errorCode_startEndDate'))).alias('event_detail'))
  15. new_df.show()
展开查看全部

相关问题