apache flink jdbc inputformat抛出java.net.socketexception:套接字关闭

gijlo24d  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(475)

我正在使用flink数据集api查询oracle数据库。为此,我定制了flinkjdbcinputformat以返回java.sql.resultset。因为我需要使用flink操作符对resultset执行进一步的操作。

public static void main(String[] args) throws Exception { 

    ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
    environment.setParallelism(1);
    @SuppressWarnings("unchecked")
    DataSource<ResultSet> source
            = environment.createInput(JDBCInputFormat.buildJDBCInputFormat()
                    .setUsername("username")
                    .setPassword("password")
                    .setDrivername("driver_name")
                    .setDBUrl("jdbcUrl")
                    .setQuery("query")
                    .finish(),      
                    new GenericTypeInfo<ResultSet>(ResultSet.class)
            );
    source.print();

    environment.execute();

}

以下是定制的jdbcinputformat:

public class JDBCInputFormat extends RichInputFormat<ResultSet, InputSplit> implements ResultTypeQueryable {

@Override
public void open(InputSplit inputSplit) throws IOException {
                Class.forName(drivername);
                    dbConn = DriverManager.getConnection(dbURL, username, password);
                statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency);
                resultSet = statement.executeQuery();
}

@Override
public void close() throws IOException {
            if(statement != null) {
                    statement.close();
                }
                if(resultSet != null) 
                    resultSet.close();
                if(dbConn != null) {
                    dbConn.close();
                }
}

@Override
public boolean reachedEnd() throws IOException {
        isLastRecord = resultSet.isLast();
    return isLastRecord;
}

@Override
public ResultSet nextRecord(ResultSet row) throws IOException{  
        if(!isLastRecord){              
            resultSet.next();
        }
        return resultSet;
}

}
这适用于以下对获取的行有限制的查询:从xyz中选择a、b、c,其中rownum<=10;但是,当我尝试获取包含大约100万数据的所有行时,在获取随机数目的行之后,我得到以下异常:

java.sql.SQLRecoverableException: Io exception: Socket closed
at oracle.jdbc.driver.SQLStateMapping.newSQLException(SQLStateMapping.java:101)
at oracle.jdbc.driver.DatabaseError.newSQLException(DatabaseError.java:133)
at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:199)
at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:263)
at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:521)
at oracle.jdbc.driver.T4CPreparedStatement.fetch(T4CPreparedStatement.java:1024)
at oracle.jdbc.driver.OracleResultSetImpl.close_or_fetch_from_next(OracleResultSetImpl.java:314)
at oracle.jdbc.driver.OracleResultSetImpl.next(OracleResultSetImpl.java:228)
at oracle.jdbc.driver.ScrollableResultSet.cacheRowAt(ScrollableResultSet.java:1839)
at oracle.jdbc.driver.ScrollableResultSet.isValidRow(ScrollableResultSet.java:1823)
at oracle.jdbc.driver.ScrollableResultSet.isLast(ScrollableResultSet.java:349)
at JDBCInputFormat.reachedEnd(JDBCInputFormat.java:98)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

原因:java.net.socketexception:套接字在java.net.socketoutputstream.socketwrite0处关闭(本机方法)
就我而言,我该如何解决这个问题?

ffscu2ro

ffscu2ro1#

我认为不可能运送 ResultSet 就像一张普通唱片。这是一个有状态对象,它在内部维护到数据库服务器的连接。使用 ResultSet 因为在flink操作符之间传输的记录意味着它可以被序列化,通过网络传送到另一台机器,反序列化,并在不同的jvm进程中传递到不同的线程。那是行不通的。
取决于连接a ResultSet 也可以留在同一台机器上的同一个线程中,这可能是对您有效的情况。如果要从操作符中查询数据库,可以将函数实现为 RichMapPartitionFunction . 否则,我会读 ResultSet 并转发生成的行。

相关问题