scala 列出位于数据湖中的文件夹中的所有文件

6yt4nkrj  于 2022-11-29  发布在  Scala
关注(0)|答案(3)|浏览(153)

我试图获得一个文件夹中所有文件的清单,它有几个子文件夹,所有这些文件都位于一个数据湖中。

import sys, os
import pandas as pd

mylist = []
root = "/mnt/rawdata/parent/"
path = os.path.join(root, "targetdirectory") 

for path, subdirs, files in os.walk(path):
    for name in files:
        mylist.append(os.path.join(path, name))

df = pd.DataFrame(mylist)
print(df)

我还尝试了以下链接中的示例代码:
Python list directory, subdirectory, and files
我在Azure Databricks工作。我愿意使用Scala来完成这项工作。到目前为止,没有任何东西对我起作用。每次,我都得到一个空的 Dataframe 。我相信这已经很接近了,但我肯定遗漏了一些小东西。有什么想法吗?

jk9hmnmh

jk9hmnmh1#

数据块文件系统(DBFS)是装载到Azure Databricks工作区并在Azure Databricks群集上可用的分布式文件系统。如果使用本地文件API,则必须引用Databricks文件系统。Azure Databricks使用FUSE装载**/dbfs配置每个群集节点,该装载允许在群集节点上运行的进程使用本地文件API读写基础分布式存储层(另请参见文档)。
因此,在路径
/dbfs**中:必须包括:

root = "/dbfs/mnt/rawdata/parent/"

这与使用Databricks文件系统实用程序(DBUtils)不同。文件系统实用程序访问Databricks文件系统,从而更容易将Azure Databricks用作文件系统:

dbutils.fs.ls("/mnt/rawdata/parent/")

对于更大的数据湖,我可以推荐一个Knowledge Base中的Scala示例。优点是它运行所有分布式子叶的列表,因此也适用于更大的目录。

lzfw57am

lzfw57am2#

我会搞定的。

from azure.storage.blob import BlockBlobService 

blob_service = BlockBlobService(account_name='your_account_name', account_key='your_account_key')

blobs = []
marker = None
while True:
    batch = blob_service.list_blobs('rawdata', marker=marker)
    blobs.extend(batch)
    if not batch.next_marker:
        break
    marker = batch.next_marker
for blob in blobs:
    print(blob.name)

唯一的先决条件是您需要导入azure.storage。因此,在集群窗口中,单击'Install-New'-〉PyPI〉package ='azure.storage'。最后,单击' Install '。

axr492tv

axr492tv3#

这对我很有效-从DBFS路径开始查找所有 parquet :

#------
# find parquet files in subdirectories recursively
def find_parquets(dbfs_ls_list):
    parquet_list = []
    if isinstance(dbfs_ls_list, str):
        # allows for user to start recursion with just a path
        dbfs_ls_list = dbutils.fs.ls(root_dir)
        parquet_list += find_parquets(dbfs_ls_list)
    else:
        for file_data in dbfs_ls_list:
            if file_data.size == 0 and file_data.name[-1] == '/':
                # found subdir
                new_dbdf_ls_list = dbutils.fs.ls(file_data.path)
                parquet_list += find_parquets(new_dbdf_ls_list)
            elif '.parquet' in file_data.name:
                parquet_list.append(file_data.path)
    return parquet_list

#------
root_dir = 'dbfs:/FileStore/my/parent/folder/'
file_list = find_parquets(root_dir)

相关问题