unicodedecodeerror with mysql to cloud storage bucket dag

h43kikqp  于 2021-06-25  发布在  Mysql
关注(0)|答案(1)|浏览(437)

我创建了一个dag,它从数据库中提取mysql数据并将其加载到云存储,然后将bigquery作为json文件。
dag适用于某些表,但不是所有表,因为它不能解码表中的某些字符。这是相当多的数据,所以我不能准确指出错误或无效字符的位置。
我尝试过将数据库、表和列字符集从utf8更改为utf8mb4。这没用。
我也试过调用encoding='utf-8'和'iso-8859-1',但是我认为我没有正确地调用它们,因为我一直在用我的连接来做这件事,我仍然得到相同的错误。
我正在运行Python2.7.12和airflow v1.8.0
更新:阅读以下内容后:https://cwiki.apache.org/confluence/display/airflow/common+pitfalls 建议使用定义字符集的连接字符串,例如:sql\u alchemy\u conn=mysql://airflow@localhost:3306/气流?字符集=utf8
如何使用云sql示例实现这一点?

podio_connections = [
    'mysql_connection'
]

podio_tables = [
     'finance_banking_details',
     'finance_goods_invoices',
]

default_args = {
    'owner': 'xxxxxx',
    'start_date': datetime(2018,1,11),
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG('mysql_to_bigquery', default_args=default_args, schedule_interval='@daily')

slack_notify = SlackAPIPostOperator(
    task_id='slack_notify',
    token='xxxxxx',
    channel='data-status',
    username='airflow',
    text='Successfully performed Podio ETL operation',
    dag=dag)

for connection in podio_connections:
    for table in podio_tables:
        extract = MySqlToGoogleCloudStorageOperator(
            task_id="extract_mysql_%s_%s"%(connection,table),
            mysql_conn_id=connection,
            google_cloud_storage_conn_id='gcp_connection',
            sql="SELECT *, '%s' as source FROM podiodb.%s"%(connection,table),
            bucket='podio-reader-storage',
            filename="%s/%s/%s{}.json"%(connection,table,table),            
            schema_filename="%s/schemas/%s.json"%(connection,table),
        dag=dag)

        load = GoogleCloudStorageToBigQueryOperator(
            task_id="load_bg_%s_%s"%(connection,table),
            bigquery_conn_id='gcp_connection',
            google_cloud_storage_conn_id='gcp_connection',
            bucket='podio-reader-storage',
            #destination_project_dataset_table="podio-data.%s.%s"%(connection,table),
            destination_project_dataset_table = "podio-data.podio_data1.%s"%(table),
            source_objects=["%s/%s/%s*.json"%(connection,table,table)],
            schema_object="%s/schemas/%s.json"%(connection,table),
            source_format='NEWLINE_DELIMITED_JSON',
            write_disposition='WRITE_TRUNCATE',
            dag=dag)

        load.set_upstream(extract)
        slack_notify.set_upstream(load)

[2018-01-12 15:36:10221]{models.py:1417}错误-'utf8'编解码器无法解码第36位的字节0x96:无效的起始字节
回溯(最近一次呼叫):
文件“/usr/local/lib/python2.7/dist packages/airflow/models.py”,第1374行,in run result=task\u copy.execute(context=context)
file“/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/mysql\u to \u gcs.py”,第91行,在execute files\u to \u upload=self.写入\u本地\u数据\u文件(光标)
file“/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/mysql\u to\u gcs.py”,第136行,in\u write\u local\u data\u files json.dump(row\u dict,tmp\u file\u handle)
文件“/usr/lib/python2.7/json/init.py”,第189行,位于iterable中chunk的dump中:
文件“/usr/lib/python2.7/json/encoder.py”,第434行,位于\u iterencode中,用于\u iterencode\u dict中的块(o,\u current\u indent\u level):
文件“/usr/lib/python2.7/json/encoder.py”,\u iterencode\u dict yield\u encoder(value)中的第390行
unicodedecodeerror:“utf8”编解码器无法解码位置36中的字节0x96:起始字节无效

tyg4sfes

tyg4sfes1#

96 拉丁文十六进制表示“en破折号”。要么将数据更改为utf8,要么将到mysql的连接更改为使用charset latin1。

相关问题