mongodb 如何按1000分块通读集合?

rryofs0p  于 2023-03-29  发布在  Go
关注(0)|答案(6)|浏览(111)

我需要用Python代码从MongoDB中读取整个集合(集合名称为“test”)。

self.__connection__ = Connection('localhost',27017)
    dbh = self.__connection__['test_db']            
    collection = dbh['test']

如何以1000为单位读取集合(以避免内存溢出,因为集合可能非常大)?

wvmv3b1j

wvmv3b1j1#

受@Rafael Valero +的启发,修复了他代码中的最后一个块错误并使其更通用,我创建了生成器函数来使用查询和投影迭代mongo集合:

def iterate_by_chunks(collection, chunksize=1, start_from=0, query={}, projection={}):
   chunks = range(start_from, collection.find(query).count(), int(chunksize))
   num_chunks = len(chunks)
   for i in range(1,num_chunks+1):
      if i < num_chunks:
          yield collection.find(query, projection=projection)[chunks[i-1]:chunks[i]]
      else:
          yield collection.find(query, projection=projection)[chunks[i-1]:chunks.stop]

例如,首先创建一个迭代器,如下所示:

mess_chunk_iter = iterate_by_chunks(db_local.conversation_messages, 200, 0, query={}, projection=projection)

然后按块迭代:

chunk_n=0
total_docs=0
for docs in mess_chunk_iter:
   chunk_n=chunk_n+1        
   chunk_len = 0
   for d in docs:
      chunk_len=chunk_len+1
      total_docs=total_docs+1
   print(f'chunk #: {chunk_n}, chunk_len: {chunk_len}')
print("total docs iterated: ", total_docs)

chunk #: 1, chunk_len: 400
chunk #: 2, chunk_len: 400
chunk #: 3, chunk_len: 400
chunk #: 4, chunk_len: 400
chunk #: 5, chunk_len: 400
chunk #: 6, chunk_len: 400
chunk #: 7, chunk_len: 281
total docs iterated:  2681
lqfhib0f

lqfhib0f2#

我同意Remon的观点,但你提到了1000个批次,他的答案并没有真正涵盖。你可以在光标上设置批次大小:

cursor.batch_size(1000);

您还可以跳过记录,例如:

cursor.skip(4000);

这是你要找的吗?这是一个有效的分页模式。但是,如果你只是想避免内存耗尽,那么你真的不需要设置批量大小或跳过。

hsvhsicv

hsvhsicv3#

使用游标。游标有一个“batchSize”变量,用于控制在执行查询后,每批实际发送给客户端的文档数量。您不必触摸此设置,因为默认值很好,并且在大多数驱动程序中,调用“getmore”命令的复杂性是隐藏的。我不熟悉pymongo,但它是这样工作的:

cursor = db.col.find() // Get everything!

while(cursor.hasNext()) {
    /* This will use the documents already fetched and if it runs out of documents in it's local batch it will fetch another X of them from the server (where X is batchSize). */
    document = cursor.next();

    // Do your magic here
}
gev0vcfq

gev0vcfq4#

这里有一个通用的解决方案,可以批量迭代任何迭代器或生成器:

def _as_batch(cursor, batch_size=50):
    # iterate over something (pymongo cursor, generator, ...) by batch. 
    # Note: the last batch may contain less than batch_size elements.
    batch = []
    try:
        while True:
            for _ in range(batch_size):
                batch.append(next(cursor))
            yield batch
            batch = []
    except StopIteration as e:
        if len(batch):
            yield batch

只要cursor定义了一个方法__next__(即我们可以使用next(cursor)),这就可以工作。因此,我们可以在原始游标或转换后的记录上使用它。

示例

简单用法:

for batch in db['coll_name'].find():
    # do stuff

更复杂的用法(例如,用于批量更新):

def update_func(doc):
    # dummy transform function
    doc['y'] = doc['x'] + 1
    return doc

query = (update_func(doc) for doc in db['coll_name'].find())
for batch in _as_batch(query):
    # do stuff

重新实现count()函数:

sum(map(len, _as_batch( db['coll_name'].find() )))
ukqbszuj

ukqbszuj5#

使用Pymongo在Python 2中创建初始连接:

host = 'localhost'
port = 27017
db_name = 'test_db'
collection_name = 'test'

使用MongoClient进行连接

# Connect to MongoDB
client = MongoClient(host=host, port=port)
# Make a query to the specific DB and Collection
dbh = client[dbname]
collection = dbh[collection_name]

**所以从这里正确的答案。**我想用块来阅读(在这个例子中大小为1000)。

chunksize = 1000

例如,我们可以决定我们想要多少大小的块(chunksize)。

# Some variables to create the chunks
skips_variable = range(0, db_aux[collection].find(query).count(), int(chunksize))
if len(skips_variable)<=1:
    skips_variable = [0,len(skips_variable)]

然后我们可以检索每个块。

for i in range(1,len(skips_variable)):

    # Expand the cursor and retrieve data 

    data_from_chunk = dbh[collection_name].find(query)[skips_variable[i-1]:skips_variable[i]]))

这里的query是query = {}
Here我使用类似的想法从MongoDB创建 Dataframe 。Here我使用类似的东西以块的形式写入MongoDB。
希望能有所帮助。

axzmvihb

axzmvihb6#

使用itertool.islice()generator 不会破坏MongoDB的惰性:

from itertools import islice

def batched(l, n):
    ll = iter(l)
    while (chunk := tuple(islice(ll, n)))
        yield chunk

chunk_size = int(5e5)
query = db_collection.find({}, trans_columns)
for chunk in batched(query, chunk_size):
    "Do something with the chunk..."

请注意,在即将到来的Python-3.12中添加了itertools.batched()

相关问题