我有一个非常简单的spark+kafka应用程序。我在读Kafka的书,在控制台里打印。我有两条线在下面,即好线和坏线
一开始我用好的行处理,然后我切换到坏的行一段时间,当我切换回好的行时,我希望从它停止的地方处理。令人惊讶的是,它从最新开始。
1 2 3失踪7 8 9
在下面的代码中,如何确保我阅读了所有消息。我没有找到可以控制偏移量的代码或位置。即使有重复处理我也很好。。因为我的信息中会有唯一的id
public static void main(String[] args) throws Exception {
String brokers = "quickstart:9092";
String topics = "simple_topic_1";
String master = "local[*]";
SparkSession sparkSession = SparkSession
.builder().appName(SimpleKafkaProcessor.class.getName())
.master(master).getOrCreate();
SQLContext sqlContext = sparkSession.sqlContext();
SparkContext context = sparkSession.sparkContext();
context.setLogLevel("ERROR");
Dataset<Row> rawDataSet = sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
//.option("enable.auto.commit", "false")
.option("auto.offset.reset", "earliest")
.option("group.id", "safe_message_landing_app_2")
.option("subscribe", topics).load();
rawDataSet.printSchema();
rawDataSet.createOrReplaceTempView("basicView");
// Good-Line
sqlContext.sql("select string(Value) as StrValue from basicView").writeStream()
// Bad-Line
//sqlContext.sql("select fieldNotFound as StrValue from basicView").writeStream()
.format("console")
.option("checkpointLocation", "cp/" + UUID.randomUUID().toString())
.trigger(ProcessingTime.create("15 seconds"))
.start()
.awaitTermination();
}
暂无答案!
目前还没有任何答案,快来回答吧!