KafkaSpark流:如何生产和消费的最后一批记录

wfveoks0  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(238)

我有一个kafka spark作业,它是一个日常批处理作业,它从hadoop位置顺序读取hdfs部分文件,并按顺序生成它们。
在最后一批中,我选取批中的最后一条记录,并在其上创建一个新的Dataframe(df2),其余记录转到另一个Dataframe(df1)。
我首先生产df1,然后生产df2,因为我希望使用者(这是一个流作业)在从producer发送的所有批中的最后一批中使用df2。但这并没有发生。在消费端,df2中的记录不是在最后一批中被消费的,而是在最后第二批或最后第三批中被消费的,但不是在最后一批中。
下面是示例代码:

class XYZ{

create new Kafka Instance
.
                    ABC.getInstance(topic,<required Kafka Parameters>);
.
.
f(!(df2.toJavaRDD().isEmpty()) && !(df1.toJavaRDD().isEmpty())) {
                    calling produceDF method(df1, <required Kafka Parameters>);
                    calling produceDF method(df2, <required Kafka Parameters>);
                } 

    public void produceDF(Dataset<Row> df, Broadcast<ABC> abc>){
        df.foreachPartition(partitionOfRecords ->{
                    final ABC kafkaProducer = abc.value();
            while (partitionOfRecords.hasNext()) {
                Row row = partitionOfRecords.next();
                kafkaProducer.produceDF(<key>, <value>);
            }
            kafkaProducer.flush();
        });
}
}

class ABC implements Serializable{
        private transient KafkaProducer<String,String> kafkaProducer = null;
                private static ABC abcInstance;

                public static ABC getInstance(String topic, Map<String,Object> kafkaParams) {
        if(abcInstance==null){
            synchronized (ABC.class){
                if(abcInstance==null){
                    abcInstance = new ABC(topic,kafkaParams);
                }
            }
        }
        return abcInstance;
    }

public void produceDF(String key,String val) {
        if( null==kafkaProducer) {
            kafkaProducer = new KafkaProducer<String, String>(<Kafka Parameters>);
            Runtime.getRuntime().addShutdownHook(new Thread() {
                public void run() {
                    kafkaProducer.close();
                }
            });

        }
        if(key!=null) {
    kafkaProducer.send(new ProducerRecord(topic, key, val), new ProducerCallback(key, val));
}
        public void flush(){
        if(null!=kafkaProducer)
            kafkaProducer.flush();
    }
}
}

请告知我在最后一批中使用df2需要采用什么方法

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题