rust 如何使用Python delta-rs读取Azure Blob存储

goqiplq2  于 2022-11-12  发布在  Python
关注(0)|答案(2)|浏览(147)

我想使用Python绑定到delta-rs来读取我的blob存储。https://github.com/delta-io/delta-rs/tree/main/python
现在我有点不知所措,因为我不知道如何在我的本地机器上配置文件系统。我必须把我的凭证放在哪里?
我可以使用adlfs吗?

from adlfs import AzureBlobFileSystem

fs = AzureBlobFileSystem(
        account_name="...", 
        account_key='...'
    )

然后使用fs对象?
如果有人能给我提供一些帮助的话,那就太好了。
最佳

pod7payv

pod7payv1#

不幸的是,我们目前还没有很好的文档。你应该能够在这个集成测试中设置AZURE_STORAGE_ACCOUNTAZURE_STORAGE_SAS环境变量。
这将确保Python绑定可以访问表元数据,但通常通过Pandas获取查询数据,我不确定Pandas是否也会处理这些变量(我自己不是ADLSv2用户)。

wkyowqbh

wkyowqbh2#

一种可能的解决方法是将delta lake文件下载到一个tmp-dir,然后使用python-delta-rs读取这些文件,如下所示:

from azure.storage.blob import BlobServiceClient
import tempfile
from deltalake import DeltaTable

def get_blobs_for_folder(container_client, blob_storage_folder_path):
    blob_iter = container_client.list_blobs(name_starts_with=blob_storage_folder_path)
    blob_names = []
    for blob in blob_iter:
        if "." in blob.name:
            # To just get files and not directories, there might be a better way to do this
            blob_names.append(blob.name)

    return blob_names

def download_blob_files(container_client, blob_names, local_folder):
    for blob_name in blob_names:
        local_filename = os.path.join(local_folder, blob_name)
        local_file_dir = os.path.dirname(local_filename)
        if not os.path.exists(local_file_dir):
            os.makedirs(local_file_dir)

        with open(local_filename, 'wb') as f:
            f.write(container_client.download_blob(blob_name).readall())

def read_delta_lake_file_to_df(blob_storage_path, access_key):
    blob_storage_url = "https://your-blob-storage"
    blob_service_client = BlobServiceClient(blob_storage_url, credential=access_key)
    container_client = blob_service_client.get_container_client("your-container-name")

    blob_names = get_blobs_for_folder(container_client, blob_storage_path)
    with tempfile.TemporaryDirectory() as tmp_dirpath:
        download_blob_files(container_client, blob_names, tmp_dirpath)
        local_filename = os.path.join(tmp_dirpath, blob_storage_path)
        dt = DeltaTable(local_filename)
        df = dt.to_pyarrow_table().to_pandas()
    return df

相关问题