我将一个20亿行的大表从cassandra集群v2迁移到cassandra集群v3,我使用分页模式读取1000行的数据块。复印件需要11天。
有人试过这么长的副本吗?
6小时后,我在读取时出错:
com.datastax.driver.core.RequestHandler - Not retrying statement because it is not idempotent
在下面的代码中,它肯定与 ma_session.execute
.
代码是scala格式的。
def paginatedRead(boundStatement:BoundStatement, test:(Row) =>Boolean, f:(Row)=>ResultSetFuture):(Long,Long) ={
/*
Cette méthode lit une table en mode paginé d'après le boundstatement
Une fonction test permet de filtrer les record
une Fontion f va réaliser une écriture assynchrone
En sortie, les 2 compteurs nb lectures global et nb insertions
*/
var page:PagingState =null
var rsParcel:ResultSet=null
var newRow:Row=null
var cptGlobal=0L
cptExec=0L; cptInsert=0L
do {
if (page != null) boundStatement.setPagingState(page)
rsParcel = ma_session.execute(boundStatement.setFetchSize(fetchSize))
if (rsParcel !=null) {
page = rsParcel.getExecutionInfo.getPagingState
} else page = null
val nbRows = rsParcel.getAvailableWithoutFetching
val itParcel = rsParcel.iterator() //. take(nbRows)
for (i <- 0 until nbRows) {
newRow=itParcel.next
if (test(newRow)) {
addFuture (f(newRow))
}
cptGlobal += 1
if (cptGlobal % 100000==0) Init2.logger.warn("lecture paginated en cours lu: "+cptGlobal+" ecrit: "+cptInsert)
}
} while (page != null)
waitFuture
(cptGlobal,cptInsert)
}
暂无答案!
目前还没有任何答案,快来回答吧!