python-3.x BlobClient.stage_block()会定期引发“HTTP错误400”的异常,请求 predicate 无效,'

bvpmtnay  于 2023-10-21  发布在  Python
关注(0)|答案(1)|浏览(158)

我正在使用Python SDK将区块上传到Azure Bob Storage Container。我需要做一个分区上传,所以我使用BlobClient.stage_block(...)调用。出于性能原因,我使用的是异步版本的BlobClient(azure.storage.blob.aio)。
暂存第一个块是没有问题的,第二个块通常也可以运行,但第三个或第四个块,我得到一个异常,告诉我服务器不支持HTTP-Verb:

.\src\venv\Scripts\python.exe .\tests\minimal_example.py
Starting.
bytes_transfered=0, file_size=601070298
bytes_transfered=10485760, file_size=601070298
bytes_transfered=20971520, file_size=601070298
Unexpected return type <class 'str'> from ContentDecodePolicy.deserialize_from_http_generics.
Error: <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN""http://www.w3.org/TR/html4/strict.dtd">
<HTML><HEAD><TITLE>Bad Request</TITLE>
<META HTTP-EQUIV="Content-Type" Content="text/html; charset=us-ascii"></HEAD>
<BODY><h2>Bad Request - Invalid Verb</h2>
<hr><p>HTTP Error 400. The request verb is invalid.</p>
</BODY></HTML>

ErrorCode:None
Content: <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN""http://www.w3.org/TR/html4/strict.dtd">
<HTML><HEAD><TITLE>Bad Request</TITLE>
<META HTTP-EQUIV="Content-Type" Content="text/html; charset=us-ascii"></HEAD>
<BODY><h2>Bad Request - Invalid Verb</h2>
<hr><p>HTTP Error 400. The request verb is invalid.</p>
</BODY></HTML>

bytes_transfered=20971520, file_size=601070298
bytes_transfered=31457280, file_size=601070298
bytes_transfered=41943040, file_size=601070298
bytes_transfered=52428800, file_size=601070298
bytes_transfered=62914560, file_size=601070298
bytes_transfered=73400320, file_size=601070298
Unexpected return type <class 'str'> from ContentDecodePolicy.deserialize_from_http_generics.
Error: <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN""http://www.w3.org/TR/html4/strict.dtd">
<HTML><HEAD><TITLE>Bad Request</TITLE>
<META HTTP-EQUIV="Content-Type" Content="text/html; charset=us-ascii"></HEAD>
<BODY><h2>Bad Request - Invalid Verb</h2>
<hr><p>HTTP Error 400. The request verb is invalid.</p>
</BODY></HTML>

ErrorCode:None
Content: <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN""http://www.w3.org/TR/html4/strict.dtd">
<HTML><HEAD><TITLE>Bad Request</TITLE>
<META HTTP-EQUIV="Content-Type" Content="text/html; charset=us-ascii"></HEAD>
<BODY><h2>Bad Request - Invalid Verb</h2>
<hr><p>HTTP Error 400. The request verb is invalid.</p>
</BODY></HTML>

bytes_transfered=73400320, file_size=601070298

我的目标代码在生成器端稍微复杂一点,但是一个重现行为的极简示例看起来像这样:

from azure.storage.blob.aio import ContainerClient, BlobClient
from azure.identity import DefaultAzureCredential
from pathlib import Path
import datetime
from uuid import uuid4
import asyncio

CREDENTIALS = DefaultAzureCredential()
CHUNK_SIZE = 4096
BLOCK_SIZE = 10 * 1024 * 1024
FILE = Path("large_file")
BLOB_NAME = f"test_blob{datetime.datetime.now().ctime()}"

async def generator(current_pos: int):
    with open(FILE, "br") as fd:
        fd.seek(current_pos)
        data = fd.read(4096)
        while data:
            yield data
            data = fd.read(4096)

async def upload():
    print(f"Starting.")
    async with ContainerClient(ACCOUNT_URL, CONTAINER, CREDENTIALS) as cc:
        async with cc.get_blob_client(BLOB_NAME) as blob:
            bytes_transfered = 0
            file_size = FILE.stat().st_size
            blocks = []
            excp_counter = 0
            while bytes_transfered < file_size:
                gen = generator(bytes_transfered)
                print(f"bytes_transfered={bytes_transfered}, file_size={file_size}")
                id = str(uuid4())
                length = min(BLOCK_SIZE, file_size - bytes_transfered)
                try:
                    await blob.stage_block(block_id=id, data=gen, length=length)
                    blocks.append(id)
                    bytes_transfered += length
                    excp_counter = 0
                except Exception as e:
                    print(f"Error: {e}")
                    excp_counter +=1  
                
                    if excp_counter > 2:
                        raise RuntimeError("Cannot upload!")

            blob.commit_block_list(blocks)

if __name__ == "__main__":
    f = upload()
    asyncio.run(f)

当我通过基本上忽略它并重试来处理异常时,第二次或第三次尝试分段块工作,但对于后续块,问题会一次又一次地发生,错误率在30%到70%之间。
这是BlobClient代码中的一个bug还是我用错了?
使用的版本:

.\src\venv\Scripts\pip.exe freeze
aiohttp==3.8.4
aiosignal==1.3.1
async-timeout==4.0.2
attrs==23.1.0
autopep8==2.0.2
azure-common==1.1.28
azure-core==1.28.0
azure-functions==1.15.0
azure-functions-durable==1.2.5
azure-identity==1.14.0
azure-keyvault==4.2.0
azure-keyvault-certificates==4.7.0
azure-keyvault-keys==4.8.0
azure-keyvault-secrets==4.7.0
azure-storage-blob==12.17.0

编辑2023-10-06

我修改了生成器函数以更好地表示用例。它仍然简化了很多,因为实际上我正在使用aiohttp从另一个源同时下载文件。根据使用新代码的测试,对输出进行了相应变更。

kxeu7u2r

kxeu7u2r1#

这是BlobClient代码中的一个bug还是我用错了?
问题是**generator函数在块完全上传之前就被消耗了。这是由于generator函数被多次使用,每次运行时,它都从文件的开头开始阅读。
因此,您可以更改生成器函数以跟踪文件中的当前位置,并在每次调用它时从该位置开始开始阅读。
您必须修改
upload函数中的代码,以便仅使用一次生成器函数,并使用__anext__()**方法从生成器获取下一个数据块。
下面是更新后的代码,它成功地上传了文件,使用的是BLOG进程。

验证码:

from azure.storage.blob.aio import ContainerClient
from azure.identity import DefaultAzureCredential
from pathlib import Path
import datetime
import asyncio
import uuid

ACCOUNT_URL = "https://<storage account name>.blob.core.windows.net"
CONTAINER = "test"
CREDENTIALS = DefaultAzureCredential()
BLOCK_SIZE = 4 * 1024 * 1024
FILE = Path("C:\\Users\\xxxx\\xxxx\\sample326.pdf")
BLOB_NAME = f"test_blob_{datetime.datetime.now().ctime()}.pdf" 

async def generator():
    with open(FILE, "rb") as fd:
        data = fd.read(BLOCK_SIZE)
        while data:
            yield data
            data = fd.read(BLOCK_SIZE)

async def upload():
    gen = generator()
    print(f"Starting. Type: {type(gen)}")
    async with ContainerClient(ACCOUNT_URL, CONTAINER, CREDENTIALS) as cc:
        async with cc.get_blob_client(BLOB_NAME) as blob:
            bytes_transferred = 0
            file_size = FILE.stat().st_size
            blocks = []
            excp_counter = 0
            while bytes_transferred < file_size:
                print(f"bytes_transferred={bytes_transferred}, file_size={file_size}")
                block_id = str(uuid.uuid4())
                length = min(BLOCK_SIZE, file_size - bytes_transferred)
                try:
                    data_chunk = await gen.__anext__()
                    await blob.stage_block(block_id, data_chunk, length)
                    blocks.append(block_id)
                    bytes_transferred += length
                    excp_counter = 0
                except StopAsyncIteration:
                    break
                except Exception as e:
                    print(f"Error: {str(e)}")
                    excp_counter += 1
                    if excp_counter > 2:
                        raise RuntimeError("Cannot upload!")

            await blob.commit_block_list(blocks)

if __name__ == "__main__":
    asyncio.run(upload())

输出:

Starting. Type: <class 'async_generator'>
bytes_transferred=0, file_size=21099096
bytes_transferred=4194304, file_size=21099096
bytes_transferred=8388608, file_size=21099096
bytes_transferred=12582912, file_size=21099096
bytes_transferred=16777216, file_size=21099096
bytes_transferred=20971520, file_size=21099096

入口:

更新:

你也可以使用candic for loop。

验证码:

from azure.storage.blob.aio import ContainerClient
from azure.identity import DefaultAzureCredential
from pathlib import Path
import datetime
import asyncio
import uuid

ACCOUNT_URL = "https://xxx.blob.core.windows.net"
CONTAINER = "test"
CREDENTIALS = DefaultAzureCredential()
BLOCK_SIZE = 4 * 1024 * 1024
FILE = Path("C:\\Users\\xxx\\xxx\\sample326.pdf")
BLOB_NAME = f"test_blob_{datetime.datetime.now().ctime()}.pdf" 

async def generator():
    with open(FILE, "rb") as fd:
        data = fd.read(BLOCK_SIZE)
        while data:
            yield data
            data = fd.read(BLOCK_SIZE)

async def upload():
    async with ContainerClient(ACCOUNT_URL, CONTAINER, CREDENTIALS) as cc:
        async with cc.get_blob_client(BLOB_NAME) as blob:
                bytes_transferred = 0
                file_size = FILE.stat().st_size
                blocks = []
                excp_counter = 0
                async for i in generator():
                    print(f"bytes_transferred={bytes_transferred}, file_size={file_size}")
                    block_id = str(uuid.uuid4())
                    length = min(BLOCK_SIZE, file_size - bytes_transferred)
                    try:
                        await blob.stage_block(block_id, i, length)
                        blocks.append(block_id)
                        bytes_transferred += length
                        excp_counter = 0
                    except StopAsyncIteration:
                      break
                    except Exception as e:
                        print(f"Error: {str(e)}")
                        excp_counter += 1
                        if excp_counter > 2:
                            raise RuntimeError("Cannot upload!")
        await blob.commit_block_list(blocks)
if __name__ == "__main__":
    asyncio.run(upload())

输出:

相关问题