spark流:打印javainputdstream

rsaldnfx  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(401)

目前,我正在与Spark流和可能性阅读Kafka的消息。与Kafka生产者我发送消息到一个主题,并希望阅读这个主题与Spark流的帮助。
我使用以下java代码来查询消息:

package apache_spark_streaming;

import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public final class Spark_Kafka_Example {

private static final String BOOTSTRAP_SERVERS_CONNECTION = "XXXXX";
private static final String SPARK_CONNECTION = "spark://XXXXX:7077";
private static final String TOPIC_NAME = "KafkaTesting1";
private static final Set<String> TOPIC_1 = new HashSet<>(Arrays.asList(TOPIC_NAME.split(",")));

public static Map<String, Object> getProperties() {

    try {
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", BOOTSTRAP_SERVERS_CONNECTION);
        kafkaParams.put("key.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class.getName());
        kafkaParams.put("value.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class.getName());
        kafkaParams.put("group.id", "Stream Testing");
        kafkaParams.put("auto.offset.reset", "earliest");
        kafkaParams.put("enable.auto.commit", false);
        return kafkaParams;
    } 
    catch (Exception e) {
        e.printStackTrace();
    }
    return null;
}
public static void main(String[] args) throws Exception {

    // Create context with a 2 seconds batch interval
    SparkConf sparkConf = new SparkConf().setAppName("Kafka Example").setMaster(SPARK_CONNECTION);   
    JavaStreamingContext sc = new JavaStreamingContext(sparkConf, Durations.seconds(2));

    JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
           sc
           , LocationStrategies.PreferConsistent()
           , ConsumerStrategies.Subscribe(TOPIC_1, getProperties())
    );    

    stream.print();
    sc.start(); 
    sc.awaitTermination();
  }
}

我的问题是我不知道如何在命令行上输出消息。也许我对如何正确使用javainputdStream只有一个理解上的问题。
目前,我仅通过print()函数将其作为输出:
2010年7月17日16:59:20信息jobscheduler:为时间14996987600毫秒添加了作业
我希望你能帮我解决这个“问题”。
我试着

stream.foreachRDD(consumerRecordJavaRDD -> {
   consumerRecordJavaRDD.foreach(stringStringConsumerRecord -> {
   //.to get topic name: stringStringConsumerRecord.topic()
   //To get value : stringStringConsumerRecord.value()
  } }
jdgnovmf

jdgnovmf1#

您直接绑定到打印输入流,这是我无法打印流的实际值。
使用下面的代码打印您的输入流,它是由您的Kafka生产者产生的。

JavaDStream<String> data = stream.map(v -> {
        return v.value();    // mapping to convert into spark D-Stream 
  });

  data.print();
w9apscun

w9apscun2#

创建流之后,您将提取内容并使用过滤器或Map对其进行处理。
查看spark/java的流示例目录:https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples/streaming
javaqueuestream给出了一个例子。

相关问题