pyspark上集成apachespark和kafka

mmvthczy  于 2021-07-12  发布在  Spark
关注(0)|答案(0)|浏览(324)

这些是我集成Kafka和spark的开发环境。

  1. IDE : eclipse 2020-12
  2. python : Anaconda 2020.02 (Python 3.7)
  3. kafka : 2.13-2.7.0
  4. spark : 3.0.1-bin-hadoop3.2

我的eclipse配置参考站点在这里。所以下面的图片是eclipse pyspark配置的。

spark-pyspark的简单代码工作成功,没有错误。但是Kafka和spark结构化流媒体的融合带来了错误。这些是密码。

  1. from pyspark.sql import SparkSession
  2. spark = SparkSession.builder.master("local[*]").appName("appName").getOrCreate()
  3. df = spark.read.format("kafka")\
  4. .option("kafka.bootstrap.servers", "localhost:9092")\
  5. .option("subscribe", "topicForMongoDB")\
  6. .option("startingOffsets", "earliest")\
  7. .load()\
  8. .selectExpr("CAST(value AS STRING) as column")
  9. df.printSchema()
  10. df.show()

抛出的错误是

  1. pyspark.sql.utils.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;

所以我插入了绑定相关jar文件的python代码。

  1. import os
  2. os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.0,org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.0'

但这次又出现了另一个错误。

  1. Error: Missing application resource.
  2. Usage: spark-submit [options] <app jar | python file | R file> [app arguments]
  3. Usage: spark-submit --kill [submission ID] --master [spark://...]
  4. Usage: spark-submit --status [submission ID] --master [spark://...]
  5. Usage: spark-submit run-example [options] example-class [example args]
  6. Options:
  7. --master MASTER_URL spark://host:port, mesos://host:port, yarn,
  8. k8s://https://host:port, or local (Default: local[*]).
  9. --deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or
  10. on one of the worker machines inside the cluster ("cluster")
  11. (Default: client).
  12. --class CLASS_NAME Your application's main class (for Java / Scala apps).
  13. --name NAME A name of your application.
  14. --jars JARS Comma-separated list of jars to include on the driver
  15. and executor classpaths.
  16. --packages Comma-separated list of maven coordinates of jars to include
  17. on the driver and executor classpaths. Will search the local
  18. maven repo, then maven central and any additional remote
  19. repositories given by --repositories. The format for the
  20. coordinates should be groupId:artifactId:version.

我被困在这里了。我的eclipse配置和pyspark代码有一些问题。但我不知道是什么导致了这些错误。请告诉我Kafka和spark-pyspark的集成配置。欢迎任何回复。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题