如何在python中将kafka主题数据加载到spark数据流中

ldfqzlk8  于 2021-05-27  发布在  Spark
关注(0)|答案(3)|浏览(326)

我将spark3.0.0与python结合使用。我有一个 test_topic 在Kafka,我生产从一个csv。
下面的代码正从这个主题消耗到spark中,但我在某个地方读到它需要在dstream中,然后才能对它执行任何ml。

import json
from json import loads
from kafka import KafkaConsumer
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext("local[2]", "test")
ssc = StreamingContext(sc, 1)

consumer = KafkaConsumer('test_topic',
                    bootstrap_servers =['localhost:9092'],
                    api_version=(0, 10))

消费者退货 <kafka.consumer.group.KafkaConsumer at 0x13bf55b0> 如何编辑上述代码以获得数据流?
我是新来的,所以请指出我犯的任何愚蠢的错误。
编辑:以下是我的制片人代码:

import json
import csv
from json import dumps
from kafka import KafkaProducer
from time import sleep

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
value_serializer=lambda x:dumps(x)

with open('test_data.csv') as file:
reader = csv.DictReader(file, delimiter=';')
for row in reader:
    producer.send('test_topic', json.dumps(row).encode('utf=8'))
    sleep(2)
    print ('Message sent ', row)
r7xajy2e

r7xajy2e1#

你需要使用org.apache。spark:spark-sql-kafka-0-10_2.12:用于运行它的3.0.0包。它将使用spark submit下载相关jar。

li9yvcax

li9yvcax2#

好久没做Spark了,让我来帮你!
首先,当您使用spark 3.0.0时,您可以使用spark结构化流媒体,api将更易于使用,因为它基于Dataframe。正如您在文档链接中看到的,这里有一个结构化流媒体模式下kafka与pyspark的集成指南。
它将与以下查询一样简单:

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "test_topic") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

然后您可以使用ml管道来使用这个Dataframe,以应用您需要的一些ml技术和模型。正如您在databricks笔记本中看到的,他们有一些用ml进行结构化流式处理的示例。这是用scala编写的,但这将是一个很好的灵感来源。您可以将它与mlpyspark文档结合起来,用python翻译它
编辑:为了使pyspark和Kafka之间的工作正常,需要遵循的实际步骤

1-Kafka装置

所以首先我要设置本地Kafka:

wget https://archive.apache.org/dist/kafka/0.10.2.2/kafka_2.12-0.10.2.2.tgz
tar -xzf kafka_2.11-0.10.2.0.tgz

我打开4个shell,运行zookeeper/server/create\u topic/write\u topic脚本:
Zookeeper

cd kafka_2.11-0.10.2.0
bin/zookeeper-server-start.sh config/zookeeper.properties

服务器

cd kafka_2.11-0.10.2.0
bin/kafka-server-start.sh config/server.properties

创建主题并检查创建

cd kafka_2.11-0.10.2.0
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
bin/kafka-topics.sh --list --zookeeper localhost:2181

主题中的测试消息(在shell中以交互方式编写它们以进行测试):

cd kafka_2.11-0.10.2.0
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

2-Pypark设置

得到额外的jar

现在我们已经设置了kafka,我们将使用特定jars下载设置pyspark:
spark-streaming-kafka-0-10-assembly\ u 2.12-3.0.0.jar

wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-10-assembly_2.12/3.0.0/spark-streaming-kafka-0-10-assembly_2.12-3.0.0.jar

spark-sql-kafka-0-10_2.12-3.0.0.jar

wget https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.0.0/spark-sql-kafka-0-10_2.12-3.0.0.jar

commons-pool2-2.8.0.jar

wget https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.8.0/commons-pool2-2.8.0.jar

Kafka客户端-0.10.2.2.jar

wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/0.10.2.2/kafka-clients-0.10.2.2.jar

运行pyspark shell命令

如果执行pyspark命令时不在jars文件夹中,请不要忘记为每个jar指定文件夹路径。

PYSPARK_PYTHON=python3 $SPARK_HOME/bin/pyspark --jars spark-sql-kafka-0-10_2.12-3.0.0.jar,spark-streaming-kafka-0-10-assembly_2.12-3.0.0.jar,kafka-clients-0.10.2.2.jar,commons-pool2-2.8.0.jar

3-运行pyspark代码

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "test") \
  .load()

query = df \
    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .writeStream \
    .format("console") \
    .start()

干杯

vkc1a9a2

vkc1a9a23#

您需要使用kafkautils createdirectstream方法。
以下是spark官方文档中的代码示例:

from pyspark.streaming.kafka import KafkaUtils
 directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})

相关问题