抱歉,如果这是个愚蠢的问题。我对所有这些管道的东西都是陌生的:)
我使用kafkapython客户端创建了一个producer来发送csv(一个csv行=一个kafka消息)。注意,我通过json将其序列化为一个字符串,并将其编码为utf-8字节。然后我创建了一个consumer来解码消息(一个csv行现在是一个字符串),并将它们打印到终端。现在我需要使用hive将这些数据保存在hdfs中。我想把每条消息插入一个配置单元表,然后我想做一个巨大的选择来获取一个文件中的所有数据。
使用python最好的方法是什么?
以下是我所做的:
首先,我启动zookeeper服务器:
bin/zookeeper-server-start.sh config/zookeeper.properties
然后我启动Kafka服务器:
bin/kafka-server-start.sh config/server.properties
然后我开始我的消费者和生产者:
我的制作人:
from kafka import KafkaClient, KafkaProducer, KafkaConsumer
import csv
import json
client = KafkaClient("localhost:9092")
producer = KafkaProducer(bootstrap_servers='localhost:9092')
with open("train.csv") as file:
reader = csv.reader(file)
for row in reader:
producer.send('the_topic', json.dumps(row).encode('utf-8'))
我的消费者:
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer('the_topic')
for msg in consumer:
decoded_msg = msg.value.decode("utf-8")
print(decoded_msg)
暂无答案!
目前还没有任何答案,快来回答吧!