kafka代理(0.10.0或更高版本)作为python中spark流的数据流源

vsikbqxv  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(207)

具体地说,我正在寻找一个替代品或工作的周围 KafkaUtils.createStream() kafka 0.8.0中pyspark.streaming.kafka的api调用。
在kafka 0.10.0中尝试使用此(已折旧)函数会产生错误。我正在考虑创建一个自定义接收器,但这里也没有任何pyspark支持。似乎也没有解决办法。
下面是我试图构建的应用程序的摘要。应用程序希望从不同的生产线资源创建一个实时的(聚合的) Jmeter 板,这些资源被输入kafka。同时,处理后的数据将进入永久存储器。目标是从这些永久性数据创建一个异常检测系统。
我可以通过在发送数据之前批处理数据来解决永久存储的这个问题。但这显然不适用于流媒体。
下面是脚本的伪代码:

  1. sc = SparkContext(appName='abc')
  2. sc.setLogLevel('WARN')
  3. ssc = StreamingContext(sc, 2)
  4. ## Create Dstream object from Kafka (This is where I'm stuck)
  5. ## Transform and create aggregated windows
  6. ssc.start()
  7. ## Catch output and send back to Kafka as producer

所有的建议和解决方案都是非常受欢迎的。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题