如何在Python中将包含JSON列的表从AWS RedShift迁移到BigQuery?

j0pj023g  于 2023-02-06  发布在  Python
关注(0)|答案(1)|浏览(113)

我想将数据表从AWS数据库迁移到BigQuery。我有一个名为sampletable的特定表,其中包括id、user_id和log。Log是一个JSON字段,其中包含一个字典,字典由键及其相应的值组成。

'reason': {
    'id': 5,
    'name': 'Sample name'
    'contact':  {
        number = 123
        address = None
     }
},
'subreason': {
    'id': 80,
    'name': 'Sample name',
    'is_active': True,
    'created_at': '2022-07-18T18:33:28.911Z',
    'deleted_at': None,
    'complaint_id': 5,
},

此函数用于将数据从表加载到BigQuery:

def load_data(table_id, data):
    print("load_data::Writing records to table", table_id)
    job_config = bigquery.LoadJobConfig(
        write_disposition="WRITE_APPEND",
        schema=[
            bigquery.SchemaField("id", "INT64"),
            bigquery.SchemaField("user_id", "INT64"),
            bigquery.SchemaField("log", "JSON"),
        ],
    )
    try:
        start = time.time()
        job = client.load_table_from_dataframe(
            data, table_id, job_config=job_config
        )
        job.result()
        end = time.time()
        print("load_data::Time taken for writing " + str(data.shape[0]) + " records: ", end - start, "s")
    except Exception as e:
        print("load_data::exception", e)
        print("load_data::Could not establish connection with Google BigQuery. Terminating program")
        conn.close()
        sys.exit()

但是,出现了一个例外,例外是“exception不能混合列表和非列表、非空值”。
我试着以这种方式更改模式:

schema=[
            bigquery.SchemaField("id", "INT64"),
            bigquery.SchemaField("user_id", "INT64"),
            bigquery.SchemaField("log", "RECORD"), fields=
            [
                bigquery.SchemaField("reason", "RECORD", fields=
                [
                    bigquery.SchemaField("id", "INT64"),
                    bigquery.SchemaField("name", "STRING")
                    bigquery.SchemaField("contact", "RECORD", fields=
                    [
                    bigquery.SchemaField("number", "STRING")
                    bigquery.SchemaField("address," "STRING"))
                    ]
                ]),
bigquery.SchemaField("subreason", "RECORD", fields=
                [
                    bigquery.SchemaField("id", "INT64"),
                    bigquery.SchemaField("name", "STRING")
                    bigquery.SchemaField("is_active", "BOOLEAN")
                    bigquery.SchemaField("created_at", "TIMESTAMP")
                    bigquery.SchemaField("deleted_at", "TIMESTAMP")
                    bigquery.SchemaField("complaint_id", "INT64")
                ]),
            ])

但是,我得到了异常“与类型dict:were expecting tuple of(key,value)pair“由于我是表中JSON列数据迁移的新手,有人能指导我解决这个问题吗?修改模式以接受JSON列进行迁移的正确方法是什么?

pb3s4cty

pb3s4cty1#

您可以尝试并考虑以下方法。
在此方法中,您将在BigQuery中以JSON数据类型加载数据。但是,由于BigQuery接受换行符分隔的JSON进行数据获取,因此需要手动调整JSON文件。请参阅以下更新文件json file示例。

{"log":{"reason":{"contact":{"address": null,"number": 123},"id": 5,"name": "Sample name"},"subreason": {"complaint_id": 5,"created_at": "2022-07-18T18:33:28.911Z","deleted_at": "None","id": 80,"is_active": true,"name": "Sample name"}}}

注意,我将JSON压缩成一个名为"log"的键,并将其压缩成一行以满足换行符分隔JSON的要求。

下面是我用来接收数据的python代码:

table_id = "your-project.-your-dataset.your-table"
file_path = "/path/of/your_json_file.json"

def load_table_file(file_path, table_id):

    # [START bigquery_load_from_file]
    from google.cloud import bigquery

    # Construct a BigQuery client object.
    client = bigquery.Client()

    # TODO(developer): Set table_id to the ID of the table to create.
    

    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, autodetect=True,
        #write_disposition="WRITE_APPEND",
        schema=[
            bigquery.SchemaField("log", "JSON"),
        ],
    )

    with open(file_path, "rb") as source_file:
        job = client.load_table_from_file(source_file, table_id, job_config=job_config)

    job.result()  # Waits for the job to complete.

    table = client.get_table(table_id)  # Make an API request.
    print(
        "Loaded {} rows and {} columns to {}".format(
            table.num_rows, len(table.schema), table_id
        )
    )
    # [END bigquery_load_from_file]
    return table

load_table_file(file_path, table_id)

输出:

相关问题