spark结构化流连接不工作

vu8f3i0k  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(279)

我对spark结构化流媒体还比较陌生,我正在尝试将来自kafka主题(spark 2.3.2,kafka 2.0)的几个流媒体连接在一起
连接在流上工作得很好,我可以对键进行简单的等连接。在两个主题的一个特定连接上,我必须进行一些数据转换,因为在一个主题上,连接键是用十六进制编码的,而在另一个主题上,连接键是用base64编码的。
经过大量调试,我得到了下面的代码,我正在Pypark齐柏林飞艇笔记本上测试。这两个主题流存储在python dict中

debug = (topicStreams['invprop']
   .where("invpropv.PHC_UID_IPID = '183C1BA9B3444919B6C33DAB0B639A87'")
   .writeStream.outputMode("append").format("memory")
   .queryName("debug").start()
)

正如我所料,这只返回第一个主题的一条消息

debug2 = (topicStreams['hca']
   .where("hex(unbase64(hcav.id)) = '183C1BA9B3444919B6C33DAB0B639A87'")
   .writeStream.....

第二个流还返回一条消息,这当然是我试图加入的两条消息。我想我可以假设钥匙确实匹配。

debug3 = (topicStreams['invprop']
   .join(topicStreams['hca'], 
         expr("invpropv.PHC_UID_IPID = hex(unbase64(hcav.id))"))
   .writeStream...

这个连接不会返回任何东西。什么会导致此联接失败?我想我一定忽略了一些基本的东西。

pepwfjgg

pepwfjgg1#

我不熟悉pyspark,但我可以看到python中compare操作符是 == 相反 = 就像你的密码一样。
请重新检查什么 = 内部操作员do expr() 功能

sg2wtvxw

sg2wtvxw2#

像往常一样,写一个stackoverflow问题总是能给出答案。这一次以一种出乎意料的方式。。。
输入上面的问题花了我几分钟,然后我再次检查我的齐柏林飞艇笔记本。瞧,我现在找到了我要找的单曲唱片。
连接速度非常慢,但它可以工作—它花了5分钟才产生结果。我从来没有等太久。不,主题不是很大,只有10到几千条信息。
好吧,现在我知道连接基本上起作用了。我得弄清楚它为什么这么慢,我怎么能加快速度。

相关问题