我对spark流媒体和Kafka有意见。当运行一个示例程序来使用kafka主题并将微批处理结果输出到终端时,当我设置以下选项时,我的工作似乎挂起了: df.option("startingOffsets", "earliest")
从最新的胶印开始工作很好,当每一个微批流通过时,结果被打印到终端。
我在想也许这是一个资源问题——我试着从一个有大量数据的主题中阅读。但是,我似乎没有内存/cpu问题(使用本地[*]集群运行此作业)。这项工作似乎从来没有真正开始过,但只是悬在一线: 19/09/17 15:21:37 INFO Metadata: Cluster ID: JFXVL24JQ3K4CEbE-VA58A
```
val sc = new SparkConf().setMaster("local[*]").setAppName("spark-test")
val streamContext = new StreamingContext(sc, Seconds(1))
val spark = SparkSession.builder().appName("spark-test")
.getOrCreate()
val topic = "topic.with.alotta.data"
//subscribe tokafka
val df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
//write
df.writeStream
.outputMode("append")
.format("console")
.option("truncate", "false")
.start()
.awaitTermination()
我希望看到结果打印到控制台…但是,应用程序似乎就像我提到的那样挂起了。有什么想法吗?这感觉像是一个spark资源问题(因为我正在针对一个有大量数据的主题运行一个本地“集群”)。是否有一些关于流式Dataframe的性质我遗漏了?
1条答案
按热度按时间vlju58qv1#
写入控制台会导致在每个触发器的驱动程序内存中收集所有数据。由于当前没有限制批处理的大小,这意味着整个主题内容都在驱动程序中累积。看到了吗https://spark.apache.org/docs/2.4.3/structured-streaming-programming-guide.html#output-Flume
设置批处理大小的限制应该可以解决您的问题。尝试添加
maxOffsetsPerTrigger
读Kafka的时候。。。看到了吗https://spark.apache.org/docs/2.4.3/structured-streaming-kafka-integration.html 详情。