如何使用带有kafkapython客户端的hive来使用hdfs中的数据?

t0ybt7op  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(171)

抱歉,如果这是个愚蠢的问题。我对所有这些管道的东西都是陌生的:)
我使用kafkapython客户端创建了一个producer来发送csv(一个csv行=一个kafka消息)。注意,我通过json将其序列化为一个字符串,并将其编码为utf-8字节。然后我创建了一个consumer来解码消息(一个csv行现在是一个字符串),并将它们打印到终端。现在我需要使用hive将这些数据保存在hdfs中。我想把每条消息插入一个配置单元表,然后我想做一个巨大的选择来获取一个文件中的所有数据。
使用python最好的方法是什么?
以下是我所做的:
首先,我启动zookeeper服务器:

bin/zookeeper-server-start.sh config/zookeeper.properties

然后我启动Kafka服务器:

bin/kafka-server-start.sh config/server.properties

然后我开始我的消费者和生产者:
我的制作人:

from kafka import KafkaClient, KafkaProducer, KafkaConsumer
import csv
import json

client = KafkaClient("localhost:9092")

producer = KafkaProducer(bootstrap_servers='localhost:9092')

with open("train.csv") as file:
    reader = csv.reader(file)
    for row in reader:
        producer.send('the_topic', json.dumps(row).encode('utf-8'))

我的消费者:

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer('the_topic')

for msg in consumer:
    decoded_msg = msg.value.decode("utf-8")
    print(decoded_msg)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题