我尝试用pyspark把spark和kafka集成到jupyter笔记本中。这是我的工作环境。
spark版本:spark 2.2.1Kafka版本:kafka_2.11-0.8.2.2 spark streamingKafkajar:spark-streaming-kafka-0-8-assembly_2.11-2.2.1.jar
我在spark-defaults.conf文件中添加了一个spark-streaming-kafka程序集jar文件。
当我为pyspark流启动streamingcontext时,此错误显示为无法从manifest.mf读取kafka版本。
这是我的密码。
from pyspark import SparkContext, SparkConf
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import sys
import os
from kafka import KafkaProducer
# Receive data handler
def handler(message):
records = message.collect()
for record in records:
print(record)
#producer.send('receive', str(res))
#producer.flush()
producer = KafkaProducer(bootstrap_servers='slave02:9092')
sc = SparkContext(appName="SparkwithKafka")
ssc = StreamingContext(sc, 1)
# Create Kafka streaming with argv
zkQuorum = 'slave02:2181'
topic = 'send'
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic:1})
kvs.foreachRDD(handler)
ssc.start()
1条答案
按热度按时间ddhy6vgd1#
很抱歉我在斯卡拉发帖了
scala 2.11和kafka 0.10的spark 2.2.1可以完成所有的工作,尽管它们被标记为实验性的
如果使用上述库,创建流的正确方法是
注意依赖关系,例如kafka有特定于kafka客户机版本和spark版本的jar文件。