如何使用spark temp table在databricks中使用select查询将数据插入表中

omvjsjqw  于 2021-05-18  发布在  Spark
关注(0)|答案(2)|浏览(785)

我想使用azuredatabricks中的sql将spark表的结果插入到一个新的sqlsynapse表中。
我试过下面的解释[https://docs.microsoft.com/en-us/azure/databricks/spark/latest/spark-sql/language-manual/sql-ref-syntax-ddl-create-table-datasource]但我运气不好。
synapse表必须作为select语句的结果创建。源应该是spark/data bricks临时视图或parquet源。
e、 g.温度表

  1. # Load Taxi Location Data from Azure Synapse Analytics
  2. jdbcUrl = "jdbc:sqlserver://synapsesqldbexample.database.windows.net:number;
  3. database=SynapseDW" #Replace "suffix" with your own
  4. connectionProperties = {
  5. "user" : "usernmae1",
  6. "password" : "password2",
  7. "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
  8. }
  9. pushdown_query = '(select * from NYC.TaxiLocationLookup) as t'
  10. dfLookupLocation = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
  11. dfLookupLocation.createOrReplaceTempView('NYCTaxiLocation')
  12. display(dfLookupLocation)

e、 g.源突触dw
服务器:synapsesqldbexample.database.windows.net
数据库:[synapsedw]
架构:[nyc]
表:[TaxLocationLookup]
接收器/目标表(尚未存在):
服务器:synapsesqldbexample.database.windows.net
数据库:[synapsedw]
架构:[nyc]
新表:[测试数据]
我尝试的sql语句:

  1. %sql
  2. CREATE TABLE if not exists TEST_NYCTaxiLocation
  3. select *
  4. from NYCTaxiLocation
  5. limit 100
nfg76nw0

nfg76nw01#

如果您使用com.databricks.spark.sqldw驱动程序,那么您将需要一个azure存储帐户和一个已经设置好的容器。一旦这是到位,实际上是很容易做到这一点。
在azuredatabricks中配置blob凭据,我采用在笔记本中的方法
创建jdbc连接字符串和blob
将select语句读入和rdd/dataframe
使用.write函数将Dataframe下推到azure synapse

配置blob凭据

spark.conf.set(“fs.azure.account.key..blob.core.windows.net”,“”)

配置jdbc和blob路径

jdbc=“jdbc:sqlserver://.database.windows。net:1433;数据库=;用户=@;密码=;encrypt=真;trustservercertificate=false;hostnameincertificate=*.database.windows.net;logintimeout=30;”blob=“wasbs://@.blob.core.windows.net/”

从synapse读取数据到dataframe

df=Spark读取
.format(“com.databricks.spark.sqldw”)
.option(“url”,jdbc)
.option(“tempdir”,blob)
.option(“forwardsparkazurestoragecredentials”,“true”)
.option(“query”,“select top 1000*from<>order by newid()”)
.load()

将Dataframe中的数据写入azure synapse

数据框写入
.format(“com.databricks.spark.sqldw”)
.option(“url”,jdbc)
.option(“forwardsparkazurestoragecredentials”,“true”)
.option(“dbtable”,“yourtablename”)
.option(“tempdir”,blob)
.mode(“覆盖”)
.save()

展开查看全部
uinbv5nw

uinbv5nw2#

除了@jpvoogt解决方案之外,另一个选择是在存储帐户中创建Parquet文件之后,在synapse池中使用cta。您可以执行copy命令或外部表。
一些参考资料:
https://docs.microsoft.com/en-us/azure/synapse-analytics/sql/develop-tables-cetashttpshttp://docs.microsoft.com/en-us/azure/synapse-analytics/sql-data-warehouse/quickstart-bulk-load-copy-tsql

相关问题