如何在Jupyter笔记本中使用PySpark时包含外部Spark库

klr1opcd  于 2022-11-16  发布在  Apache
关注(0)|答案(2)|浏览(169)

我尝试在Jupyter笔记本中运行以下PySpark-Kafka流example。下面是我在笔记本中使用的代码的第一部分:

  1. from pyspark import SparkContext
  2. from pyspark.streaming import StreamingContext
  3. from pyspark.streaming.kafka import KafkaUtils
  4. sc = pyspark.SparkContext(master='local[*]',appName="PySpark streaming")
  5. ssc = StreamingContext(sc, 2)
  6. topic = "my-topic"
  7. brokers = "localhost:9092"
  8. kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})

如果我运行该单元,我会收到以下错误/说明:

  1. Spark Streaming's Kafka libraries not found in class path. Try one of the following.
  2. 1. Include the Kafka library and its dependencies with in the
  3. spark-submit command as
  4. $ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8:2.3.0 ...
  5. 2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
  6. Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-0-8-assembly, Version = 2.3.0.
  7. Then, include the jar in the spark-submit command as
  8. $ bin/spark-submit --jars <spark-streaming-kafka-0-8-assembly.jar> ...

我的问题是:我如何将--jars或--package参数传递给Jupyter Notebook?2或者,我是否可以下载这个包并将其永久链接到Python/Jupyter(可能通过.bashrc中的一个导出)?

kuhbmx9i

kuhbmx9i1#

至少有两种方法可以执行此操作,大致对应于错误消息中建议的两个选项:

第一种方法是相应地更新您各自的Jupyter内核(如果您还没有使用Jupyter内核,您应该--请参阅此答案,了解在Jupyter for Pyspark中使用内核的详细一般性信息)。

更具体地说,您应该使用env下的以下条目更新Pyspark的相应kernel.json配置文件(如果您使用--master local以外的内容,请进行相应修改):

  1. "PYSPARK_SUBMIT_ARGS": "--master local --packages org.apache.spark:spark-streaming-kafka-0-8:2.3.0 pyspark-shell"

第二种方法是将以下条目放入spark-defaults.conf文件:

  1. spark.jars.packages org.apache.spark:spark-streaming-kafka-0-8:2.3.0

在这两种情况下,您都不需要手动下载任何东西--第一次使用更新的配置运行Pyspark时,将下载必要的文件并将其放在适当的目录中。

vmjh9lq9

vmjh9lq92#

这是我如何配置运行PySpark(版本与scala 2.12 Spark 3.2.1)结构流与Kafka在jupyter实验室
首先,我下载了5个jar文件,并将它们放在我当前项目文件夹下的文件夹/jars中(我认为只是用于本地运行):

  • spark-sql-kafka-0-10_2.12-3.2.1.jar
  • kafka-clients-2.1.1.jar
  • spark-streaming-kafka-0-10-assembly_2.12-3.2.1.jar
  • commons-pool2-2.8.0.jar
  • spark-token-provider-kafka-0-10_2.12-3.2.1.jar

jars的配置值如下所示:”
下面是实际代码:

  1. spark_jars = ("{},{},{},{},{}".format(os.getcwd() + "/jars/spark-sql-kafka-0-10_2.12-3.2.1.jar",
  2. os.getcwd() + "/jars/kafka-clients-2.1.1.jar",
  3. os.getcwd() + "/jars/spark-streaming-kafka-0-10-assembly_2.12-3.2.1.jar",
  4. os.getcwd() + "/jars/commons-pool2-2.8.0.jar",
  5. os.getcwd() + "/jars/spark-token-provider-kafka-0-10_2.12-3.2.1.jar"))
  6. spark = SparkSession.builder.config("spark.jars", spark_jars).appName("Structured_Redpanda_WordCount").getOrCreate()
  7. spark.conf.set("spark.sql.shuffle.partitions", 1)
展开查看全部

相关问题