我正在组装一个流数据接收的原型,从MySQL通过Debezium通过Kafka,然后使用Docker传输到Spark。
我的docker-compose. yml文件链接如下:
version: '2'
services:
zookeeper:
image: quay.io/debezium/zookeeper:${DEBEZIUM_VERSION}
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: quay.io/debezium/kafka:${DEBEZIUM_VERSION}
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
mysql:
image: quay.io/debezium/example-mysql:${DEBEZIUM_VERSION}
ports:
- 3306:3306
environment:
- MYSQL_ROOT_PASSWORD=debezium
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
connect:
image: quay.io/debezium/connect:${DEBEZIUM_VERSION}
ports:
- 8083:8083
links:
- kafka
- mysql
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
spark-master:
image: docker.io/bitnami/spark:3.3
environment:
- SPARK_MODE=master
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
ports:
- '8080:8080'
spark-worker:
image: docker.io/bitnami/spark:3.3
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark-master:7077
- SPARK_WORKER_MEMORY=2G
- SPARK_WORKER_CORES=1
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
links:
- kafka
jupyter:
image: jupyter/pyspark-notebook
environment:
- GRANT_SUDO=yes
- JUPYTER_ENABLE_LAB=yes
- JUPYTER_TOKEN=mysecret
ports:
- "8888:8888"
volumes:
- /Users/eugenegoldberg/jupyter_notebooks:/home/eugene
depends_on:
- spark-master
我的Jupyter笔记本(也由同一个docker-compose提供服务)有以下PySpark代码,它试图将Spark与Kafka联系起来:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import os
spark_version = '3.3.1'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:{}'.format(spark_version)
packages = [
f'org.apache.kafka:kafka-clients:3.3.1'
]
# Create SparkSession
spark = SparkSession.builder \
.appName("Kafka Streaming Example") \
.config("spark.driver.host", "host.docker.internal") \
.config("spark.jars.packages", ",".join(packages)) \
.getOrCreate()
# Define the Kafka topic and Kafka server/port
topic = "dbserver1.inventory.customers"
kafkaServer = "kafka:9092" # assuming kafka is running on a container named 'kafka'
# Read data from kafka topic
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafkaServer) \
.option("subscribe", topic) \
.load()
我得到这个错误:
---------------------------------------------------------------------------
AnalysisException Traceback (most recent call last)
Cell In[9], line 32
24 kafkaServer = "kafka:9092" # assuming kafka is running on a container named 'kafka'
26 # Read data from kafka topic
27 df = spark \
28 .readStream \
29 .format("kafka") \
30 .option("kafka.bootstrap.servers", kafkaServer) \
31 .option("subscribe", topic) \
---> 32 .load()
File /usr/local/spark/python/pyspark/sql/streaming.py:469, in DataStreamReader.load(self, path, format, schema, **options)
467 return self._df(self._jreader.load(path))
468 else:
--> 469 return self._df(self._jreader.load())
File /usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
1315 command = proto.CALL_COMMAND_NAME +\
1316 self.command_header +\
1317 args_command +\
1318 proto.END_COMMAND_PART
1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
1322 answer, self.gateway_client, self.target_id, self.name)
1324 for temp_arg in temp_args:
1325 temp_arg._detach()
File /usr/local/spark/python/pyspark/sql/utils.py:196, in capture_sql_exception.<locals>.deco(*a, **kw)
192 converted = convert_exception(e.java_exception)
193 if not isinstance(converted, UnknownException):
194 # Hide where the exception came from that shows a non-Pythonic
195 # JVM exception message.
--> 196 raise converted from None
197 else:
198 raise
AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".
为了解决这个问题,我需要做哪些更改?
1条答案
按热度按时间9jyewag01#
tl;dr
spark-master
和spark-worker
在其CLASSPATH上都没有必要的库,因此AnalysisException: Failed to find data source: kafka.
声名狼借请参见Why does format("kafka") fail with "Failed to find data source: kafka." (even with uber-jar)?了解一些背景知识。
我的猜测是您应该将jar添加到jupyter中(这样当它执行Spark代码时,它就知道在哪里可以找到所需的类)。
还有直接来自Jupyter项目的Jupyter Docker Stacks build manifests。