失败后读取spark流检查点

dldeef67  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(408)

我正在尝试用kafka应用程序实现spark流,包括容错。当我重新启动应用程序时,它读取在重新启动之前已经读取的消息,我的计算出错。请帮我解决这个问题。
下面是用java编写的代码。

public static JavaStreamingContext createContextFunc() {

    SummaryOfTransactionsWithCheckpoints app = new SummaryOfTransactionsWithCheckpoints();

    ApplicationConf conf = new ApplicationConf();
    String checkpointDir = conf.getCheckpointDirectory();

    JavaStreamingContext streamingContext =  app.getStreamingContext(checkpointDir);

    JavaDStream<String> kafkaInputStream = app.getKafkaInputStream(streamingContext);

    return streamingContext;
}

public static void main(String[] args) throws InterruptedException {

    String checkpointDir = conf.getCheckpointDirectory();

    Function0<JavaStreamingContext> createContextFunc = () -> createContextFunc();
    JavaStreamingContext streamingContext = JavaStreamingContext.getOrCreate(checkpointDir, createContextFunc);

    streamingContext.start();
    streamingContext.awaitTermination();

}

public JavaStreamingContext getStreamingContext(String checkpointDir) {

    ApplicationConf conf = new ApplicationConf();
    String appName = conf.getAppName();
    String master = conf.getMaster();
    int duration = conf.getDuration();

    SparkConf sparkConf = new SparkConf().setAppName(appName).setMaster(master);
    sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true");

    JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, new Duration(duration));
    streamingContext.checkpoint(checkpointDir);

    return streamingContext;
}

public SparkSession getSession() {

    ApplicationConf conf = new ApplicationConf();
    String appName = conf.getAppName();
    String hiveConf = conf.getHiveConf();
    String thriftConf =  conf.getThriftConf();
    int shufflePartitions = conf.getShuffle();

    SparkSession spark = SparkSession
            .builder()
            .appName(appName)
            .config("spark.sql.warehouse.dir", hiveConf)
            .config("hive.metastore.uris", thriftConf)
            .enableHiveSupport()
            .getOrCreate();

    spark.conf().set("spark.sql.shuffle.partitions", shufflePartitions);
    return spark;

}

public JavaDStream<String> getKafkaInputStream(JavaStreamingContext streamingContext) {

    KafkaConfig kafkaConfig = new KafkaConfig();
    Set<String> topicsSet = kafkaConfig.getTopicSet();
    Map<String, Object> kafkaParams = kafkaConfig.getKafkaParams();

    // Create direct kafka stream with brokers and topics
    JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
            streamingContext,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.Subscribe(topicsSet, kafkaParams));

    JavaDStream<String> logdata = messages.map(ConsumerRecord::value);

    return logdata;
}

这里是github项目的链接。https://github.com/thisast/spark-fault-tolerance

mwecs4sa

mwecs4sa1#

通过在代码中添加以下配置,我已经克服了这个问题。

sparkConf.set(“spark.streaming.stopGracefullyOnShutdown","true")

相关问题