在hadoop集群上运行pyspark应用程序会产生java.lang.noclassdeffounderror

2ic8powd  于 2021-07-15  发布在  Hadoop
关注(0)|答案(0)|浏览(316)

每当我运行时,就会显示此错误:

  1. Traceback (most recent call last):
  2. File "~/test-tung/spark_tf.py", line 69, in <module>
  3. 'spark_tf').master('yarn').getOrCreate()
  4. File "~/main-projects/spark/spark-3.0.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/session.py", line 186, in getOrCreate
  5. File "~/main-projects/spark/spark-3.0.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/context.py", line 371, in getOrCreate
  6. File "~/main-projects/spark/spark-3.0.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/context.py", line 131, in __init__
  7. File "~/main-projects/spark/spark-3.0.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/context.py", line 193, in _do_init
  8. File "~/main-projects/spark/spark-3.0.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/context.py", line 310, in _initialize_context
  9. File "~/main-projects/spark/spark-3.0.0-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1569, in __call__
  10. File "~/main-projects/spark/spark-3.0.0-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
  11. py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
  12. : java.lang.NoClassDefFoundError: org/spark_project/guava/base/Preconditions

我的python应用程序的一部分 spark_tf.py :

  1. spark = SparkSession.builder.appName(
  2. 'spark_tf').master('yarn').getOrCreate()
  3. model = tf.keras.models.load_model('./model/kdd_binary.h5')
  4. weights = model.get_weights()
  5. config = model.get_config()
  6. bc_weights = spark.sparkContext.broadcast(weights)
  7. bc_config = spark.sparkContext.broadcast(config)
  8. scheme = StructType().add('@timestamp', StringType()).add('@address', StringType())
  9. stream = spark.readStream.format('kafka') \
  10. .option('kafka.bootstrap.servers', 'my-host:9092') \
  11. .option('subscribe', 'dltest') \
  12. .load() \
  13. .selectExpr("CAST(value AS STRING)") \
  14. .select(from_json('value', scheme).alias('json'),
  15. online_predict('value').alias('result')) \
  16. .select(to_json(struct('result', 'json.@timestamp', 'json.@address'))
  17. .alias('value'))
  18. x = stream.writeStream \
  19. .format('kafka') \
  20. .option("kafka.bootstrap.servers", 'my-host:9092') \
  21. .option('topic', 'dlpred') \
  22. .option('checkpointLocation', './kafka_checkpoint') \
  23. .start()
  24. x.awaitTermination()

我的提交行:spark submit——部署模式客户机——packagesorg.apache。spark:spark-sql-kafka-0-10_2.12:3.0.0Spark塞
我想可能是因为Spark设置不当,但我不知道是什么原因造成的。
edit:我认为这段代码显然是在客户端而不是hadoop集群上运行的,但是在集群上运行它会产生相同的错误。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题