kotlin 在KTor中与Koin共享Exposed的自定义调度程序的好模式是什么

bhmjp9jg  于 2023-03-03  发布在  Kotlin
关注(0)|答案(1)|浏览(170)

我是Kotlin的忠实粉丝已经很长时间了,但是一般来说,我只是把它作为Java的替代品,比如传统的Java库和框架,比如Spring。我现在正在探索“纯Kotlin”解决方案的兔子洞。其中一部分是用KTor、Koin和Exposed做一个小练习项目。
我喜欢Kotlin协程的非阻塞行为,但显然暴露在表面之下使用纯粹的阻塞API。由于数据库操作最初受到连接池大小的限制,这不是世界末日。
开始,我有一个非常好的、简洁的、有效的解决方案,我使用Dispatchers.IO执行所有的Exposed操作,这允许协程以非阻塞的方式处理请求,同时也使用Exposed。

fun Route.getAllPeople() {
  get("/people") {
    val list = newSuspendedTransaction(Dispatchers.IO) { Person.all().toList() }
    call.respond(list.map { PersonResponse(id = it.id.value, name = it.name, age = it.age) })
  }
}

我最不喜欢的是依赖于默认的IO调度器,它有64个线程,对于我的需求来说已经足够了,但是我认为线程的数量和数据库连接池中的连接数量应该是相同的,因此,我想使用一个定制的调度器专门用于涉及这个数据源的操作。
配置这样的调度器相当简单,如下所示,我还可以将调度器共享为Koin依赖项:

fun Application.databaseKoin() {
  val config =
      HikariConfig().apply {
        jdbcUrl = environment.config.property("postgres.jdbcUrl").getString()
        username = environment.config.property("postgres.username").getString()
        password = environment.config.property("postgres.password").getString()
      }

  // I know this defaults to 10, but the point is I can customize the connection pool and therefore the dispatcher
  val postgresPool = Dispatchers.IO.limitedParallelism(config.maximumPoolSize)

  koin {
    val module = module {
      single<DataSource> { HikariDataSource(config) }
      single<CoroutineDispatcher>(named("postgresPool")) { postgresPool }
    }
    modules(module)
  }
}

我在使用这种方法时遇到的主要限制是需要显式地将调度器注入到任何我想使用它的地方。这是我的代码的额外开销,我个人并不喜欢,因为它导致需要在任何地方编写类似这样的代码:

fun Route.getAllPeople() {
  val dispatcher by inject<CoroutineDispatcher>(named("postgresPool"))
  get("/people") {
    val list = newSuspendedTransaction(dispatcher) { Person.all().toList() }
    call.respond(list.map { PersonResponse(id = it.id.value, name = it.name, age = it.age) })
  }
}

显然这不是世界末日,但我不喜欢。
是的,没有强制性的理由要这样做,而不是使用Dispatchers.IO。这个项目当然很简单,这不是什么大事,但我们的目标是学习更多关于这些工具的知识,并充分理解它们,以便在未来更大的项目中利用它们。
我可以创建自己的suspended transaction函数,问题是我不知道如何访问其中的Koin依赖项:

suspend fun <T> mySuspendedTransaction(
  db: Database? = null,
  transactionIsolation: Int? = null,
  statement: suspend Transaction.() -> T
): T {
  val postgresPoolDispatcher = TODO("???")
  return newSuspendedTransaction(postgresPoolDispatcher, db, transactionIsolation, statement)
}

或者,也许有一些方法可以设置和共享这个调度程序,并将其与我的池大小绑定,而不需要Koin?
我真的很希望能得到一些指导。我知道我正在掉进一个兔子洞,但我正在尝试探索新的做事方法。先谢谢你。

kxxlusnw

kxxlusnw1#

这个问题是基于个人观点的,而且,我对Exposed不是很熟悉,所以如果我提供了一些错误的信息,我道歉,但是:
1.我看到你没有提供dbmySuspendedTransaction(),也没有使用任何其他类型的DB句柄。这是否意味着这个函数访问全局存储的数据库句柄?如果是这样,我看不出有什么理由不对调度器做同样的事情。如果实际上你注入了db句柄,你可以有一个对象来同时保存db和调度器。
1.我不确定你限制并行度的想法是否正确。即使你限制了并行度,也没有限制并发运行的协程的数量。例如:

val dispatcher = Dispatchers.IO.limitedParallelism(8)
repeat(16) {
    launch {
        newSuspendedTransaction(dispatcher) {
            delay(1000)
        }
    }
}

在这种情况下,16个协程都将尝试获取一个事务/连接。这是因为您限制了并行性,并且您更希望限制并发性。您可以使用信号量来实现后者:

val semaphore = Semaphore(8)
repeat(16) {
    launch { 
        semaphore.withPermit {
            newSuspendedTransaction {
                delay(1000)
            }
        }
    }
}

但这并不能解决您的问题,因为您仍然需要在任何地方访问信号量。
1.我不确定你是否需要在第一个地方限制任何东西。通常,我们创建的线程池和连接池大小相同,因为如果我们使用的线程比连接多,那么过多的线程将等待获得连接,所以这将是浪费。在这种情况下,我们使用一个挂起函数启动一个事务,所以可能它已经以一种不阻塞等待连接的方式实现了。但是暂停我肯定会先测试这个。

相关问题