如何使用prometheus监控spark流中特定操作员的吞吐量?

iklwldmw  于 2021-05-29  发布在  Spark
关注(0)|答案(0)|浏览(478)

我正在监视一个使用prometheus的spark流应用程序。这是我启动应用程序的方式: ./bin/spark-submit --master spark://127.0.0.1:7077 --deploy-mode cluster --driver-cores 4 --name "TestStreamCombineByKey" --conf "spark.driver.extraJavaOptions=-javaagent:/home/flink/spark-2.4.6-bin-hadoop2.7/jars/jmx_prometheus_javaagent-0.13.0.jar=8082:/home/flink/spark-2.4.6-bin-hadoop2.7/conf/spark.yml" /home/felipe/workspace-idea/explore-spark/target/scala-2.11/explore-spark_2.11-0.1.jar 根据这个答案我从这里得到的spark.yml文件。
然后我配置了/etc/prometheus/prometheus.yml文件来从spark中获取度量。

- job_name: "spark_streaming_app"
    scrape_interval: "5s"
    static_configs:
      - targets: ['localhost:8082']

我可以在普罗米修斯的 Jmeter 盘上看到一些来自spark的指标。我还可以使用地址查看度量http://localhost:4040/metrics/json/

但是,我想监视spark应用程序的特定操作符的吞吐量。例如,进出数据库的记录数 source , map ,和 combineByKey 转变。我该如何监控?

val sparkConf = new SparkConf()
  .setAppName("TaxiRideCountCombineByKey")
  .setMaster("local[4]")
val ssc = new StreamingContext(sparkConf, Seconds(1))

val stream = ssc.receiverStream(new TaxiRideSource())
val driverStream = stream.map(taxiRide => (taxiRide.driverId, 1))
val countStream = driverStream.combineByKey(
  (v) => (v, 1), //createCombiner
  (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), //mergeValue
  (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2), // mergeCombiners
  new HashPartitioner(3)
)
countStream.print()

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题