与Kafka星火蒸,无法接收所有数据

carvr3hs  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(186)

在我的spark流媒体程序中,我尝试从Kafka那里接收数据。在kafka producer中,我发送了100万条消息,但在spark streaming中,我无法接收所有消息。它总是会丢失一些信息。我用默认配置启动kafka服务器。这是我的制作人代码:

val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
for (i <- 1 to loop_times.toInt) {
   var cnt = 0
   while (cnt < record_count.toInt) {
        val message = new ProducerRecord[String, String](topic, null, "aaa")
        producer.send(message)
        cnt += 1
        if (cnt % 10000 == 0)
            println(s"send $cnt records")
    }
}
producer.close()

下面是我的spark流代码(这个代码在spark的示例中):

object KafkaWordCount {
def main(args: Array[String]) {
if (args.length < 4) {
  System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
  System.exit(1)
}

StreamingExamples.setStreamingLogLevels()

val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")

val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
  .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()

ssc.start()
ssc.awaitTermination()
}
}

我的spark版本是1.6,kafka版本是0.8.2.1

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题