cirro.file_utils

  1import base64
  2import os
  3import random
  4import time
  5from pathlib import Path, PurePath
  6from typing import List, Union, Dict
  7
  8from boto3.exceptions import S3UploadFailedError
  9from botocore.exceptions import ConnectionError
 10
 11from cirro.clients import S3Client
 12from cirro.models.file import DirectoryStatistics, File, PathLike
 13
 14if os.name == 'nt':
 15    import win32api
 16    import win32con
 17
 18
 19def filter_files_by_pattern(files: Union[List[File], List[str]], pattern: str) -> Union[List[File], List[str]]:
 20    """
 21    Filters a list of files by a glob pattern
 22
 23    Args:
 24        files (Union[List[File], List[str]]): List of Files or file paths
 25        pattern (str): Glob pattern (i.e., *.fastq)
 26
 27    Returns:
 28        The filtered list of files
 29    """
 30    def matches_glob(file: Union[File, str]):
 31        return PurePath(file if isinstance(file, str) else file.relative_path).match(pattern)
 32
 33    return [
 34        file for file in files
 35        if matches_glob(file)
 36    ]
 37
 38
 39def generate_flattened_file_map(files: List[PathLike]) -> Dict[PathLike, str]:
 40    """
 41    Generates a mapping of file paths "flattened" to their base name.
 42
 43    Example:  data1/sample1.fastq.gz -> sample1.fastq.gz
 44
 45    Args:
 46        files: List[PathLike]: List of file paths
 47
 48    Returns:
 49        Dict[PathLike, str]: Mapping of file paths to their base name
 50    """
 51    return {
 52        file: Path(file).name for file in files
 53    }
 54
 55
 56def _is_hidden_file(file_path: Path):
 57    # Remove hidden files from listing, desktop.ini .DS_Store, etc.
 58    if os.name == 'nt':
 59        attributes = win32api.GetFileAttributes(str(file_path))
 60        return attributes & (win32con.FILE_ATTRIBUTE_HIDDEN | win32con.FILE_ATTRIBUTE_SYSTEM)
 61    else:
 62        return file_path.name.startswith('.')
 63
 64
 65def get_files_in_directory(
 66        directory: Union[str, Path],
 67        include_hidden=False
 68) -> List[str]:
 69    """
 70    Returns a list of strings containing the relative path of
 71    each file within the indicated directory.
 72
 73    Args:
 74        directory (Union[str, Path]): The path to the directory
 75        include_hidden (bool): include hidden files in the returned list
 76
 77    Returns:
 78        List of files in the directory
 79    """
 80    path = Path(directory).expanduser()
 81    path_posix = str(path.as_posix())
 82
 83    paths = []
 84
 85    for file_path in path.rglob("*"):
 86        if file_path.is_dir():
 87            continue
 88
 89        if not include_hidden and _is_hidden_file(file_path):
 90            continue
 91
 92        if not file_path.exists():
 93            continue
 94
 95        str_file_path = str(file_path.as_posix())
 96        str_file_path = str_file_path.replace(f'{path_posix}/', "")
 97        paths.append(str_file_path)
 98
 99    paths.sort()
100    return paths
101
102
103def _bytes_to_human_readable(num_bytes: int) -> str:
104    for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']:
105        if num_bytes < 1000.0 or unit == 'PB':
106            break
107        num_bytes /= 1000.0
108    return f"{num_bytes:,.2f} {unit}"
109
110
111def get_files_stats(files: List[PathLike]) -> DirectoryStatistics:
112    """
113    Returns information about the list of files provided, such as the total size and number of files.
114    """
115    sizes = [f.stat().st_size for f in files]
116    total_size = sum(sizes)
117    return DirectoryStatistics(
118        size_friendly=_bytes_to_human_readable(total_size),
119        size=total_size,
120        number_of_files=len(sizes)
121    )
122
123
124def upload_directory(directory: PathLike,
125                     files: List[PathLike],
126                     file_path_map: Dict[PathLike, str],
127                     s3_client: S3Client,
128                     bucket: str,
129                     prefix: str,
130                     max_retries=10):
131    """
132    @private
133
134    Uploads a list of files from the specified directory
135    Args:
136        directory (str|Path): Path to directory
137        files (typing.List[str|Path]): List of paths to files within the directory
138            must be the same type as directory.
139        file_path_map (typing.Dict[str|Path, str]): Map of file paths from source to destination
140        s3_client (cirro.clients.S3Client): S3 client
141        bucket (str): S3 bucket
142        prefix (str): S3 prefix
143        max_retries (int): Number of retries
144    """
145    # Ensure all files are of the same type as the directory
146    if not all(isinstance(file, type(directory)) for file in files):
147        raise ValueError("All files must be of the same type as the directory (str or Path)")
148
149    for file in files:
150        if isinstance(file, str):
151            file_path = Path(directory, file)
152        else:
153            file_path = file
154
155        # Check if is present in the file_path_map
156        # if it is, use the mapped value as the destination path
157        if file in file_path_map:
158            file_relative = file_path_map[file]
159        else:
160            file_relative = file_path.relative_to(directory).as_posix()
161
162        key = f'{prefix}/{file_relative}'
163        success = False
164
165        # Retry up to max_retries times
166        for retry in range(max_retries):
167
168            # Try the upload
169            try:
170                s3_client.upload_file(
171                    file_path=file_path,
172                    bucket=bucket,
173                    key=key
174                )
175
176                success = True
177
178            # Catch the upload error
179            except (S3UploadFailedError, ConnectionError) as e:
180                delay = random.uniform(0, 60) + retry * 60
181                # Report the error
182                print(f"Encountered error:\n{str(e)}\n"
183                      f"Retrying in {delay:.0f} seconds ({max_retries - (retry + 1)} attempts remaining)")
184                time.sleep(delay)
185
186            if success:
187                break
188
189
190def download_directory(directory: str, files: List[str], s3_client: S3Client, bucket: str, prefix: str):
191    """
192    @private
193    """
194    for file in files:
195        key = f'{prefix}/{file}'.lstrip('/')
196        local_path = Path(directory, file)
197        local_path.parent.mkdir(parents=True, exist_ok=True)
198
199        s3_client.download_file(local_path=local_path,
200                                bucket=bucket,
201                                key=key)
202
203
204def get_checksum(file: PathLike, checksum_name: str, chunk_size=1024 * 1024) -> str:
205    from awscrt import checksums
206    checksum_func_map = {
207        'CRC32': checksums.crc32,
208        'CRC32C': checksums.crc32c,
209        'CRC64NVME': checksums.crc64nvme
210    }
211
212    checksum_func = checksum_func_map.get(checksum_name)
213    if checksum_func is None:
214        raise RuntimeWarning(f"Unsupported checksum type: {checksum_name}")
215
216    crc = 0
217    with open(file, "rb") as f:
218        while True:
219            chunk = f.read(chunk_size)
220            if not chunk:
221                break
222            crc = checksum_func(chunk, crc)
223
224    byte_length = 8 if checksum_name == 'CRC64NVME' else 4
225    checksum_bytes = crc.to_bytes(byte_length, byteorder='big')
226    return base64.b64encode(checksum_bytes).decode('utf-8')
def filter_files_by_pattern( files: Union[List[cirro.models.file.File], List[str]], pattern: str) -> Union[List[cirro.models.file.File], List[str]]:
20def filter_files_by_pattern(files: Union[List[File], List[str]], pattern: str) -> Union[List[File], List[str]]:
21    """
22    Filters a list of files by a glob pattern
23
24    Args:
25        files (Union[List[File], List[str]]): List of Files or file paths
26        pattern (str): Glob pattern (i.e., *.fastq)
27
28    Returns:
29        The filtered list of files
30    """
31    def matches_glob(file: Union[File, str]):
32        return PurePath(file if isinstance(file, str) else file.relative_path).match(pattern)
33
34    return [
35        file for file in files
36        if matches_glob(file)
37    ]

Filters a list of files by a glob pattern

Arguments:
  • files (Union[List[File], List[str]]): List of Files or file paths
  • pattern (str): Glob pattern (i.e., *.fastq)
Returns:

The filtered list of files

def generate_flattened_file_map(files: List[~PathLike]) -> Dict[~PathLike, str]:
40def generate_flattened_file_map(files: List[PathLike]) -> Dict[PathLike, str]:
41    """
42    Generates a mapping of file paths "flattened" to their base name.
43
44    Example:  data1/sample1.fastq.gz -> sample1.fastq.gz
45
46    Args:
47        files: List[PathLike]: List of file paths
48
49    Returns:
50        Dict[PathLike, str]: Mapping of file paths to their base name
51    """
52    return {
53        file: Path(file).name for file in files
54    }

Generates a mapping of file paths "flattened" to their base name.

Example: data1/sample1.fastq.gz -> sample1.fastq.gz

Arguments:
  • files: List[PathLike]: List of file paths
Returns:

Dict[PathLike, str]: Mapping of file paths to their base name

def get_files_in_directory(directory: Union[str, pathlib.Path], include_hidden=False) -> List[str]:
 66def get_files_in_directory(
 67        directory: Union[str, Path],
 68        include_hidden=False
 69) -> List[str]:
 70    """
 71    Returns a list of strings containing the relative path of
 72    each file within the indicated directory.
 73
 74    Args:
 75        directory (Union[str, Path]): The path to the directory
 76        include_hidden (bool): include hidden files in the returned list
 77
 78    Returns:
 79        List of files in the directory
 80    """
 81    path = Path(directory).expanduser()
 82    path_posix = str(path.as_posix())
 83
 84    paths = []
 85
 86    for file_path in path.rglob("*"):
 87        if file_path.is_dir():
 88            continue
 89
 90        if not include_hidden and _is_hidden_file(file_path):
 91            continue
 92
 93        if not file_path.exists():
 94            continue
 95
 96        str_file_path = str(file_path.as_posix())
 97        str_file_path = str_file_path.replace(f'{path_posix}/', "")
 98        paths.append(str_file_path)
 99
100    paths.sort()
101    return paths

Returns a list of strings containing the relative path of each file within the indicated directory.

Arguments:
  • directory (Union[str, Path]): The path to the directory
  • include_hidden (bool): include hidden files in the returned list
Returns:

List of files in the directory

def get_files_stats(files: List[~PathLike]) -> cirro.models.file.DirectoryStatistics:
112def get_files_stats(files: List[PathLike]) -> DirectoryStatistics:
113    """
114    Returns information about the list of files provided, such as the total size and number of files.
115    """
116    sizes = [f.stat().st_size for f in files]
117    total_size = sum(sizes)
118    return DirectoryStatistics(
119        size_friendly=_bytes_to_human_readable(total_size),
120        size=total_size,
121        number_of_files=len(sizes)
122    )

Returns information about the list of files provided, such as the total size and number of files.

def get_checksum(file: ~PathLike, checksum_name: str, chunk_size=1048576) -> str:
205def get_checksum(file: PathLike, checksum_name: str, chunk_size=1024 * 1024) -> str:
206    from awscrt import checksums
207    checksum_func_map = {
208        'CRC32': checksums.crc32,
209        'CRC32C': checksums.crc32c,
210        'CRC64NVME': checksums.crc64nvme
211    }
212
213    checksum_func = checksum_func_map.get(checksum_name)
214    if checksum_func is None:
215        raise RuntimeWarning(f"Unsupported checksum type: {checksum_name}")
216
217    crc = 0
218    with open(file, "rb") as f:
219        while True:
220            chunk = f.read(chunk_size)
221            if not chunk:
222                break
223            crc = checksum_func(chunk, crc)
224
225    byte_length = 8 if checksum_name == 'CRC64NVME' else 4
226    checksum_bytes = crc.to_bytes(byte_length, byteorder='big')
227    return base64.b64encode(checksum_bytes).decode('utf-8')