结构化流度量理解

lokaqttq  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(243)

我对结构化流媒体还比较陌生,想更详细地了解spark的主要指标。
我在databricks中有一个结构化的流处理,它从一个eventhub读取事件,从这些事件读取值,创建一个新的df,并将这个新的df写入第二个eventhub。
来自第一个eventhub的事件是一个eventgrid事件,我从中读取一个url(当blob被添加到存储帐户时),在foreachbatch中,我创建一个新的df并将其写入第二个eventhub。
代码具有以下结构:

val streamingInputDF = 
  spark.readStream
    .format("eventhubs")
    .options(eventHubsConf.toMap)
    .load()
    .select(($"body").cast("string"))

def get_func( batchDF:DataFrame, batchID:Long ) : Unit = {

  batchDF.persist()
  for (row <- batchDF.rdd.collect) {   //necessary to read the file with spark.read....

              val file_url = "/mnt/" + path

              // create df from readed url
              val df = spark
                  .read
                  .option("rowTag", "Transaction")
                  .xml(file_url)

                if (!(df.rdd.isEmpty)){

                  // some filtering
                  val eh_df = df.select(col(...).as(...),
                  val eh_jsoned = eh_df.toJSON.withColumnRenamed("value", "body")

                  // write to Eventhub
                  eh_jsoned.select("body")
                      .write
                      .format("eventhubs")
                      .options(eventHubsConfWrite.toMap)    
                      .save()    
                }      
      } 

    batchDF.unpersist()

}

val query_test= streamingSelectDF
                .writeStream
                .queryName("query_test")
                .foreachBatch(get_func _)
                .start()

我试过添加 maxEventsPerTrigger(100) 参数,但这会增加很多时间,从数据到达存储帐户到数据在databricks中消耗。
的值 maxEventsPerTrigger 为了测试行为而随机设置。
看到这些指标后,批处理时间增加这么多,处理率和输入率相似,这有什么意义呢?

我应该考虑采用什么方法来改进流程?我在databricks7.5笔记本、spark3.0.1和scala2.12上运行它。
事先非常感谢大家。
注:
xml文件大小相同
第一个eventhub有20个分区
第一个eventhub的数据输入速率为2个事件/秒

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题