我正在准备spark流媒体的应用程序(spark 2.1,kafka 0.10)
我需要从Kafka主题“输入”中读取数据,找到正确的数据并将结果写入主题“输出”
我可以基于kafkautils.createdirectstream方法从kafka读取数据。
我将rdd转换为json并准备过滤器:
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val elementDstream = messages.map(v => v.value).foreachRDD { rdd =>
val PeopleDf=spark.read.schema(schema1).json(rdd)
import spark.implicits._
PeopleDf.show()
val PeopleDfFilter = PeopleDf.filter(($"value1".rlike("1"))||($"value2" === 2))
PeopleDfFilter.show()
}
我可以从kafka加载数据,并使用kafkaproducer将“原样”写入kafka:
messages.foreachRDD( rdd => {
rdd.foreachPartition( partition => {
val kafkaTopic = "output"
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
partition.foreach{ record: ConsumerRecord[String, String] => {
System.out.print("########################" + record.value())
val messageResult = new ProducerRecord[String, String](kafkaTopic, record.value())
producer.send(messageResult)
}}
producer.close()
})
})
但我不能将这两个操作集成到json中,即在json中找到合适的值并将结果写入kafka:以json格式编写peopledfilter以“输出”kafka主题。
我在kafka中有很多输入消息,这就是为什么我要使用foreachpartition来创建kafka producer的原因。
非常感谢你的建议。
2条答案
按热度按时间cu6pst1q1#
尝试使用结构化流媒体。即使您使用了spark 2.1,也可以实现您自己的kafka foreachwriter,如下所示:
KafkaFlume:
用法:
sdnqo3pr2#
这个过程非常简单,为什么不一直使用结构化流媒体呢?