将apache spark结果发布到另一个应用程序/kafka

mspsb9vt  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(385)

我目前正在设计一个快速数据聚合模块,它接收事件并将它们发布到kafka集群。然后我们把Kafka和Spark流结合起来。spark streaming从kafka读取流并执行一些计算。当计算完成后,我们需要将结果发送到另一个应用程序。此应用程序可以是web服务或kafka群集。
我想知道我们怎么做?据我所知,spark stream将数据推送到下游,如数据库和文件系统。
你如何设计这样一个应用程序?我是否应该用storm替换spark stream,以便能够将结果发布到另一个应用程序?

dxxyhpgq

dxxyhpgq1#

我想知道我们怎么做?据我所知,spark stream将数据推送到下游,如数据库和文件系统。
spark不局限于HDF或数据库,您可以自由初始化到任何可用外部资源的连接。它可以返回到kafka、rabbitmq或webservice。
如果你在做简单的变换,比如 map , filter , reduceByKey 等等,然后使用 DStream.foreachRDD 会很好的。如果你要做有状态的计算 DStream.mapWithState ,然后一旦处理完状态,就可以简单地将数据发送到任何外部服务。
例如,我们使用kafka作为数据的输入流,使用rabbitmq和执行一些有状态计算后的输出。

41ik7eoe

41ik7eoe2#

请参阅 dstream.foreachRDD ,这是一个强大的原语,允许将数据发送到外部系统。
使用foreachrdd的设计模式
下面是我的kafka集成代码供您参考(没有优化,只是为了poc,kafkaproducer对象可以在foreachrdd中重用):

DStream.foreachRDD(rdd => {
      rdd.foreachPartition { partitionOfRecords =>
        val kafkaProps = new Properties()
        kafkaProps.put("bootstrap.servers", props("bootstrap.servers"))
        kafkaProps.put("client.id", "KafkaIntegration Producer");
        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        val producer = new KafkaProducer[String, String](kafkaProps);

        partitionOfRecords.foreach(record => {
        val message = new ProducerRecord[String, String]("hdfs_log_test", record.asInstanceOf[String])
          producer.send(message)
        })
        producer.close()
      }
    })

相关问题