我刚接触apachespark,使用scala。我可以使用以下命令将表连接到流:
Updated_DF = Inbound_DF.join(colToAdd, colToAdd("key") <=> Inbound_DF("key"), "left")
.withColumnRenamed("Data_DF","site").drop("Id","key")
现在我想看看 colToAdd("key")
以及 Inbound_DF("key")
匹配并加入是否成功。例如,coltoadd:
Id key Data_DF
S31 S3 {"name":"nick","region":"IN"}
S21 S2 {"name":"john","region":"CA"}
S11 S1 {"name":"ashley","region":"CA"}
S51 S5 {"name":"bella","region":"UK"}
S41 S4 {"name":"kumar","region":"In"}
S6 S6 {"name":"ben","region":"US"}
P11 P1 {"name":"MKD","region":"UAE"}
P21 P2 {"name":"ahmad","region":"UAE"}
来自传入流的消息如下所示:
cusId key item price
1897 S2 book 54
加入后,更新的消息应如下所示:
cusId key item price site
1897 S2 book 54 {"name":"john","region":"CA"}
但是如果我收到一条流消息 key = S9
,连接将不会发生,然后我要记录一条消息:
------- join failed, key not found ---------
据我所知,这可以通过使用 filter
方法,但我不知道如何实现。请帮助我如何做到这一点,或者有没有更好的方法来做同样的事情。
1条答案
按热度按时间xkrw2x1b1#
有多种方法可以做到这一点。我只是给你提供了一个如何做到这一点的想法,你可以根据你的用例进行调整。
首先,左连接的方式不正确,需要交换Dataframe。流dataframe应保留为dataframe。