json 数据工程项目

3qpi33ja  于 2022-12-20  发布在  其他
关注(0)|答案(1)|浏览(70)

我正在尝试创建从csv和api数据到bigquery表的数据管道,但一直遇到KeyError

# Step one: Import necessary libraries
from urllib.request import urlopen
import json
import pandas as pd

from datetime import datetime

# Step 2: Connect to API and extract data
def covidExtractApi():
    api = "https://api.covidtracking.com/v1/states/ca/daily.json"
    api_opener = urlopen(api)
    data = json.load(api_opener)
    df1 = pd.DataFrame(data)
    df1 = df1[df1['date'] >= 20210101]
    return df1

def covidExtractCsv():
# Step 3: Extract data from csv and combine
    df2 = pd.read_csv(r"C:\Users\Chris\PycharmProjects\Projects\Data_Engineering_Projects\Covid_folder\New_york_covid_data.csv")
    df2 = df2[df2['date'] >= 20210101]
    return df2

# Step 3: Transform data
def covidTransform(df1,df2):
    print(df1.head())
    df1['dateLastUpdated'] = df1["lastUpdateEt"].str.split(' ',expand=True)[0]
    print(df1['dateLastUpdated'])
    df1['timeLastUpdated'] = df1["lastUpdateEt"].str.split(' ',expand=True)[1]
    df2["dateLastUpdated"] = df2["lastUpdateEt"].str.split(' ',expand=True)[0]
    df2["timeLastUpdated"] = df2["lastUpdateEt"].str.split(' ',expand=True)[1]
    df1['checkInDate'] = df1["checkTimeEt"].str.split(' ',expand=True)[0]
    df1['checkInTime'] = df1["checkTimeEt"].str.split(' ',expand=True)[1]
    df2['checkInDate'] = df2["checkTimeEt"].str.split(' ',expand=True)[0]
    df2['checkInTime'] = df2["checkTimeEt"].str.split(' ',expand=True)[1]
    df1.drop('dateModified',axis=1,inplace=True)
    df1.drop('date', axis = 1, inplace = True)
    df1.drop('checkTimeEt', axis=1, inplace=True)
    df1.drop('lastUpdateEt', axis = 1, inplace = True)
    df1.drop('dateChecked', axis = 1, inplace = True)
    df2.drop('dateModified',axis=1,inplace=True)
    df2.drop('date', axis = 1, inplace = True)
    df2.drop('checkTimeEt', axis=1, inplace=True)
    df2.drop('lastUpdateEt', axis = 1, inplace = True)
    df2.drop('dateChecked', axis = 1, inplace = True)
    dataframe = df1.append(df2)
    return dataframe

# Step 4: Load data to bigquery table and csv file
def covidLoad(dataframe):
    dataframe = dataframe.astype(str)
    dataframe.to_gbq(destination_table='covid.states_data',
                    project_id='projects-compartment',
                    if_exists='replace')
    # dataframe.to_csv('ny_ca_covid_data.csv')

# Step 5: Log each function call time and date 
def covidLog(message):
    timestamp_format = '%H:%M:%S on %h/%d/%Y'
    now = datetime.now()
    timestamp = now.strftime(timestamp_format)
    with open("covid.txt", "a") as file:
        file.write(message + " at " + timestamp + '\n')

covidLog("EXTRACTING DATA!")
covidExtractApi()
covidExtractCsv()
covidExtractedApi = covidExtractApi()
covidExtractedCsv = covidExtractCsv()
covidLog("FINISHED EXTRACTING")
covidLog("TRANSFORMING DATA!")
covidTransform(covidExtractedApi,covidExtractedCsv)
covidTransformedData = covidTransform(covidExtractedApi,covidExtractedCsv)
covidLog("FINISHED TRANSFORMING!")
covidLog("LOADING DATA!")
covidLoad(covidTransformedData)
covidLog("FINISHED LOADING!")

我期望它会加载到biquery表'states_data'中,但我一直收到只有列名的KeyError错误消息:

Traceback (most recent call last):
  File "C:\Users\Chris\AppData\Local\Programs\Python\Python310\lib\site-packages\pandas\core\indexes\base.py", line 3361, in get_loc
    return self._engine.get_loc(casted_key)
  File "pandas\_libs\index.pyx", line 76, in pandas._libs.index.IndexEngine.get_loc
  File "pandas\_libs\index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
  File "pandas\_libs\hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
  File "pandas\_libs\hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'lastUpdateEt'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Users\Chris\PycharmProjects\Projects\Data_Engineering_Projects\Covid_folder\ny_cali_covid.py", line 71, in <module>
    covidTransformedData = covidTransform(covidExtractedApi,covidExtractedCsv)
  File "C:\Users\Chris\PycharmProjects\Projects\Data_Engineering_Projects\Covid_folder\ny_cali_covid.py", line 26, in covidTransform
    df1['dateLastUpdated'] = df1["lastUpdateEt"].str.split(' ',expand=True)[0]
  File "C:\Users\Chris\AppData\Local\Programs\Python\Python310\lib\site-packages\pandas\core\frame.py", line 3458, in __getitem__
    indexer = self.columns.get_loc(key)
  File "C:\Users\Chris\AppData\Local\Programs\Python\Python310\lib\site-packages\pandas\core\indexes\base.py", line 3363, in get_loc
    raise KeyError(key) from err
KeyError: 'lastUpdateEt'

有人知道问题出在哪吗?

6ojccjat

6ojccjat1#

看起来错误发生在covidTransform(df1,df2)函数的这一行。
我尽我所能地运行了您的代码(我不得不拼凑一些东西,因为我没有本地存储的CSV),直到生成错误的行:

df1['dateLastUpdated'] = df1["lastUpdateEt"].str.split(' ',expand=True)[0]

我没有得到你的KeyError。看起来当你在你那端运行代码时,正确的数据可能没有存储在covidExtractedApi变量中。
如果重新启动Python环境并从头开始运行整个脚本,则可能会修复该问题。重要的是确保在运行covidTransformedData = covidTransform(covidExtractedApi,covidExtractedCsv)之前运行covidExtractedApi = covidExtractApi()

相关问题