kafka已经填充了1千万条消息,然后我执行这个代码库,按照预期,它应该给出在2秒窗口中处理的元组/消息的计数,但是它在第一次返回完整的消息计数,即1千万,并且进一步给出0,0,0…尽管打印该消息需要2秒以上的时间。。
内联源代码是
import java.io.Serializable;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import kafka.serializer.StringDecoder;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
public class Test implements Serializable {
private static final long serialVersionUID = -5863692754478476225L;
private static final String KEY_SPARK_MASTER = "spark://machine1-1467:7077";
private static final String KEY_APP_NAME = "SQLWordCount";
private static final String KEY_TOPIC = "add104";
private static JavaStreamingContext streamingContext = null;
private Test() {
disableLogging();
doInit();
process();
}
public static void main(String[] params) {
System.out.println("------------Started---------------" + new Date().toString());
new Test();
}
private void disableLogging() {
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
}
private void doInit() {
SparkConf sparkConf = new SparkConf().setMaster(KEY_SPARK_MASTER).setAppName(KEY_APP_NAME);
streamingContext = new JavaStreamingContext(sparkConf, new Duration(500));
streamingContext.checkpoint("/home/impadmin/checkpoint");
}
private HashMap<String, String> getKafkaParams() {
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
kafkaParams.put("auto.offset.reset", "smallest");
kafkaParams.put("group.id", "id7");
return kafkaParams;
}
private HashSet<String> getTopicSet() {
HashSet<String> topic = new HashSet<String>(Arrays.asList(KEY_TOPIC));
return topic;
}
private void process() {
try {
JavaPairInputDStream<String, String> messages = KafkaUtils
.createDirectStream(streamingContext, String.class,
String.class, StringDecoder.class,
StringDecoder.class, getKafkaParams(),
getTopicSet());
JavaPairDStream<String, String> windowedStream = messages.window(
new Duration(2000), new Duration(2000));
JavaDStream<String> lines = windowedStream
.map(new Function<Tuple2<String, String>, String>() {
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
lines.foreachRDD(new Function<JavaRDD<String>, Void>() {
public Void call(JavaRDD<String> rdd) throws Exception {
System.out.println(new Date().toString() + " In the Call method" + rdd.count());
JavaRDD<Stock> rowRDD = rdd
.map(new Function<String, Stock>() {
@Override
public Stock call(String arg0) throws Exception {
return null;
}
});
return null;
};
});
streamingContext.start();
streamingContext.awaitTermination();
} catch (Exception e) {
System.out.println("Exception: " + e.toString());
}
}
}
1条答案
按热度按时间lc8prwob1#
因为您使用的是kafkaparams.put(“auto.offset.reset”,“smallest”);,它会带着所有的MSG回去。
更改为kafkaparams.put(“auto.offset.reset”,“最大”);只消耗新的msg。
如果您的期望是流式处理上下文将把所有msg分为2秒的批处理,我怀疑它是否能做到这一点。但是,您可以设置偏移范围并在多个批中读取所有现有数据。
但是,新的msg将在每2秒或你设置的任何间隔批处理