spark流媒体+Kafka集成

tcomlyy6  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(397)

我尝试用pyspark把spark和kafka集成到jupyter笔记本中。这是我的工作环境。
spark版本:spark 2.2.1Kafka版本:kafka_2.11-0.8.2.2 spark streamingKafkajar:spark-streaming-kafka-0-8-assembly_2.11-2.2.1.jar
我在spark-defaults.conf文件中添加了一个spark-streaming-kafka程序集jar文件。
当我为pyspark流启动streamingcontext时,此错误显示为无法从manifest.mf读取kafka版本。

这是我的密码。

from pyspark import SparkContext, SparkConf
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import sys
import os

from kafka import KafkaProducer

# Receive data handler

def handler(message):
    records = message.collect()
    for record in records:
        print(record)
        #producer.send('receive', str(res))
        #producer.flush()

producer = KafkaProducer(bootstrap_servers='slave02:9092')
sc = SparkContext(appName="SparkwithKafka")
ssc = StreamingContext(sc, 1)

# Create Kafka streaming with argv

zkQuorum = 'slave02:2181'
topic = 'send'
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic:1})
kvs.foreachRDD(handler)

ssc.start()
ddhy6vgd

ddhy6vgd1#

很抱歉我在斯卡拉发帖了
scala 2.11和kafka 0.10的spark 2.2.1可以完成所有的工作,尽管它们被标记为实验性的
如果使用上述库,创建流的正确方法是

val kStrream =  KafkaUtils.createDirectStream(
          ssc, PreferConsistent,
          Subscribe[String, String](Array("weblogs-text"), kafkaParams, fromOffsets))

注意依赖关系,例如kafka有特定于kafka客户机版本和spark版本的jar文件。

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.2.1</version>
            <scope>provided</scope>
        </dependency>

相关问题