我有这个代码(它的工作),我想把它分解为2个功能(气流任务)。
1.拉取JSON文件并将其解析为CSV
1.将CSV上传到S3存储桶。
我无法在Airflow任务(dags)之间传递CSV文件,我尝试了xcom推和拉,但我缺少了一些东西。
我知道这不是最好的方法,但文件很小,我会及时学习的。
import os
import csv
import firebase_admin
from firebase_admin import credentials, auth
import boto3
from datetime import datetime
import json
from airflow.models import Variable
def fetch_all_users():
#AWS Cred
aws_access_key_id = Variable.get('RIVERY_AWS_ACCESS_KEY_ID')
aws_secret_access_key = Variable.get('RIVERY_AWS_SECRET_ACCESS_KEY')
bucket = Variable.get('RIVERY_BUCKET')
firebase_key = 'firebase/key/firebase.json'
#Starting S3 session
session = boto3.Session(aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
s3 = session.resource('s3')
#Getting Firebase key and parse it
obj = s3.Object(bucket, firebase_key)
file_content = obj.get()['Body'].read().decode('utf-8')
json_data = json.loads(file_content)
#init Firebase connection
cred = credentials.Certificate(json_data)
firebase_admin.initialize_app(cred)
# List all users
all_users = auth.list_users()
# Extract user data with provider data
users_data = []
for user in all_users.iterate_all():
user_data = {
'uid': user.uid,
'email': user.email,
'display_name': user.display_name,
'creation_timestamp': user.user_metadata.creation_timestamp, # Convert to ISO format
'provider_data': [{
'provider_id': provider.provider_id,
'email': provider.email,
'phone_number': provider.phone_number,
'photo_url': provider.photo_url,
'uid': provider.uid,
'last_sign_in_timestamp': user.user_metadata.last_sign_in_timestamp
} for provider in user.provider_data],
# Add other user properties as needed
}
users_data.append(user_data)
#Parse json to CSV
today = datetime.now().strftime("%Y-%m-%d")
csv_filename = f"firebase_users_s3_(today).csv"
# CSV header
csv_fields = ['uid', 'email', 'display_name', 'creation_timestamp', 'provider_id', 'provider_email', 'provider_phone_number',
'provider_photo_url', 'provider_uid', 'last_sign_in_timestamp']
# Writing to csv file
with open(csv_filename, mode='w', newline='') as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=csv_fields)
# Write header
writer.writeheader()
# Write user data
for user in users_data:
for provider_data in user['provider_data']:
user_row = {
'uid': user['uid'],
'email': user['email'],
'display_name': user['display_name'],
'creation_timestamp': user['creation_timestamp'],
'provider_id': provider_data['provider_id'],
'provider_email': provider_data['email'],
'provider_phone_number': provider_data['phone_number'],
'provider_photo_url': provider_data['photo_url'],
'provider_uid': provider_data['uid'],
'last_sign_in_timestamp': provider_data['last_sign_in_timestamp'],
}
writer.writerow(user_row)
#Upload to S3 bucket
s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
s3.upload_file(csv_filename, bucket_name, 'firebase/auth/firebase_auth_from_airflow.csv')
print(f"File uploaded successfully to qa firebase/auth/firebase_auth_from_airflow.csv")
字符串
1条答案
按热度按时间niknxzdl1#
我建议你使用Airflow Hooks和Operators,我有一个git仓库,它可以做很多你想做的事情,它可以从API下载一些数据(在你的情况下,它是firebase,在我的repo中,我不得不为此创建一个costumed hook)并上传到GCS(在你的例子中是S3),here is the link to it,Airflow提供了各种各样的Operator和Hooks,即使你没有找到你需要的,你也可以通过扩展BaseHook和BaseOperator类来创建定制的。
我假设您将使用the S3 hook和firebase hook