使用pythonspark向kafka发送大型csv

kcugc4gi  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(465)

我想给Kafka发一个大的csv。基本结构是读取csv的一行,并用标题压缩它。

a = dict(zip(header, line.split(",")

然后将其转换为json:

message = json.dumps(a)

然后我使用kafka python库发送消息

from kafka import SimpleProducer, KafkaClient
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka)
producer.send_messages("topic", message)

使用pyspark,我已经很容易地从csv文件创建了一个消息的rdd

sc = SparkContext()
text = sc.textFile("file.csv")
header = text.first().split(',')
def remove_header(itr_index, itr):
    return iter(list(itr)[1:]) if itr_index == 0 else itr
noHeader = text.mapPartitionsWithIndex(remove_header)

messageRDD = noHeader.map(lambda x: json.dumps(dict(zip(header, x.split(","))

现在我想发送这些消息:我定义了一个函数

def sendkafka(message):
  kafka = KafkaClient("localhost:9092")
  producer = SimpleProducer(kafka)
  return producer.send_messages('topic',message)

然后我创建一个新的rdd来发送消息

sentRDD = messageRDD.map(lambda x: kafkasend(x))

然后我调用sentrdd.count()
开始搅动和发送信息
不幸的是,这是非常缓慢的。它每秒发送1000条信息。这是一个10节点的集群,每个集群有4个CPU和8gb内存。
相比之下,在一个1000万行的csv上创建消息大约需要7秒~大约2gb
我认为问题是我在函数中示例化了一个kafka生产者。然而,如果我没有那么Spark抱怨生产者不存在,即使我已经尝试定义它全球。
也许有人能阐明如何解决这个问题。
谢谢您,

kyks70gy

kyks70gy1#

您可以为每个分区创建一个生产者,并使用 mapPartitions 或者 foreachPartition :

def sendkafka(messages):
    kafka = KafkaClient("localhost:9092")
    producer = SimpleProducer(kafka)
    for message in messages:
        yield producer.send_messages('topic', message)

sentRDD = messageRDD.mapPartitions(sendkafka)

如果以上这些都没有帮助,您可以尝试使用异步生产者来扩展它。
在spark 2.x中,也可以使用kafka数据源。你必须包括 spark-sql-kafka jar,匹配spark和scala版本(这里分别是2.2.0和2.11):

spark.jars.packages  org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0

将数据转换为 DataFrame (如果不是 DataFrame 已经):

messageDF = spark.createDataFrame(messageRDD, "string")

并使用 DataFrameWriter :

(messageDF.write
    .format("kafka")
    .option("topic", topic_name)
    .option("kafka.bootstrap.servers", bootstrap_servers)
    .save())

相关问题