if __name__ == "__main__":
print("PySpark Structured Streaming with Kafka Demo Application Started ...")
spark = SparkSession \
.builder \
.appName("PySpark Structured Streaming with Kafka Demo") \
.master("local[*]") \
.config("spark.jars","file:///C://Users//User//Downloads//spark-sql-kafka-0-10_2.11-2.4.0.jar,file:///C://Users//User//Downloads//kafka-clients-1.1.0.jar") \
.config("spark.executor.extraClassPath", "file:///C://Users//User//Downloads//spark-sql-kafka-0-10_2.11-2.4.0.jar:file:///C://Users//User//Downloads//kafka-clients-1.1.0.jar") \
.config("spark.executor.extraLibrary", "file:///C://Users//User//Downloads//spark-sql-kafka-0-10_2.11-2.4.0.jar:file:///C://Users//User//Downloads//kafka-clients-1.1.0.jar") \
.config("spark.driver.extraClassPath", "file:///C://Users//User//Downloads//spark-sql-kafka-0-10_2.11-2.4.0.jar:file:///C://Users//User//Downloads//kafka-clients-1.1.0.jar") \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
transaction_detail_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS_CONS) \
.option("subscribe", KAFKA_TOPIC_NAME_CONS) \
.option("startingOffsets", "latest") \
.load()
print("Printing Schema of transaction_detail_df: ")
>Py4JJavaError: An error occurred while calling o61.load.
: java.lang.NoClassDefFoundError:
org/apache/spark/sql/sources/v2/writer/StreamWriteSupport
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:14`
暂无答案!
目前还没有任何答案,快来回答吧!