Azure 储存分块并行加速下载

Azure 储存分块并行加速下载

IShirai_KurokoI

有一个需求,某个api需要从 Azure Blob Storage 中下载一个文件并返回给前端。测试中发现GB级别以上的文件会出现下载超时,但是是api的超时时间且无法调整,所以需要一种加速下载的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import os
import io
import zipfile
from azure.storage.filedatalake import DataLakeServiceClient
from concurrent.futures import ThreadPoolExecutor
import asyncio

def download_chunk(file_client, start_range, end_range):
download_response = file_client.download_file(offset=start_range, length=end_range - start_range + 1)
return download_response.readall()

async def download_file_in_chunks(file_client, file_size, chunk_size=4 * 1024 * 1024):
loop = asyncio.get_running_loop()
with ThreadPoolExecutor() as pool:
tasks = [
loop.run_in_executor(pool, download_chunk, file_client, start, min(start + chunk_size, file_size) - 1)
for start in range(0, file_size, chunk_size)
]
chunks = await asyncio.gather(*tasks)
return chunks

async def download_single_file(client: DataLakeServiceClient, container_name: str, target):
full_path = os.path.join(target['FilePath'], target['FileName'])
download_client = client.get_file_client(file_system=container_name, file_path=full_path.lstrip('/'))
file_properties = download_client.get_file_properties()
file_size = file_properties['size']
chunks = await download_file_in_chunks(download_client, file_size)
file_data = b''.join(chunks)
return file_data

async def download_multiple_files_to_zip(client: DataLakeServiceClient, container_name: str, file_list):
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w') as zip_file:
for target in file_list:
file_name = target['FileName']
file_path = target['FilePath']
full_path = os.path.join(file_path, file_name)
download_client = client.get_file_client(file_system=container_name, file_path=full_path.lstrip('/'))
file_properties = download_client.get_file_properties()
file_size = file_properties['size']
chunks = await download_file_in_chunks(download_client, file_size)
file_data = b''.join(chunks)
zip_file.writestr(file_name, file_data)
zip_buffer.seek(0)
return zip_buffer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# Azure Data Lake Storage 连接字符串
# 快速获取方法: az storage account show-connection-string --name 储存账户名 --resource-group 资源组名
connection_string = "DefaultEndpointsProtocol=https;EndpointSuffix=core.windows.net;AccountName=name;AccountKey=SnjcmmpSLAwbOm5va2V5NHU8PWYZy45AA2ccPzlQCBsLBAddYQ==;BlobEndpoint=https://name.blob.core.windows.net/;FileEndpoint=https://name.file.core.windows.net/;QueueEndpoint=https://name.queue.core.windows.net/;TableEndpoint=https://name.table.core.windows.net/"
datalake_service_client = DataLakeServiceClient.from_connection_string(connection_string)

# 单文件下载的目标
target_file = {
'FilePath': '',
'FileName': 'example.txt'
}

# 多文件压缩下载的目标
target_files = [
{'FilePath': '', 'FileName': 'file1.txt'},
{'FilePath': '', 'FileName': 'file2.txt'}
]

async def main():
# 单文件下载
print("Downloading single file...")
single_file_content = await download_single_file(datalake_service_client, 'async', target_file)
with open('downloaded_example.txt', 'wb') as f:
f.write(single_file_content)
print("Single file downloaded as 'downloaded_example.txt'")

# 多文件压缩下载
print("Downloading and compressing multiple files...")
compressed_data = await download_multiple_files_to_zip(datalake_service_client, 'async', target_files)
with open('downloaded_files.zip', 'wb') as f:
f.write(compressed_data.getvalue())
print("Multiple files downloaded and compressed as 'downloaded_files.zip'")

asyncio.run(main())

实际跑起来确实快了点能下了,但是文件再大还是有超时的风险的,还是要更换api供应商适应更长的超时或限制原始文件大小(切分文件)。

  • 标题: Azure 储存分块并行加速下载
  • 作者: IShirai_KurokoI
  • 创建于 : 2024-10-14 22:25:00
  • 更新于 : 2024-10-14 22:54:30
  • 链接: https://ishiraikurokoi.top/2024-10-14-Azure-Chunk-Download/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。
评论
目录
Azure 储存分块并行加速下载