如何使用jdbc源代码在(py)spark中写入和读取数据?

kg7wmglp  于 2021-06-26  发布在  Hive
关注(0)|答案(3)|浏览(456)

这个问题的目的是记录:
在pyspark中使用jdbc连接读写数据所需的步骤
jdbc源代码和已知解决方案可能存在的问题
只要稍作改动,这些方法就可以与其他受支持的语言(包括scala和r)一起使用。

nszi6y05

nszi6y051#

请参阅此链接下载postgres的jdbc,并按照步骤下载jar文件
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/exercises/spark-exercise-dataframe-jdbc-postgresql.html jar文件将在如下路径中下载。“/home/anand/.ivy2/jars/org.postgresql\u postgresql-42.1.1.jar“
如果你的spark版本是2

from pyspark.sql import SparkSession

spark = SparkSession.builder
        .appName("sparkanalysis")
        .config("spark.driver.extraClassPath",
         "/home/anand/.ivy2/jars/org.postgresql_postgresql42.1.1.jar")
        .getOrCreate()

//for localhost database//

pgDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:postgres") \
.option("dbtable", "public.user_emp_tab") \
.option("user", "postgres") \
.option("password", "Jonsnow@100") \
.load()

print(pgDF)

pgDF.filter(pgDF["user_id"]>5).show()

将文件另存为python并运行“python respectivefilename.py”

jbose2ul

jbose2ul2#

下载mysql连接器java驱动程序并保存在spark jar文件夹中,观察下面的python代码将数据写入“acotr1”,我们必须在mysql数据库中创建acotr1表结构

spark = SparkSession.builder.appName("prasadad").master('local').config('spark.driver.extraClassPath','D:\spark-2.1.0-bin-hadoop2.7\jars\mysql-connector-java-5.1.41-bin.jar').getOrCreate()

    sc = spark.sparkContext

    from pyspark.sql import SQLContext

    sqlContext = SQLContext(sc)

    df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/sakila",driver="com.mysql.jdbc.Driver",dbtable="actor",user="root",password="****").load()

    mysql_url="jdbc:mysql://localhost:3306/sakila?user=root&password=****"

    df.write.jdbc(mysql_url,table="actor1",mode="append")
ubbxdtey

ubbxdtey3#

写入数据

提交应用程序或启动shell时,请包含适用的jdbc驱动程序。例如,您可以使用 --packages :

bin/pyspark --packages group:name:version

或合并 driver-class-path 以及 jars ```
bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR

也可以使用设置这些属性 `PYSPARK_SUBMIT_ARGS` jvm示例启动或使用前的环境变量 `conf/spark-defaults.conf` 设置 `spark.jars.packages` 或者 `spark.jars` /  `spark.driver.extraClassPath` .
选择所需模式。spark jdbc writer支持以下模式: `append` :追加此:类的内容: `DataFrame` 到现有数据。 `overwrite` :覆盖现有数据。 `ignore` :如果数据已存在,则静默忽略此操作。 `error` (默认情况):如果数据已经存在,则抛出异常。
不支持upserts或其他细粒度修改

mode = ...

准备jdbc uri,例如:

You can encode credentials in URI or pass

separately using properties argument

of jdbc method or options

url = "jdbc:postgresql://localhost/foobar"

(可选)创建jdbc参数字典。

properties = {
"user": "foo",
"password": "bar"
}
`properties` / `options` 也可用于设置支持的jdbc连接属性。 使用 `DataFrame.write.jdbc`
df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)

保存数据(请参见 `pyspark.sql.DataFrameWriter` 详细信息)。
已知问题:
当驱动程序已包含在使用中时,找不到合适的驱动程序 `--packages` ( `java.sql.SQLException: No suitable driver found for jdbc: ...` )
假设没有驱动程序版本不匹配的问题,可以添加 `driver` 类到 `properties` . 例如:

properties = {
...
"driver": "org.postgresql.Driver"
}

使用 `df.write.format("jdbc").options(...).save()` 可能导致:
java.lang.runtimeexception:org.apache.spark.sql.execution.datasources.jdbc.defaultsource不允许将create table作为select。
解决方案未知。
在pyspark 1.3中,您可以尝试直接调用java方法:

df._jdf.insertIntoJDBC(url, "baz", True)


### 读取数据

按照步骤1-4写入数据
使用 `sqlContext.read.jdbc` :

sqlContext.read.jdbc(url=url, table="baz", properties=properties)

或者 `sqlContext.read.format("jdbc")` :

(sqlContext.read.format("jdbc")
.options(url=url, dbtable="baz",**properties)
.load())

已知问题和问题:
找不到合适的驱动程序-请参阅:写入数据
sparksql支持jdbc源的 predicate 下推,尽管不是所有的 predicate 都可以下推。它也不委托限制或聚合。可能的解决方法是替换 `dbtable` /  `table` 具有有效子查询的参数。参见示例:
spark predicate 下推是否适用于jdbc?
执行pyspark.sql.dataframe.take超过一小时(4)
如何使用sql查询在dbtable中定义表?
默认情况下,jdbc数据源使用单个执行器线程按顺序加载数据。为确保分布式数据加载,您可以:
提供分区 `column` (必须是 `IntegeType` ),  `lowerBound` ,  `upperBound` ,  `numPartitions` .
提供互斥 predicate 的列表 `predicates` ,每个所需分区一个。
请参见:
通过jdbc读取rdbms时在spark中进行分区,
从jdbc源迁移数据时如何优化分区?,
如何使用dataframe和jdbc连接提高慢速spark作业的性能?
使用jdbc导入postgres时如何划分sparkrdd?
在分布式模式下(使用分区列或 predicate ),每个执行器在自己的事务中操作。如果同时修改源数据库,则不能保证最终视图是一致的。

### 在哪里可以找到合适的司机:

maven存储库(获取 `--packages` 选择所需的版本并从窗体中的渐变选项卡复制数据 `compile-group:name:version` 替换相应字段)或maven中央存储库:
postgresql语言
mysql数据库

### 其他选择

根据数据库的不同,可能存在专门的源,并且在某些情况下首选:
greenplum-关键的greenplumSpark连接器
apache phoenix-apache spark插件
microsoft sql server-用于azure sql数据库和sql server的spark连接器
amazonredshift-databricks-redshift连接器(当前版本仅在专有databricks运行时提供)。停止使用开源版本,可在github上获得)。

相关问题