如何用golang查询将数据流传输到cassandra

smdncfj3  于 2021-06-13  发布在  Cassandra
关注(0)|答案(2)|浏览(498)

我有以下代码:

cluster := gocql.NewCluster("our-cass")
cass, err := cluster.CreateSession()
defer cass.Close()
iter := cass.Query(`SELECT * FROM cmuser.users LIMIT 9999999999;`).Iter()
c :=iter.Columns()
scanArgs := make([]interface{}, len(c))

for i:=0; i < len(scanArgs); i++ {
    scanArgs[i] = makeType(c[i])
}

for iter.Scan(scanArgs...) { ... }

问题是表中的行太多了。但是我需要读取所有的数据,将数据迁移到另一个数据库。有没有办法把Cassandra的数据流化?不幸的是,我们没有表主键的序列,我们使用的是用于pk的uuid。所以这意味着我们不能用2 for循环的简单技术,一个递增一个计数器,然后以这种方式遍历所有行。

jk9hmnmh

jk9hmnmh1#

gocql有一些分页选项(假设您的cassandra版本至少是版本2)。
戈科尔的 Session 有一个方法setpagesize
还有gocql的 Query 有一个类似的方法,pagesize
这可能有助于分解查询。代码如下所示:

cluster := gocql.NewCluster("our-cass")
cass, err := cluster.CreateSession()
defer cass.Close()

iter := cass.Query(`SELECT * FROM cmuser.users;`).PageSize(5000).Iter()

// use the iter as usual to iterate over all results 
// this will send additional CQL queries when it needs to get new pages
kcugc4gi

kcugc4gi2#

对于一种纯粹的cassandra方法,您可以在每个节点分解令牌范围时对其运行范围查询。
首先,找到标记范围:

$ nodetool ring

Datacenter: dc1
==========
Address   Rack       Status State   Load         Owns                Token
                                                                      8961648479018332584
10.1.4.3  rack3      Up     Normal  1.34 GB      100.00%             -9023369133424793632
10.1.4.1  rack2      Up     Normal  1.56 GB      100.00%             -7946127339777435347
10.1.4.3  rack3      Up     Normal  1.34 GB      100.00%             -7847456805881540087
...

等等…(这可能很大,取决于每个节点上节点和令牌的数量)
然后调整查询以使用 token() 分区键上的函数。因为我不知道你的主键定义是什么,我将猜测并使用 users_id 作为分区键:

SELECT * FROM cmuser.users
WHERE token(users_id) > 8961648479018332584
  AND token(users_id) <= -9023369133424793632;

完成后,移动到下一个令牌范围:

SELECT * FROM cmuser.users
WHERE token(users_id) > -9023369133424793632
  AND token(users_id) <= -7946127339777435347;

像这样分解查询将有助于确保它一次只能从单个节点读取。这应该允许查询从集群(和磁盘)顺序读取数据,而不必担心超时。

相关问题