我有多个spark结构化流式处理作业,我看到的通常行为是,只有当kafka中有任何新的偏移量时,才会触发新的批处理,kafka用作创建流式处理查询的源。
但是当我运行这个示例时,它演示了使用 mapGroupsWithState
,然后我看到即使流源中没有新数据,也会触发新的批处理。为什么会这样,可以避免吗?
update-1我修改了上面的示例代码并删除了与状态相关的操作,比如更新/删除它。函数只输出零。但仍然是每10秒触发一次批处理,netcat服务器上没有任何新数据。
import java.sql.Timestamp
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming._
object Stateful {
def main(args: Array[String]): Unit = {
val host = "localhost"
val port = "9999"
val spark = SparkSession
.builder
.appName("StructuredSessionization")
.master("local[2]")
.getOrCreate()
import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to host:port
val lines = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.option("includeTimestamp", true)
.load()
// Split the lines into words, treat words as sessionId of events
val events = lines
.as[(String, Timestamp)]
.flatMap { case (line, timestamp) =>
line.split(" ").map(word => Event(sessionId = word, timestamp))
}
val sessionUpdates = events
.groupByKey(event => event.sessionId)
.mapGroupsWithState[SessionInfo, Int](GroupStateTimeout.ProcessingTimeTimeout) {
case (sessionId: String, events: Iterator[Event], state: GroupState[SessionInfo]) =>
0
}
val query = sessionUpdates
.writeStream
.outputMode("update")
.trigger(Trigger.ProcessingTime("10 seconds"))
.format("console")
.start()
query.awaitTermination()
}
}
case class Event(sessionId: String, timestamp: Timestamp)
case class SessionInfo(
numEvents: Int,
startTimestampMs: Long,
endTimestampMs: Long)
1条答案
按热度按时间nzrxty8p1#
出现空批的原因是mapgroupswithstate调用中使用了超时。
根据“learning spark 2.0”一书,它说:
“下一个微批处理将调用此超时键上的函数,即使该micro.batch中没有该键的数据。[…]由于超时是在微批处理过程中处理的,因此它们的执行时间不精确,并且在很大程度上取决于触发间隔[…]
因为你已经设置了超时时间
GroupStateTimeout.ProcessingTimeTimeout
它与查询的触发时间(10秒)一致。另一种方法是根据事件时间(即。GroupStateTimeout.EventTimeTimeout
).groupstate上的scaladocs提供了一些详细信息:
当某个组发生超时时,将为该组调用该函数(没有值),并且groupstate.hastinmedout()设置为true。