上下文
我正在运行一个简单的结构化流式处理作业,该作业从一个源kafka主题读取用户输入,检查允许该用户执行的操作的远程acl,如果允许,则将输出写入sink kafka主题,以便可以进一步处理授权的操作。简而言之,这个过程看起来是这样的: user input --> kafka source topic --> streaming job read --> query remote ACL (assume user is authorized) --> streaming job write --> kafka sink topic --> perform action
由于acl查找是由另一个微服务完成的,因此我需要通过http查询该服务,因此我需要使用foreach()write stream语义。应用程序需要执行i准实时模式,因此foreach()比foreachbatch()更好。
问题
我所面临的问题是,我无法从foreach()块内部找出如何在kafka主题中进行编写。此时,我正在使用read()流连接到源主题,然后在foreach.open()块中手动初始化kafkaproducer,以便在sink主题中发布输出消息。这是可行的,但我相信有一个更干净的解决方案。
期望的行为
查看foreachbatch()语义,似乎可以通过调用batchdf.write.format(“kafka”).save()将输出写入kafka写流。这很好:
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
batchDF.write.format(...).save(...) // location 1
batchDF.write.format(...).save(...) // location 2
batchDF.unpersist()
}
在foreach()writestream中也可能有这样的情况吗?如果没有,可能的替代方案是什么?
暂无答案!
目前还没有任何答案,快来回答吧!