重点。
并行处理2000个查询时,datastax会话挂起。
并行查询
我使用的是alpakka,它 Package 了datasax cassandra驱动程序。我正在使用scala play框架。
要对大数据进行行计数,必须按分区进行。我使用以下代码计算每个分区的行数:
val futureList: ListBuffer[Future[Any]] = new ListBuffer[Future[Any]]
val acc: mutable.HashMap[String, Long] = new mutable.HashMap[String, Long]()
targets.isDefined match {
case true =>
targets.get.foreach {
e =>
val cq: CassandraQueries = new CassandraQueries()
Logger.info("targets collected so far: "+acc.size)
Logger.info("Calling count for "+e._1)
futureList += cq.futureQuery("SELECT count(*) FROM " + keyspaceName + ".\"sparseData\" where label = " + e._2 + ";", sparseRowCountResult(acc, e._1), 120000)
}
val results = Future.sequence(futureList.toList)
在我的一个键空间中,我有2000个分区,因此有2000个并行查询。
查询结果
查询由alpakka/datastax处理并返回 Future[Seq[Row]].
```
Logger.info("furtureQuery: session closed -> "+ session.isClosed)
val stmt = new SimpleStatement(query).setFetchSize(200).setReadTimeoutMillis(readTimeoutMillis)
val sb: StringBuilder = new StringBuilder()
val source = CassandraSource(stmt)
source.runWith(Sink.seq).onComplete {
case Success(f) => out(Some(f), None)
case Failure(e) =>
Logger.error("simpleQuery failed with " + e.getMessage)
out(None, Some(e.getMessage))
}
异常和挂起在大约1000个查询之后,我得到以下错误。在此之后,会话没有返回任何内容。都不是 `Success` 也不是 `Failure` 发生。
akka.configurationexception:由于[akka.event.logging$loggerinitializationexception:logger log1 logging$defaultlogger没有响应loggerinitialized,而是发送了[timeout],因此无法加载配置中指定的记录器[akka.event.logging$defaultlogger]
问题
我确信我可以延长日志记录的超时时间。但这只是症状,不是真正的问题。
我该如何:
配置会话连接以允许2000个并行请求?
或
将future.sequence限制为已知数量的可能请求?
也
如何以编程方式从这种会话挂起中恢复?
2条答案
按热度按时间i86rm4rw1#
而是触发2000个查询来执行范围查询。利用集群对象metatdata,获取令牌范围,计算密钥的令牌。然后,在一个范围查询中批处理属于同一范围的查询。
kmbjn2e32#
通过在创建群集示例时指定池选项,可以增加每个连接的运行中请求数,如下所示:
但你还是需要处理
BusyPoolException
在代码中,因为在使用异步请求时,仍然很容易重载一个特定的连接。更多信息请参见驾驶员文档。