我成功地融合了Kafka和Spark。我想把Kafka的流发送到spark。现在我可以把流发送到spark了。我想把这个流放在rdd中,所以我用createrdd()函数来创建rdd。但我在rdd只有Kafka的一些按摩师。因为它取决于偏移范围。所以请任何人告诉我如何在kafka spark createrdd()函数中设置offsetrange()。
c2e8gylq1#
只需在代码片段中使用
// Import dependencies and create kafka params as in Create Direct Stream val offsetRanges = Array( // topic, partition, inclusive starting offset, exclusive ending offset OffsetRange("test", 0, 0, 100), OffsetRange("test", 1, 0, 100) ) val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)
spark kafka集成指南据文件记载:皮斯帕克·KafkaKafka主题分区的第一组偏移范围使用
pyspark.streaming.kafka.OffsetRange(topic, partition, fromOffset, untilOffset)
初始化看起来像:
fromOffset = 0 untilOffset = 10 partition = 0 topic = 'topic' offset = OffsetRange(topic, partition, fromOffset, untilOffset) offsets = [offset]
然后你就可以创建你的rdd了
kafkaRDD = KafkaUtils.createRDD(sc, kafkaParams, offsets)
nzkunb0c2#
请查找Kafka偏移处理的代码段。
topicpartion = TopicAndPartition(var_topic_src_name, var_partition) fromoffset = {topicpartion: var_offset} print(fromoffset) kvs = KafkaUtils.createDirectStream(ssc,\ [var_topic_src_name],\ var_kafka_parms_src,\ valueDecoder=serializer.decode_message,\ fromOffsets = fromoffset)
2条答案
按热度按时间c2e8gylq1#
只需在代码片段中使用
spark kafka集成指南
据文件记载:皮斯帕克·Kafka
Kafka主题分区的第一组偏移范围使用
初始化看起来像:
然后你就可以创建你的rdd了
nzkunb0c2#
请查找Kafka偏移处理的代码段。