无法在hdfs中使用spark流存储数据

jv4diomz  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(178)

我正在尝试使用java中的spark流将kafka中的数据存储到hdfs中。这是我的代码。

JavaInputDStream<ConsumerRecord<String, String>> directKafkaStream =
                KafkaUtils.createDirectStream(
                        ssc,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
                );
directKafkaStream.foreachRDD(rdd -> {
            rdd.saveAsTextFile("hdfs://.../sampleTest.txt");
            rdd.foreach(record -> {
                System.out.println("Got the record : ");
            });
        });
        ssc.start();
        ssc.awaitTermination();

以下是我正在使用的sbt库依赖项:

"org.apache.kafka" % "kafka-clients" % "0.8.2.0",
  "org.apache.spark" %% "spark-streaming" % "2.2.0",
  "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.0.0",

在消费者策略中,我订阅了主题列表和Kafka配置。但是当我使用kafka发送数据时,hdfs中没有生成任何文件。另外,当我运行jar文件时,它会显示sparkstreamingcontext started,但之后不会打印任何确认消息。我是遗漏了什么还是Kafka依赖不匹配的问题?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题