我试图通过kafka producer在eclipseide中使用java代码生成一些随机数据。我在kafka consumer中接收相同的数据,kafka consumer也在同一ide中使用java代码创建。我的工作依赖于流数据。所以,我需要spark流来接收Kafka生成的随机数据。对于spark流,我在jupyter笔记本中使用python代码。要将kafka与spark集成,必须将“spark-streaming-kafka-0-10_2.12-3.0.0.jar”文件添加到spark jar中。我还尝试在pyspark中添加jar文件。这是我的密码
import time
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
n_secs = 3
topic = "generate"
spark = SparkSession.builder.master("local[*]") \
.appName("kafkaStreaming") \
.config("/home/Downloads/Spark/spark-2.4.6-bin-hadoop2.7/python/pyspark/spark-streaming-kafka-0-10_2.12-3.0.0.jar") \
.getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, n_secs)
kStream = KafkaUtils.createDirectStream(ssc, [topic], {
'bootstrap.servers':'localhost:9092',
'group.id':'test-group',
'auto.offset.reset':'latest'})
lines = kStream.map(lambda x: x[1])
words = lines.flatmap(lambda line: line.split(" "))
print(words)
ssc.start()
time.sleep(100)
ssc.stop(stopSparkContext=True,stopGraceFully=True)
在上面的代码中,我使用sparksession.config()方法添加了jar文件。创建dstream之后,我尝试使用kafkautils.createdirectstream()从kafka接收数据,方法是提供主题名、引导服务器等。之后,我将数据转换成rdd并打印结果。这是我工作的全部流程。起初,我在java中执行kafka生产者代码,它生成一些数据并被kafka消费者使用。到目前为止,它工作正常。在python中执行spark流代码时,它会显示如下错误
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "/home/Downloads/Spark/spark-2.4.6-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1159, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/Downloads/Spark/spark-2.4.6-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command
response = connection.send_command(command)
File "/home/Downloads/Spark/spark-2.4.6-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1164, in send_command
"Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
Py4JError Traceback (most recent call last)
<ipython-input-17-873ece723182> in <module>
36 'bootstrap.servers':'localhost:9092',
37 'group.id':'test-group',
---> 38 'auto.offset.reset':'latest'})
39
40 lines = kStream.map(lambda x: x[1])
~/Downloads/Spark/spark-2.4.6-bin-hadoop2.7/python/pyspark/streaming/kafka.py in createDirectStream(ssc, topics, kafkaParams, fromOffsets, keyDecoder, valueDecoder, messageHandler)
144 func = funcWithoutMessageHandler
145 jstream = helper.createDirectStreamWithoutMessageHandler(
--> 146 ssc._jssc, kafkaParams, set(topics), jfromOffsets)
147 else:
148 ser = AutoBatchedSerializer(PickleSerializer())
~/Downloads/Spark/spark-2.4.6-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
~/Downloads/Spark/spark-2.4.6-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a,**kw)
61 def deco(*a,**kw):
62 try:
---> 63 return f(*a,**kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
~/Downloads/Spark/spark-2.4.6-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
334 raise Py4JError(
335 "An error occurred while calling {0}{1}{2}".
--> 336 format(target_id, ".", name))
337 else:
338 type = answer[1]
Py4JError: An error occurred while calling o270.createDirectStreamWithoutMessageHandler
请任何人帮我摆脱这个问题。。。
1条答案
按热度按时间cbjzeqam1#
我能从代码本身看到的东西很少:
您的jar工件是针对spark3.0的,您使用的是sparkversion2.4.6。(提示:文件名的最后3位数字是sparkversion)
您已经在配置选项中添加了jar文件。我建议首先通过在spark submit命令中使用jar文件来验证您使用的jar文件是否是您所需要的
--jar <jar-file-path>
.尝试先打印您的直接流,而不是对其进行各种转换。你可以做:
一旦您确认您正在获取数据,就可以使用foreachrdd、transform或其他api来处理您的数据