关于将cloudera上的kafka集群与spark(结构化流媒体)连接的问题

b09cbbtk  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(190)
if __name__ == "__main__":
  print("PySpark Structured Streaming with Kafka Demo Application Started ...")

  spark = SparkSession \
    .builder \
    .appName("PySpark Structured Streaming with Kafka Demo") \
    .master("local[*]") \
    .config("spark.jars","file:///C://Users//User//Downloads//spark-sql-kafka-0-10_2.11-2.4.0.jar,file:///C://Users//User//Downloads//kafka-clients-1.1.0.jar") \
    .config("spark.executor.extraClassPath", "file:///C://Users//User//Downloads//spark-sql-kafka-0-10_2.11-2.4.0.jar:file:///C://Users//User//Downloads//kafka-clients-1.1.0.jar") \
    .config("spark.executor.extraLibrary", "file:///C://Users//User//Downloads//spark-sql-kafka-0-10_2.11-2.4.0.jar:file:///C://Users//User//Downloads//kafka-clients-1.1.0.jar") \
    .config("spark.driver.extraClassPath", "file:///C://Users//User//Downloads//spark-sql-kafka-0-10_2.11-2.4.0.jar:file:///C://Users//User//Downloads//kafka-clients-1.1.0.jar") \
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")

  transaction_detail_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS_CONS) \
    .option("subscribe", KAFKA_TOPIC_NAME_CONS) \
    .option("startingOffsets", "latest") \
    .load()

print("Printing Schema of transaction_detail_df: ")

>Py4JJavaError: An error occurred while calling o61.load.
 : java.lang.NoClassDefFoundError: 
org/apache/spark/sql/sources/v2/writer/StreamWriteSupport
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:14`

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题