pyspark 将粘附作业连接到Amazon密钥空间

hts6caw3  于 2022-12-03  发布在  Spark
关注(0)|答案(1)|浏览(128)

我正在尝试连接AWS胶水作业到亚马逊keyspaces。有没有任何连接和工作的那些表使用pyspark。PS:由于组织限制,我无法使用AWS cli。

yquaqz18

yquaqz181#

您可以利用开源的spark cassandra连接器将AWS Glue与Amazon Keyspaces连接起来。
首先,您需要为您的帐户启用murmur 3分区程序或随机分区程序。

UPDATE system.local set partitioner='org.apache.cassandra.dht.Murmur3Partitioner' where key='local';

其次,确保您了解所需的容量。默认情况下,Keyspaces表是使用OnDemand模式创建的,该模式通过根据您以前的流量峰值将资源增加一倍来了解所需的容量。新创建的表能够执行4000 WCU/秒和12 RCU/秒的性能。如果您需要更高的容量,请使用所需的吞吐量在预配模式下创建表,然后切换到OnDemand模式。
第三,在我们的samples repositories中找到我们的预构建示例。我们有导出、导入、计数和top-N的模式。这些示例显示了如何将spark cassandra连接器加载到s3,设置数据加载的最佳实践。以下代码片段显示了如何导出到s3。

val spark: SparkContext = new SparkContext(conf)
    val glueContext: GlueContext = new GlueContext(spark)
    val sparkSession: SparkSession = glueContext.getSparkSession

    import com.datastax.spark.connector._
    import org.apache.spark.sql.cassandra._
    import sparkSession.implicits._

    Job.init(args("JOB_NAME"), glueContext, args.asJava)

    val tableName = args("TABLE_NAME")
    val keyspaceName = args("KEYSPACE_NAME")
    val backupS3 = args("S3_URI")
    val backupFormat = args("FORMAT")

    val tableDf = sparkSession.read
      .format("org.apache.spark.sql.cassandra")
      .options(Map( "table" -> tableName, "keyspace" -> keyspaceName))
      .load()

    tableDf.write.format(backupFormat).mode(SaveMode.ErrorIfExists).save(backupS3)

    Job.commit()
  }
}

最好的做法是对每个DPU/Worker的胶水使用速率限制。了解您希望每个DPU达到的吞吐量,并在cassandra驱动程序设置中设置节流器。

advanced.throttler = {
      class = RateLimitingRequestThrottler
      max-requests-per-second = 1000
      max-queue-size = 50000
      drain-interval = 1 millisecond
    }

您需要确保您具有访问Amazon密钥空间的适当IAM权限。如果您使用的是VPC端点,您还需要在此处包含权限。

相关问题