dstrream[string].foreachrdd在Spark簇上

u5rb5r59  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(271)

我是新手,有人能帮我吗?

def streamStart() {
val sparkConf = new SparkConf().setAppName("kafkaStreamingNew!!").setMaster("spark://husnain:7077").setJars(Array("/home/husnain/Downloads/ScalaWorkspace/KafkaStreaming/target/KafkaStreaming-1.1.0-jar-with-dependencies.jar")) //,"/home/husnain/.m2/repository/org/apache/spark/spark-streaming-kafka_2.10/1.4.1/spark-streaming-kafka_2.10-1.4.1.jar" , "/home/husnain/.m2/repository/org/apache/spark/spark-streaming_2.10/1.4.1/spark-streaming_2.10-1.4.1.jar" ,"/home/husnain/.m2/repository/org/apache/spark/spark-core_2.10/1.4.1/spark-core_2.10-1.4.1.jar" ))
val ssc = new StreamingContext(sparkConf, Seconds(1))

val topics = "test";
ssc.checkpoint("checkpoint")
val lines = KafkaUtils.createStream(ssc, "localhost:2181", "spark", Map("test" -> 1)).map(_._2)
lines.print()
println("*****************************************************************************")
lines.foreachRDD(
  iter => iter.foreach(
    x => println(x + "\n***-------------------------------------------------------***\n")))
println("-----------------------------------------------------------------------------")
ssc.start()
ssc.awaitTermination()

在spark独立群集上,该代码不起作用,但在本地[*]上,它可以正常工作:

lines.foreachRDD(
  iter => iter.foreach(
    x => println(x + "\n***-------------------------------------------------------***\n")
    )
   )
ocebsuys

ocebsuys1#

我假设它被称为“正常工作”是因为你看到了 println 在控制台上。
当您向集群提交相同的代码时 println 对控制台的调用在每个执行器上本地发生,因此如果其他所有操作都在工作,那么缺少输出仅仅是分布式执行的结果。
在执行器的输出中查找 println s

相关问题