我的计划是:
1. using spark streaming to load data from kafka every period like 1 minute.
2. convert the data loading every 1 min into DataFrame.
3. upsert the DataFrame into a Hive table (a table storing all history data)
目前,我成功地实现了步骤1-2。
我想知道有没有什么切实可行的方法来实现第三步。具体内容:
1. load the latest history table with a certain partition in spark streaming.
2. use batch DataFrame to join the history table/DataFrame with a partition, and generate a new DataFrame.
3. save the new DataFrame to Hive, overwriting the history table of that partition.
这是我的密码:
public final class SparkConsumer {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception {
String brokers = "device1:9092,device2:9092,device3:9092";
String groupId = "spark";
String topics = "zhihu_comment";
String destTable = "ods.zhihu_comment";
// Create context with a certain seconds batch interval
SparkConf sparkConf = new SparkConf().setAppName("TestKafkaStreaming");
sparkConf.set("spark.streaming.backpressure.enabled", "true");
sparkConf.set("spark.streaming.backpressure.initialRate", "10000");
sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "10000");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(60));
Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
kafkaParams.put("enable.auto.commit", false);
kafkaParams.put("max.poll.records", "500");
SparkSession spark = SparkSession.builder().appName(topics).getOrCreate();
// Create direct kafka stream with brokers and topics
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicsSet, kafkaParams));
messages.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
// some time later, after outputs have completed
((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);
});
/*
* Keep only the actual message in JSON format
*/
Column[] colList = { col("answer_id"), col("author"), col("content"), col("vote_count") };
JavaDStream<String> recordStream = messages.flatMap(record -> Arrays.asList(record.value()).iterator());
/*
* Extract RDDs from stream
*/
recordStream.foreachRDD(rdd -> {
if (rdd.count() > 0) {
Dataset<Row> df = spark.read().json(rdd.rdd());
df.select(colList).show();
}
});
// Get the lines, split them into words, count the words and print
JavaDStream<String> lines = messages.map(ConsumerRecord::value);
jssc.start();
jssc.awaitTermination();
}
}
我想知道这种方法是否可行?如果你能给我一些建议,我将不胜感激。
1条答案
按热度按时间7gcisfzg1#
与其重新发明轮子,我强烈建议Kafka连接。您只需要hdfs接收器连接器,它将数据从kafka主题复制到配置单元:
kafka connect hdfs sink connector允许您以各种格式将kafka主题中的数据导出到hdfs文件,并与hive集成,使数据立即可用于hiveql查询
对于hdfs 2.x文件,可以使用hdfs 2接收器连接器
对于hdfs 3.x文件,请使用hdfs 3接收器连接器