我正在尝试通过PySpark写redshift。我的Spark版本是3.2.0,使用Scala版本2.12.15。
我试着按照这里的指导写。我也试着通过 aws_iam_role 写,就像链接中解释的那样,但它导致了同样的错误。我所有的depndenices都匹配scala版本2.12,这是我的Spark正在使用的。
环境Spark 3.2 Scala 2.12.15 Pyspark 3.2.3 Java 11 Ubuntu 22.04 LTS Python 3.8
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('abc')\
.config("spark.jars.packages","com.eclipsesource.minimal-json:minimal-json:0.9.5,com.amazon.redshift:redshift-jdbc42:2.1.0.12,com.google.guava:guava:31.1-jre,com.amazonaws:aws-java-sdk-s3:1.12.437,org.apache.spark:spark-avro_2.12:3.3.2,io.github.spark-redshift-community:spark-redshift_2.12:5.1.0,org.apache.hadoop:hadoop-aws:3.2.2,com.google.guava:failureaccess:1.0")\
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.access.key", "etc") \
.config("spark.hadoop.fs.s3a.secret.key", "etc") \
.config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')\
.getOrCreate()
df=spark.read.option("header",True) \
.csv("demo.csv")
df.write \
.format("io.github.spark_redshift_community.spark.redshift") \
.option("url", "jdbc:redshift:iam://host:5439/dev?user=user&password=pass") \
.option("dbtable", "demo") \
.option("forward_spark_s3_credentials","True") \
.option("tempdir", "s3a://mubucket/folder") \
.mode("append") \
.save()
它抛出错误
23/03/30 18:51:47 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
23/03/30 18:51:50 WARN Utils$: The S3 bucket demo does not have an object lifecycle configuration to ensure cleanup of temporary files. Consider configuring `tempdir` to point to a bucket with an object lifecycle policy that automatically deletes files after an expiration period. For more information, see https://docs.aws.amazon.com/AmazonS3/latest/dev/object-lifecycle-mgmt.html
23/03/30 18:51:51 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
23/03/30 18:51:53 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
23/03/30 18:51:53 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
23/03/30 18:51:54 ERROR Utils: Aborting task
java.lang.NoSuchMethodError: 'scala.Function1 org.apache.spark.sql.execution.datasources.DataSourceUtils$.createDateRebaseFuncInWrite(scala.Enumeration$Value, java.lang.String)'
由于发布,凭据被删除。通过相同的凭据,我可以创建数据库/表。相同的凭据也可以在S3上创建文件,并具有完全访问权限。
我试图通过spark写红移。我使用指南直到点,但无法写。我尝试了多次与手册中提供的不同方法,但都导致相同的错误。这是手册。
1条答案
按热度按时间hfyxw5xn1#
看起来这不起作用。现在,我已经创建了一个自定义解决方案,可以通过Spark作为parquet写入S3并在数据库上运行复制命令。我还在GitHub上打开了一个关于这个问题的问题。你可以查看它here。
这将把你的dataframe作为parquet写入s3,然后从该数据中对你的db运行一个copy命令,并从bucket中删除它。