bigdata—它不使用定义的spark流窗口,而是一次性从kafka获取完整的消息

a14dhokn  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(256)

kafka已经填充了1千万条消息,然后我执行这个代码库,按照预期,它应该给出在2秒窗口中处理的元组/消息的计数,但是它在第一次返回完整的消息计数,即1千万,并且进一步给出0,0,0…尽管打印该消息需要2秒以上的时间。。
内联源代码是

import java.io.Serializable;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import kafka.serializer.StringDecoder;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;

public class Test implements Serializable {

private static final long serialVersionUID = -5863692754478476225L;
private static final String KEY_SPARK_MASTER = "spark://machine1-1467:7077";
private static final String KEY_APP_NAME = "SQLWordCount";

private static final String KEY_TOPIC = "add104";

private static JavaStreamingContext streamingContext = null;

private Test() {

    disableLogging();
    doInit();
    process();
}

public static void main(String[] params) {

    System.out.println("------------Started---------------" + new Date().toString());
    new Test();
}

private void disableLogging() {

    Logger.getLogger("org").setLevel(Level.OFF);
    Logger.getLogger("akka").setLevel(Level.OFF);
}

private void doInit() {

    SparkConf sparkConf = new SparkConf().setMaster(KEY_SPARK_MASTER).setAppName(KEY_APP_NAME);
    streamingContext = new JavaStreamingContext(sparkConf, new Duration(500));
    streamingContext.checkpoint("/home/impadmin/checkpoint");
}

private HashMap<String, String> getKafkaParams() {
    HashMap<String, String> kafkaParams = new HashMap<String, String>();
    kafkaParams.put("metadata.broker.list", "localhost:9092");
    kafkaParams.put("auto.offset.reset", "smallest");
    kafkaParams.put("group.id", "id7");
    return kafkaParams;
}

private HashSet<String> getTopicSet() {
    HashSet<String> topic = new HashSet<String>(Arrays.asList(KEY_TOPIC));
    return topic;
}

private void process() {

    try {
        JavaPairInputDStream<String, String> messages = KafkaUtils
                .createDirectStream(streamingContext, String.class,
                        String.class, StringDecoder.class,
                        StringDecoder.class, getKafkaParams(),
                        getTopicSet());

        JavaPairDStream<String, String> windowedStream = messages.window(
                new Duration(2000), new Duration(2000));

        JavaDStream<String> lines = windowedStream
                .map(new Function<Tuple2<String, String>, String>() {

                    public String call(Tuple2<String, String> tuple2) {
                        return tuple2._2();
                    }
                });

        lines.foreachRDD(new Function<JavaRDD<String>, Void>() {

            public Void call(JavaRDD<String> rdd) throws Exception {
                System.out.println(new Date().toString() + " In the Call method" + rdd.count());
                JavaRDD<Stock> rowRDD = rdd
                        .map(new Function<String, Stock>() {
                            @Override
                            public Stock call(String arg0) throws Exception {
                                return null;
                            }
                        });
                return null;
            };
        });

        streamingContext.start();
        streamingContext.awaitTermination();
    } catch (Exception e) {
        System.out.println("Exception: " + e.toString());
    }
}
}
lc8prwob

lc8prwob1#

因为您使用的是kafkaparams.put(“auto.offset.reset”,“smallest”);,它会带着所有的MSG回去。
更改为kafkaparams.put(“auto.offset.reset”,“最大”);只消耗新的msg。
如果您的期望是流式处理上下文将把所有msg分为2秒的批处理,我怀疑它是否能做到这一点。但是,您可以设置偏移范围并在多个批中读取所有现有数据。
但是,新的msg将在每2秒或你设置的任何间隔批处理

相关问题