我对spark和kafka非常陌生,我正在尝试用python(jupyter)运行一些示例代码,使用docker compose下载、配置和执行docker图像。我正在通过Windows10WSL运行Ubuntu20.04。
以下是我迄今为止遵循的步骤:
使用下面显示的docker-compose.yml文件,我运行了[docker-compose up-d]
所有容器成功启动
我开始用http://127.0.0.1:8888/?令牌=
我逐行执行python脚本,它在
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "cloud") \
.option("startingOffsets", "earliest") \
.load()```
出现错误时: error preamble... AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".
我显然错过了一个步骤,我已经徒劳地寻找解决办法。对此问题的其他答案建议添加: import os os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 pyspark-shell'
这对我不起作用。我错过了哪一步?
docker-compose.yml公司
---
version: '2'
services:
spark:
image: jupyter/all-spark-notebook:latest
ports:
- "8888:8888"
working_dir: /home/$USER/work
volumes:
- $PWD/work:/home/$USER/work
zookeeper:
image: confluentinc/cp-zookeeper:latest
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
KAFKA_OPTS: "-Dzookeeper.4lw.commands.whitelist=*"
kafka:
image: confluentinc/cp-kafka:latest
hostname: kafka
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
connect:
image: confluentinc/cp-kafka-connect:latest
hostname: connect
container_name: connect
depends_on:
- zookeeper
- kafka
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka:9092
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_ZOOKEEPER_CONNECT: zookeeper:2181
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars
CONNECT_LOG4J_LOGGERS: org.apache.kafka.connect=DEBUG
command:
- bash
- -c
- |
echo "Installing connector plugins"
confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest
cd /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib
curl http://repo.odysseusinc.com/artifactory/community-libs-release-local/org/netezza/nzjdbc/1.0/nzjdbc-1.0.jar -o nzjdbc-1.0.jar
/etc/confluent/docker/run &
sleep infinity
python代码
# !/usr/bin/env python
# coding: utf-8
# In[ ]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 pyspark-shell'
# In[ ]:
# Spark
import pyspark
import findspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
# Spark Streaming
from pyspark.streaming import StreamingContext
# json parsing
import json
# In[ ]:
findspark.init()
# In[ ]:
spark = SparkSession .builder .appName("My App") .getOrCreate()
# .config("spark.jars", "/path/to/jar.jar,/path/to/another/jar.jar") \
# In[ ]:
spark.version
# In[ ]:
df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "cloud") .option("startingOffsets", "earliest") .load()
暂无答案!
目前还没有任何答案,快来回答吧!