如何使用spark temp table在databricks中使用select查询将数据插入表中

omvjsjqw  于 2021-05-18  发布在  Spark
关注(0)|答案(2)|浏览(698)

我想使用azuredatabricks中的sql将spark表的结果插入到一个新的sqlsynapse表中。
我试过下面的解释[https://docs.microsoft.com/en-us/azure/databricks/spark/latest/spark-sql/language-manual/sql-ref-syntax-ddl-create-table-datasource]但我运气不好。
synapse表必须作为select语句的结果创建。源应该是spark/data bricks临时视图或parquet源。
e、 g.温度表


# Load Taxi Location Data from Azure Synapse Analytics

        jdbcUrl = "jdbc:sqlserver://synapsesqldbexample.database.windows.net:number;
database=SynapseDW" #Replace "suffix" with your own  
        connectionProperties = {
          "user" : "usernmae1",
          "password" : "password2",
          "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
        }

        pushdown_query = '(select * from NYC.TaxiLocationLookup) as t'
        dfLookupLocation = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)

        dfLookupLocation.createOrReplaceTempView('NYCTaxiLocation')

        display(dfLookupLocation)

e、 g.源突触dw
服务器:synapsesqldbexample.database.windows.net
数据库:[synapsedw]
架构:[nyc]
表:[TaxLocationLookup]
接收器/目标表(尚未存在):
服务器:synapsesqldbexample.database.windows.net
数据库:[synapsedw]
架构:[nyc]
新表:[测试数据]
我尝试的sql语句:

%sql
CREATE TABLE if not exists TEST_NYCTaxiLocation 
select *
from NYCTaxiLocation
limit 100
nfg76nw0

nfg76nw01#

如果您使用com.databricks.spark.sqldw驱动程序,那么您将需要一个azure存储帐户和一个已经设置好的容器。一旦这是到位,实际上是很容易做到这一点。
在azuredatabricks中配置blob凭据,我采用在笔记本中的方法
创建jdbc连接字符串和blob
将select语句读入和rdd/dataframe
使用.write函数将Dataframe下推到azure synapse

配置blob凭据

spark.conf.set(“fs.azure.account.key..blob.core.windows.net”,“”)

配置jdbc和blob路径

jdbc=“jdbc:sqlserver://.database.windows。net:1433;数据库=;用户=@;密码=;encrypt=真;trustservercertificate=false;hostnameincertificate=*.database.windows.net;logintimeout=30;”blob=“wasbs://@.blob.core.windows.net/”

从synapse读取数据到dataframe

df=Spark读取
.format(“com.databricks.spark.sqldw”)
.option(“url”,jdbc)
.option(“tempdir”,blob)
.option(“forwardsparkazurestoragecredentials”,“true”)
.option(“query”,“select top 1000*from<>order by newid()”)
.load()

将Dataframe中的数据写入azure synapse

数据框写入
.format(“com.databricks.spark.sqldw”)
.option(“url”,jdbc)
.option(“forwardsparkazurestoragecredentials”,“true”)
.option(“dbtable”,“yourtablename”)
.option(“tempdir”,blob)
.mode(“覆盖”)
.save()

uinbv5nw

uinbv5nw2#

除了@jpvoogt解决方案之外,另一个选择是在存储帐户中创建Parquet文件之后,在synapse池中使用cta。您可以执行copy命令或外部表。
一些参考资料:
https://docs.microsoft.com/en-us/azure/synapse-analytics/sql/develop-tables-cetashttpshttp://docs.microsoft.com/en-us/azure/synapse-analytics/sql-data-warehouse/quickstart-bulk-load-copy-tsql

相关问题