cirro.file_utils
1import base64 2import os 3import random 4import time 5from pathlib import Path, PurePath 6from typing import List, Union, Dict 7 8from boto3.exceptions import S3UploadFailedError 9from botocore.exceptions import ConnectionError 10 11from cirro.clients import S3Client 12from cirro.models.file import DirectoryStatistics, File, PathLike 13 14if os.name == 'nt': 15 import win32api 16 import win32con 17 18 19def filter_files_by_pattern(files: Union[List[File], List[str]], pattern: str) -> Union[List[File], List[str]]: 20 """ 21 Filters a list of files by a glob pattern 22 23 Args: 24 files (Union[List[File], List[str]]): List of Files or file paths 25 pattern (str): Glob pattern (i.e., *.fastq) 26 27 Returns: 28 The filtered list of files 29 """ 30 def matches_glob(file: Union[File, str]): 31 return PurePath(file if isinstance(file, str) else file.relative_path).match(pattern) 32 33 return [ 34 file for file in files 35 if matches_glob(file) 36 ] 37 38 39def generate_flattened_file_map(files: List[PathLike]) -> Dict[PathLike, str]: 40 """ 41 Generates a mapping of file paths "flattened" to their base name. 42 43 Example: data1/sample1.fastq.gz -> sample1.fastq.gz 44 45 Args: 46 files: List[PathLike]: List of file paths 47 48 Returns: 49 Dict[PathLike, str]: Mapping of file paths to their base name 50 """ 51 return { 52 file: Path(file).name for file in files 53 } 54 55 56def _is_hidden_file(file_path: Path): 57 # Remove hidden files from listing, desktop.ini .DS_Store, etc. 58 if os.name == 'nt': 59 attributes = win32api.GetFileAttributes(str(file_path)) 60 return attributes & (win32con.FILE_ATTRIBUTE_HIDDEN | win32con.FILE_ATTRIBUTE_SYSTEM) 61 else: 62 return file_path.name.startswith('.') 63 64 65def get_files_in_directory( 66 directory: Union[str, Path], 67 include_hidden=False 68) -> List[str]: 69 """ 70 Returns a list of strings containing the relative path of 71 each file within the indicated directory. 72 73 Args: 74 directory (Union[str, Path]): The path to the directory 75 include_hidden (bool): include hidden files in the returned list 76 77 Returns: 78 List of files in the directory 79 """ 80 path = Path(directory).expanduser() 81 path_posix = str(path.as_posix()) 82 83 paths = [] 84 85 for file_path in path.rglob("*"): 86 if file_path.is_dir(): 87 continue 88 89 if not include_hidden and _is_hidden_file(file_path): 90 continue 91 92 if not file_path.exists(): 93 continue 94 95 str_file_path = str(file_path.as_posix()) 96 str_file_path = str_file_path.replace(f'{path_posix}/', "") 97 paths.append(str_file_path) 98 99 paths.sort() 100 return paths 101 102 103def _bytes_to_human_readable(num_bytes: int) -> str: 104 for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']: 105 if num_bytes < 1000.0 or unit == 'PB': 106 break 107 num_bytes /= 1000.0 108 return f"{num_bytes:,.2f} {unit}" 109 110 111def get_files_stats(files: List[PathLike]) -> DirectoryStatistics: 112 """ 113 Returns information about the list of files provided, such as the total size and number of files. 114 """ 115 sizes = [f.stat().st_size for f in files] 116 total_size = sum(sizes) 117 return DirectoryStatistics( 118 size_friendly=_bytes_to_human_readable(total_size), 119 size=total_size, 120 number_of_files=len(sizes) 121 ) 122 123 124def upload_directory(directory: PathLike, 125 files: List[PathLike], 126 file_path_map: Dict[PathLike, str], 127 s3_client: S3Client, 128 bucket: str, 129 prefix: str, 130 max_retries=10): 131 """ 132 @private 133 134 Uploads a list of files from the specified directory 135 Args: 136 directory (str|Path): Path to directory 137 files (typing.List[str|Path]): List of paths to files within the directory 138 must be the same type as directory. 139 file_path_map (typing.Dict[str|Path, str]): Map of file paths from source to destination 140 s3_client (cirro.clients.S3Client): S3 client 141 bucket (str): S3 bucket 142 prefix (str): S3 prefix 143 max_retries (int): Number of retries 144 """ 145 # Ensure all files are of the same type as the directory 146 if not all(isinstance(file, type(directory)) for file in files): 147 raise ValueError("All files must be of the same type as the directory (str or Path)") 148 149 for file in files: 150 if isinstance(file, str): 151 file_path = Path(directory, file) 152 else: 153 file_path = file 154 155 # Check if is present in the file_path_map 156 # if it is, use the mapped value as the destination path 157 if file in file_path_map: 158 file_relative = file_path_map[file] 159 else: 160 file_relative = file_path.relative_to(directory).as_posix() 161 162 key = f'{prefix}/{file_relative}' 163 success = False 164 165 # Retry up to max_retries times 166 for retry in range(max_retries): 167 168 # Try the upload 169 try: 170 s3_client.upload_file( 171 file_path=file_path, 172 bucket=bucket, 173 key=key 174 ) 175 176 success = True 177 178 # Catch the upload error 179 except (S3UploadFailedError, ConnectionError) as e: 180 delay = random.uniform(0, 60) + retry * 60 181 # Report the error 182 print(f"Encountered error:\n{str(e)}\n" 183 f"Retrying in {delay:.0f} seconds ({max_retries - (retry + 1)} attempts remaining)") 184 time.sleep(delay) 185 186 if success: 187 break 188 189 190def download_directory(directory: str, files: List[str], s3_client: S3Client, bucket: str, prefix: str): 191 """ 192 @private 193 """ 194 for file in files: 195 key = f'{prefix}/{file}'.lstrip('/') 196 local_path = Path(directory, file) 197 local_path.parent.mkdir(parents=True, exist_ok=True) 198 199 s3_client.download_file(local_path=local_path, 200 bucket=bucket, 201 key=key) 202 203 204def get_checksum(file: PathLike, checksum_name: str, chunk_size=1024 * 1024) -> str: 205 from awscrt import checksums 206 checksum_func_map = { 207 'CRC32': checksums.crc32, 208 'CRC32C': checksums.crc32c, 209 'CRC64NVME': checksums.crc64nvme 210 } 211 212 checksum_func = checksum_func_map.get(checksum_name) 213 if checksum_func is None: 214 raise RuntimeWarning(f"Unsupported checksum type: {checksum_name}") 215 216 crc = 0 217 with open(file, "rb") as f: 218 while True: 219 chunk = f.read(chunk_size) 220 if not chunk: 221 break 222 crc = checksum_func(chunk, crc) 223 224 byte_length = 8 if checksum_name == 'CRC64NVME' else 4 225 checksum_bytes = crc.to_bytes(byte_length, byteorder='big') 226 return base64.b64encode(checksum_bytes).decode('utf-8')
def
filter_files_by_pattern( files: Union[List[cirro.models.file.File], List[str]], pattern: str) -> Union[List[cirro.models.file.File], List[str]]:
20def filter_files_by_pattern(files: Union[List[File], List[str]], pattern: str) -> Union[List[File], List[str]]: 21 """ 22 Filters a list of files by a glob pattern 23 24 Args: 25 files (Union[List[File], List[str]]): List of Files or file paths 26 pattern (str): Glob pattern (i.e., *.fastq) 27 28 Returns: 29 The filtered list of files 30 """ 31 def matches_glob(file: Union[File, str]): 32 return PurePath(file if isinstance(file, str) else file.relative_path).match(pattern) 33 34 return [ 35 file for file in files 36 if matches_glob(file) 37 ]
Filters a list of files by a glob pattern
Arguments:
- files (Union[List[File], List[str]]): List of Files or file paths
- pattern (str): Glob pattern (i.e., *.fastq)
Returns:
The filtered list of files
def
generate_flattened_file_map(files: List[~PathLike]) -> Dict[~PathLike, str]:
40def generate_flattened_file_map(files: List[PathLike]) -> Dict[PathLike, str]: 41 """ 42 Generates a mapping of file paths "flattened" to their base name. 43 44 Example: data1/sample1.fastq.gz -> sample1.fastq.gz 45 46 Args: 47 files: List[PathLike]: List of file paths 48 49 Returns: 50 Dict[PathLike, str]: Mapping of file paths to their base name 51 """ 52 return { 53 file: Path(file).name for file in files 54 }
Generates a mapping of file paths "flattened" to their base name.
Example: data1/sample1.fastq.gz -> sample1.fastq.gz
Arguments:
- files: List[PathLike]: List of file paths
Returns:
Dict[PathLike, str]: Mapping of file paths to their base name
def
get_files_in_directory(directory: Union[str, pathlib.Path], include_hidden=False) -> List[str]:
66def get_files_in_directory( 67 directory: Union[str, Path], 68 include_hidden=False 69) -> List[str]: 70 """ 71 Returns a list of strings containing the relative path of 72 each file within the indicated directory. 73 74 Args: 75 directory (Union[str, Path]): The path to the directory 76 include_hidden (bool): include hidden files in the returned list 77 78 Returns: 79 List of files in the directory 80 """ 81 path = Path(directory).expanduser() 82 path_posix = str(path.as_posix()) 83 84 paths = [] 85 86 for file_path in path.rglob("*"): 87 if file_path.is_dir(): 88 continue 89 90 if not include_hidden and _is_hidden_file(file_path): 91 continue 92 93 if not file_path.exists(): 94 continue 95 96 str_file_path = str(file_path.as_posix()) 97 str_file_path = str_file_path.replace(f'{path_posix}/', "") 98 paths.append(str_file_path) 99 100 paths.sort() 101 return paths
Returns a list of strings containing the relative path of each file within the indicated directory.
Arguments:
- directory (Union[str, Path]): The path to the directory
- include_hidden (bool): include hidden files in the returned list
Returns:
List of files in the directory
112def get_files_stats(files: List[PathLike]) -> DirectoryStatistics: 113 """ 114 Returns information about the list of files provided, such as the total size and number of files. 115 """ 116 sizes = [f.stat().st_size for f in files] 117 total_size = sum(sizes) 118 return DirectoryStatistics( 119 size_friendly=_bytes_to_human_readable(total_size), 120 size=total_size, 121 number_of_files=len(sizes) 122 )
Returns information about the list of files provided, such as the total size and number of files.
def
get_checksum(file: ~PathLike, checksum_name: str, chunk_size=1048576) -> str:
205def get_checksum(file: PathLike, checksum_name: str, chunk_size=1024 * 1024) -> str: 206 from awscrt import checksums 207 checksum_func_map = { 208 'CRC32': checksums.crc32, 209 'CRC32C': checksums.crc32c, 210 'CRC64NVME': checksums.crc64nvme 211 } 212 213 checksum_func = checksum_func_map.get(checksum_name) 214 if checksum_func is None: 215 raise RuntimeWarning(f"Unsupported checksum type: {checksum_name}") 216 217 crc = 0 218 with open(file, "rb") as f: 219 while True: 220 chunk = f.read(chunk_size) 221 if not chunk: 222 break 223 crc = checksum_func(chunk, crc) 224 225 byte_length = 8 if checksum_name == 'CRC64NVME' else 4 226 checksum_bytes = crc.to_bytes(byte_length, byteorder='big') 227 return base64.b64encode(checksum_bytes).decode('utf-8')