scala 阿萨中synapsesql的PySpark等价物

r1zhe5dt  于 2023-08-05  发布在  Scala
关注(0)|答案(2)|浏览(154)

在使用Azure Synapse Analytics(阿萨)的Spark notebook时,我可以使用Scala将CSV文件保存为专用SQL池中的表,只需两个简单的语句:

  1. %%spark
  2. // [1] - https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html
  3. // [2] - https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-introduction-abfs-uri
  4. val testDF = spark.read.format("csv").option("header", "true").load(
  5. "abfss://earytestfs@earytestsa.dfs.core.windows.net/TIOBE-azure-backup.csv"
  6. );
  7. // [3] - https://www.aizoo.info/post/dropping-a-sql-table-in-your-synapse-spark-notebooks
  8. // [4] - https://stackoverflow.com/questions/67907984/write-data-to-sql-dw-from-apache-spark-in-azure-synapse
  9. testDF.write.mode("overwrite").synapsesql("eary_dedicated_test_sql_pool.dbo.TIOBE_test");

字符串
不幸的是,上面的[3]似乎暗示PySpark不存在synapsesql函数。自从达伦上次更新他的帖子以来,这种情况有没有改变?

  • 注意:我不想为Apache Spark配置Azure Synapse Dedicated SQL Pool Connector。我的专用SQL池与我的Spark Pool在同一个工作空间中,所以我不觉得我应该为配置一堆身份验证选项而烦恼:*

https://learn.microsoft.com/en-us/azure/synapse-analytics/spark/synapse-spark-sql-pool-import-export?tabs=scala%2Cscala1%2Cscala2%2Cscala3%2Cscala4%2Cscala5

EDIT:下面的pyspark代码给我
“属性错误:'DataFrameWriter'对象没有属性'synapsesql'“

7号线

  1. %%pyspark
  2. df = spark.read.load('abfss://earytestfs@earytestsa.dfs.core.windows.net/TIOBE-azure-backup.csv', format='csv'
  3. ## If header exists uncomment line below
  4. , header=True
  5. )
  6. # [5] - https://stackoverflow.com/questions/69720753/write-dataframe-to-sql-dedicated-database-using-synapse-analytics
  7. df.write.mode("overwrite").synapsesql("eary_dedicated_test_sql_pool.dbo.TIOBE_test")

luaexgnf

luaexgnf1#

Python对synapsesql的支持已经有一年了。只需按照docs添加导入即可:

  1. # Add required imports
  2. import com.microsoft.spark.sqlanalytics
  3. from com.microsoft.spark.sqlanalytics.Constants import Constants
  4. from pyspark.sql.functions import col
  5. # Get the table with synapsesql method and expose as temp view
  6. df = spark.read.synapsesql("sandpit_ded.dbo.nation")
  7. df.createOrReplaceTempView("vw_nation")

字符串
Python Notebook using synapsesql method

7y4bm7vi

7y4bm7vi2#

下面的代码将帮助您将CSV文件保存为专用SQL池中的表:

  1. from pyspark.sql import SparkSession
  2. spark = SparkSession.builder \
  3. .appName("Write CSV to Dedicated SQL Pool") \
  4. .getOrCreate()
  5. server = "azuresynapseaug02.sql.azuresynapse.net"
  6. database = "dedpoolaug02"
  7. user = "sqladminuser"
  8. password = "Welcome@1"
  9. jdbcUrl = f"jdbc:sqlserver://{server};database={database};user={user};password={password}"
  10. tableName = "dbo.Employees_nex_gen"
  11. csv_file_path = "abfss://rawcontainerf1@synapsestorageaug02.dfs.core.windows.net/empcsv.csv"
  12. df = spark.read.csv(csv_file_path, header=True, inferSchema=True)
  13. mapped_df = df.selectExpr("emp_id AS emp_id", "emp_name AS emp_name", "emp_salary AS emp_salary", "emp_department AS emp_department")
  14. mapped_df.write \
  15. .format("jdbc") \
  16. .option("url", jdbcUrl) \
  17. .option("dbtable", tableName) \
  18. .option("createTableColumnTypes", "emp_id INT, emp_name VARCHAR(50), emp_salary INT, emp_department VARCHAR(100)") \
  19. .option("truncate", "true") \
  20. .option("numPartitions", "4") \
  21. .mode("overwrite") \
  22. .save()

字符串

定义SQL池连接详情:

  1. server = "xxxxxxxxxxxxxxxxxxxxxx"
  2. database = "xxxxxxxxxxxxxx"
  3. user = "xxxxxxxxxxxxxxxxxxx"
  4. password = "xxxxxxxxxxx"

  • 该代码设置Azure Synapse Analytics专用SQL池的连接详细信息。
  • server变量保存服务器名称,database保存数据库名称,user和password保存身份验证和连接到SQL池的凭据。将DataFrame写入SQL池
    **将DataFrame写入SQL池:**使用write方法将mapped_df DataFrame写入Azure Synapse Analytics专用SQL池。格式被指定为“jdbc”以指示写入JDBC数据源。
  1. mapped_df.write \
  2. .format("jdbc") \
  3. .option("url", jdbcUrl) \
  4. .option("dbtable", tableName) \
  5. .option("createTableColumnTypes", "emp_id INT, emp_name VARCHAR(50), emp_salary INT, emp_department VARCHAR(100)") \
  6. .option("truncate", "true") \
  7. .option("numPartitions", "4") \
  8. .mode("overwrite") \
  9. .save()

输出:

展开查看全部

相关问题