我想将数据表从AWS数据库迁移到BigQuery。我有一个名为sampletable的特定表,其中包括id、user_id和log。Log是一个JSON字段,其中包含一个字典,字典由键及其相应的值组成。
'reason': {
'id': 5,
'name': 'Sample name'
'contact': {
number = 123
address = None
}
},
'subreason': {
'id': 80,
'name': 'Sample name',
'is_active': True,
'created_at': '2022-07-18T18:33:28.911Z',
'deleted_at': None,
'complaint_id': 5,
},
此函数用于将数据从表加载到BigQuery:
def load_data(table_id, data):
print("load_data::Writing records to table", table_id)
job_config = bigquery.LoadJobConfig(
write_disposition="WRITE_APPEND",
schema=[
bigquery.SchemaField("id", "INT64"),
bigquery.SchemaField("user_id", "INT64"),
bigquery.SchemaField("log", "JSON"),
],
)
try:
start = time.time()
job = client.load_table_from_dataframe(
data, table_id, job_config=job_config
)
job.result()
end = time.time()
print("load_data::Time taken for writing " + str(data.shape[0]) + " records: ", end - start, "s")
except Exception as e:
print("load_data::exception", e)
print("load_data::Could not establish connection with Google BigQuery. Terminating program")
conn.close()
sys.exit()
但是,出现了一个例外,例外是“exception不能混合列表和非列表、非空值”。
我试着以这种方式更改模式:
schema=[
bigquery.SchemaField("id", "INT64"),
bigquery.SchemaField("user_id", "INT64"),
bigquery.SchemaField("log", "RECORD"), fields=
[
bigquery.SchemaField("reason", "RECORD", fields=
[
bigquery.SchemaField("id", "INT64"),
bigquery.SchemaField("name", "STRING")
bigquery.SchemaField("contact", "RECORD", fields=
[
bigquery.SchemaField("number", "STRING")
bigquery.SchemaField("address," "STRING"))
]
]),
bigquery.SchemaField("subreason", "RECORD", fields=
[
bigquery.SchemaField("id", "INT64"),
bigquery.SchemaField("name", "STRING")
bigquery.SchemaField("is_active", "BOOLEAN")
bigquery.SchemaField("created_at", "TIMESTAMP")
bigquery.SchemaField("deleted_at", "TIMESTAMP")
bigquery.SchemaField("complaint_id", "INT64")
]),
])
但是,我得到了异常“与类型dict:were expecting tuple of(key,value)pair“由于我是表中JSON列数据迁移的新手,有人能指导我解决这个问题吗?修改模式以接受JSON列进行迁移的正确方法是什么?
1条答案
按热度按时间pb3s4cty1#
您可以尝试并考虑以下方法。
在此方法中,您将在BigQuery中以
JSON
数据类型加载数据。但是,由于BigQuery接受换行符分隔的JSON进行数据获取,因此需要手动调整JSON文件。请参阅以下更新文件json file示例。注意,我将JSON压缩成一个名为
"log"
的键,并将其压缩成一行以满足换行符分隔JSON的要求。下面是我用来接收数据的python代码:
输出: