cirro.services
1from .billing import BillingService 2from .compute_environment import ComputeEnvironmentService 3from .dataset import DatasetService 4from .execution import ExecutionService 5from .file import FileService 6from .metadata import MetadataService 7from .metrics import MetricsService 8from .process import ProcessService 9from .projects import ProjectService 10from .references import ReferenceService 11from .share import ShareService 12from .user import UserService 13 14__all__ = [ 15 'BillingService', 16 'DatasetService', 17 'ExecutionService', 18 'ComputeEnvironmentService', 19 'FileService', 20 'MetadataService', 21 'MetricsService', 22 'ProcessService', 23 'ProjectService', 24 'ReferenceService', 25 'ShareService', 26 'UserService' 27]
10class BillingService(BaseService): 11 """ 12 Service for interacting with the Billing endpoints 13 """ 14 def list(self) -> List[BillingAccount]: 15 """ 16 Gets a list of billing accounts the current user has access to 17 """ 18 return get_billing_accounts.sync(client=self._api_client) 19 20 def update(self, billing_account_id: str, request: BillingAccountRequest): 21 """ 22 Updates a billing account 23 24 Args: 25 billing_account_id (str): Billing account ID 26 request (cirro_api_client.v1.models.BillingAccountRequest): 27 28 ```python 29 from cirro_api_client.v1.models import BillingAccountRequest 30 from cirro.cirro_client import CirroApi 31 32 cirro = CirroApi() 33 request = BillingAccountRequest( 34 name="New billing account name", 35 primary_budget_number="new-budget-number", 36 owner="New Owner" 37 ) 38 cirro.billing.update("billing-account-id", request) 39 ``` 40 """ 41 update_billing_account.sync_detailed( 42 billing_account_id=billing_account_id, 43 body=request, 44 client=self._api_client 45 )
Service for interacting with the Billing endpoints
14 def list(self) -> List[BillingAccount]: 15 """ 16 Gets a list of billing accounts the current user has access to 17 """ 18 return get_billing_accounts.sync(client=self._api_client)
Gets a list of billing accounts the current user has access to
20 def update(self, billing_account_id: str, request: BillingAccountRequest): 21 """ 22 Updates a billing account 23 24 Args: 25 billing_account_id (str): Billing account ID 26 request (cirro_api_client.v1.models.BillingAccountRequest): 27 28 ```python 29 from cirro_api_client.v1.models import BillingAccountRequest 30 from cirro.cirro_client import CirroApi 31 32 cirro = CirroApi() 33 request = BillingAccountRequest( 34 name="New billing account name", 35 primary_budget_number="new-budget-number", 36 owner="New Owner" 37 ) 38 cirro.billing.update("billing-account-id", request) 39 ``` 40 """ 41 update_billing_account.sync_detailed( 42 billing_account_id=billing_account_id, 43 body=request, 44 client=self._api_client 45 )
Updates a billing account
Arguments:
- billing_account_id (str): Billing account ID
- request (cirro_api_client.v1.models.BillingAccountRequest):
from cirro_api_client.v1.models import BillingAccountRequest
from cirro.cirro_client import CirroApi
cirro = CirroApi()
request = BillingAccountRequest(
name="New billing account name",
primary_budget_number="new-budget-number",
owner="New Owner"
)
cirro.billing.update("billing-account-id", request)
22class DatasetService(FileEnabledService): 23 """ 24 Service for interacting with the Dataset endpoints 25 """ 26 27 def list(self, project_id: str, max_items: int = 10000) -> List[Dataset]: 28 """List datasets 29 30 Retrieves a list of datasets for a given project 31 32 Args: 33 project_id (str): ID of the Project 34 max_items (int): Maximum number of records to get (default 10,000) 35 """ 36 return get_all_records( 37 records_getter=lambda page_args: get_datasets.sync( 38 project_id=project_id, 39 client=self._api_client, 40 next_token=page_args.next_token, 41 limit=page_args.limit 42 ), 43 max_items=max_items 44 ) 45 46 def list_shared(self, project_id: str, share_id: str, max_items: int = 10000) -> List[Dataset]: 47 """ 48 Retrieves a list of shared datasets for a given project and share 49 50 Args: 51 project_id (str): ID of the Project 52 share_id (str): ID of the Share 53 max_items (int): Maximum number of records to get (default 10,000) 54 55 Example: 56 ```python 57 from cirro_api_client.v1.models import ShareType 58 from cirro.cirro_client import CirroApi 59 60 cirro = CirroApi() 61 62 # List shares that are subscribed to 63 subscribed_shares = cirro.shares.list(project_id="project-id", share_type=ShareType.SUBSCRIBER) 64 cirro.datasets.list_shared("project-id", subscribed_shares[0].id) 65 ``` 66 """ 67 return get_all_records( 68 records_getter=lambda page_args: get_shared_datasets.sync( 69 project_id=project_id, 70 share_id=share_id, 71 client=self._api_client, 72 next_token=page_args.next_token, 73 limit=page_args.limit 74 ), 75 max_items=max_items 76 ) 77 78 def import_public(self, project_id: str, import_request: ImportDataRequest) -> CreateResponse: 79 """ 80 Download data from public repositories 81 82 Args: 83 project_id (str): ID of the Project 84 import_request (cirro_api_client.v1.models.ImportDataRequest): 85 86 Returns: 87 ID of the created dataset 88 89 ```python 90 from cirro_api_client.v1.models import ImportDataRequest, Tag 91 from cirro.cirro_client import CirroApi 92 93 cirro = CirroApi() 94 request = ImportDataRequest( 95 name="Imported dataset", 96 description="Description of the dataset", 97 public_ids=["SRR123456", "SRR123457"], 98 tags=[Tag(value="tag1")] 99 ) 100 cirro.datasets.import_public("project-id", request) 101 ``` 102 """ 103 return import_public_dataset.sync(project_id=project_id, client=self._api_client, body=import_request) 104 105 def create(self, project_id: str, upload_request: UploadDatasetRequest) -> UploadDatasetCreateResponse: 106 """ 107 Registers a dataset in Cirro, which can subsequently have files uploaded to it 108 109 Args: 110 project_id (str): ID of the Project 111 upload_request (cirro_api_client.v1.models.UploadDatasetRequest): 112 113 Returns: 114 ID of the created dataset and the path to upload files 115 116 ```python 117 from cirro_api_client.v1.models import UploadDatasetRequest, Tag 118 from cirro.cirro_client import CirroApi 119 120 cirro = CirroApi() 121 request = UploadDatasetRequest( 122 name="Name of new dataset", 123 process_id="paired_dnaseq", 124 expected_files=["read_1.fastq.gz", "read_2.fastq.gz"], 125 description="Description of the dataset", 126 tags=[Tag(value="tag1"), Tag(value="tag2")] 127 ) 128 cirro.datasets.create("project-id", request) 129 ``` 130 """ 131 return upload_dataset.sync(project_id=project_id, client=self._api_client, body=upload_request) 132 133 def get(self, project_id: str, dataset_id: str) -> Optional[DatasetDetail]: 134 """ 135 Gets detailed information about a dataset 136 137 Args: 138 project_id (str): ID of the Project 139 dataset_id (str): ID of the Dataset 140 141 Returns: 142 The dataset, if found 143 """ 144 return get_dataset.sync(project_id=project_id, dataset_id=dataset_id, client=self._api_client) 145 146 def update(self, project_id: str, dataset_id: str, request: UpdateDatasetRequest) -> DatasetDetail: 147 """ 148 Update info on a dataset (name, description, and/or tags) 149 150 Args: 151 project_id (str): ID of the Project 152 dataset_id (str): ID of the Dataset 153 request (cirro_api_client.v1.models.UpdateDatasetRequest): 154 155 Returns: 156 The updated dataset 157 158 ```python 159 from cirro_api_client.v1.models import UpdateDatasetRequest 160 from cirro.cirro_client import CirroApi 161 162 cirro = CirroApi() 163 request = UpdateDatasetRequest( 164 name="Name of new dataset", 165 process_id="paired_dnaseq", 166 description="Description of the dataset" 167 ) 168 cirro.datasets.update("project-id", "dataset-id", request) 169 ``` 170 """ 171 return update_dataset.sync(project_id=project_id, dataset_id=dataset_id, body=request, client=self._api_client) 172 173 def delete(self, project_id: str, dataset_id: str) -> None: 174 """ 175 Delete a dataset 176 177 After a dataset has been deleted, the files associated with that 178 dataset are saved according to the project's retention time. 179 180 Args: 181 project_id (str): ID of the Project 182 dataset_id (str): ID of the Dataset 183 """ 184 delete_dataset.sync_detailed(project_id=project_id, dataset_id=dataset_id, client=self._api_client) 185 186 def get_assets_listing(self, project_id: str, dataset_id: str, file_limit: int = 100000) -> DatasetAssets: 187 """ 188 Gets a listing of files, charts, and other assets available for the dataset 189 190 Args: 191 project_id (str): ID of the Project 192 dataset_id (str): ID of the Dataset 193 file_limit (int): Maximum number of files to get (default 100,000) 194 """ 195 dataset = self.get(project_id, dataset_id) 196 if file_limit < 1: 197 raise ValueError("file_limit must be greater than 0") 198 all_files = [] 199 file_offset = 0 200 domain = None 201 artifacts = None 202 203 while len(all_files) < file_limit: 204 manifest = get_dataset_manifest.sync( 205 project_id=project_id, 206 dataset_id=dataset_id, 207 file_offset=file_offset, 208 client=self._api_client 209 ) 210 all_files.extend(manifest.files) 211 file_offset += len(manifest.files) 212 213 if not artifacts: 214 artifacts = manifest.artifacts 215 216 domain = manifest.domain 217 if len(all_files) >= manifest.total_files or len(manifest.files) == 0: 218 break 219 220 files = [ 221 File.from_file_entry( 222 f, 223 project_id=project_id, 224 dataset=dataset, 225 domain=domain 226 ) 227 for f in all_files 228 ] 229 artifacts = [ 230 Artifact( 231 artifact_type=a.type, 232 file=File.from_file_entry( 233 FileEntry(a.path), 234 project_id=project_id, 235 dataset=dataset, 236 domain=domain 237 ) 238 ) 239 for a in artifacts 240 ] 241 return DatasetAssets(files=files, artifacts=artifacts) 242 243 def upload_files(self, 244 project_id: str, 245 dataset_id: str, 246 directory: PathLike, 247 files: List[PathLike] = None, 248 file_path_map: Dict[PathLike, str] = None) -> None: 249 """ 250 Uploads files to a given dataset from the specified directory. 251 252 All files must be relative to the specified directory. 253 If files need to be flattened, or you are sourcing files from multiple directories, 254 please include `file_path_map` or call this method multiple times. 255 256 Args: 257 project_id (str): ID of the Project 258 dataset_id (str): ID of the Dataset 259 directory (str|Path): Path to directory 260 files (typing.List[str|Path]): List of paths to files within the directory, 261 must be the same type as directory. 262 file_path_map (typing.Dict[str|Path, str|Path]): Optional mapping of file paths to upload 263 from source path to destination path, used to "re-write" paths within the dataset. 264 ```python 265 from cirro.cirro_client import CirroApi 266 from cirro.file_utils import generate_flattened_file_map 267 268 cirro = CirroApi() 269 270 directory = "~/Downloads" 271 # Re-write file paths 272 file_map = { 273 "data1/file1.fastq.gz": "file1.fastq.gz", 274 "data2/file2.fastq.gz": "file2.fastq.gz", 275 "file3.fastq.gz": "new_file3.txt" 276 } 277 278 # Or you could automate the flattening 279 files = ["data1/file1.fastq.gz"] 280 file_map = generate_flattened_file_map(files) 281 282 cirro.datasets.upload_files( 283 project_id="project-id", 284 dataset_id="dataset-id", 285 directory=directory, 286 files=list(file_map.keys()), 287 file_path_map=file_map 288 ) 289 ``` 290 """ 291 if file_path_map is None: 292 file_path_map = {} 293 294 dataset = self.get(project_id, dataset_id) 295 296 access_context = FileAccessContext.upload_dataset( 297 project_id=project_id, 298 dataset_id=dataset_id, 299 base_url=dataset.s3 300 ) 301 302 self._file_service.upload_files( 303 access_context=access_context, 304 directory=directory, 305 files=files, 306 file_path_map=file_path_map 307 ) 308 309 def validate_folder( 310 self, 311 project_id: str, 312 dataset_id: str, 313 local_folder: PathLike 314 ) -> DatasetValidationResponse: 315 """ 316 Validates that the contents of a dataset match that of a local folder. 317 """ 318 ds_files = self.get_assets_listing(project_id, dataset_id).files 319 320 local_folder = Path(local_folder) 321 if not local_folder.is_dir(): 322 raise ValueError(f"{local_folder} is not a valid local folder") 323 324 # Keep track of files from the dataset which match by checksum, don't match, or are missing 325 ds_files_matching = [] 326 ds_files_not_matching = [] 327 ds_files_missing = [] 328 ds_validate_failed = [] 329 for ds_file in ds_files: 330 ds_file_path = ds_file.normalized_path 331 # Get the corresponding local file 332 local_file = local_folder / ds_file_path 333 if not local_file.exists(): 334 ds_files_missing.append(ds_file_path) 335 else: 336 try: 337 if self._file_service.is_valid_file(ds_file, local_file): 338 ds_files_matching.append(ds_file_path) 339 else: 340 ds_files_not_matching.append(ds_file_path) 341 except RuntimeWarning as e: 342 logger.warning(f"File validation failed: {e}") 343 ds_validate_failed.append(ds_file_path) 344 345 # Find local files that are not in the dataset 346 local_file_paths = [ 347 file.relative_to(local_folder).as_posix() 348 for file in local_folder.rglob("*") 349 if not file.is_dir() and not is_hidden_file(file) 350 ] 351 dataset_file_paths = [file.normalized_path for file in ds_files] 352 local_only_files = [ 353 file 354 for file in local_file_paths 355 if file not in dataset_file_paths 356 ] 357 358 return DatasetValidationResponse( 359 files_matching=ds_files_matching, 360 files_not_matching=ds_files_not_matching, 361 files_missing=ds_files_missing, 362 local_only_files=local_only_files, 363 validate_errors=ds_validate_failed, 364 ) 365 366 def download_files( 367 self, 368 project_id: str, 369 dataset_id: str, 370 download_location: str, 371 files: Union[List[File], List[str]] = None 372 ) -> None: 373 """ 374 Downloads files from a dataset 375 376 The `files` argument is used to optionally specify a subset of files 377 to be downloaded. By default, all files are downloaded. 378 379 Args: 380 project_id (str): ID of the Project 381 dataset_id (str): ID of the Dataset 382 download_location (str): Local destination for downloaded files 383 files (typing.List[str]): Optional list of files to download 384 """ 385 if files is None: 386 files = self.get_assets_listing(project_id, dataset_id).files 387 388 if len(files) == 0: 389 return 390 391 first_file = files[0] 392 if isinstance(first_file, File): 393 files = [file.relative_path for file in files] 394 access_context = first_file.access_context 395 else: 396 dataset = self.get(project_id, dataset_id) 397 if dataset.share: 398 access_context = FileAccessContext.download_shared_dataset(project_id=project_id, 399 dataset_id=dataset_id, 400 base_url=dataset.s3) 401 else: 402 access_context = FileAccessContext.download(project_id=project_id, 403 base_url=dataset.s3) 404 405 self._file_service.download_files(access_context, download_location, files) 406 407 def update_samplesheet( 408 self, 409 project_id: str, 410 dataset_id: str, 411 samplesheet: str 412 ): 413 """ 414 Updates a samplesheet on a dataset 415 416 Args: 417 project_id (str): ID of the Project 418 dataset_id (str): ID of the Dataset 419 samplesheet (str): Samplesheet contents to update (should be a CSV string) 420 """ 421 dataset = self.get(project_id, dataset_id) 422 access_context = FileAccessContext.upload_sample_sheet(project_id=project_id, 423 dataset_id=dataset_id, 424 base_url=dataset.s3) 425 426 samplesheet_key = f'{access_context.prefix}/samplesheet.csv' 427 self._file_service.create_file( 428 access_context=access_context, 429 key=samplesheet_key, 430 contents=samplesheet, 431 content_type='text/csv' 432 )
Service for interacting with the Dataset endpoints
27 def list(self, project_id: str, max_items: int = 10000) -> List[Dataset]: 28 """List datasets 29 30 Retrieves a list of datasets for a given project 31 32 Args: 33 project_id (str): ID of the Project 34 max_items (int): Maximum number of records to get (default 10,000) 35 """ 36 return get_all_records( 37 records_getter=lambda page_args: get_datasets.sync( 38 project_id=project_id, 39 client=self._api_client, 40 next_token=page_args.next_token, 41 limit=page_args.limit 42 ), 43 max_items=max_items 44 )
List datasets
Retrieves a list of datasets for a given project
Arguments:
- project_id (str): ID of the Project
- max_items (int): Maximum number of records to get (default 10,000)
78 def import_public(self, project_id: str, import_request: ImportDataRequest) -> CreateResponse: 79 """ 80 Download data from public repositories 81 82 Args: 83 project_id (str): ID of the Project 84 import_request (cirro_api_client.v1.models.ImportDataRequest): 85 86 Returns: 87 ID of the created dataset 88 89 ```python 90 from cirro_api_client.v1.models import ImportDataRequest, Tag 91 from cirro.cirro_client import CirroApi 92 93 cirro = CirroApi() 94 request = ImportDataRequest( 95 name="Imported dataset", 96 description="Description of the dataset", 97 public_ids=["SRR123456", "SRR123457"], 98 tags=[Tag(value="tag1")] 99 ) 100 cirro.datasets.import_public("project-id", request) 101 ``` 102 """ 103 return import_public_dataset.sync(project_id=project_id, client=self._api_client, body=import_request)
Download data from public repositories
Arguments:
- project_id (str): ID of the Project
- import_request (cirro_api_client.v1.models.ImportDataRequest):
Returns:
ID of the created dataset
from cirro_api_client.v1.models import ImportDataRequest, Tag
from cirro.cirro_client import CirroApi
cirro = CirroApi()
request = ImportDataRequest(
name="Imported dataset",
description="Description of the dataset",
public_ids=["SRR123456", "SRR123457"],
tags=[Tag(value="tag1")]
)
cirro.datasets.import_public("project-id", request)
105 def create(self, project_id: str, upload_request: UploadDatasetRequest) -> UploadDatasetCreateResponse: 106 """ 107 Registers a dataset in Cirro, which can subsequently have files uploaded to it 108 109 Args: 110 project_id (str): ID of the Project 111 upload_request (cirro_api_client.v1.models.UploadDatasetRequest): 112 113 Returns: 114 ID of the created dataset and the path to upload files 115 116 ```python 117 from cirro_api_client.v1.models import UploadDatasetRequest, Tag 118 from cirro.cirro_client import CirroApi 119 120 cirro = CirroApi() 121 request = UploadDatasetRequest( 122 name="Name of new dataset", 123 process_id="paired_dnaseq", 124 expected_files=["read_1.fastq.gz", "read_2.fastq.gz"], 125 description="Description of the dataset", 126 tags=[Tag(value="tag1"), Tag(value="tag2")] 127 ) 128 cirro.datasets.create("project-id", request) 129 ``` 130 """ 131 return upload_dataset.sync(project_id=project_id, client=self._api_client, body=upload_request)
Registers a dataset in Cirro, which can subsequently have files uploaded to it
Arguments:
- project_id (str): ID of the Project
- upload_request (cirro_api_client.v1.models.UploadDatasetRequest):
Returns:
ID of the created dataset and the path to upload files
from cirro_api_client.v1.models import UploadDatasetRequest, Tag
from cirro.cirro_client import CirroApi
cirro = CirroApi()
request = UploadDatasetRequest(
name="Name of new dataset",
process_id="paired_dnaseq",
expected_files=["read_1.fastq.gz", "read_2.fastq.gz"],
description="Description of the dataset",
tags=[Tag(value="tag1"), Tag(value="tag2")]
)
cirro.datasets.create("project-id", request)
133 def get(self, project_id: str, dataset_id: str) -> Optional[DatasetDetail]: 134 """ 135 Gets detailed information about a dataset 136 137 Args: 138 project_id (str): ID of the Project 139 dataset_id (str): ID of the Dataset 140 141 Returns: 142 The dataset, if found 143 """ 144 return get_dataset.sync(project_id=project_id, dataset_id=dataset_id, client=self._api_client)
Gets detailed information about a dataset
Arguments:
- project_id (str): ID of the Project
- dataset_id (str): ID of the Dataset
Returns:
The dataset, if found
146 def update(self, project_id: str, dataset_id: str, request: UpdateDatasetRequest) -> DatasetDetail: 147 """ 148 Update info on a dataset (name, description, and/or tags) 149 150 Args: 151 project_id (str): ID of the Project 152 dataset_id (str): ID of the Dataset 153 request (cirro_api_client.v1.models.UpdateDatasetRequest): 154 155 Returns: 156 The updated dataset 157 158 ```python 159 from cirro_api_client.v1.models import UpdateDatasetRequest 160 from cirro.cirro_client import CirroApi 161 162 cirro = CirroApi() 163 request = UpdateDatasetRequest( 164 name="Name of new dataset", 165 process_id="paired_dnaseq", 166 description="Description of the dataset" 167 ) 168 cirro.datasets.update("project-id", "dataset-id", request) 169 ``` 170 """ 171 return update_dataset.sync(project_id=project_id, dataset_id=dataset_id, body=request, client=self._api_client)
Update info on a dataset (name, description, and/or tags)
Arguments:
- project_id (str): ID of the Project
- dataset_id (str): ID of the Dataset
- request (cirro_api_client.v1.models.UpdateDatasetRequest):
Returns:
The updated dataset
from cirro_api_client.v1.models import UpdateDatasetRequest
from cirro.cirro_client import CirroApi
cirro = CirroApi()
request = UpdateDatasetRequest(
name="Name of new dataset",
process_id="paired_dnaseq",
description="Description of the dataset"
)
cirro.datasets.update("project-id", "dataset-id", request)
173 def delete(self, project_id: str, dataset_id: str) -> None: 174 """ 175 Delete a dataset 176 177 After a dataset has been deleted, the files associated with that 178 dataset are saved according to the project's retention time. 179 180 Args: 181 project_id (str): ID of the Project 182 dataset_id (str): ID of the Dataset 183 """ 184 delete_dataset.sync_detailed(project_id=project_id, dataset_id=dataset_id, client=self._api_client)
Delete a dataset
After a dataset has been deleted, the files associated with that dataset are saved according to the project's retention time.
Arguments:
- project_id (str): ID of the Project
- dataset_id (str): ID of the Dataset
186 def get_assets_listing(self, project_id: str, dataset_id: str, file_limit: int = 100000) -> DatasetAssets: 187 """ 188 Gets a listing of files, charts, and other assets available for the dataset 189 190 Args: 191 project_id (str): ID of the Project 192 dataset_id (str): ID of the Dataset 193 file_limit (int): Maximum number of files to get (default 100,000) 194 """ 195 dataset = self.get(project_id, dataset_id) 196 if file_limit < 1: 197 raise ValueError("file_limit must be greater than 0") 198 all_files = [] 199 file_offset = 0 200 domain = None 201 artifacts = None 202 203 while len(all_files) < file_limit: 204 manifest = get_dataset_manifest.sync( 205 project_id=project_id, 206 dataset_id=dataset_id, 207 file_offset=file_offset, 208 client=self._api_client 209 ) 210 all_files.extend(manifest.files) 211 file_offset += len(manifest.files) 212 213 if not artifacts: 214 artifacts = manifest.artifacts 215 216 domain = manifest.domain 217 if len(all_files) >= manifest.total_files or len(manifest.files) == 0: 218 break 219 220 files = [ 221 File.from_file_entry( 222 f, 223 project_id=project_id, 224 dataset=dataset, 225 domain=domain 226 ) 227 for f in all_files 228 ] 229 artifacts = [ 230 Artifact( 231 artifact_type=a.type, 232 file=File.from_file_entry( 233 FileEntry(a.path), 234 project_id=project_id, 235 dataset=dataset, 236 domain=domain 237 ) 238 ) 239 for a in artifacts 240 ] 241 return DatasetAssets(files=files, artifacts=artifacts)
Gets a listing of files, charts, and other assets available for the dataset
Arguments:
- project_id (str): ID of the Project
- dataset_id (str): ID of the Dataset
- file_limit (int): Maximum number of files to get (default 100,000)
243 def upload_files(self, 244 project_id: str, 245 dataset_id: str, 246 directory: PathLike, 247 files: List[PathLike] = None, 248 file_path_map: Dict[PathLike, str] = None) -> None: 249 """ 250 Uploads files to a given dataset from the specified directory. 251 252 All files must be relative to the specified directory. 253 If files need to be flattened, or you are sourcing files from multiple directories, 254 please include `file_path_map` or call this method multiple times. 255 256 Args: 257 project_id (str): ID of the Project 258 dataset_id (str): ID of the Dataset 259 directory (str|Path): Path to directory 260 files (typing.List[str|Path]): List of paths to files within the directory, 261 must be the same type as directory. 262 file_path_map (typing.Dict[str|Path, str|Path]): Optional mapping of file paths to upload 263 from source path to destination path, used to "re-write" paths within the dataset. 264 ```python 265 from cirro.cirro_client import CirroApi 266 from cirro.file_utils import generate_flattened_file_map 267 268 cirro = CirroApi() 269 270 directory = "~/Downloads" 271 # Re-write file paths 272 file_map = { 273 "data1/file1.fastq.gz": "file1.fastq.gz", 274 "data2/file2.fastq.gz": "file2.fastq.gz", 275 "file3.fastq.gz": "new_file3.txt" 276 } 277 278 # Or you could automate the flattening 279 files = ["data1/file1.fastq.gz"] 280 file_map = generate_flattened_file_map(files) 281 282 cirro.datasets.upload_files( 283 project_id="project-id", 284 dataset_id="dataset-id", 285 directory=directory, 286 files=list(file_map.keys()), 287 file_path_map=file_map 288 ) 289 ``` 290 """ 291 if file_path_map is None: 292 file_path_map = {} 293 294 dataset = self.get(project_id, dataset_id) 295 296 access_context = FileAccessContext.upload_dataset( 297 project_id=project_id, 298 dataset_id=dataset_id, 299 base_url=dataset.s3 300 ) 301 302 self._file_service.upload_files( 303 access_context=access_context, 304 directory=directory, 305 files=files, 306 file_path_map=file_path_map 307 )
Uploads files to a given dataset from the specified directory.
All files must be relative to the specified directory.
If files need to be flattened, or you are sourcing files from multiple directories,
please include file_path_map or call this method multiple times.
Arguments:
- project_id (str): ID of the Project
- dataset_id (str): ID of the Dataset
- directory (str|Path): Path to directory
- files (typing.List[str|Path]): List of paths to files within the directory, must be the same type as directory.
- file_path_map (typing.Dict[str|Path, str|Path]): Optional mapping of file paths to upload from source path to destination path, used to "re-write" paths within the dataset.
from cirro.cirro_client import CirroApi
from cirro.file_utils import generate_flattened_file_map
cirro = CirroApi()
directory = "~/Downloads"
# Re-write file paths
file_map = {
"data1/file1.fastq.gz": "file1.fastq.gz",
"data2/file2.fastq.gz": "file2.fastq.gz",
"file3.fastq.gz": "new_file3.txt"
}
# Or you could automate the flattening
files = ["data1/file1.fastq.gz"]
file_map = generate_flattened_file_map(files)
cirro.datasets.upload_files(
project_id="project-id",
dataset_id="dataset-id",
directory=directory,
files=list(file_map.keys()),
file_path_map=file_map
)
309 def validate_folder( 310 self, 311 project_id: str, 312 dataset_id: str, 313 local_folder: PathLike 314 ) -> DatasetValidationResponse: 315 """ 316 Validates that the contents of a dataset match that of a local folder. 317 """ 318 ds_files = self.get_assets_listing(project_id, dataset_id).files 319 320 local_folder = Path(local_folder) 321 if not local_folder.is_dir(): 322 raise ValueError(f"{local_folder} is not a valid local folder") 323 324 # Keep track of files from the dataset which match by checksum, don't match, or are missing 325 ds_files_matching = [] 326 ds_files_not_matching = [] 327 ds_files_missing = [] 328 ds_validate_failed = [] 329 for ds_file in ds_files: 330 ds_file_path = ds_file.normalized_path 331 # Get the corresponding local file 332 local_file = local_folder / ds_file_path 333 if not local_file.exists(): 334 ds_files_missing.append(ds_file_path) 335 else: 336 try: 337 if self._file_service.is_valid_file(ds_file, local_file): 338 ds_files_matching.append(ds_file_path) 339 else: 340 ds_files_not_matching.append(ds_file_path) 341 except RuntimeWarning as e: 342 logger.warning(f"File validation failed: {e}") 343 ds_validate_failed.append(ds_file_path) 344 345 # Find local files that are not in the dataset 346 local_file_paths = [ 347 file.relative_to(local_folder).as_posix() 348 for file in local_folder.rglob("*") 349 if not file.is_dir() and not is_hidden_file(file) 350 ] 351 dataset_file_paths = [file.normalized_path for file in ds_files] 352 local_only_files = [ 353 file 354 for file in local_file_paths 355 if file not in dataset_file_paths 356 ] 357 358 return DatasetValidationResponse( 359 files_matching=ds_files_matching, 360 files_not_matching=ds_files_not_matching, 361 files_missing=ds_files_missing, 362 local_only_files=local_only_files, 363 validate_errors=ds_validate_failed, 364 )
Validates that the contents of a dataset match that of a local folder.
366 def download_files( 367 self, 368 project_id: str, 369 dataset_id: str, 370 download_location: str, 371 files: Union[List[File], List[str]] = None 372 ) -> None: 373 """ 374 Downloads files from a dataset 375 376 The `files` argument is used to optionally specify a subset of files 377 to be downloaded. By default, all files are downloaded. 378 379 Args: 380 project_id (str): ID of the Project 381 dataset_id (str): ID of the Dataset 382 download_location (str): Local destination for downloaded files 383 files (typing.List[str]): Optional list of files to download 384 """ 385 if files is None: 386 files = self.get_assets_listing(project_id, dataset_id).files 387 388 if len(files) == 0: 389 return 390 391 first_file = files[0] 392 if isinstance(first_file, File): 393 files = [file.relative_path for file in files] 394 access_context = first_file.access_context 395 else: 396 dataset = self.get(project_id, dataset_id) 397 if dataset.share: 398 access_context = FileAccessContext.download_shared_dataset(project_id=project_id, 399 dataset_id=dataset_id, 400 base_url=dataset.s3) 401 else: 402 access_context = FileAccessContext.download(project_id=project_id, 403 base_url=dataset.s3) 404 405 self._file_service.download_files(access_context, download_location, files)
Downloads files from a dataset
The files argument is used to optionally specify a subset of files
to be downloaded. By default, all files are downloaded.
Arguments:
- project_id (str): ID of the Project
- dataset_id (str): ID of the Dataset
- download_location (str): Local destination for downloaded files
- files (typing.List[str]): Optional list of files to download
407 def update_samplesheet( 408 self, 409 project_id: str, 410 dataset_id: str, 411 samplesheet: str 412 ): 413 """ 414 Updates a samplesheet on a dataset 415 416 Args: 417 project_id (str): ID of the Project 418 dataset_id (str): ID of the Dataset 419 samplesheet (str): Samplesheet contents to update (should be a CSV string) 420 """ 421 dataset = self.get(project_id, dataset_id) 422 access_context = FileAccessContext.upload_sample_sheet(project_id=project_id, 423 dataset_id=dataset_id, 424 base_url=dataset.s3) 425 426 samplesheet_key = f'{access_context.prefix}/samplesheet.csv' 427 self._file_service.create_file( 428 access_context=access_context, 429 key=samplesheet_key, 430 contents=samplesheet, 431 content_type='text/csv' 432 )
Updates a samplesheet on a dataset
Arguments:
- project_id (str): ID of the Project
- dataset_id (str): ID of the Dataset
- samplesheet (str): Samplesheet contents to update (should be a CSV string)
13class ExecutionService(BaseService): 14 """ 15 Service for interacting with the Execution endpoints 16 """ 17 def run_analysis(self, project_id: str, request: RunAnalysisRequest) -> CreateResponse: 18 """ 19 Launch an analysis job running a process on a set of inputs 20 21 Args: 22 project_id (str): ID of the Project 23 request (cirro_api_client.v1.models.RunAnalysisRequest): 24 25 Returns: 26 The ID of the created dataset 27 28 ```python 29 from cirro_api_client.v1.models import RunAnalysisRequest, RunAnalysisRequestParams 30 from cirro.cirro_client import CirroApi 31 32 # Example: 33 # Run the "process-nf-core-rnaseq-3_8" process using input data 34 # from a dataset with the id "source-dataset-id" 35 36 # Optional analysis parameters 37 params = RunAnalysisRequestParams.from_dict({ 38 "param_a": "val_a", 39 "param_b": "val_b" 40 }) 41 42 cirro = CirroApi() 43 request = RunAnalysisRequest( 44 name="Name of the newly created dataset", 45 description="Longer description of newly created dataset", 46 process_id="process-nf-core-rnaseq-3_8", 47 source_dataset_ids=["source-dataset-id"], 48 params=params 49 ) 50 cirro.execution.run_analysis("project-id", request) 51 ``` 52 """ 53 54 form_spec = get_process_parameters.sync( 55 process_id=request.process_id, 56 client=self._api_client 57 ) 58 59 ParameterSpecification( 60 form_spec 61 ).validate_params( 62 request.params.to_dict() if request.params else {} 63 ) 64 65 return run_analysis.sync( 66 project_id=project_id, 67 body=request, 68 client=self._api_client 69 ) 70 71 def stop_analysis(self, project_id: str, dataset_id: str): 72 """ 73 Terminates all jobs related to a running analysis 74 75 Args: 76 project_id (str): ID of the Project 77 dataset_id (str): ID of the Dataset 78 """ 79 80 return stop_analysis.sync( 81 project_id=project_id, 82 dataset_id=dataset_id, 83 client=self._api_client 84 ) 85 86 def get_project_summary(self, project_id: str) -> Dict[str, List[Task]]: 87 """ 88 Gets an overview of the executions currently running in the project, by job queue 89 90 Args: 91 project_id (str): ID of the Project 92 93 Returns: 94 `cirro_api_client.v1.models.GetProjectSummaryResponse200` 95 """ 96 97 return get_project_summary.sync( 98 project_id=project_id, 99 client=self._api_client 100 ).additional_properties 101 102 def get_execution_logs(self, project_id: str, dataset_id: str, force_live=False) -> str: 103 """ 104 Gets live logs from main execution task 105 106 Args: 107 project_id (str): ID of the Project 108 dataset_id (str): ID of the Dataset 109 force_live (bool): If True, it will fetch logs from CloudWatch, 110 even if the execution is already completed 111 112 """ 113 114 resp = get_execution_logs.sync( 115 project_id=project_id, 116 dataset_id=dataset_id, 117 force_live=force_live, 118 client=self._api_client 119 ) 120 121 return '\n'.join(e.message for e in resp.events) 122 123 def get_tasks_for_execution(self, project_id: str, dataset_id: str, force_live=False) -> Optional[List[Task]]: 124 """ 125 Gets the tasks submitted by the workflow execution 126 127 Args: 128 project_id (str): ID of the Project 129 dataset_id (str): ID of the Dataset 130 force_live (bool): If True, it will try to get the list of jobs 131 from the executor (i.e., AWS Batch), rather than the workflow report 132 """ 133 134 return get_tasks_for_execution.sync( 135 project_id=project_id, 136 dataset_id=dataset_id, 137 force_live=force_live, 138 client=self._api_client 139 ) 140 141 def get_task_logs(self, project_id: str, dataset_id: str, task_id: str, force_live=False) -> str: 142 """ 143 Gets the log output from an individual task 144 145 Args: 146 project_id (str): ID of the Project 147 dataset_id (str): ID of the Dataset 148 task_id (str): ID of the task 149 force_live (bool): If True, it will fetch logs from CloudWatch, 150 even if the execution is already completed 151 152 """ 153 154 resp = get_task_logs.sync( 155 project_id=project_id, 156 dataset_id=dataset_id, 157 task_id=task_id, 158 force_live=force_live, 159 client=self._api_client 160 ) 161 162 return '\n'.join(e.message for e in resp.events)
Service for interacting with the Execution endpoints
17 def run_analysis(self, project_id: str, request: RunAnalysisRequest) -> CreateResponse: 18 """ 19 Launch an analysis job running a process on a set of inputs 20 21 Args: 22 project_id (str): ID of the Project 23 request (cirro_api_client.v1.models.RunAnalysisRequest): 24 25 Returns: 26 The ID of the created dataset 27 28 ```python 29 from cirro_api_client.v1.models import RunAnalysisRequest, RunAnalysisRequestParams 30 from cirro.cirro_client import CirroApi 31 32 # Example: 33 # Run the "process-nf-core-rnaseq-3_8" process using input data 34 # from a dataset with the id "source-dataset-id" 35 36 # Optional analysis parameters 37 params = RunAnalysisRequestParams.from_dict({ 38 "param_a": "val_a", 39 "param_b": "val_b" 40 }) 41 42 cirro = CirroApi() 43 request = RunAnalysisRequest( 44 name="Name of the newly created dataset", 45 description="Longer description of newly created dataset", 46 process_id="process-nf-core-rnaseq-3_8", 47 source_dataset_ids=["source-dataset-id"], 48 params=params 49 ) 50 cirro.execution.run_analysis("project-id", request) 51 ``` 52 """ 53 54 form_spec = get_process_parameters.sync( 55 process_id=request.process_id, 56 client=self._api_client 57 ) 58 59 ParameterSpecification( 60 form_spec 61 ).validate_params( 62 request.params.to_dict() if request.params else {} 63 ) 64 65 return run_analysis.sync( 66 project_id=project_id, 67 body=request, 68 client=self._api_client 69 )
Launch an analysis job running a process on a set of inputs
Arguments:
- project_id (str): ID of the Project
- request (cirro_api_client.v1.models.RunAnalysisRequest):
Returns:
The ID of the created dataset
from cirro_api_client.v1.models import RunAnalysisRequest, RunAnalysisRequestParams
from cirro.cirro_client import CirroApi
# Example:
# Run the "process-nf-core-rnaseq-3_8" process using input data
# from a dataset with the id "source-dataset-id"
# Optional analysis parameters
params = RunAnalysisRequestParams.from_dict({
"param_a": "val_a",
"param_b": "val_b"
})
cirro = CirroApi()
request = RunAnalysisRequest(
name="Name of the newly created dataset",
description="Longer description of newly created dataset",
process_id="process-nf-core-rnaseq-3_8",
source_dataset_ids=["source-dataset-id"],
params=params
)
cirro.execution.run_analysis("project-id", request)
71 def stop_analysis(self, project_id: str, dataset_id: str): 72 """ 73 Terminates all jobs related to a running analysis 74 75 Args: 76 project_id (str): ID of the Project 77 dataset_id (str): ID of the Dataset 78 """ 79 80 return stop_analysis.sync( 81 project_id=project_id, 82 dataset_id=dataset_id, 83 client=self._api_client 84 )
Terminates all jobs related to a running analysis
Arguments:
- project_id (str): ID of the Project
- dataset_id (str): ID of the Dataset
86 def get_project_summary(self, project_id: str) -> Dict[str, List[Task]]: 87 """ 88 Gets an overview of the executions currently running in the project, by job queue 89 90 Args: 91 project_id (str): ID of the Project 92 93 Returns: 94 `cirro_api_client.v1.models.GetProjectSummaryResponse200` 95 """ 96 97 return get_project_summary.sync( 98 project_id=project_id, 99 client=self._api_client 100 ).additional_properties
Gets an overview of the executions currently running in the project, by job queue
Arguments:
- project_id (str): ID of the Project
Returns:
102 def get_execution_logs(self, project_id: str, dataset_id: str, force_live=False) -> str: 103 """ 104 Gets live logs from main execution task 105 106 Args: 107 project_id (str): ID of the Project 108 dataset_id (str): ID of the Dataset 109 force_live (bool): If True, it will fetch logs from CloudWatch, 110 even if the execution is already completed 111 112 """ 113 114 resp = get_execution_logs.sync( 115 project_id=project_id, 116 dataset_id=dataset_id, 117 force_live=force_live, 118 client=self._api_client 119 ) 120 121 return '\n'.join(e.message for e in resp.events)
Gets live logs from main execution task
Arguments:
- project_id (str): ID of the Project
- dataset_id (str): ID of the Dataset
- force_live (bool): If True, it will fetch logs from CloudWatch, even if the execution is already completed
123 def get_tasks_for_execution(self, project_id: str, dataset_id: str, force_live=False) -> Optional[List[Task]]: 124 """ 125 Gets the tasks submitted by the workflow execution 126 127 Args: 128 project_id (str): ID of the Project 129 dataset_id (str): ID of the Dataset 130 force_live (bool): If True, it will try to get the list of jobs 131 from the executor (i.e., AWS Batch), rather than the workflow report 132 """ 133 134 return get_tasks_for_execution.sync( 135 project_id=project_id, 136 dataset_id=dataset_id, 137 force_live=force_live, 138 client=self._api_client 139 )
Gets the tasks submitted by the workflow execution
Arguments:
- project_id (str): ID of the Project
- dataset_id (str): ID of the Dataset
- force_live (bool): If True, it will try to get the list of jobs from the executor (i.e., AWS Batch), rather than the workflow report
141 def get_task_logs(self, project_id: str, dataset_id: str, task_id: str, force_live=False) -> str: 142 """ 143 Gets the log output from an individual task 144 145 Args: 146 project_id (str): ID of the Project 147 dataset_id (str): ID of the Dataset 148 task_id (str): ID of the task 149 force_live (bool): If True, it will fetch logs from CloudWatch, 150 even if the execution is already completed 151 152 """ 153 154 resp = get_task_logs.sync( 155 project_id=project_id, 156 dataset_id=dataset_id, 157 task_id=task_id, 158 force_live=force_live, 159 client=self._api_client 160 ) 161 162 return '\n'.join(e.message for e in resp.events)
Gets the log output from an individual task
Arguments:
- project_id (str): ID of the Project
- dataset_id (str): ID of the Dataset
- task_id (str): ID of the task
- force_live (bool): If True, it will fetch logs from CloudWatch, even if the execution is already completed
10class ComputeEnvironmentService(BaseService): 11 """ 12 Service for interacting with the Compute Environment endpoints 13 """ 14 15 def list_environments_for_project(self, project_id: str) -> List[ComputeEnvironmentConfiguration]: 16 """ 17 List of custom compute environments for a project (i.e., an agent) 18 19 Args: 20 project_id (str): Project ID 21 22 Returns: 23 List of compute environments that are available for the project 24 """ 25 return get_compute_environments.sync(project_id=project_id, 26 client=self._api_client)
Service for interacting with the Compute Environment endpoints
15 def list_environments_for_project(self, project_id: str) -> List[ComputeEnvironmentConfiguration]: 16 """ 17 List of custom compute environments for a project (i.e., an agent) 18 19 Args: 20 project_id (str): Project ID 21 22 Returns: 23 List of compute environments that are available for the project 24 """ 25 return get_compute_environments.sync(project_id=project_id, 26 client=self._api_client)
List of custom compute environments for a project (i.e., an agent)
Arguments:
- project_id (str): Project ID
Returns:
List of compute environments that are available for the project
22class FileService(BaseService): 23 """ 24 Service for interacting with files 25 """ 26 checksum_method: str 27 transfer_retries: int 28 _get_token_lock = threading.Lock() 29 _read_token_cache: Dict[str, AWSCredentials] = {} 30 31 def __init__(self, api_client, checksum_method, transfer_retries): 32 """ 33 Instantiates the file service class 34 """ 35 self._api_client = api_client 36 self.checksum_method = checksum_method 37 self.transfer_retries = transfer_retries 38 39 def get_access_credentials(self, access_context: FileAccessContext) -> AWSCredentials: 40 """ 41 Retrieves credentials to access files 42 43 Args: 44 access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate 45 """ 46 access_request = access_context.file_access_request 47 48 if access_request.access_type == ProjectAccessType.PROJECT_DOWNLOAD: 49 return self._get_project_read_credentials(access_context) 50 51 else: 52 return generate_project_file_access_token.sync( 53 client=self._api_client, 54 project_id=access_context.project_id, 55 body=access_context.file_access_request 56 ) 57 58 def _get_project_read_credentials(self, access_context: FileAccessContext): 59 """ 60 Retrieves credentials to read project data, this can be cached 61 """ 62 access_request = access_context.file_access_request 63 project_id = access_context.project_id 64 with self._get_token_lock: 65 cached_token = self._read_token_cache.get(project_id) 66 67 if not cached_token or datetime.now(tz=timezone.utc) > cached_token.expiration: 68 new_token = generate_project_file_access_token.sync( 69 client=self._api_client, 70 project_id=project_id, 71 body=access_request 72 ) 73 74 self._read_token_cache[project_id] = new_token 75 76 return self._read_token_cache[project_id] 77 78 def get_aws_s3_client(self, access_context: FileAccessContext) -> BaseClient: 79 """ 80 Gets the underlying AWS S3 client to perform operations on files 81 82 This is seeded with refreshable credentials from the access_context parameter 83 84 This may be used to perform advanced operations, such as CopyObject, S3 Select, etc. 85 """ 86 s3_client = self._generate_s3_client(access_context) 87 return s3_client.get_aws_client() 88 89 def get_file(self, file: File) -> bytes: 90 """ 91 Gets the contents of a file 92 93 Args: 94 file (cirro.models.file.File): 95 96 Returns: 97 The raw bytes of the file 98 """ 99 return self.get_file_from_path(file.access_context, file.relative_path) 100 101 def get_file_from_path(self, access_context: FileAccessContext, file_path: str) -> bytes: 102 """ 103 Gets the contents of a file by providing the path, used internally 104 105 Args: 106 access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate 107 file_path (str): Relative path to file within dataset 108 109 Returns: 110 The raw bytes of the file 111 """ 112 s3_client = self._generate_s3_client(access_context) 113 114 full_path = f'{access_context.prefix}/{file_path}'.lstrip('/') 115 116 return s3_client.get_file(access_context.bucket, full_path) 117 118 def create_file(self, access_context: FileAccessContext, key: str, 119 contents: str, content_type: str) -> None: 120 """ 121 Creates a file at the specified path 122 123 Args: 124 access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate 125 key (str): Key of object to create 126 contents (str): Content of object 127 content_type (str): 128 """ 129 s3_client = self._generate_s3_client(access_context) 130 131 s3_client.create_object( 132 key=key, 133 contents=contents, 134 content_type=content_type, 135 bucket=access_context.bucket 136 ) 137 138 def upload_files(self, 139 access_context: FileAccessContext, 140 directory: PathLike, 141 files: List[PathLike], 142 file_path_map: Dict[PathLike, str]) -> None: 143 """ 144 Uploads a list of files from the specified directory 145 146 Args: 147 access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate 148 directory (str|Path): Path to directory 149 files (typing.List[str|Path]): List of paths to files within the directory 150 must be the same type as directory. 151 file_path_map (typing.Dict[str|Path, str]): Optional mapping of file paths to upload 152 from source path to destination path, used to "re-write" paths within the dataset. 153 """ 154 s3_client = self._generate_s3_client(access_context) 155 156 upload_directory( 157 directory=directory, 158 files=files, 159 file_path_map=file_path_map, 160 s3_client=s3_client, 161 bucket=access_context.bucket, 162 prefix=access_context.prefix, 163 max_retries=self.transfer_retries 164 ) 165 166 def download_files(self, access_context: FileAccessContext, directory: str, files: List[str]) -> None: 167 """ 168 Download a list of files to the specified directory 169 170 Args: 171 access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate 172 directory (str): download location 173 files (List[str]): relative path of files to download 174 """ 175 s3_client = self._generate_s3_client(access_context) 176 177 download_directory( 178 directory, 179 files, 180 s3_client, 181 access_context.bucket, 182 access_context.prefix 183 ) 184 185 def is_valid_file(self, file: File, local_file: Path) -> bool: 186 """ 187 Validates the checksum of a file against a local file 188 See ``validate_file`` method for details. 189 190 Args: 191 file (File): Cirro file to validate 192 local_file (PathLike): Local file path to compare against 193 194 Returns: 195 bool: True if file integrity matches, False otherwise 196 197 Raises: 198 RuntimeWarning: If the remote checksum is not available or not supported 199 """ 200 try: 201 self.validate_file(file, local_file) 202 return True 203 except ValueError: 204 return False 205 206 def validate_file(self, file: File, local_file: PathLike): 207 """ 208 Validates the checksum of a file against a local file 209 This is used to ensure file integrity after download or upload 210 211 Checksums might not be available if the file was uploaded without checksum support 212 213 https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html 214 Args: 215 file (File): Cirro file to validate 216 local_file (PathLike): Local file path to compare against 217 218 Raises: 219 ValueError: If checksums do not match 220 RuntimeWarning: If the remote checksum is not available or not supported 221 """ 222 stats = self.get_file_stats(file) 223 224 remote_checksum_key = next((prop for prop in stats.keys() 225 if 'Checksum' in prop and prop != 'ChecksumType'), None) 226 227 if 'ChecksumType' in stats and stats['ChecksumType'] != 'FULL_OBJECT': 228 raise RuntimeWarning(f"Only 'FULL_OBJECT' checksums are supported, not {stats['ChecksumType']}") 229 230 if remote_checksum_key is None: 231 raise RuntimeWarning(f"File {file.relative_path} does not have a checksum available for validation.") 232 233 remote_checksum = stats[remote_checksum_key] 234 remote_checksum_name = remote_checksum_key.replace('Checksum', '') 235 logger.debug(f"Checksum for file {file.relative_path} is {remote_checksum} using {remote_checksum_name}") 236 237 local_checksum = get_checksum(local_file, remote_checksum_name) 238 logger.debug(f"Local checksum for file {local_file} is {local_checksum} using {remote_checksum_name}") 239 240 if local_checksum != remote_checksum: 241 raise ValueError(f"Checksum mismatch for file {file.relative_path}: " 242 f"local {local_checksum}, remote {remote_checksum}") 243 244 def get_file_stats(self, file: File) -> dict: 245 """ 246 Gets the file stats for a file, such as size, checksum, etc. 247 Equivalent to the `head_object` operation in S3 248 """ 249 s3_client = self._generate_s3_client(file.access_context) 250 251 full_path = f'{file.access_context.prefix}/{file.relative_path}' 252 253 stats = s3_client.get_file_stats( 254 bucket=file.access_context.bucket, 255 key=full_path 256 ) 257 logger.debug(f"File stats for file {file.relative_path} is {stats}") 258 return stats 259 260 def _generate_s3_client(self, access_context: FileAccessContext): 261 """ 262 Generates the Cirro-S3 client to perform operations on files 263 """ 264 return S3Client( 265 partial(self.get_access_credentials, access_context), 266 self.checksum_method 267 )
Service for interacting with files
31 def __init__(self, api_client, checksum_method, transfer_retries): 32 """ 33 Instantiates the file service class 34 """ 35 self._api_client = api_client 36 self.checksum_method = checksum_method 37 self.transfer_retries = transfer_retries
Instantiates the file service class
39 def get_access_credentials(self, access_context: FileAccessContext) -> AWSCredentials: 40 """ 41 Retrieves credentials to access files 42 43 Args: 44 access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate 45 """ 46 access_request = access_context.file_access_request 47 48 if access_request.access_type == ProjectAccessType.PROJECT_DOWNLOAD: 49 return self._get_project_read_credentials(access_context) 50 51 else: 52 return generate_project_file_access_token.sync( 53 client=self._api_client, 54 project_id=access_context.project_id, 55 body=access_context.file_access_request 56 )
Retrieves credentials to access files
Arguments:
- access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
78 def get_aws_s3_client(self, access_context: FileAccessContext) -> BaseClient: 79 """ 80 Gets the underlying AWS S3 client to perform operations on files 81 82 This is seeded with refreshable credentials from the access_context parameter 83 84 This may be used to perform advanced operations, such as CopyObject, S3 Select, etc. 85 """ 86 s3_client = self._generate_s3_client(access_context) 87 return s3_client.get_aws_client()
Gets the underlying AWS S3 client to perform operations on files
This is seeded with refreshable credentials from the access_context parameter
This may be used to perform advanced operations, such as CopyObject, S3 Select, etc.
89 def get_file(self, file: File) -> bytes: 90 """ 91 Gets the contents of a file 92 93 Args: 94 file (cirro.models.file.File): 95 96 Returns: 97 The raw bytes of the file 98 """ 99 return self.get_file_from_path(file.access_context, file.relative_path)
Gets the contents of a file
Arguments:
- file (cirro.models.file.File):
Returns:
The raw bytes of the file
101 def get_file_from_path(self, access_context: FileAccessContext, file_path: str) -> bytes: 102 """ 103 Gets the contents of a file by providing the path, used internally 104 105 Args: 106 access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate 107 file_path (str): Relative path to file within dataset 108 109 Returns: 110 The raw bytes of the file 111 """ 112 s3_client = self._generate_s3_client(access_context) 113 114 full_path = f'{access_context.prefix}/{file_path}'.lstrip('/') 115 116 return s3_client.get_file(access_context.bucket, full_path)
Gets the contents of a file by providing the path, used internally
Arguments:
- access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
- file_path (str): Relative path to file within dataset
Returns:
The raw bytes of the file
118 def create_file(self, access_context: FileAccessContext, key: str, 119 contents: str, content_type: str) -> None: 120 """ 121 Creates a file at the specified path 122 123 Args: 124 access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate 125 key (str): Key of object to create 126 contents (str): Content of object 127 content_type (str): 128 """ 129 s3_client = self._generate_s3_client(access_context) 130 131 s3_client.create_object( 132 key=key, 133 contents=contents, 134 content_type=content_type, 135 bucket=access_context.bucket 136 )
Creates a file at the specified path
Arguments:
- access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
- key (str): Key of object to create
- contents (str): Content of object
- content_type (str):
138 def upload_files(self, 139 access_context: FileAccessContext, 140 directory: PathLike, 141 files: List[PathLike], 142 file_path_map: Dict[PathLike, str]) -> None: 143 """ 144 Uploads a list of files from the specified directory 145 146 Args: 147 access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate 148 directory (str|Path): Path to directory 149 files (typing.List[str|Path]): List of paths to files within the directory 150 must be the same type as directory. 151 file_path_map (typing.Dict[str|Path, str]): Optional mapping of file paths to upload 152 from source path to destination path, used to "re-write" paths within the dataset. 153 """ 154 s3_client = self._generate_s3_client(access_context) 155 156 upload_directory( 157 directory=directory, 158 files=files, 159 file_path_map=file_path_map, 160 s3_client=s3_client, 161 bucket=access_context.bucket, 162 prefix=access_context.prefix, 163 max_retries=self.transfer_retries 164 )
Uploads a list of files from the specified directory
Arguments:
- access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
- directory (str|Path): Path to directory
- files (typing.List[str|Path]): List of paths to files within the directory must be the same type as directory.
- file_path_map (typing.Dict[str|Path, str]): Optional mapping of file paths to upload from source path to destination path, used to "re-write" paths within the dataset.
166 def download_files(self, access_context: FileAccessContext, directory: str, files: List[str]) -> None: 167 """ 168 Download a list of files to the specified directory 169 170 Args: 171 access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate 172 directory (str): download location 173 files (List[str]): relative path of files to download 174 """ 175 s3_client = self._generate_s3_client(access_context) 176 177 download_directory( 178 directory, 179 files, 180 s3_client, 181 access_context.bucket, 182 access_context.prefix 183 )
Download a list of files to the specified directory
Arguments:
- access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
- directory (str): download location
- files (List[str]): relative path of files to download
185 def is_valid_file(self, file: File, local_file: Path) -> bool: 186 """ 187 Validates the checksum of a file against a local file 188 See ``validate_file`` method for details. 189 190 Args: 191 file (File): Cirro file to validate 192 local_file (PathLike): Local file path to compare against 193 194 Returns: 195 bool: True if file integrity matches, False otherwise 196 197 Raises: 198 RuntimeWarning: If the remote checksum is not available or not supported 199 """ 200 try: 201 self.validate_file(file, local_file) 202 return True 203 except ValueError: 204 return False
Validates the checksum of a file against a local file
See validate_file method for details.
Arguments:
- file (File): Cirro file to validate
- local_file (PathLike): Local file path to compare against
Returns:
bool: True if file integrity matches, False otherwise
Raises:
- RuntimeWarning: If the remote checksum is not available or not supported
206 def validate_file(self, file: File, local_file: PathLike): 207 """ 208 Validates the checksum of a file against a local file 209 This is used to ensure file integrity after download or upload 210 211 Checksums might not be available if the file was uploaded without checksum support 212 213 https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html 214 Args: 215 file (File): Cirro file to validate 216 local_file (PathLike): Local file path to compare against 217 218 Raises: 219 ValueError: If checksums do not match 220 RuntimeWarning: If the remote checksum is not available or not supported 221 """ 222 stats = self.get_file_stats(file) 223 224 remote_checksum_key = next((prop for prop in stats.keys() 225 if 'Checksum' in prop and prop != 'ChecksumType'), None) 226 227 if 'ChecksumType' in stats and stats['ChecksumType'] != 'FULL_OBJECT': 228 raise RuntimeWarning(f"Only 'FULL_OBJECT' checksums are supported, not {stats['ChecksumType']}") 229 230 if remote_checksum_key is None: 231 raise RuntimeWarning(f"File {file.relative_path} does not have a checksum available for validation.") 232 233 remote_checksum = stats[remote_checksum_key] 234 remote_checksum_name = remote_checksum_key.replace('Checksum', '') 235 logger.debug(f"Checksum for file {file.relative_path} is {remote_checksum} using {remote_checksum_name}") 236 237 local_checksum = get_checksum(local_file, remote_checksum_name) 238 logger.debug(f"Local checksum for file {local_file} is {local_checksum} using {remote_checksum_name}") 239 240 if local_checksum != remote_checksum: 241 raise ValueError(f"Checksum mismatch for file {file.relative_path}: " 242 f"local {local_checksum}, remote {remote_checksum}")
Validates the checksum of a file against a local file This is used to ensure file integrity after download or upload
Checksums might not be available if the file was uploaded without checksum support
https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html
Arguments:
- file (File): Cirro file to validate
- local_file (PathLike): Local file path to compare against
Raises:
- ValueError: If checksums do not match
- RuntimeWarning: If the remote checksum is not available or not supported
244 def get_file_stats(self, file: File) -> dict: 245 """ 246 Gets the file stats for a file, such as size, checksum, etc. 247 Equivalent to the `head_object` operation in S3 248 """ 249 s3_client = self._generate_s3_client(file.access_context) 250 251 full_path = f'{file.access_context.prefix}/{file.relative_path}' 252 253 stats = s3_client.get_file_stats( 254 bucket=file.access_context.bucket, 255 key=full_path 256 ) 257 logger.debug(f"File stats for file {file.relative_path} is {stats}") 258 return stats
Gets the file stats for a file, such as size, checksum, etc.
Equivalent to the head_object operation in S3
11class MetadataService(BaseService): 12 """ 13 Service for interacting with the Metadata endpoints 14 """ 15 def get_project_samples(self, project_id: str, max_items: int = 10000) -> List[Sample]: 16 """ 17 Retrieves a list of samples associated with a project along with their metadata 18 19 Args: 20 project_id (str): ID of the Project 21 max_items (int): Maximum number of records to get (default 10,000) 22 """ 23 return get_all_records( 24 records_getter=lambda page_args: get_project_samples.sync(project_id=project_id, 25 client=self._api_client, 26 next_token=page_args.next_token, 27 limit=page_args.limit), 28 max_items=max_items 29 ) 30 31 def get_project_schema(self, project_id: str) -> FormSchema: 32 """ 33 Get project metadata schema 34 35 Args: 36 project_id (str): ID of the Project 37 """ 38 return get_project_schema.sync(project_id=project_id, client=self._api_client) 39 40 def update_project_schema(self, project_id: str, schema: FormSchema): 41 """ 42 Update project metadata schema 43 44 Args: 45 project_id (str): ID of the Project 46 schema (cirro_api_client.v1.models.FormSchema): Metadata schema 47 """ 48 update_project_schema.sync_detailed(project_id=project_id, body=schema, client=self._api_client) 49 50 def update_sample(self, project_id: str, sample_id: str, sample: SampleRequest) -> Sample: 51 """ 52 Updates metadata information for sample 53 54 Args: 55 project_id (str): ID of the Project 56 sample_id (str): ID of the sample 57 sample (cirro_api_client.v1.models.SampleRequest): Metadata information for the sample 58 """ 59 return update_sample.sync( 60 project_id=project_id, 61 sample_id=sample_id, 62 body=sample, 63 client=self._api_client 64 )
Service for interacting with the Metadata endpoints
15 def get_project_samples(self, project_id: str, max_items: int = 10000) -> List[Sample]: 16 """ 17 Retrieves a list of samples associated with a project along with their metadata 18 19 Args: 20 project_id (str): ID of the Project 21 max_items (int): Maximum number of records to get (default 10,000) 22 """ 23 return get_all_records( 24 records_getter=lambda page_args: get_project_samples.sync(project_id=project_id, 25 client=self._api_client, 26 next_token=page_args.next_token, 27 limit=page_args.limit), 28 max_items=max_items 29 )
Retrieves a list of samples associated with a project along with their metadata
Arguments:
- project_id (str): ID of the Project
- max_items (int): Maximum number of records to get (default 10,000)
31 def get_project_schema(self, project_id: str) -> FormSchema: 32 """ 33 Get project metadata schema 34 35 Args: 36 project_id (str): ID of the Project 37 """ 38 return get_project_schema.sync(project_id=project_id, client=self._api_client)
Get project metadata schema
Arguments:
- project_id (str): ID of the Project
40 def update_project_schema(self, project_id: str, schema: FormSchema): 41 """ 42 Update project metadata schema 43 44 Args: 45 project_id (str): ID of the Project 46 schema (cirro_api_client.v1.models.FormSchema): Metadata schema 47 """ 48 update_project_schema.sync_detailed(project_id=project_id, body=schema, client=self._api_client)
Update project metadata schema
Arguments:
- project_id (str): ID of the Project
- schema (cirro_api_client.v1.models.FormSchema): Metadata schema
50 def update_sample(self, project_id: str, sample_id: str, sample: SampleRequest) -> Sample: 51 """ 52 Updates metadata information for sample 53 54 Args: 55 project_id (str): ID of the Project 56 sample_id (str): ID of the sample 57 sample (cirro_api_client.v1.models.SampleRequest): Metadata information for the sample 58 """ 59 return update_sample.sync( 60 project_id=project_id, 61 sample_id=sample_id, 62 body=sample, 63 client=self._api_client 64 )
Updates metadata information for sample
Arguments:
- project_id (str): ID of the Project
- sample_id (str): ID of the sample
- sample (cirro_api_client.v1.models.SampleRequest): Metadata information for the sample
10class MetricsService(BaseService): 11 """ 12 Service for interacting with the Metrics endpoints 13 """ 14 def get_for_project(self, project_id: str) -> ProjectMetrics: 15 """ 16 Retrieves the cost and storage metrics for a project. 17 18 Args: 19 project_id (str): ID of the Project 20 """ 21 return get_project_metrics.sync( 22 project_id=project_id, 23 client=self._api_client 24 ) 25 26 def get_all_metrics(self) -> List[ProjectMetrics]: 27 """ 28 Retrieves all available metrics 29 """ 30 return get_all_metrics.sync(client=self._api_client)
Service for interacting with the Metrics endpoints
14 def get_for_project(self, project_id: str) -> ProjectMetrics: 15 """ 16 Retrieves the cost and storage metrics for a project. 17 18 Args: 19 project_id (str): ID of the Project 20 """ 21 return get_project_metrics.sync( 22 project_id=project_id, 23 client=self._api_client 24 )
Retrieves the cost and storage metrics for a project.
Arguments:
- project_id (str): ID of the Project
15class ProcessService(BaseService): 16 """ 17 Service for interacting with the Process endpoints 18 """ 19 def list(self, process_type: Executor = None) -> List[Process]: 20 """ 21 Retrieves a list of available processes 22 23 Args: 24 process_type (`cirro_api_client.v1.models.Executor`): Optional process type (INGEST, CROMWELL, or NEXTFLOW) 25 """ 26 processes = get_processes.sync(client=self._api_client) 27 return [p for p in processes if not process_type or process_type == p.executor] 28 29 def get(self, process_id: str) -> ProcessDetail: 30 """ 31 Retrieves detailed information on a process 32 33 Args: 34 process_id (str): Process ID 35 """ 36 return get_process.sync(process_id=process_id, client=self._api_client) 37 38 def archive(self, process_id: str): 39 """ 40 Removes a custom process from the list of available processes. 41 42 Error will be raised if the requested process does not exist. No value 43 is returned, and no error raised if process exists and request is satisfied. 44 45 Args: 46 process_id (str): Process ID 47 """ 48 archive_custom_process.sync_detailed(process_id=process_id, client=self._api_client) 49 50 def find_by_name(self, name: str) -> Optional[ProcessDetail]: 51 """ 52 Get a process by its display name 53 54 Args: 55 name (str): Process name 56 """ 57 matched_process = next((p for p in self.list() if p.name == name), None) 58 if not matched_process: 59 return None 60 61 return self.get(matched_process.id) 62 63 def get_parameter_spec(self, process_id: str) -> ParameterSpecification: 64 """ 65 Gets a specification used to describe the parameters used in the process 66 67 Args: 68 process_id (str): Process ID 69 """ 70 form_spec = get_process_parameters.sync(process_id=process_id, client=self._api_client) 71 return ParameterSpecification(form_spec) 72 73 def check_dataset_files(self, files: List[str], process_id: str, directory: str): 74 """ 75 Checks if the file mapping rules for a process are met by the list of files 76 77 Error will be raised if the file mapping rules for the process are not met. 78 No value is returned and no error is raised if the rules are satisfied. 79 80 Args: 81 process_id (str): ID for the process containing the file mapping rules 82 directory: path to directory containing files 83 files (List[str]): File names to check 84 """ 85 # Parse sample sheet file if present 86 sample_sheet = None 87 sample_sheet_file = Path(directory, 'samplesheet.csv') 88 if sample_sheet_file.exists(): 89 sample_sheet = sample_sheet_file.read_text() 90 91 request = ValidateFileRequirementsRequest( 92 file_names=files, 93 sample_sheet=sample_sheet 94 ) 95 requirements = validate_file_requirements.sync(process_id=process_id, body=request, client=self._api_client) 96 97 # These will be sample sheet errors or no files errors 98 if error_msg := requirements.error_msg: 99 raise ValueError(error_msg) 100 101 errors = [ 102 f'{entry.description}. {entry.error_msg}. We accept any of the following naming conventions: \n\t- ' + 103 '\n\t- '.join([ 104 e.example_name 105 for e in entry.allowed_patterns 106 ]) 107 for entry in requirements.allowed_data_types 108 if entry.error_msg is not None 109 ] 110 111 files_provided = ', '.join(files) 112 113 if len(errors) != 0: 114 raise ValueError(f"The files you have provided are: {files_provided} \n\n" 115 "They do not meet the dataset requirements. " 116 "The required file types are: \n" + 117 "\n".join(errors)) 118 119 def create_custom_process(self, process: CustomProcessInput) -> CreateResponse: 120 """ 121 Creates a custom process (pipeline or data type) in the system. 122 123 Whether it is a pipeline or data type is determined by the executor field. 124 For pipelines, you must specify the pipeline code and configuration repository. 125 126 This process will be available to all users in the system if `is_tenant_wide` is set to True. 127 If `is_tenant_wide` is set to False, the process will only be available the projects 128 specified in `linked_projects_ids`. Making it available tenant wide requires tenant admin privileges. 129 130 If the repository is private, you must complete the authorization flow on the UI. 131 132 See https://docs.cirro.bio/pipelines/importing-custom-pipelines/ for more info. 133 134 Args: 135 process (Process): Process to create 136 137 Example: 138 ```python 139 from cirro_api_client.v1.models import CustomProcessInput, Executor, \ 140 PipelineCode, FileMappingRule, FileNamePattern, RepositoryType, CustomPipelineSettings 141 from cirro.cirro_client import CirroApi 142 143 cirro = CirroApi() 144 145 # New pipeline 146 new_pipeline = CustomProcessInput( 147 id="my_pipeline", 148 name="My Pipeline", 149 description="This is a test pipeline", 150 executor=Executor.CROMWELL, 151 category="DNA Sequencing", 152 child_process_ids=[], 153 parent_process_ids=["rnaseq"], 154 documentation_url="https://example.com/docs", 155 pipeline_code=PipelineCode( 156 repository_path="CirroBio/test-pipeline", 157 version="v1.0.0", 158 entry_point="main.nf", 159 repository_type=RepositoryType.GITHUB_PUBLIC 160 ), 161 linked_project_ids=[], 162 is_tenant_wide=True, 163 allow_multiple_sources=True, 164 uses_sample_sheet=True, 165 # This can be the same or different from the pipeline_code 166 custom_settings=CustomPipelineSettings( 167 repository="CirroBio/test-pipeline", 168 branch="v1.0.0", 169 repository_type=RepositoryType.GITHUB_PUBLIC, 170 ), 171 file_mapping_rules=[ 172 FileMappingRule( 173 description="Filtered Feature Matrix", 174 file_name_patterns=[ 175 FileNamePattern( 176 example_name="filtered_feature_bc_matrix.h5", 177 description="Matrix", 178 sample_matching_pattern="(?P<sampleName>[\\S ]*)\\.jpg" 179 ) 180 ] 181 ) 182 ] 183 ) 184 185 cirro.processes.create_custom_process(new_pipeline) 186 187 # New data type 188 new_data_type = CustomProcessInput( 189 id="images_jpg", 190 name="JPG Images", 191 description="Used for generic JPG images", 192 executor=Executor.INGEST, 193 child_process_ids=[], 194 parent_process_ids=[], 195 documentation_url="https://example.com/docs", 196 linked_project_ids=["project_id_1", "project_id_2"], 197 file_mapping_rules=[ 198 FileMappingRule( 199 description="Images", 200 min_=1, 201 file_name_patterns=[ 202 FileNamePattern( 203 example_name="image.jpg", 204 description="JPG Image", 205 sample_matching_pattern="(?P<sampleName>[\\S ]*)/outs/image\\.jpg" 206 ) 207 ] 208 ) 209 ] 210 ) 211 cirro.processes.create_custom_process(new_data_type) 212 ``` 213 """ 214 return create_custom_process.sync(client=self._api_client, body=process) 215 216 def update_custom_process(self, process_id: str, process: CustomProcessInput): 217 """ 218 Updates the custom process (pipeline or data type) in the system. 219 220 Please run `sync_custom_process` after updating to ensure the pipeline configuration is up to date. 221 222 Args: 223 process_id (str): ID of the process to update 224 process (CustomProcessInput): Process to update 225 """ 226 update_custom_process.sync_detailed(client=self._api_client, 227 process_id=process_id, 228 body=process) 229 230 def sync_custom_process(self, process_id: str) -> CustomPipelineSettings: 231 """ 232 Syncs a custom pipeline in the system. 233 234 This updates the pipeline configurations in Cirro (form configuration, 235 input mapping, preprocess etc.) to match what is in the configured repository. 236 See https://docs.cirro.bio/pipelines/configuring-pipeline/ for more details. 237 238 Args: 239 process_id (str): ID of the process to sync 240 """ 241 return sync_custom_process.sync(client=self._api_client, process_id=process_id)
Service for interacting with the Process endpoints
19 def list(self, process_type: Executor = None) -> List[Process]: 20 """ 21 Retrieves a list of available processes 22 23 Args: 24 process_type (`cirro_api_client.v1.models.Executor`): Optional process type (INGEST, CROMWELL, or NEXTFLOW) 25 """ 26 processes = get_processes.sync(client=self._api_client) 27 return [p for p in processes if not process_type or process_type == p.executor]
Retrieves a list of available processes
Arguments:
- process_type (
cirro_api_client.v1.models.Executor): Optional process type (INGEST, CROMWELL, or NEXTFLOW)
29 def get(self, process_id: str) -> ProcessDetail: 30 """ 31 Retrieves detailed information on a process 32 33 Args: 34 process_id (str): Process ID 35 """ 36 return get_process.sync(process_id=process_id, client=self._api_client)
Retrieves detailed information on a process
Arguments:
- process_id (str): Process ID
38 def archive(self, process_id: str): 39 """ 40 Removes a custom process from the list of available processes. 41 42 Error will be raised if the requested process does not exist. No value 43 is returned, and no error raised if process exists and request is satisfied. 44 45 Args: 46 process_id (str): Process ID 47 """ 48 archive_custom_process.sync_detailed(process_id=process_id, client=self._api_client)
Removes a custom process from the list of available processes.
Error will be raised if the requested process does not exist. No value is returned, and no error raised if process exists and request is satisfied.
Arguments:
- process_id (str): Process ID
50 def find_by_name(self, name: str) -> Optional[ProcessDetail]: 51 """ 52 Get a process by its display name 53 54 Args: 55 name (str): Process name 56 """ 57 matched_process = next((p for p in self.list() if p.name == name), None) 58 if not matched_process: 59 return None 60 61 return self.get(matched_process.id)
Get a process by its display name
Arguments:
- name (str): Process name
63 def get_parameter_spec(self, process_id: str) -> ParameterSpecification: 64 """ 65 Gets a specification used to describe the parameters used in the process 66 67 Args: 68 process_id (str): Process ID 69 """ 70 form_spec = get_process_parameters.sync(process_id=process_id, client=self._api_client) 71 return ParameterSpecification(form_spec)
Gets a specification used to describe the parameters used in the process
Arguments:
- process_id (str): Process ID
73 def check_dataset_files(self, files: List[str], process_id: str, directory: str): 74 """ 75 Checks if the file mapping rules for a process are met by the list of files 76 77 Error will be raised if the file mapping rules for the process are not met. 78 No value is returned and no error is raised if the rules are satisfied. 79 80 Args: 81 process_id (str): ID for the process containing the file mapping rules 82 directory: path to directory containing files 83 files (List[str]): File names to check 84 """ 85 # Parse sample sheet file if present 86 sample_sheet = None 87 sample_sheet_file = Path(directory, 'samplesheet.csv') 88 if sample_sheet_file.exists(): 89 sample_sheet = sample_sheet_file.read_text() 90 91 request = ValidateFileRequirementsRequest( 92 file_names=files, 93 sample_sheet=sample_sheet 94 ) 95 requirements = validate_file_requirements.sync(process_id=process_id, body=request, client=self._api_client) 96 97 # These will be sample sheet errors or no files errors 98 if error_msg := requirements.error_msg: 99 raise ValueError(error_msg) 100 101 errors = [ 102 f'{entry.description}. {entry.error_msg}. We accept any of the following naming conventions: \n\t- ' + 103 '\n\t- '.join([ 104 e.example_name 105 for e in entry.allowed_patterns 106 ]) 107 for entry in requirements.allowed_data_types 108 if entry.error_msg is not None 109 ] 110 111 files_provided = ', '.join(files) 112 113 if len(errors) != 0: 114 raise ValueError(f"The files you have provided are: {files_provided} \n\n" 115 "They do not meet the dataset requirements. " 116 "The required file types are: \n" + 117 "\n".join(errors))
Checks if the file mapping rules for a process are met by the list of files
Error will be raised if the file mapping rules for the process are not met. No value is returned and no error is raised if the rules are satisfied.
Arguments:
- process_id (str): ID for the process containing the file mapping rules
- directory: path to directory containing files
- files (List[str]): File names to check
119 def create_custom_process(self, process: CustomProcessInput) -> CreateResponse: 120 """ 121 Creates a custom process (pipeline or data type) in the system. 122 123 Whether it is a pipeline or data type is determined by the executor field. 124 For pipelines, you must specify the pipeline code and configuration repository. 125 126 This process will be available to all users in the system if `is_tenant_wide` is set to True. 127 If `is_tenant_wide` is set to False, the process will only be available the projects 128 specified in `linked_projects_ids`. Making it available tenant wide requires tenant admin privileges. 129 130 If the repository is private, you must complete the authorization flow on the UI. 131 132 See https://docs.cirro.bio/pipelines/importing-custom-pipelines/ for more info. 133 134 Args: 135 process (Process): Process to create 136 137 Example: 138 ```python 139 from cirro_api_client.v1.models import CustomProcessInput, Executor, \ 140 PipelineCode, FileMappingRule, FileNamePattern, RepositoryType, CustomPipelineSettings 141 from cirro.cirro_client import CirroApi 142 143 cirro = CirroApi() 144 145 # New pipeline 146 new_pipeline = CustomProcessInput( 147 id="my_pipeline", 148 name="My Pipeline", 149 description="This is a test pipeline", 150 executor=Executor.CROMWELL, 151 category="DNA Sequencing", 152 child_process_ids=[], 153 parent_process_ids=["rnaseq"], 154 documentation_url="https://example.com/docs", 155 pipeline_code=PipelineCode( 156 repository_path="CirroBio/test-pipeline", 157 version="v1.0.0", 158 entry_point="main.nf", 159 repository_type=RepositoryType.GITHUB_PUBLIC 160 ), 161 linked_project_ids=[], 162 is_tenant_wide=True, 163 allow_multiple_sources=True, 164 uses_sample_sheet=True, 165 # This can be the same or different from the pipeline_code 166 custom_settings=CustomPipelineSettings( 167 repository="CirroBio/test-pipeline", 168 branch="v1.0.0", 169 repository_type=RepositoryType.GITHUB_PUBLIC, 170 ), 171 file_mapping_rules=[ 172 FileMappingRule( 173 description="Filtered Feature Matrix", 174 file_name_patterns=[ 175 FileNamePattern( 176 example_name="filtered_feature_bc_matrix.h5", 177 description="Matrix", 178 sample_matching_pattern="(?P<sampleName>[\\S ]*)\\.jpg" 179 ) 180 ] 181 ) 182 ] 183 ) 184 185 cirro.processes.create_custom_process(new_pipeline) 186 187 # New data type 188 new_data_type = CustomProcessInput( 189 id="images_jpg", 190 name="JPG Images", 191 description="Used for generic JPG images", 192 executor=Executor.INGEST, 193 child_process_ids=[], 194 parent_process_ids=[], 195 documentation_url="https://example.com/docs", 196 linked_project_ids=["project_id_1", "project_id_2"], 197 file_mapping_rules=[ 198 FileMappingRule( 199 description="Images", 200 min_=1, 201 file_name_patterns=[ 202 FileNamePattern( 203 example_name="image.jpg", 204 description="JPG Image", 205 sample_matching_pattern="(?P<sampleName>[\\S ]*)/outs/image\\.jpg" 206 ) 207 ] 208 ) 209 ] 210 ) 211 cirro.processes.create_custom_process(new_data_type) 212 ``` 213 """ 214 return create_custom_process.sync(client=self._api_client, body=process)
Creates a custom process (pipeline or data type) in the system.
Whether it is a pipeline or data type is determined by the executor field. For pipelines, you must specify the pipeline code and configuration repository.
This process will be available to all users in the system if is_tenant_wide is set to True.
If is_tenant_wide is set to False, the process will only be available the projects
specified in linked_projects_ids. Making it available tenant wide requires tenant admin privileges.
If the repository is private, you must complete the authorization flow on the UI.
See https://docs.cirro.bio/pipelines/importing-custom-pipelines/ for more info.
Arguments:
- process (Process): Process to create
Example:
from cirro_api_client.v1.models import CustomProcessInput, Executor, PipelineCode, FileMappingRule, FileNamePattern, RepositoryType, CustomPipelineSettings
from cirro.cirro_client import CirroApi
cirro = CirroApi()
# New pipeline
new_pipeline = CustomProcessInput(
id="my_pipeline",
name="My Pipeline",
description="This is a test pipeline",
executor=Executor.CROMWELL,
category="DNA Sequencing",
child_process_ids=[],
parent_process_ids=["rnaseq"],
documentation_url="https://example.com/docs",
pipeline_code=PipelineCode(
repository_path="CirroBio/test-pipeline",
version="v1.0.0",
entry_point="main.nf",
repository_type=RepositoryType.GITHUB_PUBLIC
),
linked_project_ids=[],
is_tenant_wide=True,
allow_multiple_sources=True,
uses_sample_sheet=True,
# This can be the same or different from the pipeline_code
custom_settings=CustomPipelineSettings(
repository="CirroBio/test-pipeline",
branch="v1.0.0",
repository_type=RepositoryType.GITHUB_PUBLIC,
),
file_mapping_rules=[
FileMappingRule(
description="Filtered Feature Matrix",
file_name_patterns=[
FileNamePattern(
example_name="filtered_feature_bc_matrix.h5",
description="Matrix",
sample_matching_pattern="(?P<sampleName>[\S ]*)\.jpg"
)
]
)
]
)
cirro.processes.create_custom_process(new_pipeline)
# New data type
new_data_type = CustomProcessInput(
id="images_jpg",
name="JPG Images",
description="Used for generic JPG images",
executor=Executor.INGEST,
child_process_ids=[],
parent_process_ids=[],
documentation_url="https://example.com/docs",
linked_project_ids=["project_id_1", "project_id_2"],
file_mapping_rules=[
FileMappingRule(
description="Images",
min_=1,
file_name_patterns=[
FileNamePattern(
example_name="image.jpg",
description="JPG Image",
sample_matching_pattern="(?P<sampleName>[\S ]*)/outs/image\.jpg"
)
]
)
]
)
cirro.processes.create_custom_process(new_data_type)
216 def update_custom_process(self, process_id: str, process: CustomProcessInput): 217 """ 218 Updates the custom process (pipeline or data type) in the system. 219 220 Please run `sync_custom_process` after updating to ensure the pipeline configuration is up to date. 221 222 Args: 223 process_id (str): ID of the process to update 224 process (CustomProcessInput): Process to update 225 """ 226 update_custom_process.sync_detailed(client=self._api_client, 227 process_id=process_id, 228 body=process)
Updates the custom process (pipeline or data type) in the system.
Please run sync_custom_process after updating to ensure the pipeline configuration is up to date.
Arguments:
- process_id (str): ID of the process to update
- process (CustomProcessInput): Process to update
230 def sync_custom_process(self, process_id: str) -> CustomPipelineSettings: 231 """ 232 Syncs a custom pipeline in the system. 233 234 This updates the pipeline configurations in Cirro (form configuration, 235 input mapping, preprocess etc.) to match what is in the configured repository. 236 See https://docs.cirro.bio/pipelines/configuring-pipeline/ for more details. 237 238 Args: 239 process_id (str): ID of the process to sync 240 """ 241 return sync_custom_process.sync(client=self._api_client, process_id=process_id)
Syncs a custom pipeline in the system.
This updates the pipeline configurations in Cirro (form configuration, input mapping, preprocess etc.) to match what is in the configured repository. See https://docs.cirro.bio/pipelines/configuring-pipeline/ for more details.
Arguments:
- process_id (str): ID of the process to sync
12class ProjectService(BaseService): 13 """ 14 Service for interacting with the Project endpoints 15 """ 16 def list(self): 17 """ 18 Retrieve a list of projects 19 """ 20 return get_projects.sync(client=self._api_client) 21 22 def get(self, project_id: str) -> ProjectDetail: 23 """ 24 Get detailed project information 25 26 Args: 27 project_id (str): Project ID 28 """ 29 return get_project.sync(project_id=project_id, client=self._api_client) 30 31 def create(self, request: ProjectInput) -> CreateResponse: 32 """ 33 Create a project 34 35 Args: 36 request (`cirro_api_client.v1.models.ProjectInput`): Detailed information about the project to create 37 38 Examples: 39 ```python 40 from cirro_api_client.v1.models import ProjectInput, ProjectSettings, Contact, BudgetPeriod, CloudAccount, \ 41 CloudAccountType 42 from cirro.cirro_client import CirroApi 43 44 cirro = CirroApi() 45 46 # Bring your own account projects are hosted by the user 47 # You must provide the account details and VPC information 48 # Please view the ProjectSettings model for more information on the attributes required 49 byoa_project = ProjectInput( 50 name="New Project Name", 51 description="Description of new project", 52 billing_account_id="billing-account-id", 53 settings=ProjectSettings( 54 budget_period=BudgetPeriod.MONTHLY, 55 budget_amount=1000, 56 max_spot_vcpu=300, 57 service_connections=[], 58 retention_policy_days=7, 59 vpc_id="vpc-000000000000", 60 batch_subnets=["subnet-000000000000", "subnet-000000000001"], 61 sagemaker_subnets=["subnet-000000000000", "subnet-000000000001"], 62 kms_arn="arn:aws:kms:us-west-2:000000000000:key/00000000-0000-0000-0000-000000000000" 63 ), 64 account=CloudAccount( 65 account_id="<AWS_ACCOUNT_ID>", 66 region_name="us-west-2", 67 account_name="Cirro Lab Project", 68 account_type=CloudAccountType.BYOA 69 ), 70 contacts=[ 71 Contact( 72 name="Contact Name", 73 organization="Contact Organization", 74 email="contact@email.com", 75 phone="987-654-3210" 76 ) 77 ] 78 ) 79 80 # Hosted projects are managed by Cirro 81 hosted_project = ProjectInput( 82 name="New Project Name", 83 description="Description of new project", 84 billing_account_id="billing-account-id", 85 settings=ProjectSettings( 86 budget_period=BudgetPeriod.MONTHLY, 87 budget_amount=1000, 88 max_spot_vcpu=300, 89 service_connections=[], 90 retention_policy_days=7 91 ), 92 contacts=[ 93 Contact( 94 name="Contact Name", 95 organization="Contact Organization", 96 email="contact@email.com", 97 phone="987-654-3210" 98 ) 99 ] 100 ) 101 102 cirro.projects.create(byoa_project) 103 ``` 104 """ 105 return create_project.sync(client=self._api_client, body=request) 106 107 def update(self, project_id: str, request: ProjectInput) -> ProjectDetail: 108 """ 109 Updates a project 110 111 Args: 112 project_id (str): ID of project to update 113 request (`cirro_api_client.v1.models.ProjectInput`): New details for the project 114 """ 115 return update_project.sync(project_id=project_id, body=request, client=self._api_client) 116 117 def update_tags(self, project_id: str, tags: List[Tag]): 118 """ 119 Sets tags on a project 120 121 Args: 122 project_id (str): Project ID 123 tags (List[Tag]): List of tags to apply 124 """ 125 update_project_tags.sync_detailed(project_id=project_id, body=tags, client=self._api_client) 126 127 def get_users(self, project_id: str) -> Optional[List[ProjectUser]]: 128 """ 129 Gets users who have access to the project 130 131 Args: 132 project_id (str): Project ID 133 """ 134 return get_project_users.sync(project_id=project_id, client=self._api_client) 135 136 def set_user_role(self, project_id: str, username: str, role: ProjectRole, suppress_notification=False): 137 """ 138 Sets a user's role within a project. 139 140 Set to `ProjectRole.NONE` if removing the user from a project. 141 142 Args: 143 project_id (str): Project ID 144 username (str): Username 145 role (`cirro_api_client.v1.models.ProjectRole`): New role to apply 146 suppress_notification (bool): Suppress email notification 147 """ 148 request_body = SetUserProjectRoleRequest( 149 username=username, 150 role=role, 151 suppress_notification=suppress_notification 152 ) 153 set_user_project_role.sync_detailed(project_id=project_id, body=request_body, client=self._api_client) 154 155 def archive(self, project_id: str): 156 """ 157 Sets the project status to archived (hidden from active projects) 158 159 Args: 160 project_id (str): Project ID 161 """ 162 archive_project.sync_detailed(project_id=project_id, client=self._api_client) 163 164 def unarchive(self, project_id: str): 165 """ 166 Sets the project status to active 167 168 Args: 169 project_id (str): Project ID 170 """ 171 unarchive_project.sync_detailed(project_id=project_id, client=self._api_client)
Service for interacting with the Project endpoints
16 def list(self): 17 """ 18 Retrieve a list of projects 19 """ 20 return get_projects.sync(client=self._api_client)
Retrieve a list of projects
22 def get(self, project_id: str) -> ProjectDetail: 23 """ 24 Get detailed project information 25 26 Args: 27 project_id (str): Project ID 28 """ 29 return get_project.sync(project_id=project_id, client=self._api_client)
Get detailed project information
Arguments:
- project_id (str): Project ID
31 def create(self, request: ProjectInput) -> CreateResponse: 32 """ 33 Create a project 34 35 Args: 36 request (`cirro_api_client.v1.models.ProjectInput`): Detailed information about the project to create 37 38 Examples: 39 ```python 40 from cirro_api_client.v1.models import ProjectInput, ProjectSettings, Contact, BudgetPeriod, CloudAccount, \ 41 CloudAccountType 42 from cirro.cirro_client import CirroApi 43 44 cirro = CirroApi() 45 46 # Bring your own account projects are hosted by the user 47 # You must provide the account details and VPC information 48 # Please view the ProjectSettings model for more information on the attributes required 49 byoa_project = ProjectInput( 50 name="New Project Name", 51 description="Description of new project", 52 billing_account_id="billing-account-id", 53 settings=ProjectSettings( 54 budget_period=BudgetPeriod.MONTHLY, 55 budget_amount=1000, 56 max_spot_vcpu=300, 57 service_connections=[], 58 retention_policy_days=7, 59 vpc_id="vpc-000000000000", 60 batch_subnets=["subnet-000000000000", "subnet-000000000001"], 61 sagemaker_subnets=["subnet-000000000000", "subnet-000000000001"], 62 kms_arn="arn:aws:kms:us-west-2:000000000000:key/00000000-0000-0000-0000-000000000000" 63 ), 64 account=CloudAccount( 65 account_id="<AWS_ACCOUNT_ID>", 66 region_name="us-west-2", 67 account_name="Cirro Lab Project", 68 account_type=CloudAccountType.BYOA 69 ), 70 contacts=[ 71 Contact( 72 name="Contact Name", 73 organization="Contact Organization", 74 email="contact@email.com", 75 phone="987-654-3210" 76 ) 77 ] 78 ) 79 80 # Hosted projects are managed by Cirro 81 hosted_project = ProjectInput( 82 name="New Project Name", 83 description="Description of new project", 84 billing_account_id="billing-account-id", 85 settings=ProjectSettings( 86 budget_period=BudgetPeriod.MONTHLY, 87 budget_amount=1000, 88 max_spot_vcpu=300, 89 service_connections=[], 90 retention_policy_days=7 91 ), 92 contacts=[ 93 Contact( 94 name="Contact Name", 95 organization="Contact Organization", 96 email="contact@email.com", 97 phone="987-654-3210" 98 ) 99 ] 100 ) 101 102 cirro.projects.create(byoa_project) 103 ``` 104 """ 105 return create_project.sync(client=self._api_client, body=request)
Create a project
Arguments:
- request (
cirro_api_client.v1.models.ProjectInput): Detailed information about the project to create
Examples:
from cirro_api_client.v1.models import ProjectInput, ProjectSettings, Contact, BudgetPeriod, CloudAccount, CloudAccountType
from cirro.cirro_client import CirroApi
cirro = CirroApi()
# Bring your own account projects are hosted by the user
# You must provide the account details and VPC information
# Please view the ProjectSettings model for more information on the attributes required
byoa_project = ProjectInput(
name="New Project Name",
description="Description of new project",
billing_account_id="billing-account-id",
settings=ProjectSettings(
budget_period=BudgetPeriod.MONTHLY,
budget_amount=1000,
max_spot_vcpu=300,
service_connections=[],
retention_policy_days=7,
vpc_id="vpc-000000000000",
batch_subnets=["subnet-000000000000", "subnet-000000000001"],
sagemaker_subnets=["subnet-000000000000", "subnet-000000000001"],
kms_arn="arn:aws:kms:us-west-2:000000000000:key/00000000-0000-0000-0000-000000000000"
),
account=CloudAccount(
account_id="<AWS_ACCOUNT_ID>",
region_name="us-west-2",
account_name="Cirro Lab Project",
account_type=CloudAccountType.BYOA
),
contacts=[
Contact(
name="Contact Name",
organization="Contact Organization",
email="contact@email.com",
phone="987-654-3210"
)
]
)
# Hosted projects are managed by Cirro
hosted_project = ProjectInput(
name="New Project Name",
description="Description of new project",
billing_account_id="billing-account-id",
settings=ProjectSettings(
budget_period=BudgetPeriod.MONTHLY,
budget_amount=1000,
max_spot_vcpu=300,
service_connections=[],
retention_policy_days=7
),
contacts=[
Contact(
name="Contact Name",
organization="Contact Organization",
email="contact@email.com",
phone="987-654-3210"
)
]
)
cirro.projects.create(byoa_project)
107 def update(self, project_id: str, request: ProjectInput) -> ProjectDetail: 108 """ 109 Updates a project 110 111 Args: 112 project_id (str): ID of project to update 113 request (`cirro_api_client.v1.models.ProjectInput`): New details for the project 114 """ 115 return update_project.sync(project_id=project_id, body=request, client=self._api_client)
Updates a project
Arguments:
- project_id (str): ID of project to update
- request (
cirro_api_client.v1.models.ProjectInput): New details for the project
127 def get_users(self, project_id: str) -> Optional[List[ProjectUser]]: 128 """ 129 Gets users who have access to the project 130 131 Args: 132 project_id (str): Project ID 133 """ 134 return get_project_users.sync(project_id=project_id, client=self._api_client)
Gets users who have access to the project
Arguments:
- project_id (str): Project ID
136 def set_user_role(self, project_id: str, username: str, role: ProjectRole, suppress_notification=False): 137 """ 138 Sets a user's role within a project. 139 140 Set to `ProjectRole.NONE` if removing the user from a project. 141 142 Args: 143 project_id (str): Project ID 144 username (str): Username 145 role (`cirro_api_client.v1.models.ProjectRole`): New role to apply 146 suppress_notification (bool): Suppress email notification 147 """ 148 request_body = SetUserProjectRoleRequest( 149 username=username, 150 role=role, 151 suppress_notification=suppress_notification 152 ) 153 set_user_project_role.sync_detailed(project_id=project_id, body=request_body, client=self._api_client)
Sets a user's role within a project.
Set to ProjectRole.NONE if removing the user from a project.
Arguments:
- project_id (str): Project ID
- username (str): Username
- role (
cirro_api_client.v1.models.ProjectRole): New role to apply - suppress_notification (bool): Suppress email notification
155 def archive(self, project_id: str): 156 """ 157 Sets the project status to archived (hidden from active projects) 158 159 Args: 160 project_id (str): Project ID 161 """ 162 archive_project.sync_detailed(project_id=project_id, client=self._api_client)
Sets the project status to archived (hidden from active projects)
Arguments:
- project_id (str): Project ID
164 def unarchive(self, project_id: str): 165 """ 166 Sets the project status to active 167 168 Args: 169 project_id (str): Project ID 170 """ 171 unarchive_project.sync_detailed(project_id=project_id, client=self._api_client)
Sets the project status to active
Arguments:
- project_id (str): Project ID
14class ReferenceService(FileEnabledService): 15 """ 16 Service for interacting with the References endpoints 17 """ 18 def get_types(self) -> Optional[List[ReferenceType]]: 19 """ 20 List available reference types 21 """ 22 return get_reference_types.sync(client=self._api_client) 23 24 def get_type(self, reference_type_name: str) -> Optional[ReferenceType]: 25 """ 26 Get a specific reference type by name 27 28 Args: 29 reference_type_name (str): Name of the reference type (e.g. "Reference Genome (FASTA)") 30 """ 31 types = self.get_types() 32 return next((t for t in types if t.name == reference_type_name), None) 33 34 def get_for_project(self, project_id: str) -> Optional[List[Reference]]: 35 """ 36 List available references for a given project 37 """ 38 return get_references_for_project.sync(project_id=project_id, client=self._api_client) 39 40 def upload_reference(self, 41 name: str, 42 reference_files: list[PathLike], 43 project_id: str, 44 ref_type: ReferenceType): 45 """ 46 Upload a reference to a project 47 48 Args: 49 name (str): Name of the reference (e.g. "GRCh38") 50 no spaces are allowed in the name 51 reference_files (list[PathLike]): Path to the reference file to upload 52 project_id (str): ID of the project to upload the reference to 53 ref_type (ReferenceType): Type of the reference 54 55 ```python 56 from pathlib import Path 57 58 from cirro import CirroApi 59 60 cirro = CirroApi() 61 62 crispr_library = cirro.references.get_type("CRISPR sgRNA Library") 63 files = [ 64 Path("~/crispr_library.csv").expanduser() 65 ] 66 67 cirro.references.upload_reference( 68 name="Library Name", 69 project_id="project-id", 70 ref_type=crispr_library, 71 reference_files=files, 72 ) 73 ``` 74 """ 75 # Validate name 76 if ' ' in name: 77 raise ValueError("Reference name cannot contain spaces") 78 79 access_context = FileAccessContext.upload_reference( 80 project_id=project_id, 81 base_url=f's3://project-{project_id}/resources/data/references', 82 ) 83 84 file_path_map = generate_reference_file_path_map( 85 files=reference_files, 86 name=name, 87 ref_type=ref_type 88 ) 89 90 self._file_service.upload_files( 91 access_context=access_context, 92 directory=Path('~').expanduser(), # Full path expected in reference_files 93 files=reference_files, 94 file_path_map=file_path_map 95 ) 96 97 refresh_project_references.sync_detailed(project_id=project_id, 98 client=self._api_client)
Service for interacting with the References endpoints
18 def get_types(self) -> Optional[List[ReferenceType]]: 19 """ 20 List available reference types 21 """ 22 return get_reference_types.sync(client=self._api_client)
List available reference types
24 def get_type(self, reference_type_name: str) -> Optional[ReferenceType]: 25 """ 26 Get a specific reference type by name 27 28 Args: 29 reference_type_name (str): Name of the reference type (e.g. "Reference Genome (FASTA)") 30 """ 31 types = self.get_types() 32 return next((t for t in types if t.name == reference_type_name), None)
Get a specific reference type by name
Arguments:
- reference_type_name (str): Name of the reference type (e.g. "Reference Genome (FASTA)")
34 def get_for_project(self, project_id: str) -> Optional[List[Reference]]: 35 """ 36 List available references for a given project 37 """ 38 return get_references_for_project.sync(project_id=project_id, client=self._api_client)
List available references for a given project
40 def upload_reference(self, 41 name: str, 42 reference_files: list[PathLike], 43 project_id: str, 44 ref_type: ReferenceType): 45 """ 46 Upload a reference to a project 47 48 Args: 49 name (str): Name of the reference (e.g. "GRCh38") 50 no spaces are allowed in the name 51 reference_files (list[PathLike]): Path to the reference file to upload 52 project_id (str): ID of the project to upload the reference to 53 ref_type (ReferenceType): Type of the reference 54 55 ```python 56 from pathlib import Path 57 58 from cirro import CirroApi 59 60 cirro = CirroApi() 61 62 crispr_library = cirro.references.get_type("CRISPR sgRNA Library") 63 files = [ 64 Path("~/crispr_library.csv").expanduser() 65 ] 66 67 cirro.references.upload_reference( 68 name="Library Name", 69 project_id="project-id", 70 ref_type=crispr_library, 71 reference_files=files, 72 ) 73 ``` 74 """ 75 # Validate name 76 if ' ' in name: 77 raise ValueError("Reference name cannot contain spaces") 78 79 access_context = FileAccessContext.upload_reference( 80 project_id=project_id, 81 base_url=f's3://project-{project_id}/resources/data/references', 82 ) 83 84 file_path_map = generate_reference_file_path_map( 85 files=reference_files, 86 name=name, 87 ref_type=ref_type 88 ) 89 90 self._file_service.upload_files( 91 access_context=access_context, 92 directory=Path('~').expanduser(), # Full path expected in reference_files 93 files=reference_files, 94 file_path_map=file_path_map 95 ) 96 97 refresh_project_references.sync_detailed(project_id=project_id, 98 client=self._api_client)
Upload a reference to a project
Arguments:
- name (str): Name of the reference (e.g. "GRCh38") no spaces are allowed in the name
- reference_files (list[PathLike]): Path to the reference file to upload
- project_id (str): ID of the project to upload the reference to
- ref_type (ReferenceType): Type of the reference
from pathlib import Path
from cirro import CirroApi
cirro = CirroApi()
crispr_library = cirro.references.get_type("CRISPR sgRNA Library")
files = [
Path("~/crispr_library.csv").expanduser()
]
cirro.references.upload_reference(
name="Library Name",
project_id="project-id",
ref_type=crispr_library,
reference_files=files,
)
10class UserService(BaseService): 11 def list(self, max_items: int = 10000) -> List[User]: 12 """ 13 List users in the system 14 """ 15 return get_all_records( 16 records_getter=lambda page_args: list_users.sync( 17 client=self._api_client, 18 next_token=page_args.next_token, 19 limit=page_args.limit 20 ), 21 max_items=max_items 22 ) 23 24 def get(self, username: str) -> UserDetail: 25 """ 26 Get user details by username, including what projects they are assigned to 27 """ 28 return get_user.sync(username=username, client=self._api_client) 29 30 def invite_user(self, name: str, organization: str, email: str): 31 """ 32 Invite a user to the system. 33 If the user already exists, it will return a message that the user already exists. 34 35 Args: 36 name (str): Name of the user 37 organization (str): Organization of the user 38 email (str): Email (username) of the user 39 """ 40 request = InviteUserRequest( 41 name=name, 42 organization=organization, 43 email=email, 44 ) 45 response = invite_user.sync(client=self._api_client, body=request) 46 return response.message
Not to be instantiated directly
11 def list(self, max_items: int = 10000) -> List[User]: 12 """ 13 List users in the system 14 """ 15 return get_all_records( 16 records_getter=lambda page_args: list_users.sync( 17 client=self._api_client, 18 next_token=page_args.next_token, 19 limit=page_args.limit 20 ), 21 max_items=max_items 22 )
List users in the system
24 def get(self, username: str) -> UserDetail: 25 """ 26 Get user details by username, including what projects they are assigned to 27 """ 28 return get_user.sync(username=username, client=self._api_client)
Get user details by username, including what projects they are assigned to
30 def invite_user(self, name: str, organization: str, email: str): 31 """ 32 Invite a user to the system. 33 If the user already exists, it will return a message that the user already exists. 34 35 Args: 36 name (str): Name of the user 37 organization (str): Organization of the user 38 email (str): Email (username) of the user 39 """ 40 request = InviteUserRequest( 41 name=name, 42 organization=organization, 43 email=email, 44 ) 45 response = invite_user.sync(client=self._api_client, body=request) 46 return response.message
Invite a user to the system. If the user already exists, it will return a message that the user already exists.
Arguments:
- name (str): Name of the user
- organization (str): Organization of the user
- email (str): Email (username) of the user