spark streaming-从kafka读取json并将json写入其他kafka主题

ctehm74n  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(369)

我正在准备spark流媒体的应用程序(spark 2.1,kafka 0.10)
我需要从Kafka主题“输入”中读取数据,找到正确的数据并将结果写入主题“输出”
我可以基于kafkautils.createdirectstream方法从kafka读取数据。
我将rdd转换为json并准备过滤器:

  1. val messages = KafkaUtils.createDirectStream[String, String](
  2. ssc,
  3. PreferConsistent,
  4. Subscribe[String, String](topics, kafkaParams)
  5. )
  6. val elementDstream = messages.map(v => v.value).foreachRDD { rdd =>
  7. val PeopleDf=spark.read.schema(schema1).json(rdd)
  8. import spark.implicits._
  9. PeopleDf.show()
  10. val PeopleDfFilter = PeopleDf.filter(($"value1".rlike("1"))||($"value2" === 2))
  11. PeopleDfFilter.show()
  12. }

我可以从kafka加载数据,并使用kafkaproducer将“原样”写入kafka:

  1. messages.foreachRDD( rdd => {
  2. rdd.foreachPartition( partition => {
  3. val kafkaTopic = "output"
  4. val props = new HashMap[String, Object]()
  5. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  6. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  7. "org.apache.kafka.common.serialization.StringSerializer")
  8. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  9. "org.apache.kafka.common.serialization.StringSerializer")
  10. val producer = new KafkaProducer[String, String](props)
  11. partition.foreach{ record: ConsumerRecord[String, String] => {
  12. System.out.print("########################" + record.value())
  13. val messageResult = new ProducerRecord[String, String](kafkaTopic, record.value())
  14. producer.send(messageResult)
  15. }}
  16. producer.close()
  17. })
  18. })

但我不能将这两个操作集成到json中,即在json中找到合适的值并将结果写入kafka:以json格式编写peopledfilter以“输出”kafka主题。
我在kafka中有很多输入消息,这就是为什么我要使用foreachpartition来创建kafka producer的原因。
非常感谢你的建议。

cu6pst1q

cu6pst1q1#

尝试使用结构化流媒体。即使您使用了spark 2.1,也可以实现您自己的kafka foreachwriter,如下所示:
KafkaFlume:

  1. import java.util.Properties
  2. import kafkashaded.org.apache.kafka.clients.producer._
  3. import org.apache.spark.sql.ForeachWriter
  4. class KafkaSink(topic:String, servers:String) extends ForeachWriter[(String, String)] {
  5. val kafkaProperties = new Properties()
  6. kafkaProperties.put("bootstrap.servers", servers)
  7. kafkaProperties.put("key.serializer",
  8. classOf[org.apache.kafka.common.serialization.StringSerializer].toString)
  9. kafkaProperties.put("value.serializer",
  10. classOf[org.apache.kafka.common.serialization.StringSerializer].toString)
  11. val results = new scala.collection.mutable.HashMap[String, String]
  12. var producer: KafkaProducer[String, String] = _
  13. def open(partitionId: Long,version: Long): Boolean = {
  14. producer = new KafkaProducer(kafkaProperties)
  15. true
  16. }
  17. def process(value: (String, String)): Unit = {
  18. producer.send(new ProducerRecord(topic, value._1 + ":" + value._2))
  19. }
  20. def close(errorOrNull: Throwable): Unit = {
  21. producer.close()
  22. }
  23. }

用法:

  1. val topic = "<topic2>"
  2. val brokers = "<server:ip>"
  3. val writer = new KafkaSink(topic, brokers)
  4. val query =
  5. streamingSelectDF
  6. .writeStream
  7. .foreach(writer)
  8. .outputMode("update")
  9. .trigger(ProcessingTime("25 seconds"))
  10. .start()
展开查看全部
sdnqo3pr

sdnqo3pr2#

这个过程非常简单,为什么不一直使用结构化流媒体呢?

  1. import org.apache.spark.sql.functions.from_json
  2. spark
  3. // Read the data
  4. .readStream
  5. .format("kafka")
  6. .option("kafka.bootstrap.servers", inservers)
  7. .option("subscribe", intopic)
  8. .load()
  9. // Transform / filter
  10. .select(from_json($"value".cast("string"), schema).alias("value"))
  11. .filter(...) // Add the condition
  12. .select(to_json($"value").alias("value")
  13. // Write back
  14. .writeStream
  15. .format("kafka")
  16. .option("kafka.bootstrap.servers", outservers)
  17. .option("subscribe", outtopic)
  18. .start()
展开查看全部

相关问题