我们目前正面临这个问题,所有显示的“类似问题”都无助于解决我们的问题。我们对docker和spark都是新手。
我们使用以下docker compose设置容器:
networks:
spark_net:
volumes:
shared-workspace:
name: "hadoop-distributed-file-system"
driver: local
services:
jupyterlab:
image: jupyterlab
container_name: jupyterlab
ports:
- 8888:8888
volumes:
- shared-workspace:/opt/workspace
spark-master:
image: spark-master
networks:
- spark_net
container_name: spark-master
ports:
- 8080:8080
- 7077:7077
volumes:
- shared-workspace:/opt/workspace
spark-worker-1:
image: spark-worker
networks:
- spark_net
container_name: spark-worker-1
environment:
- SPARK_WORKER_CORES=1
- SPARK_WORKER_MEMORY=512m
ports:
- 8081:8081
volumes:
- shared-workspace:/opt/workspace
depends_on:
- spark-master
spark-worker-2:
image: spark-worker
networks:
- spark_net
container_name: spark-worker-2
environment:
- SPARK_WORKER_CORES=1
- SPARK_WORKER_MEMORY=512m
ports:
- 8082:8081
volumes:
- shared-workspace:/opt/workspace
depends_on:
- spark-master
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "7575"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- ./var/run/docker.sock
我们还创建了两个pythonm文件来测试kafka流媒体是否有效:
制作人
import json
import time
producer = KafkaProducer(bootstrap_servers = ['twitter-streaming_kafka_1:9093'],
api_version=(0,11,5),
value_serializer=lambda x: json.dumps(x).encode('utf-8'))
for e in range(1000):
data = {'number' : e}
producer.send('corona', value=data)
time.sleep(0.5)
消费者:
import time
from kafka import KafkaConsumer, KafkaProducer
from datetime import datetime
import json
print('starting consumer')
consumer = KafkaConsumer(
'corona',
bootstrap_servers=['twitter-streaming_kafka_1:9093'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
print('printing messages')
for message in consumer:
message = message.value
print(message)
当我们在jupyterlab容器中的不同cli中执行这两个脚本时,它工作了。当我们想用下面的代码通过pyspark连接到producer流时,我们得到了上面提到的错误。
import random
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql import SparkSession
spark = Spark = SparkSession.builder.appName('KafkaStreaming').getOrCreate()
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "kafka:9093").option("subscribe", "corona").load()
我们还在spark master cli中执行了以下命令:
./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 ...
堆栈跟踪
---------------------------------------------------------------------------
AnalysisException Traceback (most recent call last)
<ipython-input-2-4dba09a73304> in <module>
6
7 spark = SparkSession.builder.appName('KafkaStreaming').getOrCreate()
----> 8 df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "twitter-streaming_kafka_1:9093").option("subscribe", "corona").load()
/usr/local/lib/python3.7/dist-packages/pyspark/sql/streaming.py in load(self, path, format, schema,**options)
418 return self._df(self._jreader.load(path))
419 else:
--> 420 return self._df(self._jreader.load())
421
422 @since(2.0)
/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
/usr/local/lib/python3.7/dist-packages/pyspark/sql/utils.py in deco(*a,**kw)
132 # Hide where the exception came from that shows a non-Pythonic
133 # JVM exception message.
--> 134 raise_from(converted)
135 else:
136 raise
/usr/local/lib/python3.7/dist-packages/pyspark/sql/utils.py in raise_from(e)
AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;
1条答案
按热度按时间mm5n2pyu1#
你的Kafka容器需要放在
spark_net
网络,以便spark容器按名称解析它如果您希望jupyter能够在spark集群上启动作业,则与jupyter相同
此外,还需要添加kafka包