spark streaming |将不同的Dataframe写入synapse dw中的多个表

mznpcxlj  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(486)

我有多个Dataframe,这些Dataframe是从来自azure事件中心的一条json消息中提取出来的。我们希望使用spark streaming job将这些df推送到synapsedw中的各个表中。
这是我的模式-

root
 |-- Name: string (nullable = true)
 |-- Salary: string (nullable = true)
 |-- EmpID: string (nullable = true)
 |-- Projects: struct (nullable = true)
 |    |-- ProjectID: string (nullable = true)
 |    |-- ProjectName: string (nullable = true)
 |    |-- Duration: string (nullable = true)
 |    |-- Location: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- City: string (nullable = true)
 |    |    |    |-- State: string (nullable = true)
 |    |-- Contact: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- Phone: string (nullable = true)
 |    |    |    |-- email: string (nullable = true)

我从上述模式中提取了4个不同的Dataframe-
项目
位置
接触
雇员
它们都应该插入synapse的4个不同的表中

ProjectDf.write.format("spark.sqldw").options(.dbo.Project).save(...)
LocationDf.write.format("spark.sqldw").options(.dbo.Loc).save(...)
ContactDf.write.format("spark.sqldw").options(.dbo.Contact).save(...)
EmployeeDf.write.format("spark.sqldw").options(.dbo.Emp).save(...)

请建议如何在此基础上应用foreachbatch sink来插入表。

ccgok5k5

ccgok5k51#

如果计划基于单个输入流Dataframe编写四个不同的Dataframe,则可以使用 foreachBatch 按以下方式:

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>

  // as you plan to use the batchDF to create multiple output it might be wort persisting the batchDF
  batchDF.persist()

  // create the four different Dataframes based on the input
  val ProjectDf = batchDF.select(...)
  val LocationDf = batchDF.select(...) 
  val ContactDf = batchDF.select(...)
  val EmployeeDf = batchDF.select(...)

  // then you can save those four Dataframes into the desired locations
  ProjectDf.write.format("spark.sqldw").options(.dbo.Project).save(...)
  LocationDf.write.format("spark.sqldw").options(.dbo.Loc).save(...)
  ContactDf.write.format("spark.sqldw").options(.dbo.Contact).save(...)
  EmployeeDf.write.format("spark.sqldw").options(.dbo.Emp).save(...)

  // do not forget to unpersist your batchDF
  batchDF.unpersist()
}

有关使用foreach和foreachbatch的文档中对此进行了描述
如果遇到异常“overloaded method foreachbatch with alternatives”,可以查看databricks runtime 7.0的发行说明,其中说明:
要修复编译错误,请更改 foreachBatch { (df, id) => myFunc(df, id) }foreachBatch(myFunc _) 或者显式使用java api:foreachbatch(new voidfunction2…)
也就是说,您的代码如下所示:

def myFunc(batchDF: DataFrame, batchId: Long): Unit = {
  // as you plan to use the batchDF to create multiple output it might be wort persisting the batchDF
  batchDF.persist()

  // create the four different Dataframes based on the input
  val ProjectDf = batchDF.select(...)
  val LocationDf = batchDF.select(...) 
  val ContactDf = batchDF.select(...)
  val EmployeeDf = batchDF.select(...)

  // then you can save those four Dataframes into the desired locations
  ProjectDf.write.format("spark.sqldw").options(.dbo.Project).save(...)
  LocationDf.write.format("spark.sqldw").options(.dbo.Loc).save(...)
  ContactDf.write.format("spark.sqldw").options(.dbo.Contact).save(...)
  EmployeeDf.write.format("spark.sqldw").options(.dbo.Emp).save(...)

  // do not forget to unpersist your batchDF
  batchDF.unpersist()
}

streamingDF.writeStream.foreachBatch(myFunc _).[...].start()

相关问题