这个问题的目的是记录:在pyspark中使用jdbc连接读写数据所需的步骤jdbc源代码和已知解决方案可能存在的问题只要稍作改动,这些方法就可以与其他受支持的语言(包括scala和r)一起使用。
nszi6y051#
请参阅此链接下载postgres的jdbc,并按照步骤下载jar文件https://jaceklaskowski.gitbooks.io/mastering-apache-spark/exercises/spark-exercise-dataframe-jdbc-postgresql.html jar文件将在如下路径中下载。“/home/anand/.ivy2/jars/org.postgresql\u postgresql-42.1.1.jar“如果你的spark版本是2
from pyspark.sql import SparkSession spark = SparkSession.builder .appName("sparkanalysis") .config("spark.driver.extraClassPath", "/home/anand/.ivy2/jars/org.postgresql_postgresql42.1.1.jar") .getOrCreate() //for localhost database// pgDF = spark.read \ .format("jdbc") \ .option("url", "jdbc:postgresql:postgres") \ .option("dbtable", "public.user_emp_tab") \ .option("user", "postgres") \ .option("password", "Jonsnow@100") \ .load() print(pgDF) pgDF.filter(pgDF["user_id"]>5).show()
将文件另存为python并运行“python respectivefilename.py”
jbose2ul2#
下载mysql连接器java驱动程序并保存在spark jar文件夹中,观察下面的python代码将数据写入“acotr1”,我们必须在mysql数据库中创建acotr1表结构
spark = SparkSession.builder.appName("prasadad").master('local').config('spark.driver.extraClassPath','D:\spark-2.1.0-bin-hadoop2.7\jars\mysql-connector-java-5.1.41-bin.jar').getOrCreate() sc = spark.sparkContext from pyspark.sql import SQLContext sqlContext = SQLContext(sc) df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/sakila",driver="com.mysql.jdbc.Driver",dbtable="actor",user="root",password="****").load() mysql_url="jdbc:mysql://localhost:3306/sakila?user=root&password=****" df.write.jdbc(mysql_url,table="actor1",mode="append")
ubbxdtey3#
提交应用程序或启动shell时,请包含适用的jdbc驱动程序。例如,您可以使用 --packages :
--packages
bin/pyspark --packages group:name:version
或合并 driver-class-path 以及 jars ```bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR
driver-class-path
jars
也可以使用设置这些属性 `PYSPARK_SUBMIT_ARGS` jvm示例启动或使用前的环境变量 `conf/spark-defaults.conf` 设置 `spark.jars.packages` 或者 `spark.jars` / `spark.driver.extraClassPath` . 选择所需模式。spark jdbc writer支持以下模式: `append` :追加此:类的内容: `DataFrame` 到现有数据。 `overwrite` :覆盖现有数据。 `ignore` :如果数据已存在,则静默忽略此操作。 `error` (默认情况):如果数据已经存在,则抛出异常。 不支持upserts或其他细粒度修改
mode = ...
准备jdbc uri,例如:
url = "jdbc:postgresql://localhost/foobar"
(可选)创建jdbc参数字典。
properties = {"user": "foo","password": "bar"}`properties` / `options` 也可用于设置支持的jdbc连接属性。 使用 `DataFrame.write.jdbc`df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
`properties` / `options` 也可用于设置支持的jdbc连接属性。 使用 `DataFrame.write.jdbc`
保存数据(请参见 `pyspark.sql.DataFrameWriter` 详细信息)。 已知问题: 当驱动程序已包含在使用中时,找不到合适的驱动程序 `--packages` ( `java.sql.SQLException: No suitable driver found for jdbc: ...` ) 假设没有驱动程序版本不匹配的问题,可以添加 `driver` 类到 `properties` . 例如:
properties = {..."driver": "org.postgresql.Driver"}
使用 `df.write.format("jdbc").options(...).save()` 可能导致: java.lang.runtimeexception:org.apache.spark.sql.execution.datasources.jdbc.defaultsource不允许将create table作为select。 解决方案未知。 在pyspark 1.3中,您可以尝试直接调用java方法:
df._jdf.insertIntoJDBC(url, "baz", True)
### 读取数据 按照步骤1-4写入数据 使用 `sqlContext.read.jdbc` :
sqlContext.read.jdbc(url=url, table="baz", properties=properties)
或者 `sqlContext.read.format("jdbc")` :
(sqlContext.read.format("jdbc").options(url=url, dbtable="baz",**properties).load())
已知问题和问题: 找不到合适的驱动程序-请参阅:写入数据 sparksql支持jdbc源的 predicate 下推,尽管不是所有的 predicate 都可以下推。它也不委托限制或聚合。可能的解决方法是替换 `dbtable` / `table` 具有有效子查询的参数。参见示例: spark predicate 下推是否适用于jdbc? 执行pyspark.sql.dataframe.take超过一小时(4) 如何使用sql查询在dbtable中定义表? 默认情况下,jdbc数据源使用单个执行器线程按顺序加载数据。为确保分布式数据加载,您可以: 提供分区 `column` (必须是 `IntegeType` ), `lowerBound` , `upperBound` , `numPartitions` . 提供互斥 predicate 的列表 `predicates` ,每个所需分区一个。 请参见: 通过jdbc读取rdbms时在spark中进行分区, 从jdbc源迁移数据时如何优化分区?, 如何使用dataframe和jdbc连接提高慢速spark作业的性能? 使用jdbc导入postgres时如何划分sparkrdd? 在分布式模式下(使用分区列或 predicate ),每个执行器在自己的事务中操作。如果同时修改源数据库,则不能保证最终视图是一致的。 ### 在哪里可以找到合适的司机: maven存储库(获取 `--packages` 选择所需的版本并从窗体中的渐变选项卡复制数据 `compile-group:name:version` 替换相应字段)或maven中央存储库: postgresql语言 mysql数据库 ### 其他选择 根据数据库的不同,可能存在专门的源,并且在某些情况下首选: greenplum-关键的greenplumSpark连接器 apache phoenix-apache spark插件 microsoft sql server-用于azure sql数据库和sql server的spark连接器 amazonredshift-databricks-redshift连接器(当前版本仅在专有databricks运行时提供)。停止使用开源版本,可在github上获得)。
3条答案
按热度按时间nszi6y051#
请参阅此链接下载postgres的jdbc,并按照步骤下载jar文件
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/exercises/spark-exercise-dataframe-jdbc-postgresql.html jar文件将在如下路径中下载。“/home/anand/.ivy2/jars/org.postgresql\u postgresql-42.1.1.jar“
如果你的spark版本是2
将文件另存为python并运行“python respectivefilename.py”
jbose2ul2#
下载mysql连接器java驱动程序并保存在spark jar文件夹中,观察下面的python代码将数据写入“acotr1”,我们必须在mysql数据库中创建acotr1表结构
ubbxdtey3#
写入数据
提交应用程序或启动shell时,请包含适用的jdbc驱动程序。例如,您可以使用
--packages
:或合并
driver-class-path
以及jars
```bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR
mode = ...
You can encode credentials in URI or pass
separately using properties argument
of jdbc method or options
url = "jdbc:postgresql://localhost/foobar"
properties = {
"user": "foo",
"password": "bar"
}
`properties` / `options` 也可用于设置支持的jdbc连接属性。 使用 `DataFrame.write.jdbc`
df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
properties = {
...
"driver": "org.postgresql.Driver"
}
df._jdf.insertIntoJDBC(url, "baz", True)
sqlContext.read.jdbc(url=url, table="baz", properties=properties)
(sqlContext.read.format("jdbc")
.options(url=url, dbtable="baz",**properties)
.load())