cirro.file_utils
1import base64 2import os 3import random 4import time 5from pathlib import Path, PurePath 6from typing import List, Union, Dict 7 8from boto3.exceptions import S3UploadFailedError 9from botocore.exceptions import ConnectionError 10 11from cirro.clients import S3Client 12from cirro.models.file import DirectoryStatistics, File, PathLike 13 14if os.name == 'nt': 15 import win32api 16 import win32con 17 18 19def filter_files_by_pattern(files: Union[List[File], List[str]], pattern: str) -> Union[List[File], List[str]]: 20 """ 21 Filters a list of files by a glob pattern 22 23 Args: 24 files (Union[List[File], List[str]]): List of Files or file paths 25 pattern (str): Glob pattern (i.e., *.fastq) 26 27 Returns: 28 The filtered list of files 29 """ 30 def matches_glob(file: Union[File, str]): 31 return PurePath(file if isinstance(file, str) else file.relative_path).match(pattern) 32 33 return [ 34 file for file in files 35 if matches_glob(file) 36 ] 37 38 39def generate_flattened_file_map(files: List[PathLike]) -> Dict[PathLike, str]: 40 """ 41 Generates a mapping of file paths "flattened" to their base name. 42 43 Example: data1/sample1.fastq.gz -> sample1.fastq.gz 44 45 Args: 46 files: List[PathLike]: List of file paths 47 48 Returns: 49 Dict[PathLike, str]: Mapping of file paths to their base name 50 """ 51 return { 52 file: Path(file).name for file in files 53 } 54 55 56def is_hidden_file(file_path: Path): 57 """ 58 Check if a file path is hidden 59 Such as desktop.ini, .DS_Store, etc. 60 """ 61 if os.name == 'nt': 62 attributes = win32api.GetFileAttributes(str(file_path)) 63 return attributes & (win32con.FILE_ATTRIBUTE_HIDDEN | win32con.FILE_ATTRIBUTE_SYSTEM) 64 else: 65 return file_path.name.startswith('.') 66 67 68def get_files_in_directory( 69 directory: Union[str, Path], 70 include_hidden=False 71) -> List[str]: 72 """ 73 Returns a list of strings containing the relative path of 74 each file within the indicated directory. 75 76 Args: 77 directory (Union[str, Path]): The path to the directory 78 include_hidden (bool): include hidden files in the returned list 79 80 Returns: 81 List of files in the directory 82 """ 83 path = Path(directory).expanduser() 84 path_posix = str(path.as_posix()) 85 86 paths = [] 87 88 for file_path in path.rglob("*"): 89 if file_path.is_dir(): 90 continue 91 92 if not include_hidden and is_hidden_file(file_path): 93 continue 94 95 if not file_path.exists(): 96 continue 97 98 str_file_path = str(file_path.as_posix()) 99 str_file_path = str_file_path.replace(f'{path_posix}/', "") 100 paths.append(str_file_path) 101 102 paths.sort() 103 return paths 104 105 106def _bytes_to_human_readable(num_bytes: int) -> str: 107 for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']: 108 if num_bytes < 1000.0 or unit == 'PB': 109 break 110 num_bytes /= 1000.0 111 return f"{num_bytes:,.2f} {unit}" 112 113 114def get_files_stats(files: List[PathLike]) -> DirectoryStatistics: 115 """ 116 Returns information about the list of files provided, such as the total size and number of files. 117 """ 118 sizes = [f.stat().st_size for f in files] 119 total_size = sum(sizes) 120 return DirectoryStatistics( 121 size_friendly=_bytes_to_human_readable(total_size), 122 size=total_size, 123 number_of_files=len(sizes) 124 ) 125 126 127def upload_directory(directory: PathLike, 128 files: List[PathLike], 129 file_path_map: Dict[PathLike, str], 130 s3_client: S3Client, 131 bucket: str, 132 prefix: str, 133 max_retries=10): 134 """ 135 @private 136 137 Uploads a list of files from the specified directory 138 Args: 139 directory (str|Path): Path to directory 140 files (typing.List[str|Path]): List of paths to files within the directory 141 must be the same type as directory. 142 file_path_map (typing.Dict[str|Path, str]): Map of file paths from source to destination 143 s3_client (cirro.clients.S3Client): S3 client 144 bucket (str): S3 bucket 145 prefix (str): S3 prefix 146 max_retries (int): Number of retries 147 """ 148 # Ensure all files are of the same type as the directory 149 if not all(isinstance(file, type(directory)) for file in files): 150 raise ValueError("All files must be of the same type as the directory (str or Path)") 151 152 for file in files: 153 if isinstance(file, str): 154 file_path = Path(directory, file) 155 else: 156 file_path = file 157 158 # Check if is present in the file_path_map 159 # if it is, use the mapped value as the destination path 160 if file in file_path_map: 161 file_relative = file_path_map[file] 162 else: 163 file_relative = file_path.relative_to(directory).as_posix() 164 165 key = f'{prefix}/{file_relative}' 166 success = False 167 168 # Retry up to max_retries times 169 for retry in range(max_retries): 170 171 # Try the upload 172 try: 173 s3_client.upload_file( 174 file_path=file_path, 175 bucket=bucket, 176 key=key 177 ) 178 179 success = True 180 181 # Catch the upload error 182 except (S3UploadFailedError, ConnectionError) as e: 183 delay = random.uniform(0, 60) + retry * 60 184 # Report the error 185 print(f"Encountered error:\n{str(e)}\n" 186 f"Retrying in {delay:.0f} seconds ({max_retries - (retry + 1)} attempts remaining)") 187 time.sleep(delay) 188 189 if success: 190 break 191 192 193def download_directory(directory: str, files: List[str], s3_client: S3Client, bucket: str, prefix: str): 194 """ 195 @private 196 """ 197 for file in files: 198 key = f'{prefix}/{file}'.lstrip('/') 199 local_path = Path(directory, file) 200 local_path.parent.mkdir(parents=True, exist_ok=True) 201 202 s3_client.download_file(local_path=local_path, 203 bucket=bucket, 204 key=key) 205 206 207def get_checksum(file: PathLike, checksum_name: str, chunk_size=1024 * 1024) -> str: 208 from awscrt import checksums 209 checksum_func_map = { 210 'CRC32': checksums.crc32, 211 'CRC32C': checksums.crc32c, 212 'CRC64NVME': checksums.crc64nvme 213 } 214 215 checksum_func = checksum_func_map.get(checksum_name) 216 if checksum_func is None: 217 raise RuntimeWarning(f"Unsupported checksum type: {checksum_name}") 218 219 crc = 0 220 with open(file, "rb") as f: 221 while True: 222 chunk = f.read(chunk_size) 223 if not chunk: 224 break 225 crc = checksum_func(chunk, crc) 226 227 byte_length = 8 if checksum_name == 'CRC64NVME' else 4 228 checksum_bytes = crc.to_bytes(byte_length, byteorder='big') 229 return base64.b64encode(checksum_bytes).decode('utf-8')
def
filter_files_by_pattern( files: Union[List[cirro.models.file.File], List[str]], pattern: str) -> Union[List[cirro.models.file.File], List[str]]:
20def filter_files_by_pattern(files: Union[List[File], List[str]], pattern: str) -> Union[List[File], List[str]]: 21 """ 22 Filters a list of files by a glob pattern 23 24 Args: 25 files (Union[List[File], List[str]]): List of Files or file paths 26 pattern (str): Glob pattern (i.e., *.fastq) 27 28 Returns: 29 The filtered list of files 30 """ 31 def matches_glob(file: Union[File, str]): 32 return PurePath(file if isinstance(file, str) else file.relative_path).match(pattern) 33 34 return [ 35 file for file in files 36 if matches_glob(file) 37 ]
Filters a list of files by a glob pattern
Arguments:
- files (Union[List[File], List[str]]): List of Files or file paths
- pattern (str): Glob pattern (i.e., *.fastq)
Returns:
The filtered list of files
def
generate_flattened_file_map(files: List[~PathLike]) -> Dict[~PathLike, str]:
40def generate_flattened_file_map(files: List[PathLike]) -> Dict[PathLike, str]: 41 """ 42 Generates a mapping of file paths "flattened" to their base name. 43 44 Example: data1/sample1.fastq.gz -> sample1.fastq.gz 45 46 Args: 47 files: List[PathLike]: List of file paths 48 49 Returns: 50 Dict[PathLike, str]: Mapping of file paths to their base name 51 """ 52 return { 53 file: Path(file).name for file in files 54 }
Generates a mapping of file paths "flattened" to their base name.
Example: data1/sample1.fastq.gz -> sample1.fastq.gz
Arguments:
- files: List[PathLike]: List of file paths
Returns:
Dict[PathLike, str]: Mapping of file paths to their base name
def
get_files_in_directory(directory: Union[str, pathlib.Path], include_hidden=False) -> List[str]:
69def get_files_in_directory( 70 directory: Union[str, Path], 71 include_hidden=False 72) -> List[str]: 73 """ 74 Returns a list of strings containing the relative path of 75 each file within the indicated directory. 76 77 Args: 78 directory (Union[str, Path]): The path to the directory 79 include_hidden (bool): include hidden files in the returned list 80 81 Returns: 82 List of files in the directory 83 """ 84 path = Path(directory).expanduser() 85 path_posix = str(path.as_posix()) 86 87 paths = [] 88 89 for file_path in path.rglob("*"): 90 if file_path.is_dir(): 91 continue 92 93 if not include_hidden and is_hidden_file(file_path): 94 continue 95 96 if not file_path.exists(): 97 continue 98 99 str_file_path = str(file_path.as_posix()) 100 str_file_path = str_file_path.replace(f'{path_posix}/', "") 101 paths.append(str_file_path) 102 103 paths.sort() 104 return paths
Returns a list of strings containing the relative path of each file within the indicated directory.
Arguments:
- directory (Union[str, Path]): The path to the directory
- include_hidden (bool): include hidden files in the returned list
Returns:
List of files in the directory
115def get_files_stats(files: List[PathLike]) -> DirectoryStatistics: 116 """ 117 Returns information about the list of files provided, such as the total size and number of files. 118 """ 119 sizes = [f.stat().st_size for f in files] 120 total_size = sum(sizes) 121 return DirectoryStatistics( 122 size_friendly=_bytes_to_human_readable(total_size), 123 size=total_size, 124 number_of_files=len(sizes) 125 )
Returns information about the list of files provided, such as the total size and number of files.
def
get_checksum(file: ~PathLike, checksum_name: str, chunk_size=1048576) -> str:
208def get_checksum(file: PathLike, checksum_name: str, chunk_size=1024 * 1024) -> str: 209 from awscrt import checksums 210 checksum_func_map = { 211 'CRC32': checksums.crc32, 212 'CRC32C': checksums.crc32c, 213 'CRC64NVME': checksums.crc64nvme 214 } 215 216 checksum_func = checksum_func_map.get(checksum_name) 217 if checksum_func is None: 218 raise RuntimeWarning(f"Unsupported checksum type: {checksum_name}") 219 220 crc = 0 221 with open(file, "rb") as f: 222 while True: 223 chunk = f.read(chunk_size) 224 if not chunk: 225 break 226 crc = checksum_func(chunk, crc) 227 228 byte_length = 8 if checksum_name == 'CRC64NVME' else 4 229 checksum_bytes = crc.to_bytes(byte_length, byteorder='big') 230 return base64.b64encode(checksum_bytes).decode('utf-8')