**运行环境:**Spark 3.5.0、Scala 2.12.18、openjdk 11.0.20.1
我尝试从Kafka源中流式传输数据,但从Spark获得异常。看起来这通常是由于依赖版本不匹配,但我的包版本匹配我的环境。
Kafka服务器正在运行,我可以看到数据(kafka_2.12-3.5.1/bin/kafka-console-consumer.sh--bootstrap-server localhost:9092 --topic stockPrices)
Bash命令:
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 ./test.py
PySpark脚本(test.py):
spark = SparkSession.builder.appName("KafkaStreamToRDD") \
.getOrCreate()
spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "stockPrices") \
.option("startingOffsets", "earliest") \
.load()
异常:
py4j.protocol.Py4JJavaError: An error occurred while calling o28.load.
: java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArraySerializer
at org.apache.spark.sql.kafka010.KafkaSourceProvider$.<init>(KafkaSourceProvider.scala:601)
at org.apache.spark.sql.kafka010.KafkaSourceProvider$.<clinit>(KafkaSourceProvider.scala)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.org$apache$spark$sql$kafka010$KafkaSourceProvider$$validateStreamOptions(KafkaSourceProvider.scala:338)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:71)
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:233)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:118)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:118)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:36)
at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:169)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:145)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArraySerializer
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
... 22 more
我已经检查了我的依赖版本并重新安装了Spark。
1条答案
按热度按时间ldxq2e6h1#
我需要清理我的Spark罐文件夹。我有几个jar的多个版本(spark-avro_2.12-3.5.0和spark-avro_2.12:3.4.1)。星火选错了人