Spark Redshift Python

knsnq2tg  于 2023-04-07  发布在  Apache
关注(0)|答案(7)|浏览(119)

我尝试将Spark与Amazon Redshift连接,但收到以下错误:

代码如下:

from pyspark.sql import SQLContext
from pyspark import SparkContext

sc = SparkContext(appName="Connect Spark with Redshift")
sql_context = SQLContext(sc)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", <ACCESSID>)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", <ACCESSKEY>)

df = sql_context.read \
    .option("url", "jdbc:redshift://example.coyf2i236wts.eu-central-    1.redshift.amazonaws.com:5439/agcdb?user=user&password=pwd") \
    .option("dbtable", "table_name") \
    .option("tempdir", "bucket") \
    .load()
s71maibg

s71maibg1#

这里是一步一步地连接到红移的过程。

  • 下载redshift连接器文件。尝试下面的命令
wget "https://s3.amazonaws.com/redshift-downloads/drivers/RedshiftJDBC4-1.2.1.1001.jar"
  • 将下面的代码保存在一个python文件中(你想要运行的.py)并相应地替换凭证。
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

#initialize the spark session 
spark = SparkSession.builder.master("yarn").appName("Connect to redshift").enableHiveSupport().getOrCreate()
sc = spark.sparkContext
sqlContext = HiveContext(sc)

sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", "<ACCESSKEYID>")
sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", "<ACCESSKEYSECTRET>")

taxonomyDf = sqlContext.read \
    .format("com.databricks.spark.redshift") \
    .option("url", "jdbc:postgresql://url.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx") \
    .option("dbtable", "table_name") \
    .option("tempdir", "s3://mybucket/") \
    .load()
  • 像下面这样运行spark-submit
spark-submit --packages com.databricks:spark-redshift_2.10:0.5.0 --jars RedshiftJDBC4-1.2.1.1001.jar test.py
wr98u20j

wr98u20j2#

如果您正在使用Spark 2.0.4并在AWS EMR集群上运行代码,请按照以下步骤操作:
1)使用以下命令下载Redshift JDBC jar:

wget https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.20.1043/RedshiftJDBC4-no-awssdk-1.2.20.1043.jar

参考:-AWS Document
2)将下面提到的代码复制到python文件中,然后将所需的值替换为您的AWS资源:

import pyspark
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

spark._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", "access key")
spark._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", "secret access key")

sqlCon = SQLContext(spark)
df = sqlCon.createDataFrame([
    (1, "A", "X1"),
    (2, "B", "X2"),
    (3, "B", "X3"),
    (1, "B", "X3"),
    (2, "C", "X2"),
    (3, "C", "X2"),
    (1, "C", "X1"),
    (1, "B", "X1"),
], ["ID", "TYPE", "CODE"])

df.write \
  .format("com.databricks.spark.redshift") \
  .option("url", "jdbc:redshift://HOST_URL:5439/DATABASE_NAME?user=USERID&password=PASSWORD") \
  .option("dbtable", "TABLE_NAME") \
  .option("aws_region", "us-west-1") \
  .option("tempdir", "s3://BUCKET_NAME/PATH/") \
  .mode("error") \
  .save()

3)运行下面的spark-submit命令:

spark-submit --name "App Name" --jars RedshiftJDBC4-no-awssdk-1.2.20.1043.jar --packages com.databricks:spark-redshift_2.10:2.0.0,org.apache.spark:spark-avro_2.11:2.4.0,com.eclipsesource.minimal-json:minimal-json:0.9.4 --py-files python_script.py python_script.py

注:-

1)Reshift集群安全组入站规则中应允许EMR节点(spark-submit作业将在其上运行)的公网IP地址。
2)Redshift集群和在“tempdir”下使用的S3位置应该在相同的地理位置。在上面的示例中,两个资源都在us-west-1中。
3)如果数据敏感,请确保所有通道的安全。要确保连接安全,请按照此处配置中提到的步骤操作。

rkkpypqq

rkkpypqq3#

错误是由于缺少依赖项。
验证spark主目录中是否有这些jar文件:

  1. spark-redshift_2.10-3.0.0-preview1.jar
  2. RedshiftJDBC41-1.1.10.1010.jar
  3. hadoop-aws-2.7.1.jar
  4. aws-java-sdk-1.7.4.jar
    1.(aws-java-sdk-s3-1.11.60.jar)(较新的版本,但不是所有的东西都能用)
    将这些jar文件放在$SPARK_HOME/jars/中,然后启动spark
pyspark --jars $SPARK_HOME/jars/spark-redshift_2.10-3.0.0-preview1.jar,$SPARK_HOME/jars/RedshiftJDBC41-1.1.10.1010.jar,$SPARK_HOME/jars/hadoop-aws-2.7.1.jar,$SPARK_HOME/jars/aws-java-sdk-s3-1.11.60.jar,$SPARK_HOME/jars/aws-java-sdk-1.7.4.jar

(SPARK_HOME应该是=“/usr/local/Cellar/apache-spark/$SPARK_VERSION/libexec”)
这将运行带有所有必要依赖项的Spark。请注意,如果使用awsAccessKeys,还需要指定身份验证类型'forward_spark_s3_credentials'= True。

from pyspark.sql import SQLContext
from pyspark import SparkContext

sc = SparkContext(appName="Connect Spark with Redshift")
sql_context = SQLContext(sc)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", <ACCESSID>)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", <ACCESSKEY>)

df = sql_context.read \
     .format("com.databricks.spark.redshift") \
     .option("url", "jdbc:redshift://example.coyf2i236wts.eu-central-    1.redshift.amazonaws.com:5439/agcdb?user=user&password=pwd") \
     .option("dbtable", "table_name") \
     .option('forward_spark_s3_credentials',True) \
     .option("tempdir", "s3n://bucket") \
     .load()

之后的常见错误是:

  • Redshift连接错误:“SSL关闭”
  • 解决方案:.option("url", "jdbc:redshift://example.coyf2i236wts.eu-central- 1.redshift.amazonaws.com:5439/agcdb?user=user&password=pwd?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory")
  • S3错误:当卸载数据时,例如在df.show()之后,您会收到以下消息:“您试图访问的存储桶必须使用指定的端点进行寻址。请将所有将来的请求发送到此端点。”
  • 解决方案:存储桶和集群必须在同一地域内运行
csbfibhn

csbfibhn4#

如果你正在使用数据块,我认为你不必创建一个新的sql Context,因为它们会这样做,因为你只需要使用sqlContext,试试下面的代码:

from pyspark.sql import SQLContext
    sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "YOUR_KEY_ID")
    sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "YOUR_SECRET_ACCESS_KEY")

df = sqlContext.read \ .......

可能是铲斗没有安装

dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY, ENCODED_SECRET_KEY, AWS_BUCKET_NAME), "/mnt/%s" % MOUNT_NAME)
l5tcr1uw

l5tcr1uw5#

这个问题我回答得有点晚了,但是我花了很多时间尝试将pyspark的本地示例连接到amazon Redshift。我使用的是Mac,所以您的配置可能会略有不同。我得出的结果假设如下:

  • Scala 2.12
  • Pyspark 3.3.0
  • openjdk 19.0.2
  • Python 3.10
from pyspark import SparkConf
from pyspark.sql import SparkSession
import urllib.parse

conf = (
    SparkConf()
    .set(
        'spark.jars.packages', 
        # All the dependencies for connecting to redshift
        # and S3
        (
            'org.apache.hadoop:hadoop-aws:3.3.2,'
            'org.apache.spark:spark-avro_2.12:3.3.2,'
            'com.amazonaws:aws-java-sdk-core:1.12.441,'
            'com.amazonaws:aws-java-sdk-redshift:1.12.441,'
            'com.amazonaws:aws-java-sdk-sts:1.12.441,'
            'com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.14.2,'
            'com.amazon.redshift:redshift-jdbc42:2.1.0.13,'
            'com.eclipsesource.minimal-json:minimal-json:0.9.4'
        )
    )
    # this leverages the credentials in my
    # ~/.aws/credentials file configured by the CLI
    .set(
        'spark.hadoop.fs.s3a.aws.credentials.provider', 
        'com.amazonaws.auth.DefaultAWSCredentialsProviderChain'
    )
)

spark = (
    SparkSession
    .builder
    .master('local[*]')
    .config(conf=conf)
    .appName('jupyter')
    .getOrCreate()
)

为了连接,我需要url格式的凭据(我正在努力让IAM角色工作,这使用基本的身份验证):

# My password had special characters
# that needed to be formatted into the url
url = "jdbc:redshift://redshift-cluster.endpoint.region.redshift.amazonaws.com:5439/db?"

params = {'user': user, 'password': pw}
url = url + urllib.parse.urlencode(params)

最后

df = (
    spark
    .read 
    .format('jdbc')
    .option("driver", "com.amazon.redshift.jdbc42.Driver")
    # This uses the s3a credentials mapped in locally
    .option("forward_spark_s3_credentials", True) 
    .option("url", url) 
    .option("dbtable", "schema.table") 
    .option("tempdir", "s3a://my-bucket/tmp/") 
    .load()
)

df.head(1)
qnakjoqk

qnakjoqk6#

我认为s3n:// URL样式已被弃用和/或删除。
尝试将密钥定义为"fs.s3.awsAccessKeyId"

ltskdhd1

ltskdhd17#

我认为您需要将.format("com.databricks.spark.redshift")添加到sql_context.read调用中;我的直觉是Spark不能推断出这个数据源的格式,所以你需要明确指定我们应该使用spark-redshift连接器。
有关此错误的详细信息,请参阅https://github.com/databricks/spark-redshift/issues/230

相关问题