1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| import os import io import zipfile from azure.storage.filedatalake import DataLakeServiceClient from concurrent.futures import ThreadPoolExecutor import asyncio
def download_chunk(file_client, start_range, end_range): download_response = file_client.download_file(offset=start_range, length=end_range - start_range + 1) return download_response.readall()
async def download_file_in_chunks(file_client, file_size, chunk_size=4 * 1024 * 1024): loop = asyncio.get_running_loop() with ThreadPoolExecutor() as pool: tasks = [ loop.run_in_executor(pool, download_chunk, file_client, start, min(start + chunk_size, file_size) - 1) for start in range(0, file_size, chunk_size) ] chunks = await asyncio.gather(*tasks) return chunks
async def download_single_file(client: DataLakeServiceClient, container_name: str, target): full_path = os.path.join(target['FilePath'], target['FileName']) download_client = client.get_file_client(file_system=container_name, file_path=full_path.lstrip('/')) file_properties = download_client.get_file_properties() file_size = file_properties['size'] chunks = await download_file_in_chunks(download_client, file_size) file_data = b''.join(chunks) return file_data
async def download_multiple_files_to_zip(client: DataLakeServiceClient, container_name: str, file_list): zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, 'w') as zip_file: for target in file_list: file_name = target['FileName'] file_path = target['FilePath'] full_path = os.path.join(file_path, file_name) download_client = client.get_file_client(file_system=container_name, file_path=full_path.lstrip('/')) file_properties = download_client.get_file_properties() file_size = file_properties['size'] chunks = await download_file_in_chunks(download_client, file_size) file_data = b''.join(chunks) zip_file.writestr(file_name, file_data) zip_buffer.seek(0) return zip_buffer
|