无法通过PySpark写入redshift

kcugc4gi  于 2023-04-05  发布在  Spark
关注(0)|答案(1)|浏览(207)

我正在尝试通过PySpark写redshift。我的Spark版本是3.2.0,使用Scala版本2.12.15。
我试着按照这里的指导写。我也试着通过 aws_iam_role 写,就像链接中解释的那样,但它导致了同样的错误。我所有的depndenices都匹配scala版本2.12,这是我的Spark正在使用的。

环境Spark 3.2 Scala 2.12.15 Pyspark 3.2.3 Java 11 Ubuntu 22.04 LTS Python 3.8

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('abc')\
    .config("spark.jars.packages","com.eclipsesource.minimal-json:minimal-json:0.9.5,com.amazon.redshift:redshift-jdbc42:2.1.0.12,com.google.guava:guava:31.1-jre,com.amazonaws:aws-java-sdk-s3:1.12.437,org.apache.spark:spark-avro_2.12:3.3.2,io.github.spark-redshift-community:spark-redshift_2.12:5.1.0,org.apache.hadoop:hadoop-aws:3.2.2,com.google.guava:failureaccess:1.0")\
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.access.key", "etc") \
    .config("spark.hadoop.fs.s3a.secret.key", "etc") \
    .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')\
    .getOrCreate()

df=spark.read.option("header",True) \
.csv("demo.csv")

df.write \
  .format("io.github.spark_redshift_community.spark.redshift") \
  .option("url", "jdbc:redshift:iam://host:5439/dev?user=user&password=pass") \
  .option("dbtable", "demo") \
  .option("forward_spark_s3_credentials","True") \
  .option("tempdir", "s3a://mubucket/folder") \
  .mode("append") \
  .save()

它抛出错误

23/03/30 18:51:47 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
23/03/30 18:51:50 WARN Utils$: The S3 bucket demo does not have an object lifecycle configuration to ensure cleanup of temporary files. Consider configuring `tempdir` to point to a bucket with an object lifecycle policy that automatically deletes files after an expiration period. For more information, see https://docs.aws.amazon.com/AmazonS3/latest/dev/object-lifecycle-mgmt.html
23/03/30 18:51:51 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
23/03/30 18:51:53 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
23/03/30 18:51:53 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
23/03/30 18:51:54 ERROR Utils: Aborting task
java.lang.NoSuchMethodError: 'scala.Function1 org.apache.spark.sql.execution.datasources.DataSourceUtils$.createDateRebaseFuncInWrite(scala.Enumeration$Value, java.lang.String)'

由于发布,凭据被删除。通过相同的凭据,我可以创建数据库/表。相同的凭据也可以在S3上创建文件,并具有完全访问权限。
我试图通过spark写红移。我使用指南直到点,但无法写。我尝试了多次与手册中提供的不同方法,但都导致相同的错误。这是手册。

hfyxw5xn

hfyxw5xn1#

看起来这不起作用。现在,我已经创建了一个自定义解决方案,可以通过Spark作为parquet写入S3并在数据库上运行复制命令。我还在GitHub上打开了一个关于这个问题的问题。你可以查看它here

from pyspark.sql import SparkSession
import psycopg2
import boto3

def query_redshift(current_query,fetch,url):
      conn_string = url
      conn = psycopg2.connect(conn_string)
      conn.autocommit=True
      cursor = conn.cursor()
      cursor.execute(current_query)

      if fetch==1:
          records=cursor.fetchall()
          conn.commit()
          return records
      cursor.close()
      conn.close()
      print ("S3 to Redshift Transfer Successful")
           
      
def write_to_redshift(df,folder,arn,tablename,jdbc_url,bucket,aws_access_key_id,aws_secret_access_key):
  staging = "s3://"+bucket+"/"+folder
  s3a = staging.replace("s3://","s3a://")
  df.write.parquet(s3a)
  
  query=f"""
COPY {tablename}
FROM '{staging}'
CREDENTIALS 'aws_access_key_id={aws_access_key_id};aws_secret_access_key={aws_secret_access_key}'
FORMAT AS PARQUET;
  """
  
  try:
    print(query)
    resp = query_redshift(query,0,jdbc_url)
  except Exception as e:
    print(str(e))  
  finally:
    s3 = boto3.resource('s3',aws_access_key_id=aws_access_key_id,
          aws_secret_access_key= aws_secret_access_key)
    bucket = s3.Bucket(bucket)
    delete = bucket.objects.filter(Prefix=folder+"/").delete()
    print(delete)


def main():
  aws_access_key_id = 'etc'
  aws_secret_access_key = 'etc'
  spark = SparkSession.builder.appName('abc')\
      .config("spark.jars.packages","com.amazon.redshift:redshift-jdbc42:2.1.0.12,com.google.guava:guava:31.1-jre,org.apache.hadoop:hadoop-aws:3.2.2")\
      .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
      .config("spark.hadoop.fs.s3a.access.key", aws_access_key_id) \
      .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')\
      .config("spark.hadoop.fs.s3a.secret.key", aws_secret_access_key) \
      .getOrCreate()
      
  df=spark.read.option("header",True) \
  .csv("demo.csv") # replace with whatever dataframe you have

  df.show()
  tablename = 'public.demo'
  iam_role=""
  bucket_name = 'bucket'

  #S3 Credentials Option 1

  jdbc = "host = 'host' port ='5439' dbname = 'dev' user = 'user' password = 'pass' connect_timeout = 30000"
  folder = "cache8"
  
  write_to_redshift(df,folder,iam_role,tablename,jdbc,bucket_name,aws_access_key_id,aws_secret_access_key)
  
main()

这将把你的dataframe作为parquet写入s3,然后从该数据中对你的db运行一个copy命令,并从bucket中删除它。

相关问题