source.getbatch和schema对qe.analysed.schema?

brtdzjyr  于 2021-07-09  发布在  Spark
关注(0)|答案(0)|浏览(190)

我一直在为spark结构化流媒体开发一个带有源和汇的数据源。
我有一个关于source.getbatch的问题:

def getBatch(start: Option[Offset], end: Offset): DataFrame
``` `getBatch` 返回偏移量之间的流Dataframe,因此习惯用法(?)具有如下代码:

val relation = new MyRelation(...)(sparkSession)
val plan = LogicalRelation(relation, isStreaming = true)
new Dataset[Row](sparkSession, plan, RowEncoder(schema))

注意schema的使用,它是 `Source` 抽象:

def schema: StructType

这就是我问题的“源头”。在streaming sink/source.getbatch中,上述操作是否正常?
因为没有可以更改属性(模式)的临时操作符,所以我认为这是可以的。
我看到了下面的代码,这让我怀疑它是否比上面的解决方案更好:

val relation = new MyRelation(...)(sparkSession)
val plan = LogicalRelation(relation, isStreaming = true)

// When would we have to execute plan?
val qe = sparkSession.sessionState.executePlan(plan)
new Dataset[Row](sparkSession, plan, RowEncoder(qe.analyzed.schema))

我们什么时候用 `qe.analyzed.schema` vs简单 `schema` ? 这个可以吗 `qe.analyzed.schema` 有助于避免一些边缘情况,是首选方法吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题