我正在尝试创建从csv和api数据到bigquery表的数据管道,但一直遇到KeyError
# Step one: Import necessary libraries
from urllib.request import urlopen
import json
import pandas as pd
from datetime import datetime
# Step 2: Connect to API and extract data
def covidExtractApi():
api = "https://api.covidtracking.com/v1/states/ca/daily.json"
api_opener = urlopen(api)
data = json.load(api_opener)
df1 = pd.DataFrame(data)
df1 = df1[df1['date'] >= 20210101]
return df1
def covidExtractCsv():
# Step 3: Extract data from csv and combine
df2 = pd.read_csv(r"C:\Users\Chris\PycharmProjects\Projects\Data_Engineering_Projects\Covid_folder\New_york_covid_data.csv")
df2 = df2[df2['date'] >= 20210101]
return df2
# Step 3: Transform data
def covidTransform(df1,df2):
print(df1.head())
df1['dateLastUpdated'] = df1["lastUpdateEt"].str.split(' ',expand=True)[0]
print(df1['dateLastUpdated'])
df1['timeLastUpdated'] = df1["lastUpdateEt"].str.split(' ',expand=True)[1]
df2["dateLastUpdated"] = df2["lastUpdateEt"].str.split(' ',expand=True)[0]
df2["timeLastUpdated"] = df2["lastUpdateEt"].str.split(' ',expand=True)[1]
df1['checkInDate'] = df1["checkTimeEt"].str.split(' ',expand=True)[0]
df1['checkInTime'] = df1["checkTimeEt"].str.split(' ',expand=True)[1]
df2['checkInDate'] = df2["checkTimeEt"].str.split(' ',expand=True)[0]
df2['checkInTime'] = df2["checkTimeEt"].str.split(' ',expand=True)[1]
df1.drop('dateModified',axis=1,inplace=True)
df1.drop('date', axis = 1, inplace = True)
df1.drop('checkTimeEt', axis=1, inplace=True)
df1.drop('lastUpdateEt', axis = 1, inplace = True)
df1.drop('dateChecked', axis = 1, inplace = True)
df2.drop('dateModified',axis=1,inplace=True)
df2.drop('date', axis = 1, inplace = True)
df2.drop('checkTimeEt', axis=1, inplace=True)
df2.drop('lastUpdateEt', axis = 1, inplace = True)
df2.drop('dateChecked', axis = 1, inplace = True)
dataframe = df1.append(df2)
return dataframe
# Step 4: Load data to bigquery table and csv file
def covidLoad(dataframe):
dataframe = dataframe.astype(str)
dataframe.to_gbq(destination_table='covid.states_data',
project_id='projects-compartment',
if_exists='replace')
# dataframe.to_csv('ny_ca_covid_data.csv')
# Step 5: Log each function call time and date
def covidLog(message):
timestamp_format = '%H:%M:%S on %h/%d/%Y'
now = datetime.now()
timestamp = now.strftime(timestamp_format)
with open("covid.txt", "a") as file:
file.write(message + " at " + timestamp + '\n')
covidLog("EXTRACTING DATA!")
covidExtractApi()
covidExtractCsv()
covidExtractedApi = covidExtractApi()
covidExtractedCsv = covidExtractCsv()
covidLog("FINISHED EXTRACTING")
covidLog("TRANSFORMING DATA!")
covidTransform(covidExtractedApi,covidExtractedCsv)
covidTransformedData = covidTransform(covidExtractedApi,covidExtractedCsv)
covidLog("FINISHED TRANSFORMING!")
covidLog("LOADING DATA!")
covidLoad(covidTransformedData)
covidLog("FINISHED LOADING!")
我期望它会加载到biquery表'states_data'中,但我一直收到只有列名的KeyError错误消息:
Traceback (most recent call last):
File "C:\Users\Chris\AppData\Local\Programs\Python\Python310\lib\site-packages\pandas\core\indexes\base.py", line 3361, in get_loc
return self._engine.get_loc(casted_key)
File "pandas\_libs\index.pyx", line 76, in pandas._libs.index.IndexEngine.get_loc
File "pandas\_libs\index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
File "pandas\_libs\hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
File "pandas\_libs\hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'lastUpdateEt'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:\Users\Chris\PycharmProjects\Projects\Data_Engineering_Projects\Covid_folder\ny_cali_covid.py", line 71, in <module>
covidTransformedData = covidTransform(covidExtractedApi,covidExtractedCsv)
File "C:\Users\Chris\PycharmProjects\Projects\Data_Engineering_Projects\Covid_folder\ny_cali_covid.py", line 26, in covidTransform
df1['dateLastUpdated'] = df1["lastUpdateEt"].str.split(' ',expand=True)[0]
File "C:\Users\Chris\AppData\Local\Programs\Python\Python310\lib\site-packages\pandas\core\frame.py", line 3458, in __getitem__
indexer = self.columns.get_loc(key)
File "C:\Users\Chris\AppData\Local\Programs\Python\Python310\lib\site-packages\pandas\core\indexes\base.py", line 3363, in get_loc
raise KeyError(key) from err
KeyError: 'lastUpdateEt'
有人知道问题出在哪吗?
1条答案
按热度按时间6ojccjat1#
看起来错误发生在
covidTransform(df1,df2)
函数的这一行。我尽我所能地运行了您的代码(我不得不拼凑一些东西,因为我没有本地存储的CSV),直到生成错误的行:
我没有得到你的KeyError。看起来当你在你那端运行代码时,正确的数据可能没有存储在
covidExtractedApi
变量中。如果重新启动Python环境并从头开始运行整个脚本,则可能会修复该问题。重要的是确保在运行
covidTransformedData = covidTransform(covidExtractedApi,covidExtractedCsv)
之前运行covidExtractedApi = covidExtractApi()