使用kafka的spark流媒体确保处理过程无损失

jvidinwx  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(204)

我有一个非常简单的spark+kafka应用程序。我在读Kafka的书,在控制台里打印。我有两条线在下面,即好线和坏线
一开始我用好的行处理,然后我切换到坏的行一段时间,当我切换回好的行时,我希望从它停止的地方处理。令人惊讶的是,它从最新开始。
1 2 3失踪7 8 9
在下面的代码中,如何确保我阅读了所有消息。我没有找到可以控制偏移量的代码或位置。即使有重复处理我也很好。。因为我的信息中会有唯一的id

public static void main(String[] args) throws Exception {

    String brokers = "quickstart:9092";
    String topics = "simple_topic_1";
    String master = "local[*]";

    SparkSession sparkSession = SparkSession
            .builder().appName(SimpleKafkaProcessor.class.getName())
            .master(master).getOrCreate();
    SQLContext sqlContext = sparkSession.sqlContext();
    SparkContext context = sparkSession.sparkContext();
    context.setLogLevel("ERROR");

    Dataset<Row> rawDataSet = sparkSession.readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", brokers)
            //.option("enable.auto.commit", "false")
            .option("auto.offset.reset", "earliest")
            .option("group.id", "safe_message_landing_app_2")
            .option("subscribe", topics).load();
    rawDataSet.printSchema();
    rawDataSet.createOrReplaceTempView("basicView");

    // Good-Line
    sqlContext.sql("select string(Value) as StrValue from basicView").writeStream()
    // Bad-Line
    //sqlContext.sql("select fieldNotFound as StrValue from basicView").writeStream()
            .format("console")
            .option("checkpointLocation", "cp/" + UUID.randomUUID().toString())
            .trigger(ProcessingTime.create("15 seconds"))
            .start()
            .awaitTermination();

}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题