我对结构化流媒体还比较陌生,想更详细地了解spark的主要指标。
我在databricks中有一个结构化的流处理,它从一个eventhub读取事件,从这些事件读取值,创建一个新的df,并将这个新的df写入第二个eventhub。
来自第一个eventhub的事件是一个eventgrid事件,我从中读取一个url(当blob被添加到存储帐户时),在foreachbatch中,我创建一个新的df并将其写入第二个eventhub。
代码具有以下结构:
val streamingInputDF =
spark.readStream
.format("eventhubs")
.options(eventHubsConf.toMap)
.load()
.select(($"body").cast("string"))
def get_func( batchDF:DataFrame, batchID:Long ) : Unit = {
batchDF.persist()
for (row <- batchDF.rdd.collect) { //necessary to read the file with spark.read....
val file_url = "/mnt/" + path
// create df from readed url
val df = spark
.read
.option("rowTag", "Transaction")
.xml(file_url)
if (!(df.rdd.isEmpty)){
// some filtering
val eh_df = df.select(col(...).as(...),
val eh_jsoned = eh_df.toJSON.withColumnRenamed("value", "body")
// write to Eventhub
eh_jsoned.select("body")
.write
.format("eventhubs")
.options(eventHubsConfWrite.toMap)
.save()
}
}
batchDF.unpersist()
}
val query_test= streamingSelectDF
.writeStream
.queryName("query_test")
.foreachBatch(get_func _)
.start()
我试过添加 maxEventsPerTrigger(100)
参数,但这会增加很多时间,从数据到达存储帐户到数据在databricks中消耗。
的值 maxEventsPerTrigger
为了测试行为而随机设置。
看到这些指标后,批处理时间增加这么多,处理率和输入率相似,这有什么意义呢?
我应该考虑采用什么方法来改进流程?我在databricks7.5笔记本、spark3.0.1和scala2.12上运行它。
事先非常感谢大家。
注:
xml文件大小相同
第一个eventhub有20个分区
第一个eventhub的数据输入速率为2个事件/秒
暂无答案!
目前还没有任何答案,快来回答吧!