java—如何使用javasparkcontext处理kafka记录中具有文件名的文件?

lpwwtiir  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(279)

在我的应用程序中,有一个webui应用程序在完成文件上传过程后向kafka发送文件路径。
我有一个spark流应用程序,它使用 JavaSparkContext 以及 JavaPairInputDStream (因此它接收文件路径,但也可能有多个文件路径)。
我必须并行处理这些文件,并需要将结果发送到另一个Kafka流:

SparkConf conf = new SparkConf().setAppName("Task1").setMaster("local[*]");
    sc = new JavaSparkContext(conf);
    JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));

    Map<String, String> kafkaParams = new HashMap<>();
    kafkaParams.put("metadata.broker.list", "localhost:9092");
    Set<String> topics = Collections.singleton("topic1");

    JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc, String.class,
            String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);

    directKafkaStream.foreachRDD(rdd -> {

        rdd.collect().forEach((t) -> {
            sendMessage(sc, t._2());
        });
    });

    ssc.start();
    ssc.awaitTermination();
``` `sendMessage` 将发送文件中的数据。
在上面的实现中,我在foreachrdd方法中使用javasparkcontext,这不是最佳实践。我想并行处理这个文件。
tp5buhyn

tp5buhyn1#

例如:

directKafkaStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
    public void call(JavaRDD<String> stringJavaRDD) throws Exception {
        stringJavaRDD.foreachPartition(new VoidFunction<Iterator<String>>() {
            public void call(Iterator<String> stringIterator) throws Exception {
                sendMessage(stringIterator);
            }
        });
    }
sqyvllje

sqyvllje2#

我会创建一个函数 sendMessage 这将是一个纯粹的Kafka生产者(没有Spark的依赖,特别是。 JavaSparkContext )它将向Kafka主题发送一条消息,或者接收所有消息的迭代器。
请参阅apache kafka的官方文档。
纯粹的Kafka制作人 sendMessage 在spark streaming的转换中,我将执行以下操作(内联的注解应该会给您一些关于每行发生什么的提示):

def sendMessage(message: String) = {
  println(s"Sending $message to Kafka")
}
dstream.map(_.value).foreachRDD { rdd =>
  println(s"Received rdd: $rdd with ${rdd.count()} records")
  // take paths from RDD that contains Kafka records with the file names
  val files = rdd.collect()
  files.foreach { f =>
    // read a file `f` using Spark Core's RDD API
    rdd.sparkContext.textFile(f).map { line =>
      // do something with line
      // this is the place for a pure Spark transformation
      // it's as if you were outside Spark Streaming
      println(line)
      line
    }.foreachPartition { linesAfterProcessingPerPartition =>
      // send lines to Kafka
      // they have been processed using Spark
      linesAfterProcessingPerPartition.foreach { line =>
        sendMessage(message = line)
      }
    }
  }
}

我相信代码会变得更清晰,但那将是scala,你使用java,所以我就到此为止。
我强烈推荐使用sparksql的结构化流媒体,因为它很快就会取代spark流媒体,成为spark中的流媒体api。

相关问题