cassandra长运行查询分页

muk1a3rh  于 2021-06-10  发布在  Cassandra
关注(0)|答案(0)|浏览(278)

我将一个20亿行的大表从cassandra集群v2迁移到cassandra集群v3,我使用分页模式读取1000行的数据块。复印件需要11天。
有人试过这么长的副本吗?
6小时后,我在读取时出错:

com.datastax.driver.core.RequestHandler - Not retrying statement because it is not idempotent

在下面的代码中,它肯定与 ma_session.execute .
代码是scala格式的。

def paginatedRead(boundStatement:BoundStatement, test:(Row) =>Boolean, f:(Row)=>ResultSetFuture):(Long,Long) ={
    /*
     Cette méthode lit une table en mode paginé d'après le boundstatement
     Une fonction test permet de filtrer les record
     une Fontion f va réaliser une écriture assynchrone
     En sortie, les 2 compteurs nb lectures global et nb insertions
     */
    var page:PagingState =null
    var rsParcel:ResultSet=null

    var newRow:Row=null
    var cptGlobal=0L

    cptExec=0L; cptInsert=0L
    do {
      if (page != null) boundStatement.setPagingState(page)
      rsParcel = ma_session.execute(boundStatement.setFetchSize(fetchSize))
      if (rsParcel !=null) {
        page = rsParcel.getExecutionInfo.getPagingState
      } else page = null
      val nbRows = rsParcel.getAvailableWithoutFetching

      val itParcel = rsParcel.iterator() //. take(nbRows)

      for (i <- 0 until nbRows) {
        newRow=itParcel.next
        if (test(newRow)) {
          addFuture (f(newRow))
        }
        cptGlobal += 1
        if (cptGlobal % 100000==0) Init2.logger.warn("lecture paginated en cours lu: "+cptGlobal+"  ecrit: "+cptInsert)
      }

    }  while (page != null)
    waitFuture
    (cptGlobal,cptInsert)

  }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题