来自Kafka源的Pyspark结构化流异常

rryofs0p  于 2023-10-15  发布在  Spark
关注(0)|答案(1)|浏览(116)

**运行环境:**Spark 3.5.0、Scala 2.12.18、openjdk 11.0.20.1

我尝试从Kafka源中流式传输数据,但从Spark获得异常。看起来这通常是由于依赖版本不匹配,但我的包版本匹配我的环境。
Kafka服务器正在运行,我可以看到数据(kafka_2.12-3.5.1/bin/kafka-console-consumer.sh--bootstrap-server localhost:9092 --topic stockPrices)

Bash命令:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 ./test.py

PySpark脚本(test.py):

spark = SparkSession.builder.appName("KafkaStreamToRDD") \
    .getOrCreate()

spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "stockPrices") \
    .option("startingOffsets", "earliest") \
    .load()

异常:

py4j.protocol.Py4JJavaError: An error occurred while calling o28.load.
: java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArraySerializer
        at org.apache.spark.sql.kafka010.KafkaSourceProvider$.<init>(KafkaSourceProvider.scala:601)
        at org.apache.spark.sql.kafka010.KafkaSourceProvider$.<clinit>(KafkaSourceProvider.scala)
        at org.apache.spark.sql.kafka010.KafkaSourceProvider.org$apache$spark$sql$kafka010$KafkaSourceProvider$$validateStreamOptions(KafkaSourceProvider.scala:338)
        at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:71)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:233)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:118)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:118)
        at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:36)
        at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:169)
        at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:145)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArraySerializer
        at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
        at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
        ... 22 more

我已经检查了我的依赖版本并重新安装了Spark。

ldxq2e6h

ldxq2e6h1#

我需要清理我的Spark罐文件夹。我有几个jar的多个版本(spark-avro_2.12-3.5.0和spark-avro_2.12:3.4.1)。星火选错了人

相关问题