尝试将“org.apache.spark.sql.dataframe”对象转换为dataframe,结果在DataRicks中出现错误“name'dataframe'is not defined”

u3r8eeie  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(920)

我试图通过databricks中的jdbc连接查询sql数据库,并将查询结果存储为一个dataframe。我在网上找到的所有方法都涉及到将其存储为一种spark对象,首先使用scala代码,然后将其转换为pandas。我试过一号牢房:

%scala
val df_table1 = sqlContext.read.format("jdbc").options(Map(
    ("url" -> "jdbc:sqlserver://myserver.database.windows.net:1433;database=mydb"),
    ("dbtable" -> "(select top 10 * from myschema.table) as table"),
    ("user" -> "user"),
    ("password" -> "password123"),
    ("driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver"))
).load()

结果是:

df_table1: org.apache.spark.sql.DataFrame = [var1: int, var2: string ... 50 more fields]

太好了!但是当我试着把它转换成细胞2中的df,这样我就可以使用它了:

import numpy as np
import pandas as pd 

result_pdf = df_table1.select("*").toPandas()

print(result_pdf)

它生成错误消息:

NameError: name 'df_table1' is not defined

我如何成功地将这个对象转换成一个pandas dataframe,或者是否有任何方法可以使用python代码通过jdbc连接查询sql数据库而不需要使用scala(我并不特别喜欢scala语法,如果可能的话,我宁愿避免使用它)?

vcudknz3

vcudknz31#

我假设您打算使用python查询sql,如果是这样的话,下面的查询就可以了。

%python
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
conf = SparkConf()
conf.setMaster("local").setAppName("My app")
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)
database = "YourDBName"
table = "[dbo].[YourTabelName]"
user = "SqlUser"
password  = "SqlPassword"

DF1 = spark.read.format("jdbc") \
    .option("url", f"jdbc:sqlserver://YourAzureSql.database.windows.net:1433;databaseName={database};") \
    .option("dbtable", table) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .load()
DF1.show()

table = "[dbo].[someOthertable]"

DF2 = spark.read.format("jdbc") \
    .option("url", f"jdbc:sqlserver://YourAzureSql.database.windows.net:1433;databaseName={database};") \
    .option("dbtable", table) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .load()
DF2.show()

Finaldf = DF1.join(DF2,(DF1.Prop_0 == DF2.prop_0),how="inner").select(DF1.Prop_0,DF1.Prop_1,DF2.Address)
Finaldf.show()

相关问题