无法找到数据源:Docker环境中的Kafka

h9a6wy2h  于 2023-01-25  发布在  Apache
关注(0)|答案(1)|浏览(203)

我正在组装一个流数据接收的原型,从MySQL通过Debezium通过Kafka,然后使用Docker传输到Spark。
我的docker-compose. yml文件链接如下:

version: '2'
services:
  zookeeper:
    image: quay.io/debezium/zookeeper:${DEBEZIUM_VERSION}
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888
  kafka:
    image: quay.io/debezium/kafka:${DEBEZIUM_VERSION}
    ports:
     - 9092:9092
    links:
     - zookeeper
    environment:
     - ZOOKEEPER_CONNECT=zookeeper:2181
  mysql:
    image: quay.io/debezium/example-mysql:${DEBEZIUM_VERSION}
    ports:
     - 3306:3306
    environment:
     - MYSQL_ROOT_PASSWORD=debezium
     - MYSQL_USER=mysqluser
     - MYSQL_PASSWORD=mysqlpw
  connect:
    image: quay.io/debezium/connect:${DEBEZIUM_VERSION}
    ports:
     - 8083:8083
    links:
     - kafka
     - mysql
    environment:
     - BOOTSTRAP_SERVERS=kafka:9092
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=my_connect_configs
     - OFFSET_STORAGE_TOPIC=my_connect_offsets
     - STATUS_STORAGE_TOPIC=my_connect_statuses
  spark-master:
      image: docker.io/bitnami/spark:3.3
      environment:
        - SPARK_MODE=master
        - SPARK_RPC_AUTHENTICATION_ENABLED=no
        - SPARK_RPC_ENCRYPTION_ENABLED=no
        - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
        - SPARK_SSL_ENABLED=no
      ports:
        - '8080:8080'
  spark-worker:
    image: docker.io/bitnami/spark:3.3
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark-master:7077
      - SPARK_WORKER_MEMORY=2G
      - SPARK_WORKER_CORES=1
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
    links:
      - kafka
  jupyter:
    image: jupyter/pyspark-notebook
    environment:
      - GRANT_SUDO=yes
      - JUPYTER_ENABLE_LAB=yes
      - JUPYTER_TOKEN=mysecret
    ports:
      - "8888:8888"
    volumes:
      - /Users/eugenegoldberg/jupyter_notebooks:/home/eugene
    depends_on:
      - spark-master

我的Jupyter笔记本(也由同一个docker-compose提供服务)有以下PySpark代码,它试图将Spark与Kafka联系起来:

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

import os

spark_version = '3.3.1'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:{}'.format(spark_version)

packages = [
    f'org.apache.kafka:kafka-clients:3.3.1'
]

# Create SparkSession
spark = SparkSession.builder \
    .appName("Kafka Streaming Example") \
    .config("spark.driver.host", "host.docker.internal") \
    .config("spark.jars.packages", ",".join(packages)) \
    .getOrCreate()

# Define the Kafka topic and Kafka server/port
topic = "dbserver1.inventory.customers"
kafkaServer = "kafka:9092" # assuming kafka is running on a container named 'kafka'

# Read data from kafka topic
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafkaServer) \
  .option("subscribe", topic) \
  .load()

我得到这个错误:

---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
Cell In[9], line 32
     24 kafkaServer = "kafka:9092" # assuming kafka is running on a container named 'kafka'
     26 # Read data from kafka topic
     27 df = spark \
     28   .readStream \
     29   .format("kafka") \
     30   .option("kafka.bootstrap.servers", kafkaServer) \
     31   .option("subscribe", topic) \
---> 32   .load()

File /usr/local/spark/python/pyspark/sql/streaming.py:469, in DataStreamReader.load(self, path, format, schema, **options)
    467     return self._df(self._jreader.load(path))
    468 else:
--> 469     return self._df(self._jreader.load())

File /usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
   1315 command = proto.CALL_COMMAND_NAME +\
   1316     self.command_header +\
   1317     args_command +\
   1318     proto.END_COMMAND_PART
   1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
   1322     answer, self.gateway_client, self.target_id, self.name)
   1324 for temp_arg in temp_args:
   1325     temp_arg._detach()

File /usr/local/spark/python/pyspark/sql/utils.py:196, in capture_sql_exception.<locals>.deco(*a, **kw)
    192 converted = convert_exception(e.java_exception)
    193 if not isinstance(converted, UnknownException):
    194     # Hide where the exception came from that shows a non-Pythonic
    195     # JVM exception message.
--> 196     raise converted from None
    197 else:
    198     raise

AnalysisException:  Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".

为了解决这个问题,我需要做哪些更改?

9jyewag0

9jyewag01#

tl;drspark-masterspark-worker在其CLASSPATH上都没有必要的库,因此AnalysisException: Failed to find data source: kafka.声名狼借

请参见Why does format("kafka") fail with "Failed to find data source: kafka." (even with uber-jar)?了解一些背景知识。
我的猜测是您应该将jar添加到jupyter中(这样当它执行Spark代码时,它就知道在哪里可以找到所需的类)。
还有直接来自Jupyter项目的Jupyter Docker Stacks build manifests

相关问题