使用多个(python)客户端并行加载cassandra中的所有行

rqenqsqc  于 2021-06-14  发布在  Cassandra
关注(0)|答案(2)|浏览(456)

当使用cassandra推荐的randompartitioner(或Murrin3Partitioner)时,不可能对键执行有意义的范围查询,因为行是使用键的md5哈希分布在集群中的。这些散列称为“令牌”
尽管如此,通过为每个计算工作者分配一系列令牌,将一个大表拆分为多个计算工作者是非常有用的。使用cql3,似乎可以直接针对令牌发出查询,但是下面的python不起作用。。。编辑:在切换到对cassandra数据库(doh!)的最新版本进行测试之后工作,并根据以下注解更新语法:


## use python cql module

import cql

## If running against an old version of Cassandra, this raises:

## TApplicationException: Invalid method name: 'set_cql_version'

conn = cql.connect('localhost', cql_version='3.0.2')

cursor = conn.cursor()

try:
    ## remove the previous attempt to make this work
    cursor.execute('DROP KEYSPACE test;')
except Exception, exc:
    print exc

## make a keyspace and a simple table

cursor.execute("CREATE KEYSPACE test WITH strategy_class = 'SimpleStrategy' AND strategy_options:replication_factor = 1;")
cursor.execute("USE test;")
cursor.execute('CREATE TABLE data (k int PRIMARY KEY, v varchar);')

## put some data in the table -- must use single quotes around literals, not double quotes

cursor.execute("INSERT INTO data (k, v) VALUES (0, 'a');")
cursor.execute("INSERT INTO data (k, v) VALUES (1, 'b');")
cursor.execute("INSERT INTO data (k, v) VALUES (2, 'c');")
cursor.execute("INSERT INTO data (k, v) VALUES (3, 'd');")

## split up the full range of tokens.

## Suppose there are 2**k workers:

k = 3 # --> eight workers
token_sub_range = 2**(127 - k)
worker_num = 2 # for example
start_token =    worker_num  * token_sub_range
end_token = (1 + worker_num) * token_sub_range

## put single quotes around the token strings

cql3_command = "SELECT k, v FROM data WHERE token(k) >= '%d' AND token(k) < '%d';" % (start_token, end_token)
print cql3_command

## this fails with "ProgrammingError: Bad Request: line 1:28 no viable alternative at input 'token'"

cursor.execute(cql3_command)

for row in cursor:
    print row

cursor.close()
conn.close()

我很想用pycassa实现这一点,因为我更喜欢它的pythonic接口。
有没有更好的办法?

hmae6n7t

hmae6n7t1#

它不是cql3,但这里有一个简单的程序,可以直接使用thrift接口读取localhost拥有的所有(pickle)数据。这可以用来构建一个以cassandra为后端的简单map/reduce引擎。每个节点都会在属于自己的数据上运行类似于这样的to map(),这样就不会产生数据检索的网络开销。然后,结果将返回到单独节点上的reduce()阶段。
显然,对于cassandra1.2+中的vnodes,这并不是很好。我现在使用一种索引方法,允许在较小的局部数据子集上使用map(),并支持vnode。


# !/usr/bin/env python2.7

import sys
import socket
import cPickle as pickle
from thrift import Thrift
from thrift.transport import TTransport
from thrift.transport import TSocket
from pycassa.cassandra import Cassandra
from pycassa.cassandra.ttypes import *
import time
import pprint

def main():
    jobname = sys.argv[1]
    pp = pprint.PrettyPrinter(indent=2)

    (client, transport) = connect("localhost")

    # Determine local IP address
    ip = socket.gethostbyname(socket.gethostname())

    # Set up query
    keyspace = "data"
    column_parent = ColumnParent(column_family=foo)

    try:
        # Find range of tokens for which this node is first replica
        for tokenrange in client.describe_ring(keyspace):
            if tokenrange.endpoints[0] == ip:
                start_token=tokenrange.start_token
                end_token=tokenrange.end_token
                break

        # Set kesypace
        client.set_keyspace(keyspace)

        # Query for all data owned by this node
        slice_range = SliceRange(start="", finish="")
        predicate = SlicePredicate(slice_range=slice_range)
        keyrange = KeyRange(start_token=start_token, end_token=end_token, count=10000)
        t0 = time.time()
        ptime = 0
        keycount = 0
        start=""
        for keyslice in client.get_range_slices(column_parent, predicate, keyrange, ConsistencyLevel.ONE):
            keycount += 1
            for col in keyslice.columns:
                pt0 = time.time()
                data = pickle.loads(col.column.value)
                ptime += time.time() - pt0
    except Thrift.TException, tx:
        print 'Thrift: %s' % tx.message
    finally:
        disconnect(transport)

    t1 = time.time() - t0
    print "Read data for %d tasks in: %.2gs" %(keycount, t1)
    print "Job unpickling time: %.2gs" %ptime
    print "Unpickling percentage: %.2f%%" %(ptime/t1*100)

def connect(host):
    """ 
    Connect to cassandra instance on given host.
    Returns: Cassandra.Client object
    """
    socket = TSocket.TSocket(host, 9160)
    transport = TTransport.TFramedTransport(socket)
    protocol = TBinaryProtocol.TBinaryProtocolAccelerated(transport)
    transport.open()
    client = Cassandra.Client(protocol) 
    return (client, transport)

def disconnect(transport):
    """ 
    Disconnect from cassandra instance
    """
    transport.close()

if __name__ == '__main__':
    main()
hsgswve4

hsgswve42#

我已经更新了问题以包含答案。

相关问题