apache spark无法读取kafka消息内容

pkwftd7m  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(248)

我正在尝试创建apachespark作业来使用提交到主题中的kafka消息。使用kafka console producer向主题提交消息,如下所示。

./kafka-console-producer.sh --broker-list kafka1:9092 --topic my-own-topic

为了阅读信息,我使用spark-streaming-kafka-0-10_.11库。与库一起管理读取接收到主题的消息的总数。但我无法读取流中的consumerrecord对象,当我尝试读取它时,整个应用程序都被阻止,无法将其打印到控制台。注意,我在docker容器中运行kafka、zookeeper和spark。我们将不胜感激。

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.HasOffsetRanges;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange;

public class SparkKafkaStreamingJDBCExample {

  public static void main(String[] args) {

    // Start a spark instance and get a context
    SparkConf conf =
        new SparkConf().setAppName("Study Spark").setMaster("spark://spark-master:7077");

    // Setup a streaming context.
    JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(3));

    // Create a map of Kafka params
    Map<String, Object> kafkaParams = new HashMap<String, Object>();
    // List of Kafka brokers to listen to.
    kafkaParams.put("bootstrap.servers", "kafka1:9092");
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer", StringDeserializer.class);
    kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
    // Do you want to start from the earliest record or the latest?
    kafkaParams.put("auto.offset.reset", "earliest");
    kafkaParams.put("enable.auto.commit", true);

    // List of topics to listen to.
    Collection<String> topics = Arrays.asList("my-own-topic");

    // Create a Spark DStream with the kafka topics.
    final JavaInputDStream<ConsumerRecord<String, String>> stream =
        KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(),
            ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));

    System.out.println("Study Spark Example Starting ....");

    stream.foreachRDD(rdd -> {

      if (rdd.isEmpty()) {
        System.out.println("RDD Empty " + rdd.count());
        return;
      } else {
        System.out.println("RDD not empty " + rdd.count());

        OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
        System.out.println("Partition Id " + TaskContext.getPartitionId());
        OffsetRange o = offsetRanges[TaskContext.getPartitionId()];
        System.out.println("Topic " + o.topic());

        System.out.println("Creating RDD !!!");
        JavaRDD<ConsumerRecord<String, String>> r =
            KafkaUtils.createRDD(streamingContext.sparkContext(), kafkaParams, offsetRanges,
                LocationStrategies.PreferConsistent());
        System.out.println("Count " + r.count());
    //Application stuck from here onwards ...
    ConsumerRecord<String, String> first = r.first();
    System.out.println("First taken");
    System.out.println("First value " + first.value());

      }
    });

    System.out.println("Stream context starting ...");
    // Start streaming.
    streamingContext.start();
    System.out.println("Stream context started ...");

    try {
      System.out.println("Stream context await termination ...");
      streamingContext.awaitTermination();
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

  }

}

下面给出了输出示例。

Study Spark Example Starting ....
Stream context starting ...
Stream context started ...
Stream context await termination ...
RDD Empty 0
RDD Empty 0
RDD Empty 0
RDD Empty 0
RDD not empty 3
Partition Id 0
Topic my-own-topic
Creating RDD !!!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题