使用sqoop将表从mysql移动到使用远程安装的配置单元

esyap4oy  于 2021-06-03  发布在  Sqoop
关注(0)|答案(2)|浏览(396)

我有一个场景,其中我有一个aws emr的设置与几个应用程序,如spark,hadoop,hive,hcatalog,齐柏林飞艇,sqoop等,我有另一个服务器,只运行气流。
我正在处理一个需求,我想使用sqoop将mysql表(同样在另一个rds示例上)移动到hive,并且这个触发器必须由airflow提交。
如果气流在远程服务器中,是否可以使用气流中可用的sqoopoperator来实现这一点?我相信没有,那么有没有其他方法可以达到这个目的呢?
提前感谢。

tp5buhyn

tp5buhyn1#

尝试使用script-runner.jar添加step-by。还有更多。

aws emr create-cluster --name "Test cluster" –-release-label emr-5.16.0 --applications Name=Hive Name=Pig --use-default-roles --ec2-attributes KeyName=myKey --instance-type m4.large --instance-count 3 --steps Type=CUSTOM_JAR,Name=CustomJAR,ActionOnFailure=CONTINUE,Jar=s3://region.elasticmapreduce/libs/script-runner/script-runner.jar,Args=["s3://mybucket/script-path/my_script.sh"]

你就可以这样做。

SPARK_TEST_STEPS = [
    {
        'Name': 'demo',
        'ActionOnFailure': 'CONTINUE',
        'HadoopJarStep': {
            'Jar': 'cn-northwest-1.elasticmapreduce/libs/script-runner/script-runner.jar',
            'Args': [
                "s3://d.s3.d.com/demo.sh",
            ]
        }
    }
]

step_adder = EmrAddStepsOperator(
    task_id='add_steps',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    aws_conn_id='aws_default',
    steps=SPARK_TEST_STEPS,
    dag=dag
)

step_checker = EmrStepSensor(
    task_id='watch_step',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
    aws_conn_id='aws_default',
    dag=dag
)

step_adder.set_downstream(step_checker)

像这样。

!/bin/bash    
sqoop import --connect jdbc:mysql://mysql.example.com/testDb --username root --password hadoop123 --table student
piwo6bdm

piwo6bdm2#

是的,这是可能的。我承认关于如何使用操作符的文档是缺乏的,但是如果你理解气流中的钩子和操作符的概念,你可以通过阅读你想要使用的操作符的代码来弄清楚。在这种情况下,您需要通读sqoophook和sqoopoperator代码库。我知道如何处理气流的大部分工作都是通过阅读代码来完成的,虽然我还没有使用这个操作符,但我可以在这里尽我所能帮助您。
假设您要执行以下sqoop命令:

sqoop import --connect jdbc:mysql://mysql.example.com/testDb --username root --password hadoop123 --table student

您有一个运行在远程主机上的sqoop服务器,您可以通过位于http://scoop.example.com:12000/sqoop/。
首先,您需要在airflow管理ui中创建连接,调用该连接 sqoop . 对于连接,请填写 host 作为 scoop.example.com , schema 作为 sqoop ,和 port 作为 12000 . 如果您有密码,则需要将其放入服务器上的一个文件中,然后 extras 填写一个json字符串 {'password_file':'/path/to/password.txt'} (请参阅有关此密码文件的内联代码)。
在ui中设置连接后,现在可以在dag文件中使用sqoop操作符创建任务。这可能是这样的:

sqoop_mysql_export = SqoopOperator(conn_id='sqoop',
                                   table='student',
                                   username='root',
                                   password='password',
                                   driver='jdbc:mysql://mysql.example.com/testDb',
                                   cmd_type='import')

您可以在这里的代码中看到为导入传递的参数的完整列表。
您可以在这里看到sqoop操作符(实际上是操作符用来连接到sqoop的sqoophook)如何将这些参数转换为命令行命令。
实际上,这个sqoop操作符只是通过将传递的kwarg转换为sqoop客户机cli命令来工作。如果您查看sqoophook,您可以看到这是如何完成的,并且可能会发现如何使它适用于您的案例。祝你好运!
要进行故障排除,我建议您将sshing插入正在运行的服务器,并确认您可以从命令行运行spoop客户端并连接到远程spoop服务器。

相关问题