我使用dbcp2.basicdatasource作为数据库连接池。在Map功能中使用数据库查询来获取传感器的附加信息;我发现,当flink作业由于异常而重新启动时,旧的db连接在服务器端仍然处于活动状态。
flink 1.7版
在这里输入源代码
object DbHelper extends Lazing with Logging {
private lazy val connectionPool: BasicDataSource = createDataSource()
private def createDataSource(): BasicDataSource = {
val conn_str = props.getProperty("db.url")
val conn_user = props.getProperty("db.user")
val conn_pwd = props.getProperty("db.pwd")
val initialSize = props.getProperty("db.initial.size", "3").toInt
val bds = new BasicDataSource
bds.setDriverClassName("org.postgresql.Driver")
bds.setUrl(conn_str)
bds.setUsername(conn_user)
bds.setPassword(conn_pwd)
bds.setInitialSize(initialSize)
bds
}
}
1条答案
按热度按时间3bygqnnd1#
将Map功能更改为
RichMapFunction
. 覆盖close()
方法RichMapFunction
把关闭数据库连接的代码放在那里。您可能需要在open()
方法也是。