cassandra长运行查询分页

muk1a3rh  于 2021-06-10  发布在  Cassandra
关注(0)|答案(0)|浏览(314)

我将一个20亿行的大表从cassandra集群v2迁移到cassandra集群v3,我使用分页模式读取1000行的数据块。复印件需要11天。
有人试过这么长的副本吗?
6小时后,我在读取时出错:

  1. com.datastax.driver.core.RequestHandler - Not retrying statement because it is not idempotent

在下面的代码中,它肯定与 ma_session.execute .
代码是scala格式的。

  1. def paginatedRead(boundStatement:BoundStatement, test:(Row) =>Boolean, f:(Row)=>ResultSetFuture):(Long,Long) ={
  2. /*
  3. Cette méthode lit une table en mode paginé d'après le boundstatement
  4. Une fonction test permet de filtrer les record
  5. une Fontion f va réaliser une écriture assynchrone
  6. En sortie, les 2 compteurs nb lectures global et nb insertions
  7. */
  8. var page:PagingState =null
  9. var rsParcel:ResultSet=null
  10. var newRow:Row=null
  11. var cptGlobal=0L
  12. cptExec=0L; cptInsert=0L
  13. do {
  14. if (page != null) boundStatement.setPagingState(page)
  15. rsParcel = ma_session.execute(boundStatement.setFetchSize(fetchSize))
  16. if (rsParcel !=null) {
  17. page = rsParcel.getExecutionInfo.getPagingState
  18. } else page = null
  19. val nbRows = rsParcel.getAvailableWithoutFetching
  20. val itParcel = rsParcel.iterator() //. take(nbRows)
  21. for (i <- 0 until nbRows) {
  22. newRow=itParcel.next
  23. if (test(newRow)) {
  24. addFuture (f(newRow))
  25. }
  26. cptGlobal += 1
  27. if (cptGlobal % 100000==0) Init2.logger.warn("lecture paginated en cours lu: "+cptGlobal+" ecrit: "+cptInsert)
  28. }
  29. } while (page != null)
  30. waitFuture
  31. (cptGlobal,cptInsert)
  32. }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题