如何将spark流df写入kafka主题

j2cgzkjk  于 2021-06-08  发布在  Kafka
关注(0)|答案(8)|浏览(373)

我正在使用spark流来处理两个kafka队列之间的数据,但是我似乎找不到一个好的方法从spark写kafka。我试过这个:

input.foreachRDD(rdd =>
  rdd.foreachPartition(partition =>
    partition.foreach {
      case x: String => {
        val props = new HashMap[String, Object]()

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")

        println(x)
        val producer = new KafkaProducer[String, String](props)
        val message = new ProducerRecord[String, String]("output", null, x)
        producer.send(message)
      }
    }
  )
)

它按预期的方式工作,但是为每一条信息引用一个新的kafkaproducer在真实环境中显然是不可行的,我正在努力解决这个问题。
我想为每个进程保留对单个示例的引用,并在需要发送消息时访问它。我怎样才能从spark streaming写信给Kafka?

bzzcjhmw

bzzcjhmw1#

Spark>=2.2

使用结构化流api,kafka上的读写操作都是可能的

从Kafka主题构建流

// Subscribe to a topic and read messages from the earliest to latest offsets
val ds= spark
  .readStream // use `read` for batch, like DataFrame
  .format("kafka")
  .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
  .option("subscribe", "source-topic1")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

阅读键和值并应用两者的模式,为简单起见,我们正在将它们转换为 String 类型。

val dsStruc = ds.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

dsStruc 有了这个模式,它就可以接受所有类似sql的操作 filter , agg , select …等等。

将流写入Kafka主题

dsStruc
  .writeStream // use `write` for batch, like DataFrame
  .format("kafka")
  .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
  .option("topic", "target-topic1")
  .start()

Kafka集成读写的更多配置

要添加到应用程序中的关键构件

"org.apache.spark" % "spark-core_2.11" % 2.2.0,
 "org.apache.spark" % "spark-streaming_2.11" % 2.2.0,
 "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % 2.2.0,
vbopmzt1

vbopmzt12#

有一个由cloudera维护的流式kafka作家(实际上是从spark jira[1]衍生出来的)。它基本上为每个分区创建一个producer,它将创建“重”对象所花费的时间分摊到一个(希望是大的)元素集合上。
作者可以在这里找到:https://github.com/cloudera/spark-kafka-writer

holgip5t

holgip5t3#

我有同样的问题,找到了这个帖子。
作者通过为每个执行器创建一个生产者来解决这个问题。他只发送了一个“配方”,即如何通过广播在执行器中创建生产者,而不是发送生产者本身。

val kafkaSink = sparkContext.broadcast(KafkaSink(conf))

他使用了一个 Package 器,懒洋洋地创建了生产者:

class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {

      lazy val producer = createProducer()

      def send(topic: String, value: String): Unit = producer.send(new     ProducerRecord(topic, value))
    }

    object KafkaSink {
      def apply(config: Map[String, Object]): KafkaSink = {
        val f = () => {
          val producer = new KafkaProducer[String, String](config)

          sys.addShutdownHook {
            producer.close()
          }

          producer
        }
        new KafkaSink(f)
      }
    }

Package 器是可序列化的,因为kafka生产者是在第一次在执行器上使用之前初始化的。驱动程序保留对 Package 器的引用, Package 器使用每个执行器的生产者发送消息:

dstream.foreachRDD { rdd =>
      rdd.foreach { message =>
        kafkaSink.value.send("topicName", message)
      }
    }
ufj5ltwl

ufj5ltwl4#

这可能就是你想做的。基本上为每个记录分区创建一个生产者。

input.foreachRDD(rdd =>
      rdd.foreachPartition(
          partitionOfRecords =>
            {
                val props = new HashMap[String, Object]()
                props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
                props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.StringSerializer")
                props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.StringSerializer")
                val producer = new KafkaProducer[String,String](props)

                partitionOfRecords.foreach
                {
                    case x:String=>{
                        println(x)

                        val message=new ProducerRecord[String, String]("output",null,x)
                        producer.send(message)
                    }
                }
          })
)

希望有帮助

x7yiwoj4

x7yiwoj45#

为什么不可行?基本上,每个rdd的每个分区都是独立运行的(可能在不同的集群节点上运行),因此您必须在每个分区的任务开始时重新进行连接(以及任何同步)。如果这样做的开销太大,那么您应该在 StreamingContext 直到它成为可接受的(obv.有一个延迟成本这样做)。
(如果您没有在每个分区中处理数千条消息,您确定需要spark流吗?使用独立应用程序会更好吗?)

z0qdvdin

z0qdvdin6#

是的,不幸的是,spark(1.x,2.x)并不能直接说明如何高效地给Kafka写信。
我建议采用以下方法:
使用(再使用)一个 KafkaProducer 每个执行进程/jvm的示例。
以下是此方法的高级设置:
首先,你必须“ Package ”Kafka的 KafkaProducer 因为,正如你提到的,它是不可序列化的。 Package 它可以让你把它“运送”给遗嘱执行人。这里的关键思想是使用 lazy val 因此,您可以将生产者的示例化延迟到它的第一次使用,这实际上是一种解决方法,因此您不需要担心 KafkaProducer 不可序列化。
通过使用广播变量将 Package 好的生产者“运送”给每个执行者。
在实际的处理逻辑中,通过broadcast变量访问 Package 的生产者,并使用它将处理结果写回kafka。
下面的代码片段从spark 2.0开始使用spark流。
第一步: Package KafkaProducer ```
import java.util.concurrent.Future

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}

class MySparkKafkaProducer[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {

/* This is the key idea that allows us to work around running into
NotSerializableExceptions. */
lazy val producer = createProducer()

def send(topic: String, key: K, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, key, value))

def send(topic: String, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, value))

}

object MySparkKafkaProducer {

import scala.collection.JavaConversions._

def apply[K, V](config: Map[String, Object]): MySparkKafkaProducer[K, V] = {
val createProducerFunc = () => {
val producer = new KafkaProducerK, V

  sys.addShutdownHook {
    // Ensure that, on executor JVM shutdown, the Kafka producer sends
    // any buffered messages to Kafka before shutting down.
    producer.close()
  }

  producer
}
new MySparkKafkaProducer(createProducerFunc)

}

def apply[K, V](config: java.util.Properties): MySparkKafkaProducer[K, V] = apply(config.toMap)

}

步骤2:使用广播变量为每个执行器提供自己的包 `KafkaProducer` 示例

import org.apache.kafka.clients.producer.ProducerConfig

val ssc: StreamingContext = {
val sparkConf = new SparkConf().setAppName("spark-streaming-kafka-example").setMaster("local[2]")
new StreamingContext(sparkConf, Seconds(1))
}

ssc.checkpoint("checkpoint-directory")

val kafkaProducer: Broadcast[MySparkKafkaProducer[Array[Byte], String]] = {
val kafkaProducerConfig = {
val p = new Properties()
p.setProperty("bootstrap.servers", "broker1:9092")
p.setProperty("key.serializer", classOf[ByteArraySerializer].getName)
p.setProperty("value.serializer", classOf[StringSerializer].getName)
p
}
ssc.sparkContext.broadcast(MySparkKafkaProducerArray[Byte], String)
}

第三步:从spark streaming到kafka,重复使用相同的代码 `KafkaProducer` 示例(针对每个执行者)

import java.util.concurrent.Future
import org.apache.kafka.clients.producer.RecordMetadata

val stream: DStream[String] = ???
stream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val metadata: Stream[Future[RecordMetadata]] = partitionOfRecords.map { record =>
kafkaProducer.value.send("my-output-topic", record)
}.toStream
metadata.foreach { metadata => metadata.get() }
}
}

希望这有帮助。
j2qf4p5b

j2qf4p5b7#

我的第一个建议是尝试在foreachpartition中创建一个新示例,并测量它是否足够快以满足您的需要(官方文档建议在foreachpartition中示例化重对象)。
另一种选择是使用对象池,如本例所示:
https://github.com/miguno/kafka-storm-starter/blob/develop/src/main/scala/com/miguno/kafkastorm/kafka/pooledkafkaproducerappfactory.scala
然而,我发现在使用检查点时很难实现。
另一个对我来说运行良好的版本是一个工厂,如下面的博客文章所述,您只需检查它是否为您的需求提供了足够的并行性(检查评论部分):
http://allegro.tech/2015/08/spark-kafka-integration.html

jckbn6z7

jckbn6z78#

Spark<2.2

因为没有直接的方式从spark流向kafka发送信息

创建KafkasinkWriter

import java.util.Properties
import org.apache.kafka.clients.producer._
import org.apache.spark.sql.ForeachWriter

 class  KafkaSink(topic:String, servers:String) extends ForeachWriter[(String, String)] {
      val kafkaProperties = new Properties()
      kafkaProperties.put("bootstrap.servers", servers)
      kafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
      kafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
      val results = new scala.collection.mutable.HashMap[String, String]
      var producer: KafkaProducer[String, String] = _

      def open(partitionId: Long,version: Long): Boolean = {
        producer = new KafkaProducer(kafkaProperties)
        true
      }

      def process(value: (String, String)): Unit = {
          producer.send(new ProducerRecord(topic, value._1 + ":" + value._2))
      }

      def close(errorOrNull: Throwable): Unit = {
        producer.close()
      }
   }

使用sinkwriter编写消息

val topic = "<topic2>"
val brokers = "<server:ip>"

val writer = new KafkaSink(topic, brokers)

val query =
  streamingSelectDF
    .writeStream
    .foreach(writer)
    .outputMode("update")
    .trigger(ProcessingTime("25 seconds"))
    .start()

参考链接

相关问题