azure Python脚本在一段时间后冻结

daolsyd0  于 12个月前  发布在  Python
关注(0)|答案(1)|浏览(122)

*蓝色
*5.0.0
*数据块集群:12.2LTS
*Python 3
**描述问题:**我在我的代码中遇到问题,我应该提取ADLS GEN 2容器内所有文件的元数据,程序仍在运行,但6小时后没有返回数据,这是我使用的代码

def list_container_metadata(self, skiped_containers, batch_size=1000000):
        containers = self.blob_service_client.list_containers(include_metadata=True)

        for container in containers:
            # skip the processed container
            if container['name'] in skiped_containers:
                print(f"Container  {container['name']}  Already Processed!")
                continue

            print("\nCONTAINER: ", container['name'])
            self.container_client = self.blob_service_client.get_container_client(container['name'])
            blob_list = self.list_blobs()

            container_data = []
            for blob in blob_list:
                container_data.append((
                    self.storage_name,
                    container['name'],
                    blob.name,
                    blob.last_modified,
                    blob.size
                ))
                if len(container_data) % 100000 == 0:
                    print("BATCH SIZE:", len(container_data))

                if len(container_data) % batch_size == 0:
                    yield container_data
                    container_data = []
            # Yield any remaining data
            if container_data:
                yield container_data
            else:
                yield [(self.storage_name, container['name'], "", "", -1)]

字符串
我在我的代码中面临的问题,我应该提取ADLS GEN 2容器内的所有文件的元数据,程序仍在运行,但6小时后没有返回数据,集群的内存使用50%和CPU 10%。集群/代码继续运行,但我看不到提取的元数据的任何打印了

h7appiyu

h7appiyu1#

可能的原因是列出了blob并循环遍历所有10,00,000,以便为记录数超过批大小的blob生成数据。
所以,如果你成功地得到了blob,那么问题就出在循环上。

def list_container_metadata(self, skiped_containers, batch_size=1000000):
        containers = self.blob_service_client.list_containers(include_metadata=True)

        for container in containers:
            # skip the processed container
            if container['name'] in skiped_containers:
                print(f"Container  {container['name']}  Already Processed!")
                continue

            print("\nCONTAINER: ", container['name'])
            self.container_client = self.blob_service_client.get_container_client(container['name'])
            # Measure the time it takes to list blobs
            start_time = time.time()
            blob_list = list(self.list_blobs())
            end_time = time.time()
            listing_time = end_time - start_time
        
            print(f"Time taken to list blobs in container {container['name']}: {listing_time:.2f} seconds")

            if len(blob_list) > batch_size:
                for i in range(0,len(blob_list),batch_size):
                    tmp = [(self.storage_name,container['name'],i.name,i.last_modified,i.size) for i in blobs[i:i+batch_size]]
                    yield tmp
            else:
                tmp = [(self.storage_name,container['name'],i.name,i.last_modified,i.size) for i in blobs]
                yield tmp

字符串
这里,根据批量大小进行循环,并使用比for loop更快的列表解析来获取Meta数据。
还有一件事,尝试提供较小的批量大小。
输出量:
x1c 0d1x的数据
我有一些几百个斑点我给40有批量大小。在你的情况下给一些30 k-40 k。

相关问题