spark streaming |将不同的Dataframe写入synapse dw中的多个表

mznpcxlj  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(575)

我有多个Dataframe,这些Dataframe是从来自azure事件中心的一条json消息中提取出来的。我们希望使用spark streaming job将这些df推送到synapsedw中的各个表中。
这是我的模式-

  1. root
  2. |-- Name: string (nullable = true)
  3. |-- Salary: string (nullable = true)
  4. |-- EmpID: string (nullable = true)
  5. |-- Projects: struct (nullable = true)
  6. | |-- ProjectID: string (nullable = true)
  7. | |-- ProjectName: string (nullable = true)
  8. | |-- Duration: string (nullable = true)
  9. | |-- Location: array (nullable = true)
  10. | | |-- element: struct (containsNull = true)
  11. | | | |-- City: string (nullable = true)
  12. | | | |-- State: string (nullable = true)
  13. | |-- Contact: array (nullable = true)
  14. | | |-- element: struct (containsNull = true)
  15. | | | |-- Phone: string (nullable = true)
  16. | | | |-- email: string (nullable = true)

我从上述模式中提取了4个不同的Dataframe-
项目
位置
接触
雇员
它们都应该插入synapse的4个不同的表中

  1. ProjectDf.write.format("spark.sqldw").options(.dbo.Project).save(...)
  2. LocationDf.write.format("spark.sqldw").options(.dbo.Loc).save(...)
  3. ContactDf.write.format("spark.sqldw").options(.dbo.Contact).save(...)
  4. EmployeeDf.write.format("spark.sqldw").options(.dbo.Emp).save(...)

请建议如何在此基础上应用foreachbatch sink来插入表。

ccgok5k5

ccgok5k51#

如果计划基于单个输入流Dataframe编写四个不同的Dataframe,则可以使用 foreachBatch 按以下方式:

  1. streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  2. // as you plan to use the batchDF to create multiple output it might be wort persisting the batchDF
  3. batchDF.persist()
  4. // create the four different Dataframes based on the input
  5. val ProjectDf = batchDF.select(...)
  6. val LocationDf = batchDF.select(...)
  7. val ContactDf = batchDF.select(...)
  8. val EmployeeDf = batchDF.select(...)
  9. // then you can save those four Dataframes into the desired locations
  10. ProjectDf.write.format("spark.sqldw").options(.dbo.Project).save(...)
  11. LocationDf.write.format("spark.sqldw").options(.dbo.Loc).save(...)
  12. ContactDf.write.format("spark.sqldw").options(.dbo.Contact).save(...)
  13. EmployeeDf.write.format("spark.sqldw").options(.dbo.Emp).save(...)
  14. // do not forget to unpersist your batchDF
  15. batchDF.unpersist()
  16. }

有关使用foreach和foreachbatch的文档中对此进行了描述
如果遇到异常“overloaded method foreachbatch with alternatives”,可以查看databricks runtime 7.0的发行说明,其中说明:
要修复编译错误,请更改 foreachBatch { (df, id) => myFunc(df, id) }foreachBatch(myFunc _) 或者显式使用java api:foreachbatch(new voidfunction2…)
也就是说,您的代码如下所示:

  1. def myFunc(batchDF: DataFrame, batchId: Long): Unit = {
  2. // as you plan to use the batchDF to create multiple output it might be wort persisting the batchDF
  3. batchDF.persist()
  4. // create the four different Dataframes based on the input
  5. val ProjectDf = batchDF.select(...)
  6. val LocationDf = batchDF.select(...)
  7. val ContactDf = batchDF.select(...)
  8. val EmployeeDf = batchDF.select(...)
  9. // then you can save those four Dataframes into the desired locations
  10. ProjectDf.write.format("spark.sqldw").options(.dbo.Project).save(...)
  11. LocationDf.write.format("spark.sqldw").options(.dbo.Loc).save(...)
  12. ContactDf.write.format("spark.sqldw").options(.dbo.Contact).save(...)
  13. EmployeeDf.write.format("spark.sqldw").options(.dbo.Emp).save(...)
  14. // do not forget to unpersist your batchDF
  15. batchDF.unpersist()
  16. }
  17. streamingDF.writeStream.foreachBatch(myFunc _).[...].start()
展开查看全部

相关问题