我们如何在sqoop中自动化增量导入?

aurhwmvo  于 2021-05-29  发布在  Hadoop
关注(0)|答案(4)|浏览(457)

如何在sqoop中自动化增量导入?
在增量导入中,我们需要给出 --last-value 从最后一个值开始导入,但我的工作是经常从rdbms导入,我不想手动给出最后一个值,有什么方法可以使这个过程自动化吗?

rhfm7lfc

rhfm7lfc1#

一种方法是:
在数据库中创建日志表并开发增量导入,如下所示

Query the log table using sqoop eval command with the last value from last run
Run the sqoop import
Update the log table with the latest valueusing sqoop eval command

你需要自动化 sqoop eval , sqoop import 以及 sqoop eval . 您可以将任何有效的查询提交到使用 sqoop eval . 因此,可以在导入之前运行selectquery to以获取上次运行的最后一个值,并运行updatequery以使用当前运行的最后一个值更新日志表。

bxfogqkk

bxfogqkk2#

另一种方法是@durga viswanath gadiraju答案。
如果要将数据导入配置单元表,可以从配置单元表中查询上次更新的值,并将该值传递给sqoop导入查询。您可以使用shell脚本或oozie操作来实现这一点。
shell脚本:

lastupdatedvalue=`hive -e 'select last_value from table` #tweak the selection query based on the logic.

sqoop import --connect jdbc:mysql://localhost:3306/ydb --table yloc --username root -P --incremental append --last-value ${lastupdatedvalue}

oozie方法:
基于检索上次更新值的逻辑的select查询的配置单元操作。
sqoop操作,用于从上一个配置单元操作的捕获输出获得增量负载。
pfb a sudo工作流:

<workflow-app name="sqoop-to-hive" xmlns="uri:oozie:workflow:0.4">
<start to="hiveact"/>
<action name="hiveact">
    <hive xmlns="uri:oozie:hive-action:0.2">
        <job-tracker>${jobTracker}</job-tracker>
        <name-node>${nameNode}</name-node>
        <configuration>
            <property>
                <name>mapred.job.queue.name</name>
                <value>${queueName}</value>
            </property>
        </configuration>
        <script>script.sql</script>
<capture-output/>
    </hive>    
    <ok to="sqoopact"/>
    <error to="kill"/>

<action name="sqoopact">
    <sqoop xmlns="uri:oozie:sqoop-action:0.2">
        <job-tracker>${jobTracker}</job-tracker>
        <name-node>${nameNode}</name-node>
        <command>import --connect jdbc:mysql://localhost:3306/ydb --table yloc --username root -P --incremental append --last-value ${wf:actionData('hiveact')}</command>
     </sqoop>
    <ok to="end"/>
    <error to="kill"/>
</action>
<kill name="kill">
    <message>Action failed</message>
</kill>
<end name="end"/>

希望这有帮助。

brgchamk

brgchamk3#

您可以利用内置的sqoop元存储
可以使用以下命令创建简单的增量导入作业:
sqoop作业--创建<>--导入--连接<>--用户名<>--密码<>--表<>--增量追加--检查-<>--最后一个值0
从--exec参数开始:

sqoop job --exec <<Job Name>>

每次成功的增量作业之后,sqoop都会自动将最后一个导入的值序列化回metastore

p8ekf7hl

p8ekf7hl4#

这可以通过sqoop作业轻松实现
1创建一个sqoop作业(“import”前有一个空格)

sqoop job     --create JobName6 \
           -- import  \
                --connect jdbc:mysql://localhost:3306/retail_db \
                --username=username \
                --password-file /user/sqoop/password \
                --table departments \
                --target-dir /user/hive/warehouse/test.db/departments \
                --table departments \
                --split-by department_id \
                --check-column department_id \
                --incremental append \
               --last-value 0;

2运行sqoop job sqoop job--exec jobname6;检查hdfs中位置的值
三。在源表(mysql)中插入一些数据insert into departments value(9,'new data1'),(10,'new data2');
2再次运行sqoop作业。sqoop job—执行jobname6;再次检查hdfs中位置中的值。
与配置单元导入类似

sqoop job     --create JobName1 \
           -- import  \
                --connect jdbc:mysql://localhost:3306/retail_db \
                --username=username\
                --password-file /user/sqoop/password \
                --table departments \
                --hive-import \
                --hive-table department \
                --split-by department_id \
                --check-column department_id \
                --incremental append \
               --last-value 0;

相关问题