如何将pyspark rdd转换为Dataframe

yftpprvb  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(609)

我有一个Dataframedf如下
测向=

  1. +---+---+----+---+---+
  2. | a| b| c| d| e|
  3. +---+---+----+---+---+
  4. | 1| a|foo1| 4| 5|
  5. | 2| b| bar| 4| 6|
  6. | 3| c| mnc| 4| 7|
  7. | 4| c| mnc| 4| 7|
  8. +---+---+----+---+---+

我想实现像df1这样的目标=

  1. +---+---+-----------------------------------------------+
  2. | a| b| c |
  3. +---+---+-----------------------------------------------+
  4. | 1| a|{'a': 1, 'b': 'a', 'c': 'foo1', 'd': 4, 'e': 5}|
  5. | 2| b|{'a': 2, 'b': 'b', 'c': 'bar', 'd': 4, 'e': 6} |
  6. | 3| c|{'a': 3, 'b': 'c', 'c': 'mnc', 'd': 4, 'e': 7} |
  7. | 4| c|{'a': 4, 'b': 'c', 'c': 'mnc', 'd': 4, 'e': 7} |
  8. +---+---+-----------------------------------------------+

我真的想避免分组,所以我想首先将Dataframe转换成rdd,然后再将它们转换成一个Dataframe
我写的代码是

  1. df2=df.rdd.flatMap(lambda x:(x.a,x.b,x.asDict()))

在df2上做foreach时,我得到的结果是rdd格式的,所以我试图用它创建一个Dataframe。

  1. df3=df2.toDF() #1st way
  2. df3=sparkSession.createDataframe(df2) #2nd way

但我在这两方面都犯了错误。有人能解释一下我在这里做错了什么,以及如何实现我的团聚吗

pepwfjgg

pepwfjgg1#

可以使用spark sql执行以下操作:
Sparksql

  1. data.createOrReplaceTempView("data")
  2. spark.sql("""
  3. select a, b, to_json(named_struct('a',a, 'b',b,'c',c,'d',d,'e',e)) as c
  4. from data""").show(20,False)

输出

  1. # +---+---+----------------------------------------+
  2. # |a |b |c |
  3. # +---+---+----------------------------------------+
  4. # |1 |a |{"a":1,"b":"a","c":"foo1","d":"4","e":5}|
  5. # |2 |b |{"a":2,"b":"b","c":"bar","d":"4","e":6} |
  6. # |3 |c |{"a":3,"b":"c","c":"mnc","d":"4","e":7} |
  7. # |4 |c |{"a":4,"b":"c","c":"mnc","d":"4","e":7} |
  8. # +---+---+----------------------------------------+

Dataframeapi

  1. result = data\
  2. .withColumn('c',to_json(struct(data.a,data.b,data.c,data.d,data.e)))\
  3. .select("a","b","c")
  4. result.show(20,False)

输出

  1. # +---+---+----------------------------------------+
  2. # |a |b |c |
  3. # +---+---+----------------------------------------+
  4. # |1 |a |{"a":1,"b":"a","c":"foo1","d":"4","e":5}|
  5. # |2 |b |{"a":2,"b":"b","c":"bar","d":"4","e":6} |
  6. # |3 |c |{"a":3,"b":"c","c":"mnc","d":"4","e":7} |
  7. # |4 |c |{"a":4,"b":"c","c":"mnc","d":"4","e":7} |
  8. # +---+---+----------------------------------------+
展开查看全部
dgtucam1

dgtucam12#

可以从Map类型列创建json列

  1. import pyspark.sql.functions as F
  2. df = sqlContext.createDataFrame(
  3. [(0, 1, 23, 4, 8, 9, 5, "b1"), (1, 2, 43, 8, 10, 20, 43, "e1")],
  4. ("id", "a1", "b1", "c1", "d1", "e1", "f1", "ref")
  5. )
  6. tst = [[F.lit(c),F.col(c)] for c in df.columns]
  7. tst_flat =[item for sublist in tst for item in sublist]
  8. # %%
  9. map_coln = F.create_map(*tst_flat)
  10. df1=df.withColumn("out",F.to_json(map_coln))

结果:

  1. In [37]: df1.show(truncate=False)
  2. +---+---+---+---+---+---+---+---+-------------------------------------------------------------------------------+
  3. |id |a1 |b1 |c1 |d1 |e1 |f1 |ref|out |
  4. +---+---+---+---+---+---+---+---+-------------------------------------------------------------------------------+
  5. |0 |1 |23 |4 |8 |9 |5 |b1 |{"id":"0","a1":"1","b1":"23","c1":"4","d1":"8","e1":"9","f1":"5","ref":"b1"} |
  6. |1 |2 |43 |8 |10 |20 |43 |e1 |{"id":"1","a1":"2","b1":"43","c1":"8","d1":"10","e1":"20","f1":"43","ref":"e1"}|
  7. +---+---+---+---+---+---+---+---+-------------------------------------------------------------------------------+
展开查看全部

相关问题