我有多个Dataframe,这些Dataframe是从来自azure事件中心的一条json消息中提取出来的。我们希望使用spark streaming job将这些df推送到synapsedw中的各个表中。
这是我的模式-
root
|-- Name: string (nullable = true)
|-- Salary: string (nullable = true)
|-- EmpID: string (nullable = true)
|-- Projects: struct (nullable = true)
| |-- ProjectID: string (nullable = true)
| |-- ProjectName: string (nullable = true)
| |-- Duration: string (nullable = true)
| |-- Location: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- City: string (nullable = true)
| | | |-- State: string (nullable = true)
| |-- Contact: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- Phone: string (nullable = true)
| | | |-- email: string (nullable = true)
我从上述模式中提取了4个不同的Dataframe-
项目
位置
接触
雇员
它们都应该插入synapse的4个不同的表中
ProjectDf.write.format("spark.sqldw").options(.dbo.Project).save(...)
LocationDf.write.format("spark.sqldw").options(.dbo.Loc).save(...)
ContactDf.write.format("spark.sqldw").options(.dbo.Contact).save(...)
EmployeeDf.write.format("spark.sqldw").options(.dbo.Emp).save(...)
请建议如何在此基础上应用foreachbatch sink来插入表。
1条答案
按热度按时间ccgok5k51#
如果计划基于单个输入流Dataframe编写四个不同的Dataframe,则可以使用
foreachBatch
按以下方式:有关使用foreach和foreachbatch的文档中对此进行了描述
如果遇到异常“overloaded method foreachbatch with alternatives”,可以查看databricks runtime 7.0的发行说明,其中说明:
要修复编译错误,请更改
foreachBatch { (df, id) => myFunc(df, id) }
至foreachBatch(myFunc _)
或者显式使用java api:foreachbatch(new voidfunction2…)也就是说,您的代码如下所示: