从星火般的工作中被派往Kafka花了太多时间

py49o6xq  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(338)

我有一个Spark流的工作,消费数据从Kafka和发送回Kafka后,做了一些数据处理。为此,我对数据进行了一些Map操作,

val lines = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicNameMap, StorageLevel.MEMORY_AND_DISK)
var ad = ""
val abc = lines.map(_._2).map { x =>
  val jsonObj = new JSONObject(x)
  val data = someMethod(schema, jsonObj)
  data
}

然后我对它执行每个操作,我没有在这里收集所有的数据给驱动程序,因为我想在执行器内部发送这些记录。

abc.foreachRDD(rdd => {
  rdd.foreach { toSend =>
    val producer = KafkaProducerUtils.getKafkaProducer(kafkaBrokers)
    println("toSend---->" + toSend)
    producer.send(new ProducerRecord[String, String](topicToSend, toSend))
  }

我用这段代码对1405个数据进行了10秒的测试,但完成这项工作大约花了2.5分钟。我知道创造 KafkaProducer 是昂贵的,有没有其他方法来减少处理时间。出于测试目的,我使用2个执行器,每个执行器有2个内核和1gm。

anauzrmj

anauzrmj1#

处理如此多的消息时出现如此巨大的延迟,肯定有几个原因:
问题可能存在于你的消费阶段。如果您使用“createstream”,至少spark的次要版本使用高级使用者实现,它需要zookeeper来存储属于特定组的使用者的偏移量。所以我´d检查此通信,因为在提交阶段可能会花费太多时间。如果由于任何原因,使承诺为每一个一个你的消费率可能会受到损害。所以首先,检查这个。
另一个原因是文件系统的预写日志。尽管您的配置指示内存为磁盘,但正如您在spark文档中看到的:
效率:在第一种方法中实现零数据丢失需要将数据存储在预写日志中,这进一步复制了数据。这实际上是低效的,因为数据被有效地复制了两次—一次是由kafka复制的,另一次是由写前日志复制的。第二种方法消除了这个问题,因为没有接收器,因此不需要写前日志。只要您有足够的Kafka保留,就可以从Kafka恢复邮件
为了更好的消费率,我会使用createdirectstream代替。

eaf3rand

eaf3rand2#

在搜索了很多之后,我找到了这篇关于Kafka辛的文章。这将给你的想法,生产数据Kafka内部Spark流以有效的方式。

相关问题