我是新的Spark流,试图读取数据在Kafka经纪人。
下面是我的代码:
def __init__(self):
self.spark = SparkSession \
.builder \
.appName("TestApp") \
.config("k1", "v1") \
.getOrCreate()
self.ssc = StreamingContext(self.spark.sparkContext, 1)
def StreamingObject(self):
kafkaParams = {'metadata.broker.list': 'localhost:9092'}
topic = "Topic2"
topicpartion = TopicAndPartition(topic, 0)
fromoffset = {topicpartion: 0}
kvs = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams, fromOffsets = fromoffset)
words = kvs.flatMap(lambda line: line.split(","))
words.pprint()
self.ssc.start()
self.ssc.awaitTermination()
最后一步是打印我从代理获取的任何内容,但得到下面的错误消息。
Traceback (most recent call last):
File "C:/Users/<user>/PycharmProjects/GCPProject/SStreaming.py", line 72, in <module>
objss.StreamingObject()
File "C:/Users/<user>/PycharmProjects/GCPProject/SStreaming.py", line 40, in StreamingObject
kvs = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams, fromOffsets = fromoffset)
File "C:\spark\spark-2.4.0-bin-hadoop2.7\spark-2.4.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\streaming\kafka.py", line 130, in createDirectStream
File "C:\spark\spark-2.4.0-bin-hadoop2.7\spark-2.4.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py", line 1133, in __call__
File "C:\spark\spark-2.4.0-bin-hadoop2.7\spark-2.4.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\utils.py", line 63, in deco
File "C:\spark\spark-2.4.0-bin-hadoop2.7\spark-2.4.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o37.createDirectStreamWithoutMessageHandler.
: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper$$anonfun$17.apply(KafkaUtils.scala:717)
at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.copyToBuffer(TraversableOnce.scala:275)
at scala.collection.AbstractTraversable.copyToBuffer(Traversable.scala:104)
at scala.collection.MapLike$class.toBuffer(MapLike.scala:326)
at scala.collection.AbstractMap.toBuffer(Map.scala:59)
at scala.collection.MapLike$class.toSeq(MapLike.scala:323)
at scala.collection.AbstractMap.toSeq(Map.scala:59)
at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:717)
at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStreamWithoutMessageHandler(KafkaUtils.scala:688)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Unknown Source)
19/09/18 23:23:43 INFO SparkContext: Invoking stop() from shutdown hook
19/09/18 23:23:43 INFO SparkUI: Stopped Spark web UI at http://192.168.1.6:4040
19/09/18 23:23:43 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/09/18 23:23:43 INFO MemoryStore: MemoryStore cleared
19/09/18 23:23:43 INFO BlockManager: BlockManager stopped
19/09/18 23:23:43 INFO BlockManagerMaster: BlockManagerMaster stopped
19/09/18 23:23:43 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/09/18 23:23:43 INFO SparkContext: Successfully stopped SparkContext
19/09/18 23:23:43 INFO ShutdownHookManager: Shutdown hook called
19/09/18 23:23:43 INFO ShutdownHookManager: Deleting directory C:\Users\<user>\AppData\Local\Temp\spark-4ac3750b-cdf3-4d1d-823c-2b60f62db15a
19/09/18 23:23:43 INFO ShutdownHookManager: Deleting directory C:\Users\<user>\AppData\Local\Temp\spark-4ac3750b-cdf3-4d1d-823c-2b60f62db15a\pyspark-e791b26d-bacb-47ab-b7ae-2ae66a811158
数据是csv格式的,存在于kafka代理中。我不确定问题出在哪里。请帮我从Kafka经纪人那里取些信息。
我正在研究 Spark 2.2.0
以及 spark-streaming-kafka 0.9.0
并在中设置此环境 windows
.
1条答案
按热度按时间bvjveswy1#
错误
java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class
发生,可能是因为您的本地scala版本与spark依赖的scala版本不匹配。请检查您的scala版本。spark 2.2.0使用scala 2.11