mongodb 将大型XML文件直接从GridFS流式传输到xmltodict解析

bq3bfh9z  于 2023-04-05  发布在  Go
关注(0)|答案(1)|浏览(142)

bounty已结束。回答此问题可获得+50声望奖励。奖励宽限期将在7小时后结束。Shiladitya Bose正在寻找来自信誉良好的来源的答案

我正在使用Motor进行异步MongoDB操作。我有一个gridfs存储,我将大型XML文件(通常大小为30+ MB)存储在8 MB的块中。我想使用xmltodict增量解析XML文件。下面是我的代码。

async def read_file(file_id):
    gfs_out: AsyncIOMotorGridOut = await gfs_bucket.open_download_stream(file_id)

    tmpfile = tempfile.SpooledTemporaryFile(mode="w+b")
    while data := await gfs_out.readchunk():
        tmpfile.write(data)

    xmltodict.parse(tmpfile)

我把所有的块一个接一个地拉出来,并把它们存储在内存中的一个临时文件中,然后通过xmltodict解析整个文件。理想情况下,我希望增量地解析它,因为我不需要从一开始就需要整个xml对象。
xmltodict的文档建议我们可以添加自定义处理程序来解析流,如以下示例:

>>> def handle_artist(_, artist):
...     print(artist['name'])
...     return True
>>> 
>>> xmltodict.parse(GzipFile('discogs_artists.xml.gz'),
...     item_depth=2, item_callback=handle_artist)
A Perfect Circle
Fantômas
King Crimson
Chris Potter
...

但问题是,它需要一个类似文件的对象,带有一个同步的read()方法,而不是一个协程。有什么方法可以实现吗?如果有任何帮助,将不胜感激。

t2a7ltrp

t2a7ltrp1#

不,不是直接的。在xmltodict中没有异步解析器,但是你可以使用库的其余部分来创建自己的异步解析器。如果你觉得它对你自己有用,甚至可以为社区做出贡献,并且可以添加所有的尽职调查参数,使其与原始的parse兼容。
一个非常简单的代码应该是这样的:

from xmltodict import _DictSAXHandler

# from https://github.com/martinblech/xmltodict/blob/8c8cdd2f17520e0422dbe8cda6156e98375e02d3/xmltodict.py:
try:
    from defusedexpat import pyexpat as expat
except ImportError:
    from xml.parsers import expat

async def async_parser(async_generator):
    parser = expat.ParserCreate('utf-8')
    handler = _DictSAXHandler(namespace_separator=":")
    parser.StartNamespaceDeclHandler = handler.startNamespaceDecl
    parser.StartElementHandler = handler.startElement
    parser.EndElementHandler = handler.endElement
    parser.CharacterDataHandler = handler.characters

    # the change to the original parser to make it async
    async for chunk in async_generator:
        parser.Parse(chunk, False)
    parser.Parse(b'', True)
    return handler.item

# then you create the async generator from GFS
async def read_file(file_id):
    gfs_out: AsyncIOMotorGridOut = await gfs_bucket.open_download_stream(file_id)

    while data := await gfs_out.readchunk():
        yield data
    
# and pass it as a parameter to the parser
async_parser(read_file("the id"))

请记住,异步文件读取器也使解析器异步,因此您需要等待结果。

相关问题