scala—flink重新启动时如何处理数据库连接

pdtvr36n  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(440)

我使用dbcp2.basicdatasource作为数据库连接池。在Map功能中使用数据库查询来获取传感器的附加信息;我发现,当flink作业由于异常而重新启动时,旧的db连接在服务器端仍然处于活动状态。
flink 1.7版
在这里输入源代码

object DbHelper extends Lazing with Logging {
    private lazy val connectionPool: BasicDataSource = createDataSource()

    private def createDataSource(): BasicDataSource = {
        val conn_str = props.getProperty("db.url")
        val conn_user = props.getProperty("db.user")
        val conn_pwd = props.getProperty("db.pwd")
        val initialSize = props.getProperty("db.initial.size", "3").toInt

        val bds = new BasicDataSource
        bds.setDriverClassName("org.postgresql.Driver")
        bds.setUrl(conn_str)
        bds.setUsername(conn_user)
        bds.setPassword(conn_pwd)
        bds.setInitialSize(initialSize)
        bds
    }
}
3bygqnnd

3bygqnnd1#

将Map功能更改为 RichMapFunction . 覆盖 close() 方法 RichMapFunction 把关闭数据库连接的代码放在那里。您可能需要在 open() 方法也是。

相关问题