我有一个Spark流的工作,消费数据从Kafka和发送回Kafka后,做了一些数据处理。为此,我对数据进行了一些Map操作,
val lines = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicNameMap, StorageLevel.MEMORY_AND_DISK)
var ad = ""
val abc = lines.map(_._2).map { x =>
val jsonObj = new JSONObject(x)
val data = someMethod(schema, jsonObj)
data
}
然后我对它执行每个操作,我没有在这里收集所有的数据给驱动程序,因为我想在执行器内部发送这些记录。
abc.foreachRDD(rdd => {
rdd.foreach { toSend =>
val producer = KafkaProducerUtils.getKafkaProducer(kafkaBrokers)
println("toSend---->" + toSend)
producer.send(new ProducerRecord[String, String](topicToSend, toSend))
}
我用这段代码对1405个数据进行了10秒的测试,但完成这项工作大约花了2.5分钟。我知道创造 KafkaProducer
是昂贵的,有没有其他方法来减少处理时间。出于测试目的,我使用2个执行器,每个执行器有2个内核和1gm。
2条答案
按热度按时间anauzrmj1#
处理如此多的消息时出现如此巨大的延迟,肯定有几个原因:
问题可能存在于你的消费阶段。如果您使用“createstream”,至少spark的次要版本使用高级使用者实现,它需要zookeeper来存储属于特定组的使用者的偏移量。所以我´d检查此通信,因为在提交阶段可能会花费太多时间。如果由于任何原因,使承诺为每一个一个你的消费率可能会受到损害。所以首先,检查这个。
另一个原因是文件系统的预写日志。尽管您的配置指示内存为磁盘,但正如您在spark文档中看到的:
效率:在第一种方法中实现零数据丢失需要将数据存储在预写日志中,这进一步复制了数据。这实际上是低效的,因为数据被有效地复制了两次—一次是由kafka复制的,另一次是由写前日志复制的。第二种方法消除了这个问题,因为没有接收器,因此不需要写前日志。只要您有足够的Kafka保留,就可以从Kafka恢复邮件
为了更好的消费率,我会使用createdirectstream代替。
eaf3rand2#
在搜索了很多之后,我找到了这篇关于Kafka辛的文章。这将给你的想法,生产数据Kafka内部Spark流以有效的方式。