如何在Airflow任务之间传递CSV文件

zaqlnxep  于 2023-11-14  发布在  其他
关注(0)|答案(1)|浏览(158)

我有这个代码(它的工作),我想把它分解为2个功能(气流任务)。
1.拉取JSON文件并将其解析为CSV
1.将CSV上传到S3存储桶。
我无法在Airflow任务(dags)之间传递CSV文件,我尝试了xcom推和拉,但我缺少了一些东西。
我知道这不是最好的方法,但文件很小,我会及时学习的。

import os
import csv
import firebase_admin
from firebase_admin import credentials, auth
import boto3
from datetime import datetime

import json
from airflow.models import Variable

def fetch_all_users():
    #AWS Cred
    aws_access_key_id = Variable.get('RIVERY_AWS_ACCESS_KEY_ID')
    aws_secret_access_key = Variable.get('RIVERY_AWS_SECRET_ACCESS_KEY')
    bucket = Variable.get('RIVERY_BUCKET')
    firebase_key = 'firebase/key/firebase.json'

    #Starting S3 session
    session = boto3.Session(aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
    s3 = session.resource('s3')

    #Getting Firebase key and parse it
    obj = s3.Object(bucket, firebase_key)
    file_content = obj.get()['Body'].read().decode('utf-8')
    json_data = json.loads(file_content)

    #init Firebase connection
    cred = credentials.Certificate(json_data)
    firebase_admin.initialize_app(cred)

    # List all users
    all_users = auth.list_users()

    # Extract user data with provider data
    users_data = []
    for user in all_users.iterate_all():
        user_data = {
            'uid': user.uid,
            'email': user.email,
            'display_name': user.display_name,
            'creation_timestamp': user.user_metadata.creation_timestamp,  # Convert to ISO format
            'provider_data': [{
                'provider_id': provider.provider_id,
                'email': provider.email,
                'phone_number': provider.phone_number,
                'photo_url': provider.photo_url,
                'uid': provider.uid,
                'last_sign_in_timestamp': user.user_metadata.last_sign_in_timestamp
            } for provider in user.provider_data],
            # Add other user properties as needed
        }
        users_data.append(user_data)

        #Parse json to CSV
    today = datetime.now().strftime("%Y-%m-%d")
    csv_filename = f"firebase_users_s3_(today).csv"
    # CSV header
    csv_fields = ['uid', 'email', 'display_name', 'creation_timestamp', 'provider_id', 'provider_email', 'provider_phone_number',
                  'provider_photo_url', 'provider_uid', 'last_sign_in_timestamp']

    # Writing to csv file
    with open(csv_filename, mode='w', newline='') as csv_file:
        writer = csv.DictWriter(csv_file, fieldnames=csv_fields)

        # Write header
        writer.writeheader()

        # Write user data
        for user in users_data:
            for provider_data in user['provider_data']:
                user_row = {
                    'uid': user['uid'],
                    'email': user['email'],
                    'display_name': user['display_name'],
                    'creation_timestamp': user['creation_timestamp'],
                    'provider_id': provider_data['provider_id'],
                    'provider_email': provider_data['email'],
                    'provider_phone_number': provider_data['phone_number'],
                    'provider_photo_url': provider_data['photo_url'],
                    'provider_uid': provider_data['uid'],
                    'last_sign_in_timestamp': provider_data['last_sign_in_timestamp'],
                }
                writer.writerow(user_row)

    #Upload to S3 bucket
    s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
    s3.upload_file(csv_filename, bucket_name, 'firebase/auth/firebase_auth_from_airflow.csv')
    print(f"File uploaded successfully to qa firebase/auth/firebase_auth_from_airflow.csv")

字符串

niknxzdl

niknxzdl1#

我建议你使用Airflow Hooks和Operators,我有一个git仓库,它可以做很多你想做的事情,它可以从API下载一些数据(在你的情况下,它是firebase,在我的repo中,我不得不为此创建一个costumed hook)并上传到GCS(在你的例子中是S3),here is the link to it,Airflow提供了各种各样的Operator和Hooks,即使你没有找到你需要的,你也可以通过扩展BaseHook和BaseOperator类来创建定制的。
我假设您将使用the S3 hookfirebase hook

相关问题