我正在使用spark流来处理两个kafka队列之间的数据,但是我似乎找不到一个好的方法从spark写kafka。我试过这个:
input.foreachRDD(rdd =>
rdd.foreachPartition(partition =>
partition.foreach {
case x: String => {
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
println(x)
val producer = new KafkaProducer[String, String](props)
val message = new ProducerRecord[String, String]("output", null, x)
producer.send(message)
}
}
)
)
它按预期的方式工作,但是为每一条信息引用一个新的kafkaproducer在真实环境中显然是不可行的,我正在努力解决这个问题。
我想为每个进程保留对单个示例的引用,并在需要发送消息时访问它。我怎样才能从spark streaming写信给Kafka?
8条答案
按热度按时间bzzcjhmw1#
Spark>=2.2
使用结构化流api,kafka上的读写操作都是可能的
从Kafka主题构建流
阅读键和值并应用两者的模式,为简单起见,我们正在将它们转换为
String
类型。自
dsStruc
有了这个模式,它就可以接受所有类似sql的操作filter
,agg
,select
…等等。将流写入Kafka主题
Kafka集成读写的更多配置
要添加到应用程序中的关键构件
vbopmzt12#
有一个由cloudera维护的流式kafka作家(实际上是从spark jira[1]衍生出来的)。它基本上为每个分区创建一个producer,它将创建“重”对象所花费的时间分摊到一个(希望是大的)元素集合上。
作者可以在这里找到:https://github.com/cloudera/spark-kafka-writer
holgip5t3#
我有同样的问题,找到了这个帖子。
作者通过为每个执行器创建一个生产者来解决这个问题。他只发送了一个“配方”,即如何通过广播在执行器中创建生产者,而不是发送生产者本身。
他使用了一个 Package 器,懒洋洋地创建了生产者:
Package 器是可序列化的,因为kafka生产者是在第一次在执行器上使用之前初始化的。驱动程序保留对 Package 器的引用, Package 器使用每个执行器的生产者发送消息:
ufj5ltwl4#
这可能就是你想做的。基本上为每个记录分区创建一个生产者。
希望有帮助
x7yiwoj45#
为什么不可行?基本上,每个rdd的每个分区都是独立运行的(可能在不同的集群节点上运行),因此您必须在每个分区的任务开始时重新进行连接(以及任何同步)。如果这样做的开销太大,那么您应该在
StreamingContext
直到它成为可接受的(obv.有一个延迟成本这样做)。(如果您没有在每个分区中处理数千条消息,您确定需要spark流吗?使用独立应用程序会更好吗?)
z0qdvdin6#
是的,不幸的是,spark(1.x,2.x)并不能直接说明如何高效地给Kafka写信。
我建议采用以下方法:
使用(再使用)一个
KafkaProducer
每个执行进程/jvm的示例。以下是此方法的高级设置:
首先,你必须“ Package ”Kafka的
KafkaProducer
因为,正如你提到的,它是不可序列化的。 Package 它可以让你把它“运送”给遗嘱执行人。这里的关键思想是使用lazy val
因此,您可以将生产者的示例化延迟到它的第一次使用,这实际上是一种解决方法,因此您不需要担心KafkaProducer
不可序列化。通过使用广播变量将 Package 好的生产者“运送”给每个执行者。
在实际的处理逻辑中,通过broadcast变量访问 Package 的生产者,并使用它将处理结果写回kafka。
下面的代码片段从spark 2.0开始使用spark流。
第一步: Package
KafkaProducer
```import java.util.concurrent.Future
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
class MySparkKafkaProducer[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
/* This is the key idea that allows us to work around running into
NotSerializableExceptions. */
lazy val producer = createProducer()
def send(topic: String, key: K, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, key, value))
def send(topic: String, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, value))
}
object MySparkKafkaProducer {
import scala.collection.JavaConversions._
def apply[K, V](config: Map[String, Object]): MySparkKafkaProducer[K, V] = {
val createProducerFunc = () => {
val producer = new KafkaProducerK, V
}
def apply[K, V](config: java.util.Properties): MySparkKafkaProducer[K, V] = apply(config.toMap)
}
import org.apache.kafka.clients.producer.ProducerConfig
val ssc: StreamingContext = {
val sparkConf = new SparkConf().setAppName("spark-streaming-kafka-example").setMaster("local[2]")
new StreamingContext(sparkConf, Seconds(1))
}
ssc.checkpoint("checkpoint-directory")
val kafkaProducer: Broadcast[MySparkKafkaProducer[Array[Byte], String]] = {
val kafkaProducerConfig = {
val p = new Properties()
p.setProperty("bootstrap.servers", "broker1:9092")
p.setProperty("key.serializer", classOf[ByteArraySerializer].getName)
p.setProperty("value.serializer", classOf[StringSerializer].getName)
p
}
ssc.sparkContext.broadcast(MySparkKafkaProducerArray[Byte], String)
}
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.RecordMetadata
val stream: DStream[String] = ???
stream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val metadata: Stream[Future[RecordMetadata]] = partitionOfRecords.map { record =>
kafkaProducer.value.send("my-output-topic", record)
}.toStream
metadata.foreach { metadata => metadata.get() }
}
}
j2qf4p5b7#
我的第一个建议是尝试在foreachpartition中创建一个新示例,并测量它是否足够快以满足您的需要(官方文档建议在foreachpartition中示例化重对象)。
另一种选择是使用对象池,如本例所示:
https://github.com/miguno/kafka-storm-starter/blob/develop/src/main/scala/com/miguno/kafkastorm/kafka/pooledkafkaproducerappfactory.scala
然而,我发现在使用检查点时很难实现。
另一个对我来说运行良好的版本是一个工厂,如下面的博客文章所述,您只需检查它是否为您的需求提供了足够的并行性(检查评论部分):
http://allegro.tech/2015/08/spark-kafka-integration.html
jckbn6z78#
Spark<2.2
因为没有直接的方式从spark流向kafka发送信息
创建KafkasinkWriter
使用sinkwriter编写消息
参考链接