无法使用pyspark和airflow emr operator从emr群集连接到snowflake

qc6wkl3g  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(616)

我正在尝试连接到由气流emr操作员启动的emr群集的雪花,但出现以下错误
py4j.protocol.py4jjavaerror:调用o147.load时出错:java.lang.classnotfoundexception:找不到数据源:net.snowflake.spark.snowflake。请在以下地址查找包裹http://spark.apache.org/third-party-projects.html
以下是我添加到emraddsteps操作符以运行脚本的步骤 load_updates.py 我在“args”里描述我的雪花包裹

  1. STEPS = [
  2. {
  3. "Name" : "convo_facts",
  4. "ActionOnFailure" : "TERMINATE_CLUSTER",
  5. "HadoopJarStep" : {
  6. "Jar" : "command-runner.jar",
  7. "Args" : ["spark-submit", "s3://dev-data-lake/spark_files/cf/load_updates.py", \
  8. "--packages net.snowflake:snowflake-jdbc:3.8.0,net.snowflake:spark-snowflake_2.11:2.4.14-spark_2.4", \
  9. "INPUT=s3://dev-data-lake/table_exports/public/", \
  10. "OUTPUT=s3://dev-data-lake/emr_output/cf/"]
  11. }
  12. }
  13. ]
  14. JOB_FLOW_OVERRIDES = {
  15. 'Name' : 'cftest',
  16. 'LogUri' : 's3://dev-data-lake/emr_logs/cf/log.txt',
  17. 'ReleaseLabel' : 'emr-5.32.0',
  18. 'Instances' : {
  19. 'InstanceGroups' : [
  20. {
  21. 'Name' : 'Master nodes',
  22. 'Market' : 'ON_DEMAND',
  23. 'InstanceRole' : 'MASTER',
  24. 'InstanceType' : 'r6g.4xlarge',
  25. 'InstanceCount' : 1,
  26. },
  27. {
  28. 'Name' : 'Slave nodes',
  29. 'Market' : 'ON_DEMAND',
  30. 'InstanceRole' : 'CORE',
  31. 'InstanceType' : 'r6g.4xlarge',
  32. 'InstanceCount' : 3,
  33. }
  34. ],
  35. 'KeepJobFlowAliveWhenNoSteps' : True,
  36. 'TerminationProtected' : False
  37. },
  38. 'Applications' : [{
  39. 'Name' : 'Spark'
  40. }],
  41. 'JobFlowRole' : 'EMR_EC2_DefaultRole',
  42. 'ServiceRole' : 'EMR_DefaultRole'
  43. }

这就是我如何在我的load\u updates.py脚本中添加snowflake creds以提取到pysparkDataframe中。

  1. # Set options below
  2. sfOptions = {
  3. "sfURL" : "xxxx.us-east-1.snowflakecomputing.com",
  4. "sfUser" : "user",
  5. "sfPassword" : "xxxx",
  6. "sfDatabase" : "",
  7. "sfSchema" : "PUBLIC",
  8. "sfWarehouse" : ""
  9. }
  10. SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
  11. query_sql = """select * from cf""";
  12. messages_new = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  13. .options(**sfOptions) \
  14. .option("query", query_sql) \
  15. .load()

我不知道我是不是漏掉了什么或者哪里做错了。

gz5pxeao

gz5pxeao1#

选择 --package 应该放在 s3://.../load_updates.py 在spark submit命令中。否则,它将被视为应用程序参数。
试试这个:

  1. STEPS = [
  2. {
  3. "Name": "convo_facts",
  4. "ActionOnFailure": "TERMINATE_CLUSTER",
  5. "HadoopJarStep": {
  6. "Jar": "command-runner.jar",
  7. "Args": [
  8. "spark-submit",
  9. "--packages",
  10. "net.snowflake:snowflake-jdbc:3.8.0,net.snowflake:spark-snowflake_2.11:2.4.14-spark_2.4",
  11. "s3://dev-data-lake/spark_files/cf/load_updates.py",
  12. "INPUT=s3://dev-data-lake/table_exports/public/",
  13. "OUTPUT=s3://dev-data-lake/emr_output/cf/"
  14. ]
  15. }
  16. }
  17. ]
展开查看全部

相关问题