我想加入 3 tables
使用 spark rdd
. 我使用sparksql实现了我的目标,但是当我尝试使用rdd加入它时,我没有得到期望的结果。下面是我的查询使用 spark SQL
以及 output
:
scala> actorDF.as("df1").join(movieCastDF.as("df2"),$"df1.act_id"===$"df2.act_id").join(movieDF.as("df3"),$"df2.mov_id"===$"df3.mov_id").
filter(col("df3.mov_title")==="Annie Hall").select($"df1.act_fname",$"df1.act_lname",$"df2.role").show(false)
+---------+---------+-----------+
|act_fname|act_lname|role |
+---------+---------+-----------+
|Woody |Allen |Alvy Singer|
+---------+---------+-----------+
现在我创造了 pairedRDDs
对于三个数据集,如下所示:
scala> val actPairedRdd=actRdd.map(_.split("\t",-1)).map(p=>(p(0),(p(1),p(2),p(3))))
scala> actPairedRdd.take(5).foreach(println)
(101,(James,Stewart,M))
(102,(Deborah,Kerr,F))
(103,(Peter,OToole,M))
(104,(Robert,De Niro,M))
(105,(F. Murray,Abraham,M))
scala> val movieCastPairedRdd=movieCastRdd.map(_.split("\t",-1)).map(p=>(p(0),(p(1),p(2))))
movieCastPairedRdd: org.apache.spark.rdd.RDD[(String, (String, String))] = MapPartitionsRDD[318] at map at <console>:29
scala> movieCastPairedRdd.foreach(println)
(101,(901,John Scottie Ferguson))
(102,(902,Miss Giddens))
(103,(903,T.E. Lawrence))
(104,(904,Michael))
(105,(905,Antonio Salieri))
(106,(906,Rick Deckard))
scala> val moviePairedRdd=movieRdd.map(_.split("\t",-1)).map(p=>(p(0),(p(1),p(2),p(3),p(4),p(5),p(6))))
moviePairedRdd: org.apache.spark.rdd.RDD[(String, (String, String, String, String, String, String))] = MapPartitionsRDD[322] at map at <console>:29
scala> moviePairedRdd.take(2).foreach(println)
(901,(Vertigo,1958,128,English,1958-08-24,UK))
(902,(The Innocents,1961,100,English,1962-02-19,SW))
在这里 actPairedRdd
以及 movieCastPairedRdd
彼此相连 movieCastPairedRdd
以及 moviePairedRdd
是链接的,因为它们有公共列。
现在,当我加入所有三个数据集时,我没有得到任何数据
scala> actPairedRdd.join(movieCastPairedRdd).join(moviePairedRdd).take(2).foreach(println)
我得到的是空白记录。那我哪里做错了??提前谢谢
1条答案
按热度按时间ldxq2e6h1#
像这样连接RDD是痛苦的,这是dfs更好的另一个原因。
由于rdd=k,v没有上一个rdd的k部分的公共数据,因此没有得到任何数据。101、102的k将加入,但901、902没有共同点。你需要改变一些事情,像这样,我更有限的例子:
退货:
你需要通过Map把数据去掉,我留给你。每个默认值的内部联接。