我正在监视一个使用prometheus的spark流应用程序。这是我启动应用程序的方式: ./bin/spark-submit --master spark://127.0.0.1:7077 --deploy-mode cluster --driver-cores 4 --name "TestStreamCombineByKey" --conf "spark.driver.extraJavaOptions=-javaagent:/home/flink/spark-2.4.6-bin-hadoop2.7/jars/jmx_prometheus_javaagent-0.13.0.jar=8082:/home/flink/spark-2.4.6-bin-hadoop2.7/conf/spark.yml" /home/felipe/workspace-idea/explore-spark/target/scala-2.11/explore-spark_2.11-0.1.jar
根据这个答案我从这里得到的spark.yml文件。
然后我配置了/etc/prometheus/prometheus.yml文件来从spark中获取度量。
- job_name: "spark_streaming_app"
scrape_interval: "5s"
static_configs:
- targets: ['localhost:8082']
我可以在普罗米修斯的 Jmeter 盘上看到一些来自spark的指标。我还可以使用地址查看度量http://localhost:4040/metrics/json/
但是,我想监视spark应用程序的特定操作符的吞吐量。例如,进出数据库的记录数 source
, map
,和 combineByKey
转变。我该如何监控?
val sparkConf = new SparkConf()
.setAppName("TaxiRideCountCombineByKey")
.setMaster("local[4]")
val ssc = new StreamingContext(sparkConf, Seconds(1))
val stream = ssc.receiverStream(new TaxiRideSource())
val driverStream = stream.map(taxiRide => (taxiRide.driverId, 1))
val countStream = driverStream.combineByKey(
(v) => (v, 1), //createCombiner
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), //mergeValue
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2), // mergeCombiners
new HashPartitioner(3)
)
countStream.print()
暂无答案!
目前还没有任何答案,快来回答吧!