使用jdbc从spark中的rdbms增量和并行读取

eoigrqb6  于 2021-06-03  发布在  Sqoop
关注(0)|答案(1)|浏览(597)

我正在从事一个项目,涉及使用jdbc从rdbms读取数据,我成功地读取了数据。这是我每周都会经常做的事情。因此,我一直在尝试想出一种方法来确保在初始读取之后,后续的读取应该只提取更新的记录,而不是从表中提取整个数据。我可以通过指定三个参数来使用sqoop增量导入来实现这一点 (--check-column, --incremental last-modified/append 以及 --last-value) . 但是,我不想使用sqoop来实现这个。有没有办法用scala在spark中复制相同的功能?
其次,有些表没有唯一的列可以用作 partitionColumn ,因此我考虑使用一个row number函数向这些表中添加一个唯一的列,然后获得 MIN 以及 MAX 唯一列的 lowerBound 以及 upperBound 分别。我现在面临的挑战是如何将这些值动态解析到read语句中,如下所示:

val queryNum = "select a1.*, row_number() over (order by sales) as row_nums from (select * from schema.table) a1"

val df = spark.read.format("jdbc").
option("driver", driver).
option("url",url ).
option("partitionColumn",row_nums).
option("lowerBound", min(row_nums)).
option("upperBound", max(row_nums)).
option("numPartitions", some value).
option("fetchsize",some value).
option("dbtable", queryNum).
option("user", user).
option("password",password).
load()

我知道上面的代码是不正确的,可能遗漏了很多过程,但我想它将给出我在这里尝试实现的总体概述。

jm81lzqq

jm81lzqq1#

在spark中处理增量jdbc读取非常复杂。imho,它严重限制了构建许多应用程序的易用性,如果sqoop正在做这项工作,那么可能不值得您费心。
然而,这是可行的。有关使用dbtable选项的示例,请参见此线程:
apachespark选择所有行
要保持此作业的幂等性,您需要直接从加载所有数据文件或通过每次写入的日志文件读取先前输出的max行。如果您的数据文件是海量的,您可能需要使用日志文件,如果较小,您可能需要加载。

相关问题