在我的应用程序中,有一个webui应用程序在完成文件上传过程后向kafka发送文件路径。
我有一个spark流应用程序,它使用 JavaSparkContext
以及 JavaPairInputDStream
(因此它接收文件路径,但也可能有多个文件路径)。
我必须并行处理这些文件,并需要将结果发送到另一个Kafka流:
SparkConf conf = new SparkConf().setAppName("Task1").setMaster("local[*]");
sc = new JavaSparkContext(conf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
Set<String> topics = Collections.singleton("topic1");
JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc, String.class,
String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
directKafkaStream.foreachRDD(rdd -> {
rdd.collect().forEach((t) -> {
sendMessage(sc, t._2());
});
});
ssc.start();
ssc.awaitTermination();
``` `sendMessage` 将发送文件中的数据。
在上面的实现中,我在foreachrdd方法中使用javasparkcontext,这不是最佳实践。我想并行处理这个文件。
2条答案
按热度按时间tp5buhyn1#
例如:
sqyvllje2#
我会创建一个函数
sendMessage
这将是一个纯粹的Kafka生产者(没有Spark的依赖,特别是。JavaSparkContext
)它将向Kafka主题发送一条消息,或者接收所有消息的迭代器。请参阅apache kafka的官方文档。
纯粹的Kafka制作人
sendMessage
在spark streaming的转换中,我将执行以下操作(内联的注解应该会给您一些关于每行发生什么的提示):我相信代码会变得更清晰,但那将是scala,你使用java,所以我就到此为止。
我强烈推荐使用sparksql的结构化流媒体,因为它很快就会取代spark流媒体,成为spark中的流媒体api。