datastax会话因大量并行查询而挂起

w6mmgewl  于 2021-06-15  发布在  Cassandra
关注(0)|答案(2)|浏览(244)

重点。
并行处理2000个查询时,datastax会话挂起。
并行查询
我使用的是alpakka,它 Package 了datasax cassandra驱动程序。我正在使用scala play框架。
要对大数据进行行计数,必须按分区进行。我使用以下代码计算每个分区的行数:

val futureList: ListBuffer[Future[Any]] = new ListBuffer[Future[Any]]
  val acc: mutable.HashMap[String, Long] = new mutable.HashMap[String, Long]()
  targets.isDefined match {
    case true =>
      targets.get.foreach {
        e =>
          val cq: CassandraQueries = new CassandraQueries()
          Logger.info("targets collected so far: "+acc.size)
          Logger.info("Calling count for "+e._1)

          futureList += cq.futureQuery("SELECT count(*) FROM " + keyspaceName + ".\"sparseData\" where label = " + e._2 + ";", sparseRowCountResult(acc, e._1), 120000)
      }

      val results = Future.sequence(futureList.toList)

在我的一个键空间中,我有2000个分区,因此有2000个并行查询。
查询结果
查询由alpakka/datastax处理并返回 Future[Seq[Row]]. ```
Logger.info("furtureQuery: session closed -> "+ session.isClosed)
val stmt = new SimpleStatement(query).setFetchSize(200).setReadTimeoutMillis(readTimeoutMillis)
val sb: StringBuilder = new StringBuilder()
val source = CassandraSource(stmt)
source.runWith(Sink.seq).onComplete {
case Success(f) => out(Some(f), None)

  case Failure(e) =>
    Logger.error("simpleQuery failed with " + e.getMessage)
    out(None, Some(e.getMessage))

}
异常和挂起在大约1000个查询之后,我得到以下错误。在此之后,会话没有返回任何内容。都不是 `Success` 也不是 `Failure` 发生。
akka.configurationexception:由于[akka.event.logging$loggerinitializationexception:logger log1 logging$defaultlogger没有响应loggerinitialized,而是发送了[timeout],因此无法加载配置中指定的记录器[akka.event.logging$defaultlogger]
问题
我确信我可以延长日志记录的超时时间。但这只是症状,不是真正的问题。
我该如何:
配置会话连接以允许2000个并行请求?
或
将future.sequence限制为已知数量的可能请求?
也
如何以编程方式从这种会话挂起中恢复?
i86rm4rw

i86rm4rw1#

而是触发2000个查询来执行范围查询。利用集群对象metatdata,获取令牌范围,计算密钥的令牌。然后,在一个范围查询中批处理属于同一范围的查询。

kmbjn2e3

kmbjn2e32#

通过在创建群集示例时指定池选项,可以增加每个连接的运行中请求数,如下所示:

PoolingOptions poolingOptions = new PoolingOptions();
poolingOptions.setMaxRequestsPerConnection(HostDistance.LOCAL, 10240);

Cluster cluster = Cluster.builder()
    .withContactPoints("127.0.0.1")
    .withPoolingOptions(poolingOptions)
    .build();

但你还是需要处理 BusyPoolException 在代码中,因为在使用异步请求时,仍然很容易重载一个特定的连接。
更多信息请参见驾驶员文档。

相关问题