DataX 本地测试的时候,稍微大一点的表同步会一直打印读写速度为0,如何解决数据量大时速度为0的问题

xsuvu9jc  于 2022-11-19  发布在  其他
关注(0)|答案(8)|浏览(873)

尝试过运行时指定jvm内存参数,一时有用一时没用,也不清楚怎样调优,已经是没有设定byte和record的限制,就是这两个限制点是完全放开的
2021-06-07 10:44:03.816 [taskGroup-0] INFO Channel - Channel set byte_speed_limit to -1, No bps activated. 2021-06-07 10:44:03.816 [taskGroup-0] INFO Channel - Channel set record_speed_limit to -1, No tps activated.

2021-06-07 10:49:13.855 [job-0] INFO StandAloneJobContainerCommunicator - Total 0 records, 0 bytes | Speed 0B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 0.00% 2021-06-07 10:49:33.855 [job-0] INFO StandAloneJobContainerCommunicator - Total 0 records, 0 bytes | Speed 0B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 0.00% 2021-06-07 10:49:53.856 [job-0] INFO StandAloneJobContainerCommunicator - Total 0 records, 0 bytes | Speed 0B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 0.00% 2021-06-07 10:50:03.858 [job-0] INFO StandAloneJobContainerCommunicator - Total 0 records, 0 bytes | Speed 0B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 0.00% 2021-06-07 10:50:23.858 [job-0] INFO StandAloneJobContainerCommunicator - Total 0 records, 0 bytes | Speed 0B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 0.00% 2021-06-07 10:50:33.859 [job-0] INFO StandAloneJobContainerCommunicator - Total 0 records, 0 bytes | Speed 0B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 0.00% 2021-06-07 10:50:43.860 [job-0] INFO StandAloneJobContainerCommunicator - Total 0 records, 0 bytes | Speed 0B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 0.00% 2021-06-07 10:50:53.874 [job-0] INFO StandAloneJobContainerCommunicator - Total 0 records, 0 bytes | Speed 0B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 0.00% 2021-06-07 10:51:03.876 [job-0] INFO StandAloneJobContainerCommunicator - Total 0 records, 0 bytes | Speed 0B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 0.00% 2021-06-07 10:51:23.876 [job-0] INFO StandAloneJobContainerCommunicator - Total 0 records, 0 bytes | Speed 0B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 0.00% 2021-06-07 10:51:33.877 [job-0] INFO StandAloneJobContainerCommunicator - Total 0 records, 0 bytes | Speed 0B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 0.00% 2021-06-07 10:51:53.877 [job-0] INFO StandAloneJobContainerCommunicator - Total 0 records, 0 bytes | Speed 0B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 0.00%

4uqofj5v

4uqofj5v1#

看一下数据库的链接和后台processlist情况,以及数据库的资源使用情况.

uqxowvwt

uqxowvwt2#

我也遇到过这个情况,偶现,processlist中的sql一直在Sending data状态,暂时还没定位到问题原因。

SQL调优和索引处理后,相对好点了。

gmol1639

gmol16393#

try this:
add jdbc parameter to reader cnnfig, like:
jdbc:mysql://localhost:3306/db_name?sessionVariables=MAX_EXECUTION_TIME=68400000

tv6aics1

tv6aics14#

同样遇到这个问题,比较常现

91zkwejq

91zkwejq5#

脚本有设置model的updateKey或replacekey的话,可以设置成false或者为replaceKey设置索引再试试

bxfogqkk

bxfogqkk6#

出现原因:由于Starrocks设定了查询超时时间,DataX数据同步使用流式数据读取,导致数据读取超过了数据库指定的查询超时时间,数据读取被中断,DataX没有报错,出现了Speed一直为0的情况。

处理方法

  1. 可以暂时将数据库的query_timout参数调大,保证数据同步时间不会超过该值。
set global query_timeout=3000;
  1. 在当前SQL语句中设置query_timeout的值,详见: https://docs.starrocks.com/zh-cn/latest/reference/System_variable
SELECT /*+ SET_VAR(query_timeout = 1) */ name FROM people ORDER BY name;

具体说明:

  1. DataX的数据同步,采用的是使用java.sql.Statement从数据库拉取数据,并且将fetchSize设置成了Integer.MIN_VALUE, 该方式使用流数据接受方式,每次只从服务器接受部分数据,直到数据处理完毕。

源码如下:

/**
* 任务初始化
*/

public void init() {
    this.originalConfig = super.getPluginJobConf();

    Integer userConfigedFetchSize = this.originalConfig.getInt(Constant.FETCH_SIZE);
    if (userConfigedFetchSize != null) {
        LOG.warn("对 mysqlreader 不需要配置 fetchSize, mysqlreader 将会忽略这项配置. 如果您不想再看到此警告,请去除fetchSize 配置.");
    }
	// 默认被设置为Integer.MIN_VALUE
    this.originalConfig.set(Constant.FETCH_SIZE, Integer.MIN_VALUE);

    this.commonRdbmsReaderJob = new CommonRdbmsReader.Job(DATABASE_TYPE);
    this.commonRdbmsReaderJob.init(this.originalConfig);
}

/**
* 任务调用
**/
 public void startRead(RecordSender recordSender) {
    int fetchSize = this.readerSliceConfig.getInt(Constant.FETCH_SIZE);

    this.commonRdbmsReaderTask.startRead(this.readerSliceConfig, recordSender,
            super.getTaskPluginCollector(), fetchSize);
}

/**
* a wrapped method to execute select-like sql statement .
*
* @param conn         Database connection .
* @param sql          sql statement to be executed
* @param fetchSize
* @param queryTimeout unit:second
* @return
* @throws SQLException
*/
public static ResultSet query(Connection conn, String sql, int fetchSize, int queryTimeout)
    throws SQLException {
    // make sure autocommit is off
    conn.setAutoCommit(false);
    // ResultSet.RTYPE_FORWORD_ONLY,只可向前滚动;
    // ResultSet.CONCUR_READ_ONLY,指定不可以更新 ResultSet 
    Statement stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY,
                                          ResultSet.CONCUR_READ_ONLY);

    // 指定了fetchSize为Integer.MIN_VALUE
    stmt.setFetchSize(fetchSize);
    stmt.setQueryTimeout(queryTimeout);
    return query(stmt, sql);
}
  1. 数据库中配置了数据的查询超时时间,Starrocks中该配置名称为query_timeout。默认值为300s。如果一个查询持续时间超过了该参数的值,数据库就会返回查询超时错误。
show variables like '%timeout%';

# 结果如下:
interactive_timeout	3600
net_read_timeout	60
net_write_timeout	60
new_planner_optimize_timeout	3000
query_delivery_timeout	300
query_timeout	300
tx_visible_wait_timeout	10
wait_timeout	28800
  1. DataX未将该异常抛出,导致程序没有中止,实际数据库的查询已经结束,所有出现了Speed为0的现象。
2022-08-26 13:58:27.724 [job-0] INFO  StandAloneJobContainerCommunicator - Total 778208 records, 497121061 bytes | Speed 0B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.046s |  All Task WaitReaderTime 291.284s | Percentage 0.00%
2022-08-26 13:58:37.731 [job-0] INFO  StandAloneJobContainerCommunicator - Total 778208 records, 497121061 bytes | Speed 0B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.046s |  All Task WaitReaderTime 291.284s | Percentage 0.00%
  1. 代码调试,当超过query_timeout时,抛出如下错误。
经DataX智能分析,该任务最可能的错误原因是:
com.alibaba.datax.common.exception.DataXException: Code:[DBUtilErrorCode-07], Description:[读取数据库数据失败. 请检查您的配置的 column/table/where/querySql或者向 DBA 寻求帮助.].  - 执行的SQL为: select id,coordinate,latitude,domain_name,uri,url,user_name,city_name,city,district,province,postcode,country,first_name,first_romanized_name,last_name,name_female,ssn,phone_number,email,date,year_data,month_data,day_of_week,pystr,random_element,random_letter,company,company_suffix,company_prefix,company_email,sentence,text,word from test_v7  具体错误信息为:com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Query exceeded time limit of 30 seconds
	at com.alibaba.datax.common.exception.DataXException.asDataXException(DataXException.java:26)
	at com.alibaba.datax.plugin.rdbms.util.RdbmsException.asQueryException(RdbmsException.java:81)
	at com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader$Task.startRead(CommonRdbmsReader.java:220)
	at com.alibaba.datax.plugin.reader.mysqlreader.MysqlReader$Task.startRead(MysqlReader.java:81)
	at com.alibaba.datax.core.taskgroup.runner.ReaderRunner.run(ReaderRunner.java:57)
	at java.lang.Thread.run(Thread.java:748)
  1. 测试用例如下:

测试表信息:

  • 字段数:34
  • 表数据量:12965900条
  • 表大小:9.074 GB

不同query_timeout测试结果如下:

query_timeout值成功导出数据量
30s77792
300s778208
3000s7821522

job_json如下:

{
    "core":{
        "transport": {
            "channel":{
                "speed":{
                    "byte": 5242880
                }
            }
        }
    },
    "job": {
        "setting": {
            "speed": {
                "channel":1,
                "batchSize": 2048,
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "root",
                        "column" : [
                           "id","coordinate","latitude","domain_name","uri","url","user_name","city_name","city",
                           "district","province","postcode","country","first_name","first_romanized_name","last_name",
                           "name_female","ssn","phone_number","email","date","year_data","month_data","day_of_week","pystr",
                           "random_element","random_letter","company","company_suffix","company_prefix","company_email","sentence",
                           "text","word"
                        ],
                        "splitPk":"id",
                        "connection": [
                            {
                                "table": ["test_v7"],
                                "jdbcUrl": ["jdbc:mysql://192.168.20.213:9030/TEST?connectTimeout=60000000&socketTimeout=60000000"]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "txtfilewriter",
                    "parameter": {
                        "path": "E:\\opt",
                        "fileName": "text_datax_export",
                        "writeMode": "truncate",
                        "dateFormat": "yyyy-MM-dd"
                    }
                }
            }
        ]
    }
}

使用SQL设置变量的job如下

{
    "core":{
        "transport": {
            "channel":{
                "speed":{
                    "byte": 5242880
                }
            }
        }
    },
    "job": {
        "setting": {
            "speed": {
                "channel":1,
                "batchSize": 2048,
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "root",
                        "connection": [
                            {
                                "querySql": [
                                    "select /*+ SET_VAR(query_timeout = 100) */ id,coordinate,latitude,domain_name,uri,url,user_name,city_name,city,district,province,postcode,country,first_name,first_romanized_name,last_name,name_female,ssn,phone_number,email,date,year_data,month_data,day_of_week,pystr,random_element,random_letter,company,company_suffix,company_prefix,company_email,sentence,text,word from test_v7"
                                ],
                                "jdbcUrl": ["jdbc:mysql://192.168.20.213:9030/TEST"]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "txtfilewriter",
                    "parameter": {
                        "path": "E:\\opt",
                        "fileName": "text_datax_export",
                        "writeMode": "truncate",
                        "dateFormat": "yyyy-MM-dd"
                    }
                }
            }
        ]
    }
}
23c0lvtd

23c0lvtd7#

出现原因:由于Starrocks设定了查询超时时间,DataX数据同步使用流式数据读取,导致数据读取超过了数据库指定的查询超时时间,数据读取被中断,DataX没有报错,出现了Speed一直为0的情况。

处理方法

  1. 可以暂时将数据库的query_timout参数调大,保证数据同步时间不会超过该值。
set global query_timeout=3000;
  1. 在当前SQL语句中设置query_timeout的值,详见: https://docs.starrocks.com/zh-cn/latest/reference/System_variable
SELECT /*+ SET_VAR(query_timeout = 1) */ name FROM people ORDER BY name;

具体说明:

  1. DataX的数据同步,采用的是使用java.sql.Statement从数据库拉取数据,并且将fetchSize设置成了Integer.MIN_VALUE, 该方式使用流数据接受方式,每次只从服务器接受部分数据,直到数据处理完毕。

源码如下:

/**
* 任务初始化
*/

public void init() {
    this.originalConfig = super.getPluginJobConf();

    Integer userConfigedFetchSize = this.originalConfig.getInt(Constant.FETCH_SIZE);
    if (userConfigedFetchSize != null) {
        LOG.warn("对 mysqlreader 不需要配置 fetchSize, mysqlreader 将会忽略这项配置. 如果您不想再看到此警告,请去除fetchSize 配置.");
    }
	// 默认被设置为Integer.MIN_VALUE
    this.originalConfig.set(Constant.FETCH_SIZE, Integer.MIN_VALUE);

    this.commonRdbmsReaderJob = new CommonRdbmsReader.Job(DATABASE_TYPE);
    this.commonRdbmsReaderJob.init(this.originalConfig);
}

/**
* 任务调用
**/
 public void startRead(RecordSender recordSender) {
    int fetchSize = this.readerSliceConfig.getInt(Constant.FETCH_SIZE);

    this.commonRdbmsReaderTask.startRead(this.readerSliceConfig, recordSender,
            super.getTaskPluginCollector(), fetchSize);
}

/**
* a wrapped method to execute select-like sql statement .
*
* @param conn         Database connection .
* @param sql          sql statement to be executed
* @param fetchSize
* @param queryTimeout unit:second
* @return
* @throws SQLException
*/
public static ResultSet query(Connection conn, String sql, int fetchSize, int queryTimeout)
    throws SQLException {
    // make sure autocommit is off
    conn.setAutoCommit(false);
    // ResultSet.RTYPE_FORWORD_ONLY,只可向前滚动;
    // ResultSet.CONCUR_READ_ONLY,指定不可以更新 ResultSet 
    Statement stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY,
                                          ResultSet.CONCUR_READ_ONLY);

    // 指定了fetchSize为Integer.MIN_VALUE
    stmt.setFetchSize(fetchSize);
    stmt.setQueryTimeout(queryTimeout);
    return query(stmt, sql);
}
  1. 数据库中配置了数据的查询超时时间,Starrocks中该配置名称为query_timeout。默认值为300s。如果一个查询持续时间超过了该参数的值,数据库就会返回查询超时错误。
show variables like '%timeout%';

# 结果如下:
interactive_timeout	3600
net_read_timeout	60
net_write_timeout	60
new_planner_optimize_timeout	3000
query_delivery_timeout	300
query_timeout	300
tx_visible_wait_timeout	10
wait_timeout	28800
  1. DataX未将该异常抛出,导致程序没有中止,实际数据库的查询已经结束,所有出现了Speed为0的现象。
2022-08-26 13:58:27.724 [job-0] INFO  StandAloneJobContainerCommunicator - Total 778208 records, 497121061 bytes | Speed 0B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.046s |  All Task WaitReaderTime 291.284s | Percentage 0.00%
2022-08-26 13:58:37.731 [job-0] INFO  StandAloneJobContainerCommunicator - Total 778208 records, 497121061 bytes | Speed 0B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.046s |  All Task WaitReaderTime 291.284s | Percentage 0.00%
  1. 代码调试,当超过query_timeout时,抛出如下错误。
经DataX智能分析,该任务最可能的错误原因是:
com.alibaba.datax.common.exception.DataXException: Code:[DBUtilErrorCode-07], Description:[读取数据库数据失败. 请检查您的配置的 column/table/where/querySql或者向 DBA 寻求帮助.].  - 执行的SQL为: select id,coordinate,latitude,domain_name,uri,url,user_name,city_name,city,district,province,postcode,country,first_name,first_romanized_name,last_name,name_female,ssn,phone_number,email,date,year_data,month_data,day_of_week,pystr,random_element,random_letter,company,company_suffix,company_prefix,company_email,sentence,text,word from test_v7  具体错误信息为:com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Query exceeded time limit of 30 seconds
	at com.alibaba.datax.common.exception.DataXException.asDataXException(DataXException.java:26)
	at com.alibaba.datax.plugin.rdbms.util.RdbmsException.asQueryException(RdbmsException.java:81)
	at com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader$Task.startRead(CommonRdbmsReader.java:220)
	at com.alibaba.datax.plugin.reader.mysqlreader.MysqlReader$Task.startRead(MysqlReader.java:81)
	at com.alibaba.datax.core.taskgroup.runner.ReaderRunner.run(ReaderRunner.java:57)
	at java.lang.Thread.run(Thread.java:748)
  1. 测试用例如下:

测试表信息:

  • 字段数:34
  • 表数据量:12965900条
  • 表大小:9.074 GB

不同query_timeout测试结果如下:

query_timeout值 成功导出数据量
30s 77792
300s 778208
3000s 7821522
job_json如下:

{
    "core":{
        "transport": {
            "channel":{
                "speed":{
                    "byte": 5242880
                }
            }
        }
    },
    "job": {
        "setting": {
            "speed": {
                "channel":1,
                "batchSize": 2048,
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "root",
                        "column" : [
                           "id","coordinate","latitude","domain_name","uri","url","user_name","city_name","city",
                           "district","province","postcode","country","first_name","first_romanized_name","last_name",
                           "name_female","ssn","phone_number","email","date","year_data","month_data","day_of_week","pystr",
                           "random_element","random_letter","company","company_suffix","company_prefix","company_email","sentence",
                           "text","word"
                        ],
                        "splitPk":"id",
                        "connection": [
                            {
                                "table": ["test_v7"],
                                "jdbcUrl": ["jdbc:mysql://192.168.20.213:9030/TEST?connectTimeout=60000000&socketTimeout=60000000"]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "txtfilewriter",
                    "parameter": {
                        "path": "E:\\opt",
                        "fileName": "text_datax_export",
                        "writeMode": "truncate",
                        "dateFormat": "yyyy-MM-dd"
                    }
                }
            }
        ]
    }
}

使用SQL设置变量的job如下

{
    "core":{
        "transport": {
            "channel":{
                "speed":{
                    "byte": 5242880
                }
            }
        }
    },
    "job": {
        "setting": {
            "speed": {
                "channel":1,
                "batchSize": 2048,
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "root",
                        "connection": [
                            {
                                "querySql": [
                                    "select /*+ SET_VAR(query_timeout = 100) */ id,coordinate,latitude,domain_name,uri,url,user_name,city_name,city,district,province,postcode,country,first_name,first_romanized_name,last_name,name_female,ssn,phone_number,email,date,year_data,month_data,day_of_week,pystr,random_element,random_letter,company,company_suffix,company_prefix,company_email,sentence,text,word from test_v7"
                                ],
                                "jdbcUrl": ["jdbc:mysql://192.168.20.213:9030/TEST"]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "txtfilewriter",
                    "parameter": {
                        "path": "E:\\opt",
                        "fileName": "text_datax_export",
                        "writeMode": "truncate",
                        "dateFormat": "yyyy-MM-dd"
                    }
                }
            }
        ]
    }
}

我当时遇到的现象是,查询记录停留在sending data,不会断开。

相关问题