我对spark结构化流媒体还比较陌生,我正在尝试将来自kafka主题(spark 2.3.2,kafka 2.0)的几个流媒体连接在一起
连接在流上工作得很好,我可以对键进行简单的等连接。在两个主题的一个特定连接上,我必须进行一些数据转换,因为在一个主题上,连接键是用十六进制编码的,而在另一个主题上,连接键是用base64编码的。
经过大量调试,我得到了下面的代码,我正在Pypark齐柏林飞艇笔记本上测试。这两个主题流存储在python dict中
debug = (topicStreams['invprop']
.where("invpropv.PHC_UID_IPID = '183C1BA9B3444919B6C33DAB0B639A87'")
.writeStream.outputMode("append").format("memory")
.queryName("debug").start()
)
正如我所料,这只返回第一个主题的一条消息
debug2 = (topicStreams['hca']
.where("hex(unbase64(hcav.id)) = '183C1BA9B3444919B6C33DAB0B639A87'")
.writeStream.....
第二个流还返回一条消息,这当然是我试图加入的两条消息。我想我可以假设钥匙确实匹配。
debug3 = (topicStreams['invprop']
.join(topicStreams['hca'],
expr("invpropv.PHC_UID_IPID = hex(unbase64(hcav.id))"))
.writeStream...
这个连接不会返回任何东西。什么会导致此联接失败?我想我一定忽略了一些基本的东西。
2条答案
按热度按时间pepwfjgg1#
我不熟悉pyspark,但我可以看到python中compare操作符是
==
相反=
就像你的密码一样。请重新检查什么
=
内部操作员doexpr()
功能sg2wtvxw2#
像往常一样,写一个stackoverflow问题总是能给出答案。这一次以一种出乎意料的方式。。。
输入上面的问题花了我几分钟,然后我再次检查我的齐柏林飞艇笔记本。瞧,我现在找到了我要找的单曲唱片。
连接速度非常慢,但它可以工作—它花了5分钟才产生结果。我从来没有等太久。不,主题不是很大,只有10到几千条信息。
好吧,现在我知道连接基本上起作用了。我得弄清楚它为什么这么慢,我怎么能加快速度。