cirro.file_utils

  1import base64
  2import os
  3import random
  4import time
  5from pathlib import Path, PurePath
  6from typing import List, Union, Dict
  7
  8from boto3.exceptions import S3UploadFailedError
  9from botocore.exceptions import ConnectionError
 10
 11from cirro.clients import S3Client
 12from cirro.models.file import DirectoryStatistics, File, PathLike
 13
 14if os.name == 'nt':
 15    import win32api
 16    import win32con
 17
 18
 19def filter_files_by_pattern(files: Union[List[File], List[str]], pattern: str) -> Union[List[File], List[str]]:
 20    """
 21    Filters a list of files by a glob pattern
 22
 23    Args:
 24        files (Union[List[File], List[str]]): List of Files or file paths
 25        pattern (str): Glob pattern (i.e., *.fastq)
 26
 27    Returns:
 28        The filtered list of files
 29    """
 30    def matches_glob(file: Union[File, str]):
 31        return PurePath(file if isinstance(file, str) else file.relative_path).match(pattern)
 32
 33    return [
 34        file for file in files
 35        if matches_glob(file)
 36    ]
 37
 38
 39def generate_flattened_file_map(files: List[PathLike]) -> Dict[PathLike, str]:
 40    """
 41    Generates a mapping of file paths "flattened" to their base name.
 42
 43    Example:  data1/sample1.fastq.gz -> sample1.fastq.gz
 44
 45    Args:
 46        files: List[PathLike]: List of file paths
 47
 48    Returns:
 49        Dict[PathLike, str]: Mapping of file paths to their base name
 50    """
 51    return {
 52        file: Path(file).name for file in files
 53    }
 54
 55
 56def is_hidden_file(file_path: Path):
 57    """
 58    Check if a file path is hidden
 59    Such as desktop.ini, .DS_Store, etc.
 60    """
 61    if os.name == 'nt':
 62        attributes = win32api.GetFileAttributes(str(file_path))
 63        return attributes & (win32con.FILE_ATTRIBUTE_HIDDEN | win32con.FILE_ATTRIBUTE_SYSTEM)
 64    else:
 65        return file_path.name.startswith('.')
 66
 67
 68def get_files_in_directory(
 69        directory: Union[str, Path],
 70        include_hidden=False
 71) -> List[str]:
 72    """
 73    Returns a list of strings containing the relative path of
 74    each file within the indicated directory.
 75
 76    Args:
 77        directory (Union[str, Path]): The path to the directory
 78        include_hidden (bool): include hidden files in the returned list
 79
 80    Returns:
 81        List of files in the directory
 82    """
 83    path = Path(directory).expanduser()
 84    path_posix = str(path.as_posix())
 85
 86    paths = []
 87
 88    for file_path in path.rglob("*"):
 89        if file_path.is_dir():
 90            continue
 91
 92        if not include_hidden and is_hidden_file(file_path):
 93            continue
 94
 95        if not file_path.exists():
 96            continue
 97
 98        str_file_path = str(file_path.as_posix())
 99        str_file_path = str_file_path.replace(f'{path_posix}/', "")
100        paths.append(str_file_path)
101
102    paths.sort()
103    return paths
104
105
106def _bytes_to_human_readable(num_bytes: int) -> str:
107    for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']:
108        if num_bytes < 1000.0 or unit == 'PB':
109            break
110        num_bytes /= 1000.0
111    return f"{num_bytes:,.2f} {unit}"
112
113
114def get_files_stats(files: List[PathLike]) -> DirectoryStatistics:
115    """
116    Returns information about the list of files provided, such as the total size and number of files.
117    """
118    sizes = [f.stat().st_size for f in files]
119    total_size = sum(sizes)
120    return DirectoryStatistics(
121        size_friendly=_bytes_to_human_readable(total_size),
122        size=total_size,
123        number_of_files=len(sizes)
124    )
125
126
127def upload_directory(directory: PathLike,
128                     files: List[PathLike],
129                     file_path_map: Dict[PathLike, str],
130                     s3_client: S3Client,
131                     bucket: str,
132                     prefix: str,
133                     max_retries=10):
134    """
135    @private
136
137    Uploads a list of files from the specified directory
138    Args:
139        directory (str|Path): Path to directory
140        files (typing.List[str|Path]): List of paths to files within the directory
141            must be the same type as directory.
142        file_path_map (typing.Dict[str|Path, str]): Map of file paths from source to destination
143        s3_client (cirro.clients.S3Client): S3 client
144        bucket (str): S3 bucket
145        prefix (str): S3 prefix
146        max_retries (int): Number of retries
147    """
148    # Ensure all files are of the same type as the directory
149    if not all(isinstance(file, type(directory)) for file in files):
150        raise ValueError("All files must be of the same type as the directory (str or Path)")
151
152    for file in files:
153        if isinstance(file, str):
154            file_path = Path(directory, file)
155        else:
156            file_path = file
157
158        # Check if is present in the file_path_map
159        # if it is, use the mapped value as the destination path
160        if file in file_path_map:
161            file_relative = file_path_map[file]
162        else:
163            file_relative = file_path.relative_to(directory).as_posix()
164
165        key = f'{prefix}/{file_relative}'
166        success = False
167
168        # Retry up to max_retries times
169        for retry in range(max_retries):
170
171            # Try the upload
172            try:
173                s3_client.upload_file(
174                    file_path=file_path,
175                    bucket=bucket,
176                    key=key
177                )
178
179                success = True
180
181            # Catch the upload error
182            except (S3UploadFailedError, ConnectionError) as e:
183                delay = random.uniform(0, 60) + retry * 60
184                # Report the error
185                print(f"Encountered error:\n{str(e)}\n"
186                      f"Retrying in {delay:.0f} seconds ({max_retries - (retry + 1)} attempts remaining)")
187                time.sleep(delay)
188
189            if success:
190                break
191
192
193def download_directory(directory: str, files: List[str], s3_client: S3Client, bucket: str, prefix: str):
194    """
195    @private
196    """
197    for file in files:
198        key = f'{prefix}/{file}'.lstrip('/')
199        local_path = Path(directory, file)
200        local_path.parent.mkdir(parents=True, exist_ok=True)
201
202        s3_client.download_file(local_path=local_path,
203                                bucket=bucket,
204                                key=key)
205
206
207def get_checksum(file: PathLike, checksum_name: str, chunk_size=1024 * 1024) -> str:
208    from awscrt import checksums
209    checksum_func_map = {
210        'CRC32': checksums.crc32,
211        'CRC32C': checksums.crc32c,
212        'CRC64NVME': checksums.crc64nvme
213    }
214
215    checksum_func = checksum_func_map.get(checksum_name)
216    if checksum_func is None:
217        raise RuntimeWarning(f"Unsupported checksum type: {checksum_name}")
218
219    crc = 0
220    with open(file, "rb") as f:
221        while True:
222            chunk = f.read(chunk_size)
223            if not chunk:
224                break
225            crc = checksum_func(chunk, crc)
226
227    byte_length = 8 if checksum_name == 'CRC64NVME' else 4
228    checksum_bytes = crc.to_bytes(byte_length, byteorder='big')
229    return base64.b64encode(checksum_bytes).decode('utf-8')
def filter_files_by_pattern( files: Union[List[cirro.models.file.File], List[str]], pattern: str) -> Union[List[cirro.models.file.File], List[str]]:
20def filter_files_by_pattern(files: Union[List[File], List[str]], pattern: str) -> Union[List[File], List[str]]:
21    """
22    Filters a list of files by a glob pattern
23
24    Args:
25        files (Union[List[File], List[str]]): List of Files or file paths
26        pattern (str): Glob pattern (i.e., *.fastq)
27
28    Returns:
29        The filtered list of files
30    """
31    def matches_glob(file: Union[File, str]):
32        return PurePath(file if isinstance(file, str) else file.relative_path).match(pattern)
33
34    return [
35        file for file in files
36        if matches_glob(file)
37    ]

Filters a list of files by a glob pattern

Arguments:
  • files (Union[List[File], List[str]]): List of Files or file paths
  • pattern (str): Glob pattern (i.e., *.fastq)
Returns:

The filtered list of files

def generate_flattened_file_map(files: List[~PathLike]) -> Dict[~PathLike, str]:
40def generate_flattened_file_map(files: List[PathLike]) -> Dict[PathLike, str]:
41    """
42    Generates a mapping of file paths "flattened" to their base name.
43
44    Example:  data1/sample1.fastq.gz -> sample1.fastq.gz
45
46    Args:
47        files: List[PathLike]: List of file paths
48
49    Returns:
50        Dict[PathLike, str]: Mapping of file paths to their base name
51    """
52    return {
53        file: Path(file).name for file in files
54    }

Generates a mapping of file paths "flattened" to their base name.

Example: data1/sample1.fastq.gz -> sample1.fastq.gz

Arguments:
  • files: List[PathLike]: List of file paths
Returns:

Dict[PathLike, str]: Mapping of file paths to their base name

def is_hidden_file(file_path: pathlib.Path):
57def is_hidden_file(file_path: Path):
58    """
59    Check if a file path is hidden
60    Such as desktop.ini, .DS_Store, etc.
61    """
62    if os.name == 'nt':
63        attributes = win32api.GetFileAttributes(str(file_path))
64        return attributes & (win32con.FILE_ATTRIBUTE_HIDDEN | win32con.FILE_ATTRIBUTE_SYSTEM)
65    else:
66        return file_path.name.startswith('.')

Check if a file path is hidden Such as desktop.ini, .DS_Store, etc.

def get_files_in_directory(directory: Union[str, pathlib.Path], include_hidden=False) -> List[str]:
 69def get_files_in_directory(
 70        directory: Union[str, Path],
 71        include_hidden=False
 72) -> List[str]:
 73    """
 74    Returns a list of strings containing the relative path of
 75    each file within the indicated directory.
 76
 77    Args:
 78        directory (Union[str, Path]): The path to the directory
 79        include_hidden (bool): include hidden files in the returned list
 80
 81    Returns:
 82        List of files in the directory
 83    """
 84    path = Path(directory).expanduser()
 85    path_posix = str(path.as_posix())
 86
 87    paths = []
 88
 89    for file_path in path.rglob("*"):
 90        if file_path.is_dir():
 91            continue
 92
 93        if not include_hidden and is_hidden_file(file_path):
 94            continue
 95
 96        if not file_path.exists():
 97            continue
 98
 99        str_file_path = str(file_path.as_posix())
100        str_file_path = str_file_path.replace(f'{path_posix}/', "")
101        paths.append(str_file_path)
102
103    paths.sort()
104    return paths

Returns a list of strings containing the relative path of each file within the indicated directory.

Arguments:
  • directory (Union[str, Path]): The path to the directory
  • include_hidden (bool): include hidden files in the returned list
Returns:

List of files in the directory

def get_files_stats(files: List[~PathLike]) -> cirro.models.file.DirectoryStatistics:
115def get_files_stats(files: List[PathLike]) -> DirectoryStatistics:
116    """
117    Returns information about the list of files provided, such as the total size and number of files.
118    """
119    sizes = [f.stat().st_size for f in files]
120    total_size = sum(sizes)
121    return DirectoryStatistics(
122        size_friendly=_bytes_to_human_readable(total_size),
123        size=total_size,
124        number_of_files=len(sizes)
125    )

Returns information about the list of files provided, such as the total size and number of files.

def get_checksum(file: ~PathLike, checksum_name: str, chunk_size=1048576) -> str:
208def get_checksum(file: PathLike, checksum_name: str, chunk_size=1024 * 1024) -> str:
209    from awscrt import checksums
210    checksum_func_map = {
211        'CRC32': checksums.crc32,
212        'CRC32C': checksums.crc32c,
213        'CRC64NVME': checksums.crc64nvme
214    }
215
216    checksum_func = checksum_func_map.get(checksum_name)
217    if checksum_func is None:
218        raise RuntimeWarning(f"Unsupported checksum type: {checksum_name}")
219
220    crc = 0
221    with open(file, "rb") as f:
222        while True:
223            chunk = f.read(chunk_size)
224            if not chunk:
225                break
226            crc = checksum_func(chunk, crc)
227
228    byte_length = 8 if checksum_name == 'CRC64NVME' else 4
229    checksum_bytes = crc.to_bytes(byte_length, byteorder='big')
230    return base64.b64encode(checksum_bytes).decode('utf-8')