我读了一个大文件要插入Cassandra。我试图创建一个cassandra会话,并将其传递给多个线程。我希望所有的线程使用同一个会话,以避免数千次的重新连接。
我发现池从未调用insert\u chunk函数,因为看不到日志记录。知道为什么会话不能在线程中传递给吗?我还尝试将session设置为全局而不是参数,可以调用insert\u块,但仍停留在session.prepare
def insert_chunk(chunk, session):
logging.info("insert chunck started")
prepared = session.prepare(...)
for line in chunck:
session.execute(prepared, values)
main():
cluster = Cluster()
session = cluster.connect()
with open("BIG_JSON", mode="r") as _file:
objects = ijson.items(_file, "item")
with Pool(20) as pool:
while True:
chunk = list(itertools.islice(objects, 10000))
if len(chunk) <= 0:
break
pool.apply_async(
insert_chunk,[chunk, session],
)
pool.close()
pool.join()
1条答案
按热度按时间ht4b089n1#
驱动程序文档说明:
上面讨论的所有模式都可以使用多处理模块在多个进程上使用。多个进程比多个线程具有更好的可扩展性,因此如果您的目标是高吞吐量,请考虑此选项。请确保不要在多个进程之间共享任何群集、会话或响应未来对象。这些对象都应该在分叉进程之后而不是之前创建。
以及描述如何正确操作的博客文章的链接。
但是,如果您需要从一个巨大的json文件加载数据,那么我有另一个建议-使用datastax的dsbulk工具(与dse和cassandra一起使用)。这个工具可以从json中读取数据,对数据的加载和卸载进行了大量优化,并且非常灵活地将数据从jsonMap到表中。
有一系列关于其用法的博客文章:
https://www.datastax.com/blog/2019/03/datastax-bulk-loader-introduction-and-loading
https://www.datastax.com/blog/2019/04/datastax-bulk-loader-more-loading
https://www.datastax.com/blog/2019/04/datastax-bulk-loader-common-settings
https://www.datastax.com/blog/2019/06/datastax-bulk-loader-unloading
https://www.datastax.com/blog/2019/07/datastax-bulk-loader-counting
https://www.datastax.com/blog/2019/12/datastax-bulk-loader-examples-loading-other-locations