如何使用Lambda将CSV文件数据从S3加载到MySQL RDS?

b0zn9rqh  于 2022-11-21  发布在  Mysql
关注(0)|答案(2)|浏览(166)

enter image description hereenter image description here假设数据库名称= EmployeeDB在此数据库中,我们有5个表,分别是表1、表2、表3、表4和表5,我们还有5个CSV文件,分别是草图、配置文件、阅读、运行状况和错误

场景:每当CSV文件上载到S3存储桶中时,它都应触发并将该CSV数据加载到特定表中。(例如:上传草绘时,应转到table1table)

为了实现这一点,我尝试了Lambda函数,这是我使用的代码。

import json
import boto3
import csv
import mysql.connector
from mysql.connector import Error
from mysql.connector import errorcode
s3_client = boto3.client('s3')

# Read CSV file content from S3 bucket
def lambda_handler(event, context):
    # TODO implement
    # print(event)
    bucket = event['Records'][0]['s3']['bucket']['name']
    csv_file = event['Records'][0]['s3']['object']['key']
    csv_file_obj = s3_client.get_object(Bucket=bucket, Key=csv_file)
    lines = csv_file_obj['Body'].read().decode('utf-8').split()
    
    results = []
    for row in csv.DictReader(lines):
        results.append(row.values())
    print(results)
    
    connection = mysql.connector.connect(host='xxxxxxxxxxxxxxx.ap-south-1.rds.amazonaws.com',database='employeedb',user='xxxxxx',password='xxxxxx')
    
    tables_dict = {
        'sketching': 'INSERT INTO table1 (empid, empname, empaddress) VALUES (%s, %s, %s)',
        'profile': 'INSERT INTO table2 (empid, empname, empaddress) VALUES (%s, %s, %s)',
        'reading': 'INSERT INTO table3 (empid, empname, empaddress) VALUES (%s, %s, %s)',
        'health': 'INSERT INTO table4 (empid, empname, empaddress) VALUES (%s, %s, %s)',
        'error': 'INSERT INTO table5 (empid, empname, empaddress) VALUES (%s, %s, %s)'
    }
    if csv_file in tables_dict:
        mysql_empsql_insert_query = tables_dict[csv_file]
        cursor = connection.cursor()
        cursor.executemany(mysql_empsql_insert_query,results)
        connection.commit()
        print(cursor.rowcount, f"Record inserted successfully from {csv_file} file")
    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }

这段代码对我不起作用,它在cloudwatch中触发,但当我上传草图时,它没有将数据加载到table1或任何表中。
有人能帮助我更改我的场景的代码吗?

lkaoscv7

lkaoscv71#

您可以使用档案和查询建立字典。每个档案/数据表都需要不同的插入查询。

tables_dict = {
  'sketching.csv': 'INSERT INTO table1.sketching (empid, empname, empaddress) VALUES (%s, %s, %s)',
  'profile.csv': '',
  'reading.csv': '',
  'health.csv': '',
  'error.csv': ''
}

if csv_file in tables_dict:
  mysql_empsql_insert_query = tables_dict[csv_file]
  cursor = connection.cursor()
  cursor.executemany(mysql_empsql_insert_query,results)
  connection.commit()
  print(cursor.rowcount, f"Record inserted successfully from {csv_file} file")
cuxqih21

cuxqih212#

下面是您可以使用的代码

LOAD DATA FROM S3 's3://db-bucket-name/folder-name/test.csv' INTO TABLE table-name FIELDS TERMINATED BY ',' ENCLOSED BY '"+'"'+"' LINES TERMINATED BY '\r\n';

相关问题