我试图从Kafka主题中获取消息并在控制台中打印它。我可以通过reader成功获取消息,但是当我试图通过writer在控制台中打印消息时,出现以下错误,
java.lang.noclassdeffounderror:无法初始化类org.apache.spark.sql.kafka010.kafkadataconsumer$
from pyspark.sql import SparkSession, Row
from pyspark.streaming import StreamingContext
spark = SparkSession.builder\
.appName("Kafka Spark")\
.config("spark.jars", "/C:/Hadoop/Spark/spark-3.0.0-preview2-bin-hadoop2.7/jars/spark-sql-kafka-0-10_2.12-3.0.0-preview2.jar")\
.config("spark.executor.extraClassPath", "/C:/Hadoop/Spark/spark-3.0.0-preview2-bin-hadoop2.7/jars/spark-sql-kafka-0-10_2.12-3.0.0-preview2.jar")\
.config("spark.executor.extraLibrary", "/C:/Hadoop/Spark/spark-3.0.0-preview2-bin-hadoop2.7/jars/spark-sql-kafka-0-10_2.12-3.0.0-preview2.jar")\
.config("spark.driver.extraClassPath", "/C:/Hadoop/Spark/spark-3.0.0-preview2-bin-hadoop2.7/jars/spark-sql-kafka-0-10_2.12-3.0.0-preview2.jar")\
.getOrCreate()
dataFrameRead = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe", "Jim_Topic")\
.load()\
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")\
.writeStream\
.format("console")\
.trigger(continuous="1 second")\
.start()
dataFrameRead.awaitTermination()```
Complete error,
```Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/06/13 21:23:03 ERROR Utils: Aborting task
java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.kafka010.KafkaDataConsumer$
at org.apache.spark.sql.kafka010.KafkaContinuousPartitionReader.<init>(KafkaContinuousStream.scala:195)
at org.apache.spark.sql.kafka010.KafkaContinuousReaderFactory$.createReader(KafkaContinuousStream.scala:174)
at org.apache.spark.sql.execution.streaming.continuous.ContinuousDataSourceRDD.compute(ContinuousDataSourceRDD.scala:83)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.sql.execution.streaming.continuous.ContinuousWriteRDD.$anonfun$compute$1(ContinuousWriteRDD.scala:53)
at org.apache.spark.sql.execution.streaming.continuous.ContinuousWriteRDD$$Lambda$2098/1825151985.apply$mcV$sp(Unknown Source)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
at org.apache.spark.sql.execution.streaming.continuous.ContinuousWriteRDD.compute(ContinuousWriteRDD.scala:84)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:441)
at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2019/307923369.apply(Unknown Source)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:444)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
20/06/13 21:23:03 ERROR Utils: Aborting task
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:/Users/Macaulay/PycharmProjects/Spark/KafkaSpark/KafkaTopic2CSV.py", line 39, in <module>
dataFrameRead.awaitTermination()
File "C:/Hadoop/Spark/spark-3.0.0-preview2-bin-hadoop2.7\python\pyspark\sql\streaming.py", line 103, in awaitTermination
return self._jsq.awaitTermination()
File "C:\Hadoop\Spark\spark-3.0.0-preview2-bin-hadoop2.7\python\lib\py4j-0.10.8.1-src.zip\py4j\java_gateway.py", line 1285, in __call__
File "C:/Hadoop/Spark/spark-3.0.0-preview2-bin-hadoop2.7\python\pyspark\sql\utils.py", line 102, in deco
raise converted
pyspark.sql.utils.StreamingQueryException: Writing job aborted.
=== Streaming Query ===
Identifier: [id = 7f1ca9c7-6345-46a2-9c94-cf22c31c30ff, runId = f540fad6-8797-489e-8fd3-00581282689a]
Current Committed Offsets: {}
Current Available Offsets: {}
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
WriteToContinuousDataSource ConsoleWriter[numRows=20, truncate=true]
+- Project [cast(key#7 as string) AS key#21, cast(value#8 as string) AS value#22]
+- StreamingDataSourceV2Relation [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@6d216413, KafkaSource[Subscribe[Jim_Topic]]
Process finished with exit code 1```
暂无答案!
目前还没有任何答案,快来回答吧!