set offsetrange()函数

jckbn6z7  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(520)

我成功地融合了Kafka和Spark。我想把Kafka的流发送到spark。现在我可以把流发送到spark了。我想把这个流放在rdd中,所以我用createrdd()函数来创建rdd。但我在rdd只有Kafka的一些按摩师。因为它取决于偏移范围。所以请任何人告诉我如何在kafka spark createrdd()函数中设置offsetrange()。

c2e8gylq

c2e8gylq1#

只需在代码片段中使用

// Import dependencies and create kafka params as in Create Direct Stream

    val offsetRanges = Array(
      // topic, partition, inclusive starting offset, exclusive ending offset
      OffsetRange("test", 0, 0, 100),
      OffsetRange("test", 1, 0, 100)
    )

    val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)

spark kafka集成指南
据文件记载:皮斯帕克·Kafka
Kafka主题分区的第一组偏移范围使用

pyspark.streaming.kafka.OffsetRange(topic, partition, fromOffset, untilOffset)

初始化看起来像:

fromOffset = 0
untilOffset = 10
partition = 0
topic = 'topic'
offset = OffsetRange(topic, partition, fromOffset, untilOffset)
offsets = [offset]

然后你就可以创建你的rdd了

kafkaRDD = KafkaUtils.createRDD(sc, kafkaParams, offsets)
nzkunb0c

nzkunb0c2#

请查找Kafka偏移处理的代码段。

topicpartion = TopicAndPartition(var_topic_src_name, var_partition)
fromoffset = {topicpartion: var_offset}
print(fromoffset)

kvs = KafkaUtils.createDirectStream(ssc,\
                                   [var_topic_src_name],\
                                   var_kafka_parms_src,\
                                   valueDecoder=serializer.decode_message,\
                                   fromOffsets = fromoffset)

相关问题