找不到数据源:kafka(docker环境)

yyyllmsg  于 2021-05-17  发布在  Spark
关注(0)|答案(1)|浏览(588)

我们目前正面临这个问题,所有显示的“类似问题”都无助于解决我们的问题。我们对docker和spark都是新手。
我们使用以下docker compose设置容器:

networks:
  spark_net:

volumes:
  shared-workspace:
    name: "hadoop-distributed-file-system"
    driver: local
services:
  jupyterlab:
    image: jupyterlab
    container_name: jupyterlab
    ports:
      - 8888:8888
    volumes:
      - shared-workspace:/opt/workspace
  spark-master:
    image: spark-master
    networks:
      - spark_net
    container_name: spark-master
    ports:
      - 8080:8080
      - 7077:7077
    volumes:
      - shared-workspace:/opt/workspace
  spark-worker-1:
    image: spark-worker
    networks:
      - spark_net
    container_name: spark-worker-1
    environment:
      - SPARK_WORKER_CORES=1
      - SPARK_WORKER_MEMORY=512m
    ports:
      - 8081:8081
    volumes:
      - shared-workspace:/opt/workspace
    depends_on:
      - spark-master
  spark-worker-2:
    image: spark-worker
    networks:
      - spark_net
    container_name: spark-worker-2
    environment:
      - SPARK_WORKER_CORES=1
      - SPARK_WORKER_MEMORY=512m
    ports:
      - 8082:8081
    volumes:
      - shared-workspace:/opt/workspace
    depends_on:
      - spark-master

  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"

  kafka:
    image: wurstmeister/kafka
    ports:
      - "7575"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - ./var/run/docker.sock

我们还创建了两个pythonm文件来测试kafka流媒体是否有效:
制作人

import json
import time

producer = KafkaProducer(bootstrap_servers = ['twitter-streaming_kafka_1:9093'],
                         api_version=(0,11,5),
                         value_serializer=lambda x: json.dumps(x).encode('utf-8'))
for e in range(1000):
    data = {'number' : e}
    producer.send('corona', value=data)
    time.sleep(0.5)

消费者:

import time
from kafka import KafkaConsumer, KafkaProducer
from datetime import datetime
import json

print('starting consumer')
consumer = KafkaConsumer(
    'corona',
     bootstrap_servers=['twitter-streaming_kafka_1:9093'],
     auto_offset_reset='earliest',
     enable_auto_commit=True,
     group_id='my-group',
     value_deserializer=lambda x: json.loads(x.decode('utf-8')))

print('printing messages')
for message in consumer:
    message = message.value
    print(message)

当我们在jupyterlab容器中的不同cli中执行这两个脚本时,它工作了。当我们想用下面的代码通过pyspark连接到producer流时,我们得到了上面提到的错误。

import random
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql import SparkSession

spark = Spark = SparkSession.builder.appName('KafkaStreaming').getOrCreate()
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "kafka:9093").option("subscribe", "corona").load()

我们还在spark master cli中执行了以下命令:

./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 ...

堆栈跟踪

---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
<ipython-input-2-4dba09a73304> in <module>
      6 
      7 spark = SparkSession.builder.appName('KafkaStreaming').getOrCreate()
----> 8 df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "twitter-streaming_kafka_1:9093").option("subscribe", "corona").load()

/usr/local/lib/python3.7/dist-packages/pyspark/sql/streaming.py in load(self, path, format, schema,**options)
    418             return self._df(self._jreader.load(path))
    419         else:
--> 420             return self._df(self._jreader.load())
    421 
    422     @since(2.0)

/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/usr/local/lib/python3.7/dist-packages/pyspark/sql/utils.py in deco(*a,**kw)
    132                 # Hide where the exception came from that shows a non-Pythonic
    133                 # JVM exception message.
--> 134                 raise_from(converted)
    135             else:
    136                 raise

/usr/local/lib/python3.7/dist-packages/pyspark/sql/utils.py in raise_from(e)

AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;
mm5n2pyu

mm5n2pyu1#

你的Kafka容器需要放在 spark_net 网络,以便spark容器按名称解析它
如果您希望jupyter能够在spark集群上启动作业,则与jupyter相同
此外,还需要添加kafka包

相关问题