如何将PySpark Dataframe 写入DynamoDB表?没有找到太多关于此的信息。根据我的要求,我必须将PySpark Dataframe 写入Dynamo数据库表。总的来说,我需要从我的PySpark代码读取/写入Dynamo。先谢谢你。
bq9c1y661#
Ram,没有办法直接从pyspark完成这个任务。如果你有流水线软件,可以通过一系列的步骤来完成。下面是如何完成的:1.创建一个临时配置单元表,如CREATE TABLE TEMP( column1 type, column2 type...) STORED AS ORC;1.运行pySpark作业并将数据写入其中dataframe.createOrReplaceTempView("df") spark.sql("INSERT OVERWRITE TABLE temp SELECT * FROM df")1.创建dynamo连接器表CREATE TABLE TEMPTODYNAMO( column1 type, column2 type...) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "temp-to-dynamo", "dynamodb.column.mapping" = "column1:column1,column2:column2...";1.用临时表覆盖该表INSERT OVERWRITE TABLE TEMPTODYNAMO SELECT * FROM TEMP;更多信息,请访问:https://docs.aws.amazon.com/emr/latest/ReleaseGuide/EMR_Hive_Commands.html
CREATE TABLE TEMP( column1 type, column2 type...) STORED AS ORC;
dataframe.createOrReplaceTempView("df") spark.sql("INSERT OVERWRITE TABLE temp SELECT * FROM df")
CREATE TABLE TEMPTODYNAMO( column1 type, column2 type...) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "temp-to-dynamo", "dynamodb.column.mapping" = "column1:column1,column2:column2...";
INSERT OVERWRITE TABLE TEMPTODYNAMO SELECT * FROM TEMP;
um6iljoc2#
您可以使用spark-dynamodb。从他们的回购协议:
# Load a DataFrame from a Dynamo table. Only incurs the cost of a single scan for schema inference.dynamoDf = spark.read.option("tableName", "SomeTableName") \ .format("dynamodb") \ .load() # <-- DataFrame of Row objects with inferred schema.# Scan the table for the first 100 items (the order is arbitrary) and print them.dynamoDf.show(100)# write to some other table overwriting existing item with same keysdynamoDf.write.option("tableName", "SomeOtherTable") \ .format("dynamodb") \ .save()
# Load a DataFrame from a Dynamo table. Only incurs the cost of a single scan for schema inference.
dynamoDf = spark.read.option("tableName", "SomeTableName") \
.format("dynamodb") \
.load() # <-- DataFrame of Row objects with inferred schema.
# Scan the table for the first 100 items (the order is arbitrary) and print them.
dynamoDf.show(100)
# write to some other table overwriting existing item with same keys
dynamoDf.write.option("tableName", "SomeOtherTable") \
.save()
uyto3xhc3#
这个AWS博客解释了如何使用AWS Glue创建一个唯一键、分区并将S3数据(csv)写入DynamoDB表。How realtor.com® maximized data upload from Amazon S3 into Amazon DynamoDB
pbgvytdp4#
我们将pyspark输出保存到S3上的parquet,然后使用lambda中的awswrangler层将parquet数据读取到panda帧,并使用wrangler.dynamodb.put_df将整个 Dataframe 写入dynamoDB表。
4条答案
按热度按时间bq9c1y661#
Ram,没有办法直接从pyspark完成这个任务。如果你有流水线软件,可以通过一系列的步骤来完成。下面是如何完成的:
1.创建一个临时配置单元表,如
CREATE TABLE TEMP( column1 type, column2 type...) STORED AS ORC;
1.运行pySpark作业并将数据写入其中
dataframe.createOrReplaceTempView("df") spark.sql("INSERT OVERWRITE TABLE temp SELECT * FROM df")
1.创建dynamo连接器表
CREATE TABLE TEMPTODYNAMO( column1 type, column2 type...) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "temp-to-dynamo", "dynamodb.column.mapping" = "column1:column1,column2:column2...";
1.用临时表覆盖该表
INSERT OVERWRITE TABLE TEMPTODYNAMO SELECT * FROM TEMP;
更多信息,请访问:https://docs.aws.amazon.com/emr/latest/ReleaseGuide/EMR_Hive_Commands.html
um6iljoc2#
您可以使用spark-dynamodb。
从他们的回购协议:
uyto3xhc3#
这个AWS博客解释了如何使用AWS Glue创建一个唯一键、分区并将S3数据(csv)写入DynamoDB表。
How realtor.com® maximized data upload from Amazon S3 into Amazon DynamoDB
pbgvytdp4#
我们将pyspark输出保存到S3上的parquet,然后使用lambda中的awswrangler层将parquet数据读取到panda帧,并使用wrangler.dynamodb.put_df将整个 Dataframe 写入dynamoDB表。