我正在尝试使用java中的spark流将kafka中的数据存储到hdfs中。这是我的代码。
JavaInputDStream<ConsumerRecord<String, String>> directKafkaStream =
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
directKafkaStream.foreachRDD(rdd -> {
rdd.saveAsTextFile("hdfs://.../sampleTest.txt");
rdd.foreach(record -> {
System.out.println("Got the record : ");
});
});
ssc.start();
ssc.awaitTermination();
以下是我正在使用的sbt库依赖项:
"org.apache.kafka" % "kafka-clients" % "0.8.2.0",
"org.apache.spark" %% "spark-streaming" % "2.2.0",
"org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.0.0",
在消费者策略中,我订阅了主题列表和Kafka配置。但是当我使用kafka发送数据时,hdfs中没有生成任何文件。另外,当我运行jar文件时,它会显示sparkstreamingcontext started,但之后不会打印任何确认消息。我是遗漏了什么还是Kafka依赖不匹配的问题?
暂无答案!
目前还没有任何答案,快来回答吧!