如何在多线程中插入cassandra?

p8h8hvxi  于 2021-06-13  发布在  Cassandra
关注(0)|答案(1)|浏览(311)

我读了一个大文件要插入Cassandra。我试图创建一个cassandra会话,并将其传递给多个线程。我希望所有的线程使用同一个会话,以避免数千次的重新连接。
我发现池从未调用insert\u chunk函数,因为看不到日志记录。知道为什么会话不能在线程中传递给吗?我还尝试将session设置为全局而不是参数,可以调用insert\u块,但仍停留在session.prepare

def insert_chunk(chunk, session):
    logging.info("insert chunck started")
    prepared = session.prepare(...)
    for line in chunck:
       session.execute(prepared, values)

main():
        cluster = Cluster()
        session = cluster.connect()
        with open("BIG_JSON", mode="r") as _file:
            objects = ijson.items(_file, "item")
            with Pool(20) as pool:
                while True:
                    chunk = list(itertools.islice(objects, 10000))
                    if len(chunk) <= 0:
                        break
                    pool.apply_async(
                        insert_chunk,[chunk, session],
                    )
                pool.close()
                pool.join()
ht4b089n

ht4b089n1#

驱动程序文档说明:
上面讨论的所有模式都可以使用多处理模块在多个进程上使用。多个进程比多个线程具有更好的可扩展性,因此如果您的目标是高吞吐量,请考虑此选项。请确保不要在多个进程之间共享任何群集、会话或响应未来对象。这些对象都应该在分叉进程之后而不是之前创建。
以及描述如何正确操作的博客文章的链接。
但是,如果您需要从一个巨大的json文件加载数据,那么我有另一个建议-使用datastax的dsbulk工具(与dse和cassandra一起使用)。这个工具可以从json中读取数据,对数据的加载和卸载进行了大量优化,并且非常灵活地将数据从jsonMap到表中。
有一系列关于其用法的博客文章:
https://www.datastax.com/blog/2019/03/datastax-bulk-loader-introduction-and-loading
https://www.datastax.com/blog/2019/04/datastax-bulk-loader-more-loading
https://www.datastax.com/blog/2019/04/datastax-bulk-loader-common-settings
https://www.datastax.com/blog/2019/06/datastax-bulk-loader-unloading
https://www.datastax.com/blog/2019/07/datastax-bulk-loader-counting
https://www.datastax.com/blog/2019/12/datastax-bulk-loader-examples-loading-other-locations

相关问题