cirro.services
1from .billing import BillingService 2from .compute_environment import ComputeEnvironmentService 3from .dataset import DatasetService 4from .execution import ExecutionService 5from .file import FileService 6from .metadata import MetadataService 7from .metrics import MetricsService 8from .process import ProcessService 9from .projects import ProjectService 10from .references import ReferenceService 11from .share import ShareService 12from .user import UserService 13from .workspace import WorkspaceService 14 15 16__all__ = [ 17 'BillingService', 18 'DatasetService', 19 'ExecutionService', 20 'ComputeEnvironmentService', 21 'FileService', 22 'MetadataService', 23 'MetricsService', 24 'ProcessService', 25 'ProjectService', 26 'ReferenceService', 27 'ShareService', 28 'UserService', 29 'WorkspaceService', 30]
10class BillingService(BaseService): 11 """ 12 Service for interacting with the Billing endpoints 13 """ 14 def list(self) -> List[BillingAccount]: 15 """ 16 Gets a list of billing accounts the current user has access to 17 """ 18 return get_billing_accounts.sync(client=self._api_client) 19 20 def update(self, billing_account_id: str, request: BillingAccountRequest): 21 """ 22 Updates a billing account 23 24 Args: 25 billing_account_id (str): Billing account ID 26 request (cirro_api_client.v1.models.BillingAccountRequest): 27 28 ```python 29 from cirro_api_client.v1.models import BillingAccountRequest 30 from cirro.cirro_client import CirroApi 31 32 cirro = CirroApi() 33 request = BillingAccountRequest( 34 name="New billing account name", 35 primary_budget_number="new-budget-number", 36 owner="New Owner" 37 ) 38 cirro.billing.update("billing-account-id", request) 39 ``` 40 """ 41 update_billing_account.sync_detailed( 42 billing_account_id=billing_account_id, 43 body=request, 44 client=self._api_client 45 )
Service for interacting with the Billing endpoints
14 def list(self) -> List[BillingAccount]: 15 """ 16 Gets a list of billing accounts the current user has access to 17 """ 18 return get_billing_accounts.sync(client=self._api_client)
Gets a list of billing accounts the current user has access to
20 def update(self, billing_account_id: str, request: BillingAccountRequest): 21 """ 22 Updates a billing account 23 24 Args: 25 billing_account_id (str): Billing account ID 26 request (cirro_api_client.v1.models.BillingAccountRequest): 27 28 ```python 29 from cirro_api_client.v1.models import BillingAccountRequest 30 from cirro.cirro_client import CirroApi 31 32 cirro = CirroApi() 33 request = BillingAccountRequest( 34 name="New billing account name", 35 primary_budget_number="new-budget-number", 36 owner="New Owner" 37 ) 38 cirro.billing.update("billing-account-id", request) 39 ``` 40 """ 41 update_billing_account.sync_detailed( 42 billing_account_id=billing_account_id, 43 body=request, 44 client=self._api_client 45 )
Updates a billing account
Arguments:
- billing_account_id (str): Billing account ID
- request (cirro_api_client.v1.models.BillingAccountRequest):
from cirro_api_client.v1.models import BillingAccountRequest
from cirro.cirro_client import CirroApi
cirro = CirroApi()
request = BillingAccountRequest(
name="New billing account name",
primary_budget_number="new-budget-number",
owner="New Owner"
)
cirro.billing.update("billing-account-id", request)
21class DatasetService(FileEnabledService): 22 """ 23 Service for interacting with the Dataset endpoints 24 """ 25 26 def list(self, project_id: str, max_items: int = 10000) -> List[Dataset]: 27 """List datasets 28 29 Retrieves a list of datasets for a given project 30 31 Args: 32 project_id (str): ID of the Project 33 max_items (int): Maximum number of records to get (default 10,000) 34 """ 35 return get_all_records( 36 records_getter=lambda page_args: get_datasets.sync( 37 project_id=project_id, 38 client=self._api_client, 39 next_token=page_args.next_token, 40 limit=page_args.limit 41 ), 42 max_items=max_items 43 ) 44 45 def list_shared(self, project_id: str, share_id: str, max_items: int = 10000) -> List[Dataset]: 46 """ 47 Retrieves a list of shared datasets for a given project and share 48 49 Args: 50 project_id (str): ID of the Project 51 share_id (str): ID of the Share 52 max_items (int): Maximum number of records to get (default 10,000) 53 54 Example: 55 ```python 56 from cirro_api_client.v1.models import ShareType 57 from cirro.cirro_client import CirroApi 58 59 cirro = CirroApi() 60 61 # List shares that are subscribed to 62 subscribed_shares = cirro.shares.list(project_id="project-id", share_type=ShareType.SUBSCRIBER) 63 cirro.datasets.list_shared("project-id", subscribed_shares[0].id) 64 ``` 65 """ 66 return get_all_records( 67 records_getter=lambda page_args: get_shared_datasets.sync( 68 project_id=project_id, 69 share_id=share_id, 70 client=self._api_client, 71 next_token=page_args.next_token, 72 limit=page_args.limit 73 ), 74 max_items=max_items 75 ) 76 77 def import_public(self, project_id: str, import_request: ImportDataRequest) -> CreateResponse: 78 """ 79 Download data from public repositories 80 81 Args: 82 project_id (str): ID of the Project 83 import_request (cirro_api_client.v1.models.ImportDataRequest): 84 85 Returns: 86 ID of the created dataset 87 88 ```python 89 from cirro_api_client.v1.models import ImportDataRequest, Tag 90 from cirro.cirro_client import CirroApi 91 92 cirro = CirroApi() 93 request = ImportDataRequest( 94 name="Imported dataset", 95 description="Description of the dataset", 96 public_ids=["SRR123456", "SRR123457"], 97 tags=[Tag(value="tag1")] 98 ) 99 cirro.datasets.import_public("project-id", request) 100 ``` 101 """ 102 return import_public_dataset.sync(project_id=project_id, client=self._api_client, body=import_request) 103 104 def create(self, project_id: str, upload_request: UploadDatasetRequest) -> UploadDatasetCreateResponse: 105 """ 106 Registers a dataset in Cirro, which can subsequently have files uploaded to it 107 108 Args: 109 project_id (str): ID of the Project 110 upload_request (cirro_api_client.v1.models.UploadDatasetRequest): 111 112 Returns: 113 ID of the created dataset and the path to upload files 114 115 ```python 116 from cirro_api_client.v1.models import UploadDatasetRequest, Tag 117 from cirro.cirro_client import CirroApi 118 119 cirro = CirroApi() 120 request = UploadDatasetRequest( 121 name="Name of new dataset", 122 process_id="paired_dnaseq", 123 expected_files=["read_1.fastq.gz", "read_2.fastq.gz"], 124 description="Description of the dataset", 125 tags=[Tag(value="tag1"), Tag(value="tag2")] 126 ) 127 cirro.datasets.create("project-id", request) 128 ``` 129 """ 130 return upload_dataset.sync(project_id=project_id, client=self._api_client, body=upload_request) 131 132 def get(self, project_id: str, dataset_id: str) -> Optional[DatasetDetail]: 133 """ 134 Gets detailed information about a dataset 135 136 Args: 137 project_id (str): ID of the Project 138 dataset_id (str): ID of the Dataset 139 140 Returns: 141 The dataset, if found 142 """ 143 return get_dataset.sync(project_id=project_id, dataset_id=dataset_id, client=self._api_client) 144 145 def update(self, project_id: str, dataset_id: str, request: UpdateDatasetRequest) -> DatasetDetail: 146 """ 147 Update info on a dataset (name, description, and/or tags) 148 149 Args: 150 project_id (str): ID of the Project 151 dataset_id (str): ID of the Dataset 152 request (cirro_api_client.v1.models.UpdateDatasetRequest): 153 154 Returns: 155 The updated dataset 156 157 ```python 158 from cirro_api_client.v1.models import UpdateDatasetRequest 159 from cirro.cirro_client import CirroApi 160 161 cirro = CirroApi() 162 request = UpdateDatasetRequest( 163 name="Name of new dataset", 164 process_id="paired_dnaseq", 165 description="Description of the dataset" 166 ) 167 cirro.datasets.update("project-id", "dataset-id", request) 168 ``` 169 """ 170 return update_dataset.sync(project_id=project_id, dataset_id=dataset_id, body=request, client=self._api_client) 171 172 def delete(self, project_id: str, dataset_id: str) -> None: 173 """ 174 Delete a dataset 175 176 After a dataset has been deleted, the files associated with that 177 dataset are saved according to the project's retention time. 178 179 Args: 180 project_id (str): ID of the Project 181 dataset_id (str): ID of the Dataset 182 """ 183 delete_dataset.sync_detailed(project_id=project_id, dataset_id=dataset_id, client=self._api_client) 184 185 def get_assets_listing(self, project_id: str, dataset_id: str, file_limit: int = 100000) -> DatasetAssets: 186 """ 187 Gets a listing of files, charts, and other assets available for the dataset 188 189 Args: 190 project_id (str): ID of the Project 191 dataset_id (str): ID of the Dataset 192 file_limit (int): Maximum number of files to get (default 100,000) 193 """ 194 dataset = self.get(project_id, dataset_id) 195 if file_limit < 1: 196 raise ValueError("file_limit must be greater than 0") 197 all_files = [] 198 file_offset = 0 199 domain = None 200 artifacts = None 201 202 while len(all_files) < file_limit: 203 manifest = get_dataset_manifest.sync( 204 project_id=project_id, 205 dataset_id=dataset_id, 206 file_offset=file_offset, 207 client=self._api_client 208 ) 209 all_files.extend(manifest.files) 210 file_offset += len(manifest.files) 211 212 if not artifacts: 213 artifacts = manifest.artifacts 214 215 domain = manifest.domain 216 if len(all_files) >= manifest.total_files or len(manifest.files) == 0: 217 break 218 219 files = [ 220 File.from_file_entry( 221 f, 222 project_id=project_id, 223 dataset=dataset, 224 domain=domain 225 ) 226 for f in all_files 227 ] 228 artifacts = [ 229 Artifact( 230 artifact_type=a.type_, 231 file=File.from_file_entry( 232 FileEntry(a.path), 233 project_id=project_id, 234 dataset=dataset, 235 domain=domain 236 ) 237 ) 238 for a in artifacts 239 ] 240 return DatasetAssets(files=files, artifacts=artifacts) 241 242 def upload_files(self, 243 project_id: str, 244 dataset_id: str, 245 directory: PathLike, 246 files: List[PathLike] = None, 247 file_path_map: Dict[PathLike, str] = None) -> None: 248 """ 249 Uploads files to a given dataset from the specified directory. 250 251 All files must be relative to the specified directory. 252 If files need to be flattened, or you are sourcing files from multiple directories, 253 please include `file_path_map` or call this method multiple times. 254 255 Args: 256 project_id (str): ID of the Project 257 dataset_id (str): ID of the Dataset 258 directory (str|Path): Path to directory 259 files (typing.List[str|Path]): List of paths to files within the directory, 260 must be the same type as directory. 261 file_path_map (typing.Dict[str|Path, str|Path]): Optional mapping of file paths to upload 262 from source path to destination path, used to "re-write" paths within the dataset. 263 ```python 264 from cirro.cirro_client import CirroApi 265 from cirro.file_utils import generate_flattened_file_map 266 267 cirro = CirroApi() 268 269 directory = "~/Downloads" 270 # Re-write file paths 271 file_map = { 272 "data1/file1.fastq.gz": "file1.fastq.gz", 273 "data2/file2.fastq.gz": "file2.fastq.gz", 274 "file3.fastq.gz": "new_file3.txt" 275 } 276 277 # Or you could automate the flattening 278 files = ["data1/file1.fastq.gz"] 279 file_map = generate_flattened_file_map(files) 280 281 cirro.datasets.upload_files( 282 project_id="project-id", 283 dataset_id="dataset-id", 284 directory=directory, 285 files=list(file_map.keys()), 286 file_path_map=file_map 287 ) 288 ``` 289 """ 290 if file_path_map is None: 291 file_path_map = {} 292 293 dataset = self.get(project_id, dataset_id) 294 295 access_context = FileAccessContext.upload_dataset( 296 project_id=project_id, 297 dataset_id=dataset_id, 298 base_url=dataset.s3 299 ) 300 301 self._file_service.upload_files( 302 access_context=access_context, 303 directory=directory, 304 files=files, 305 file_path_map=file_path_map 306 ) 307 308 def validate_folder( 309 self, 310 project_id: str, 311 dataset_id: str, 312 local_folder: PathLike, 313 file_limit: int = 100000 314 ) -> DatasetValidationResponse: 315 """ 316 Validates that the contents of a dataset match that of a local folder. 317 """ 318 ds_files = self.get_assets_listing(project_id, dataset_id, file_limit=file_limit).files 319 320 local_folder = Path(local_folder) 321 if not local_folder.is_dir(): 322 raise ValueError(f"{local_folder} is not a valid local folder") 323 324 # Keep track of files from the dataset which match by checksum, don't match, or are missing 325 ds_files_matching = [] 326 ds_files_not_matching = [] 327 ds_files_missing = [] 328 ds_validate_failed = [] 329 for ds_file in ds_files: 330 ds_file_path = ds_file.normalized_path 331 # Get the corresponding local file 332 local_file = local_folder / ds_file_path 333 if not local_file.exists(): 334 ds_files_missing.append(ds_file_path) 335 else: 336 try: 337 if self._file_service.is_valid_file(ds_file, local_file): 338 ds_files_matching.append(ds_file_path) 339 else: 340 ds_files_not_matching.append(ds_file_path) 341 except RuntimeWarning as e: 342 logger.warning(f"File validation failed: {e}") 343 ds_validate_failed.append(ds_file_path) 344 345 # Find local files that are not in the dataset 346 local_file_paths = [ 347 file.relative_to(local_folder).as_posix() 348 for file in local_folder.rglob("*") 349 if not file.is_dir() and not is_hidden_file(file) 350 ] 351 dataset_file_paths = [file.normalized_path for file in ds_files] 352 local_only_files = [ 353 file 354 for file in local_file_paths 355 if file not in dataset_file_paths 356 ] 357 358 return DatasetValidationResponse( 359 files_matching=ds_files_matching, 360 files_not_matching=ds_files_not_matching, 361 files_missing=ds_files_missing, 362 local_only_files=local_only_files, 363 validate_errors=ds_validate_failed, 364 ) 365 366 def download_files( 367 self, 368 project_id: str, 369 dataset_id: str, 370 download_location: str, 371 files: Union[List[File], List[str]] = None, 372 file_limit: int = 100000 373 ) -> None: 374 """ 375 Downloads files from a dataset 376 377 The `files` argument is used to optionally specify a subset of files 378 to be downloaded. By default, all files are downloaded. 379 380 Args: 381 project_id (str): ID of the Project 382 dataset_id (str): ID of the Dataset 383 download_location (str): Local destination for downloaded files 384 files (typing.List[str]): Optional list of files to download 385 file_limit (int): Maximum number of files to get (default 100,000) 386 """ 387 if files is None: 388 files = self.get_assets_listing(project_id, dataset_id, file_limit=file_limit).files 389 390 if len(files) == 0: 391 return 392 393 first_file = files[0] 394 if isinstance(first_file, File): 395 files = [file.relative_path for file in files] 396 access_context = first_file.access_context 397 else: 398 dataset = self.get(project_id, dataset_id) 399 if dataset.share: 400 access_context = FileAccessContext.download_shared_dataset(project_id=project_id, 401 dataset_id=dataset_id, 402 base_url=dataset.s3) 403 else: 404 access_context = FileAccessContext.download(project_id=project_id, 405 base_url=dataset.s3) 406 407 self._file_service.download_files(access_context, download_location, files) 408 409 def update_samplesheet( 410 self, 411 project_id: str, 412 dataset_id: str, 413 samplesheet: str 414 ): 415 """ 416 Updates a samplesheet on a dataset 417 418 Args: 419 project_id (str): ID of the Project 420 dataset_id (str): ID of the Dataset 421 samplesheet (str): Samplesheet contents to update (should be a CSV string) 422 """ 423 dataset = self.get(project_id, dataset_id) 424 access_context = FileAccessContext.upload_sample_sheet(project_id=project_id, 425 dataset_id=dataset_id, 426 base_url=dataset.s3) 427 428 samplesheet_key = f'{access_context.prefix}/samplesheet.csv' 429 self._file_service.create_file( 430 access_context=access_context, 431 key=samplesheet_key, 432 contents=samplesheet, 433 content_type='text/csv' 434 )
Service for interacting with the Dataset endpoints
26 def list(self, project_id: str, max_items: int = 10000) -> List[Dataset]: 27 """List datasets 28 29 Retrieves a list of datasets for a given project 30 31 Args: 32 project_id (str): ID of the Project 33 max_items (int): Maximum number of records to get (default 10,000) 34 """ 35 return get_all_records( 36 records_getter=lambda page_args: get_datasets.sync( 37 project_id=project_id, 38 client=self._api_client, 39 next_token=page_args.next_token, 40 limit=page_args.limit 41 ), 42 max_items=max_items 43 )
List datasets
Retrieves a list of datasets for a given project
Arguments:
- project_id (str): ID of the Project
- max_items (int): Maximum number of records to get (default 10,000)
77 def import_public(self, project_id: str, import_request: ImportDataRequest) -> CreateResponse: 78 """ 79 Download data from public repositories 80 81 Args: 82 project_id (str): ID of the Project 83 import_request (cirro_api_client.v1.models.ImportDataRequest): 84 85 Returns: 86 ID of the created dataset 87 88 ```python 89 from cirro_api_client.v1.models import ImportDataRequest, Tag 90 from cirro.cirro_client import CirroApi 91 92 cirro = CirroApi() 93 request = ImportDataRequest( 94 name="Imported dataset", 95 description="Description of the dataset", 96 public_ids=["SRR123456", "SRR123457"], 97 tags=[Tag(value="tag1")] 98 ) 99 cirro.datasets.import_public("project-id", request) 100 ``` 101 """ 102 return import_public_dataset.sync(project_id=project_id, client=self._api_client, body=import_request)
Download data from public repositories
Arguments:
- project_id (str): ID of the Project
- import_request (cirro_api_client.v1.models.ImportDataRequest):
Returns:
ID of the created dataset
from cirro_api_client.v1.models import ImportDataRequest, Tag
from cirro.cirro_client import CirroApi
cirro = CirroApi()
request = ImportDataRequest(
name="Imported dataset",
description="Description of the dataset",
public_ids=["SRR123456", "SRR123457"],
tags=[Tag(value="tag1")]
)
cirro.datasets.import_public("project-id", request)
104 def create(self, project_id: str, upload_request: UploadDatasetRequest) -> UploadDatasetCreateResponse: 105 """ 106 Registers a dataset in Cirro, which can subsequently have files uploaded to it 107 108 Args: 109 project_id (str): ID of the Project 110 upload_request (cirro_api_client.v1.models.UploadDatasetRequest): 111 112 Returns: 113 ID of the created dataset and the path to upload files 114 115 ```python 116 from cirro_api_client.v1.models import UploadDatasetRequest, Tag 117 from cirro.cirro_client import CirroApi 118 119 cirro = CirroApi() 120 request = UploadDatasetRequest( 121 name="Name of new dataset", 122 process_id="paired_dnaseq", 123 expected_files=["read_1.fastq.gz", "read_2.fastq.gz"], 124 description="Description of the dataset", 125 tags=[Tag(value="tag1"), Tag(value="tag2")] 126 ) 127 cirro.datasets.create("project-id", request) 128 ``` 129 """ 130 return upload_dataset.sync(project_id=project_id, client=self._api_client, body=upload_request)
Registers a dataset in Cirro, which can subsequently have files uploaded to it
Arguments:
- project_id (str): ID of the Project
- upload_request (cirro_api_client.v1.models.UploadDatasetRequest):
Returns:
ID of the created dataset and the path to upload files
from cirro_api_client.v1.models import UploadDatasetRequest, Tag
from cirro.cirro_client import CirroApi
cirro = CirroApi()
request = UploadDatasetRequest(
name="Name of new dataset",
process_id="paired_dnaseq",
expected_files=["read_1.fastq.gz", "read_2.fastq.gz"],
description="Description of the dataset",
tags=[Tag(value="tag1"), Tag(value="tag2")]
)
cirro.datasets.create("project-id", request)
132 def get(self, project_id: str, dataset_id: str) -> Optional[DatasetDetail]: 133 """ 134 Gets detailed information about a dataset 135 136 Args: 137 project_id (str): ID of the Project 138 dataset_id (str): ID of the Dataset 139 140 Returns: 141 The dataset, if found 142 """ 143 return get_dataset.sync(project_id=project_id, dataset_id=dataset_id, client=self._api_client)
Gets detailed information about a dataset
Arguments:
- project_id (str): ID of the Project
- dataset_id (str): ID of the Dataset
Returns:
The dataset, if found
145 def update(self, project_id: str, dataset_id: str, request: UpdateDatasetRequest) -> DatasetDetail: 146 """ 147 Update info on a dataset (name, description, and/or tags) 148 149 Args: 150 project_id (str): ID of the Project 151 dataset_id (str): ID of the Dataset 152 request (cirro_api_client.v1.models.UpdateDatasetRequest): 153 154 Returns: 155 The updated dataset 156 157 ```python 158 from cirro_api_client.v1.models import UpdateDatasetRequest 159 from cirro.cirro_client import CirroApi 160 161 cirro = CirroApi() 162 request = UpdateDatasetRequest( 163 name="Name of new dataset", 164 process_id="paired_dnaseq", 165 description="Description of the dataset" 166 ) 167 cirro.datasets.update("project-id", "dataset-id", request) 168 ``` 169 """ 170 return update_dataset.sync(project_id=project_id, dataset_id=dataset_id, body=request, client=self._api_client)
Update info on a dataset (name, description, and/or tags)
Arguments:
- project_id (str): ID of the Project
- dataset_id (str): ID of the Dataset
- request (cirro_api_client.v1.models.UpdateDatasetRequest):
Returns:
The updated dataset
from cirro_api_client.v1.models import UpdateDatasetRequest
from cirro.cirro_client import CirroApi
cirro = CirroApi()
request = UpdateDatasetRequest(
name="Name of new dataset",
process_id="paired_dnaseq",
description="Description of the dataset"
)
cirro.datasets.update("project-id", "dataset-id", request)
172 def delete(self, project_id: str, dataset_id: str) -> None: 173 """ 174 Delete a dataset 175 176 After a dataset has been deleted, the files associated with that 177 dataset are saved according to the project's retention time. 178 179 Args: 180 project_id (str): ID of the Project 181 dataset_id (str): ID of the Dataset 182 """ 183 delete_dataset.sync_detailed(project_id=project_id, dataset_id=dataset_id, client=self._api_client)
Delete a dataset
After a dataset has been deleted, the files associated with that dataset are saved according to the project's retention time.
Arguments:
- project_id (str): ID of the Project
- dataset_id (str): ID of the Dataset
185 def get_assets_listing(self, project_id: str, dataset_id: str, file_limit: int = 100000) -> DatasetAssets: 186 """ 187 Gets a listing of files, charts, and other assets available for the dataset 188 189 Args: 190 project_id (str): ID of the Project 191 dataset_id (str): ID of the Dataset 192 file_limit (int): Maximum number of files to get (default 100,000) 193 """ 194 dataset = self.get(project_id, dataset_id) 195 if file_limit < 1: 196 raise ValueError("file_limit must be greater than 0") 197 all_files = [] 198 file_offset = 0 199 domain = None 200 artifacts = None 201 202 while len(all_files) < file_limit: 203 manifest = get_dataset_manifest.sync( 204 project_id=project_id, 205 dataset_id=dataset_id, 206 file_offset=file_offset, 207 client=self._api_client 208 ) 209 all_files.extend(manifest.files) 210 file_offset += len(manifest.files) 211 212 if not artifacts: 213 artifacts = manifest.artifacts 214 215 domain = manifest.domain 216 if len(all_files) >= manifest.total_files or len(manifest.files) == 0: 217 break 218 219 files = [ 220 File.from_file_entry( 221 f, 222 project_id=project_id, 223 dataset=dataset, 224 domain=domain 225 ) 226 for f in all_files 227 ] 228 artifacts = [ 229 Artifact( 230 artifact_type=a.type_, 231 file=File.from_file_entry( 232 FileEntry(a.path), 233 project_id=project_id, 234 dataset=dataset, 235 domain=domain 236 ) 237 ) 238 for a in artifacts 239 ] 240 return DatasetAssets(files=files, artifacts=artifacts)
Gets a listing of files, charts, and other assets available for the dataset
Arguments:
- project_id (str): ID of the Project
- dataset_id (str): ID of the Dataset
- file_limit (int): Maximum number of files to get (default 100,000)
242 def upload_files(self, 243 project_id: str, 244 dataset_id: str, 245 directory: PathLike, 246 files: List[PathLike] = None, 247 file_path_map: Dict[PathLike, str] = None) -> None: 248 """ 249 Uploads files to a given dataset from the specified directory. 250 251 All files must be relative to the specified directory. 252 If files need to be flattened, or you are sourcing files from multiple directories, 253 please include `file_path_map` or call this method multiple times. 254 255 Args: 256 project_id (str): ID of the Project 257 dataset_id (str): ID of the Dataset 258 directory (str|Path): Path to directory 259 files (typing.List[str|Path]): List of paths to files within the directory, 260 must be the same type as directory. 261 file_path_map (typing.Dict[str|Path, str|Path]): Optional mapping of file paths to upload 262 from source path to destination path, used to "re-write" paths within the dataset. 263 ```python 264 from cirro.cirro_client import CirroApi 265 from cirro.file_utils import generate_flattened_file_map 266 267 cirro = CirroApi() 268 269 directory = "~/Downloads" 270 # Re-write file paths 271 file_map = { 272 "data1/file1.fastq.gz": "file1.fastq.gz", 273 "data2/file2.fastq.gz": "file2.fastq.gz", 274 "file3.fastq.gz": "new_file3.txt" 275 } 276 277 # Or you could automate the flattening 278 files = ["data1/file1.fastq.gz"] 279 file_map = generate_flattened_file_map(files) 280 281 cirro.datasets.upload_files( 282 project_id="project-id", 283 dataset_id="dataset-id", 284 directory=directory, 285 files=list(file_map.keys()), 286 file_path_map=file_map 287 ) 288 ``` 289 """ 290 if file_path_map is None: 291 file_path_map = {} 292 293 dataset = self.get(project_id, dataset_id) 294 295 access_context = FileAccessContext.upload_dataset( 296 project_id=project_id, 297 dataset_id=dataset_id, 298 base_url=dataset.s3 299 ) 300 301 self._file_service.upload_files( 302 access_context=access_context, 303 directory=directory, 304 files=files, 305 file_path_map=file_path_map 306 )
Uploads files to a given dataset from the specified directory.
All files must be relative to the specified directory.
If files need to be flattened, or you are sourcing files from multiple directories,
please include file_path_map or call this method multiple times.
Arguments:
- project_id (str): ID of the Project
- dataset_id (str): ID of the Dataset
- directory (str|Path): Path to directory
- files (typing.List[str|Path]): List of paths to files within the directory, must be the same type as directory.
- file_path_map (typing.Dict[str|Path, str|Path]): Optional mapping of file paths to upload from source path to destination path, used to "re-write" paths within the dataset.
from cirro.cirro_client import CirroApi
from cirro.file_utils import generate_flattened_file_map
cirro = CirroApi()
directory = "~/Downloads"
# Re-write file paths
file_map = {
"data1/file1.fastq.gz": "file1.fastq.gz",
"data2/file2.fastq.gz": "file2.fastq.gz",
"file3.fastq.gz": "new_file3.txt"
}
# Or you could automate the flattening
files = ["data1/file1.fastq.gz"]
file_map = generate_flattened_file_map(files)
cirro.datasets.upload_files(
project_id="project-id",
dataset_id="dataset-id",
directory=directory,
files=list(file_map.keys()),
file_path_map=file_map
)
308 def validate_folder( 309 self, 310 project_id: str, 311 dataset_id: str, 312 local_folder: PathLike, 313 file_limit: int = 100000 314 ) -> DatasetValidationResponse: 315 """ 316 Validates that the contents of a dataset match that of a local folder. 317 """ 318 ds_files = self.get_assets_listing(project_id, dataset_id, file_limit=file_limit).files 319 320 local_folder = Path(local_folder) 321 if not local_folder.is_dir(): 322 raise ValueError(f"{local_folder} is not a valid local folder") 323 324 # Keep track of files from the dataset which match by checksum, don't match, or are missing 325 ds_files_matching = [] 326 ds_files_not_matching = [] 327 ds_files_missing = [] 328 ds_validate_failed = [] 329 for ds_file in ds_files: 330 ds_file_path = ds_file.normalized_path 331 # Get the corresponding local file 332 local_file = local_folder / ds_file_path 333 if not local_file.exists(): 334 ds_files_missing.append(ds_file_path) 335 else: 336 try: 337 if self._file_service.is_valid_file(ds_file, local_file): 338 ds_files_matching.append(ds_file_path) 339 else: 340 ds_files_not_matching.append(ds_file_path) 341 except RuntimeWarning as e: 342 logger.warning(f"File validation failed: {e}") 343 ds_validate_failed.append(ds_file_path) 344 345 # Find local files that are not in the dataset 346 local_file_paths = [ 347 file.relative_to(local_folder).as_posix() 348 for file in local_folder.rglob("*") 349 if not file.is_dir() and not is_hidden_file(file) 350 ] 351 dataset_file_paths = [file.normalized_path for file in ds_files] 352 local_only_files = [ 353 file 354 for file in local_file_paths 355 if file not in dataset_file_paths 356 ] 357 358 return DatasetValidationResponse( 359 files_matching=ds_files_matching, 360 files_not_matching=ds_files_not_matching, 361 files_missing=ds_files_missing, 362 local_only_files=local_only_files, 363 validate_errors=ds_validate_failed, 364 )
Validates that the contents of a dataset match that of a local folder.
366 def download_files( 367 self, 368 project_id: str, 369 dataset_id: str, 370 download_location: str, 371 files: Union[List[File], List[str]] = None, 372 file_limit: int = 100000 373 ) -> None: 374 """ 375 Downloads files from a dataset 376 377 The `files` argument is used to optionally specify a subset of files 378 to be downloaded. By default, all files are downloaded. 379 380 Args: 381 project_id (str): ID of the Project 382 dataset_id (str): ID of the Dataset 383 download_location (str): Local destination for downloaded files 384 files (typing.List[str]): Optional list of files to download 385 file_limit (int): Maximum number of files to get (default 100,000) 386 """ 387 if files is None: 388 files = self.get_assets_listing(project_id, dataset_id, file_limit=file_limit).files 389 390 if len(files) == 0: 391 return 392 393 first_file = files[0] 394 if isinstance(first_file, File): 395 files = [file.relative_path for file in files] 396 access_context = first_file.access_context 397 else: 398 dataset = self.get(project_id, dataset_id) 399 if dataset.share: 400 access_context = FileAccessContext.download_shared_dataset(project_id=project_id, 401 dataset_id=dataset_id, 402 base_url=dataset.s3) 403 else: 404 access_context = FileAccessContext.download(project_id=project_id, 405 base_url=dataset.s3) 406 407 self._file_service.download_files(access_context, download_location, files)
Downloads files from a dataset
The files argument is used to optionally specify a subset of files
to be downloaded. By default, all files are downloaded.
Arguments:
- project_id (str): ID of the Project
- dataset_id (str): ID of the Dataset
- download_location (str): Local destination for downloaded files
- files (typing.List[str]): Optional list of files to download
- file_limit (int): Maximum number of files to get (default 100,000)
409 def update_samplesheet( 410 self, 411 project_id: str, 412 dataset_id: str, 413 samplesheet: str 414 ): 415 """ 416 Updates a samplesheet on a dataset 417 418 Args: 419 project_id (str): ID of the Project 420 dataset_id (str): ID of the Dataset 421 samplesheet (str): Samplesheet contents to update (should be a CSV string) 422 """ 423 dataset = self.get(project_id, dataset_id) 424 access_context = FileAccessContext.upload_sample_sheet(project_id=project_id, 425 dataset_id=dataset_id, 426 base_url=dataset.s3) 427 428 samplesheet_key = f'{access_context.prefix}/samplesheet.csv' 429 self._file_service.create_file( 430 access_context=access_context, 431 key=samplesheet_key, 432 contents=samplesheet, 433 content_type='text/csv' 434 )
Updates a samplesheet on a dataset
Arguments:
- project_id (str): ID of the Project
- dataset_id (str): ID of the Dataset
- samplesheet (str): Samplesheet contents to update (should be a CSV string)
13class ExecutionService(BaseService): 14 """ 15 Service for interacting with the Execution endpoints 16 """ 17 def run_analysis(self, project_id: str, request: RunAnalysisRequest) -> CreateResponse: 18 """ 19 Launch an analysis job running a process on a set of inputs 20 21 Args: 22 project_id (str): ID of the Project 23 request (cirro_api_client.v1.models.RunAnalysisRequest): 24 25 Returns: 26 The ID of the created dataset 27 28 ```python 29 from cirro_api_client.v1.models import RunAnalysisRequest, RunAnalysisRequestParams 30 from cirro.cirro_client import CirroApi 31 32 # Example: 33 # Run the "process-nf-core-rnaseq-3_8" process using input data 34 # from a dataset with the id "source-dataset-id" 35 36 # Optional analysis parameters 37 params = RunAnalysisRequestParams.from_dict({ 38 "param_a": "val_a", 39 "param_b": "val_b" 40 }) 41 42 cirro = CirroApi() 43 request = RunAnalysisRequest( 44 name="Name of the newly created dataset", 45 description="Longer description of newly created dataset", 46 process_id="process-nf-core-rnaseq-3_8", 47 source_dataset_ids=["source-dataset-id"], 48 params=params 49 ) 50 cirro.execution.run_analysis("project-id", request) 51 ``` 52 """ 53 54 form_spec = get_process_parameters.sync( 55 process_id=request.process_id, 56 client=self._api_client 57 ) 58 59 ParameterSpecification( 60 form_spec 61 ).validate_params( 62 request.params.to_dict() if request.params else {} 63 ) 64 65 return run_analysis.sync( 66 project_id=project_id, 67 body=request, 68 client=self._api_client 69 ) 70 71 def stop_analysis(self, project_id: str, dataset_id: str): 72 """ 73 Terminates all jobs related to a running analysis 74 75 Args: 76 project_id (str): ID of the Project 77 dataset_id (str): ID of the Dataset 78 """ 79 80 return stop_analysis.sync( 81 project_id=project_id, 82 dataset_id=dataset_id, 83 client=self._api_client 84 ) 85 86 def get_project_summary(self, project_id: str) -> Dict[str, List[Task]]: 87 """ 88 Gets an overview of the executions currently running in the project, by job queue 89 90 Args: 91 project_id (str): ID of the Project 92 93 Returns: 94 `cirro_api_client.v1.models.GetProjectSummaryResponse200` 95 """ 96 97 return get_project_summary.sync( 98 project_id=project_id, 99 client=self._api_client 100 ).additional_properties 101 102 def get_execution_logs(self, project_id: str, dataset_id: str, force_live=False) -> str: 103 """ 104 Gets live logs from main execution task 105 106 Args: 107 project_id (str): ID of the Project 108 dataset_id (str): ID of the Dataset 109 force_live (bool): If True, it will fetch logs from CloudWatch, 110 even if the execution is already completed 111 112 """ 113 114 resp = get_execution_logs.sync( 115 project_id=project_id, 116 dataset_id=dataset_id, 117 force_live=force_live, 118 client=self._api_client 119 ) 120 121 return '\n'.join(e.message for e in resp.events) 122 123 def get_tasks_for_execution(self, project_id: str, dataset_id: str, force_live=False) -> Optional[List[Task]]: 124 """ 125 Gets the tasks submitted by the workflow execution 126 127 Args: 128 project_id (str): ID of the Project 129 dataset_id (str): ID of the Dataset 130 force_live (bool): If True, it will try to get the list of jobs 131 from the executor (i.e., AWS Batch), rather than the workflow report 132 """ 133 134 return get_tasks_for_execution.sync( 135 project_id=project_id, 136 dataset_id=dataset_id, 137 force_live=force_live, 138 client=self._api_client 139 ) 140 141 def get_task_logs(self, project_id: str, dataset_id: str, task_id: str, force_live=False) -> str: 142 """ 143 Gets the log output from an individual task 144 145 Args: 146 project_id (str): ID of the Project 147 dataset_id (str): ID of the Dataset 148 task_id (str): ID of the task 149 force_live (bool): If True, it will fetch logs from CloudWatch, 150 even if the execution is already completed 151 152 """ 153 154 resp = get_task_logs.sync( 155 project_id=project_id, 156 dataset_id=dataset_id, 157 task_id=task_id, 158 force_live=force_live, 159 client=self._api_client 160 ) 161 162 return '\n'.join(e.message for e in resp.events)
Service for interacting with the Execution endpoints
17 def run_analysis(self, project_id: str, request: RunAnalysisRequest) -> CreateResponse: 18 """ 19 Launch an analysis job running a process on a set of inputs 20 21 Args: 22 project_id (str): ID of the Project 23 request (cirro_api_client.v1.models.RunAnalysisRequest): 24 25 Returns: 26 The ID of the created dataset 27 28 ```python 29 from cirro_api_client.v1.models import RunAnalysisRequest, RunAnalysisRequestParams 30 from cirro.cirro_client import CirroApi 31 32 # Example: 33 # Run the "process-nf-core-rnaseq-3_8" process using input data 34 # from a dataset with the id "source-dataset-id" 35 36 # Optional analysis parameters 37 params = RunAnalysisRequestParams.from_dict({ 38 "param_a": "val_a", 39 "param_b": "val_b" 40 }) 41 42 cirro = CirroApi() 43 request = RunAnalysisRequest( 44 name="Name of the newly created dataset", 45 description="Longer description of newly created dataset", 46 process_id="process-nf-core-rnaseq-3_8", 47 source_dataset_ids=["source-dataset-id"], 48 params=params 49 ) 50 cirro.execution.run_analysis("project-id", request) 51 ``` 52 """ 53 54 form_spec = get_process_parameters.sync( 55 process_id=request.process_id, 56 client=self._api_client 57 ) 58 59 ParameterSpecification( 60 form_spec 61 ).validate_params( 62 request.params.to_dict() if request.params else {} 63 ) 64 65 return run_analysis.sync( 66 project_id=project_id, 67 body=request, 68 client=self._api_client 69 )
Launch an analysis job running a process on a set of inputs
Arguments:
- project_id (str): ID of the Project
- request (cirro_api_client.v1.models.RunAnalysisRequest):
Returns:
The ID of the created dataset
from cirro_api_client.v1.models import RunAnalysisRequest, RunAnalysisRequestParams
from cirro.cirro_client import CirroApi
# Example:
# Run the "process-nf-core-rnaseq-3_8" process using input data
# from a dataset with the id "source-dataset-id"
# Optional analysis parameters
params = RunAnalysisRequestParams.from_dict({
"param_a": "val_a",
"param_b": "val_b"
})
cirro = CirroApi()
request = RunAnalysisRequest(
name="Name of the newly created dataset",
description="Longer description of newly created dataset",
process_id="process-nf-core-rnaseq-3_8",
source_dataset_ids=["source-dataset-id"],
params=params
)
cirro.execution.run_analysis("project-id", request)
71 def stop_analysis(self, project_id: str, dataset_id: str): 72 """ 73 Terminates all jobs related to a running analysis 74 75 Args: 76 project_id (str): ID of the Project 77 dataset_id (str): ID of the Dataset 78 """ 79 80 return stop_analysis.sync( 81 project_id=project_id, 82 dataset_id=dataset_id, 83 client=self._api_client 84 )
Terminates all jobs related to a running analysis
Arguments:
- project_id (str): ID of the Project
- dataset_id (str): ID of the Dataset
86 def get_project_summary(self, project_id: str) -> Dict[str, List[Task]]: 87 """ 88 Gets an overview of the executions currently running in the project, by job queue 89 90 Args: 91 project_id (str): ID of the Project 92 93 Returns: 94 `cirro_api_client.v1.models.GetProjectSummaryResponse200` 95 """ 96 97 return get_project_summary.sync( 98 project_id=project_id, 99 client=self._api_client 100 ).additional_properties
Gets an overview of the executions currently running in the project, by job queue
Arguments:
- project_id (str): ID of the Project
Returns:
102 def get_execution_logs(self, project_id: str, dataset_id: str, force_live=False) -> str: 103 """ 104 Gets live logs from main execution task 105 106 Args: 107 project_id (str): ID of the Project 108 dataset_id (str): ID of the Dataset 109 force_live (bool): If True, it will fetch logs from CloudWatch, 110 even if the execution is already completed 111 112 """ 113 114 resp = get_execution_logs.sync( 115 project_id=project_id, 116 dataset_id=dataset_id, 117 force_live=force_live, 118 client=self._api_client 119 ) 120 121 return '\n'.join(e.message for e in resp.events)
Gets live logs from main execution task
Arguments:
- project_id (str): ID of the Project
- dataset_id (str): ID of the Dataset
- force_live (bool): If True, it will fetch logs from CloudWatch, even if the execution is already completed
123 def get_tasks_for_execution(self, project_id: str, dataset_id: str, force_live=False) -> Optional[List[Task]]: 124 """ 125 Gets the tasks submitted by the workflow execution 126 127 Args: 128 project_id (str): ID of the Project 129 dataset_id (str): ID of the Dataset 130 force_live (bool): If True, it will try to get the list of jobs 131 from the executor (i.e., AWS Batch), rather than the workflow report 132 """ 133 134 return get_tasks_for_execution.sync( 135 project_id=project_id, 136 dataset_id=dataset_id, 137 force_live=force_live, 138 client=self._api_client 139 )
Gets the tasks submitted by the workflow execution
Arguments:
- project_id (str): ID of the Project
- dataset_id (str): ID of the Dataset
- force_live (bool): If True, it will try to get the list of jobs from the executor (i.e., AWS Batch), rather than the workflow report
141 def get_task_logs(self, project_id: str, dataset_id: str, task_id: str, force_live=False) -> str: 142 """ 143 Gets the log output from an individual task 144 145 Args: 146 project_id (str): ID of the Project 147 dataset_id (str): ID of the Dataset 148 task_id (str): ID of the task 149 force_live (bool): If True, it will fetch logs from CloudWatch, 150 even if the execution is already completed 151 152 """ 153 154 resp = get_task_logs.sync( 155 project_id=project_id, 156 dataset_id=dataset_id, 157 task_id=task_id, 158 force_live=force_live, 159 client=self._api_client 160 ) 161 162 return '\n'.join(e.message for e in resp.events)
Gets the log output from an individual task
Arguments:
- project_id (str): ID of the Project
- dataset_id (str): ID of the Dataset
- task_id (str): ID of the task
- force_live (bool): If True, it will fetch logs from CloudWatch, even if the execution is already completed
10class ComputeEnvironmentService(BaseService): 11 """ 12 Service for interacting with the Compute Environment endpoints 13 """ 14 15 def list_environments_for_project(self, project_id: str) -> List[ComputeEnvironmentConfiguration]: 16 """ 17 List of custom compute environments for a project (i.e., an agent) 18 19 Args: 20 project_id (str): Project ID 21 22 Returns: 23 List of compute environments that are available for the project 24 """ 25 return get_compute_environments.sync(project_id=project_id, 26 client=self._api_client)
Service for interacting with the Compute Environment endpoints
15 def list_environments_for_project(self, project_id: str) -> List[ComputeEnvironmentConfiguration]: 16 """ 17 List of custom compute environments for a project (i.e., an agent) 18 19 Args: 20 project_id (str): Project ID 21 22 Returns: 23 List of compute environments that are available for the project 24 """ 25 return get_compute_environments.sync(project_id=project_id, 26 client=self._api_client)
List of custom compute environments for a project (i.e., an agent)
Arguments:
- project_id (str): Project ID
Returns:
List of compute environments that are available for the project
22class FileService(BaseService): 23 """ 24 Service for interacting with files 25 """ 26 checksum_method: str 27 transfer_retries: int 28 _get_token_lock = threading.Lock() 29 _read_token_cache: Dict[str, AWSCredentials] = {} 30 31 def __init__(self, api_client, checksum_method, transfer_retries): 32 """ 33 Instantiates the file service class 34 """ 35 self._api_client = api_client 36 self.checksum_method = checksum_method 37 self.transfer_retries = transfer_retries 38 39 def get_access_credentials(self, access_context: FileAccessContext) -> AWSCredentials: 40 """ 41 Retrieves credentials to access files 42 43 Args: 44 access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate 45 """ 46 access_request = access_context.file_access_request 47 48 if access_request.access_type == ProjectAccessType.PROJECT_DOWNLOAD: 49 return self._get_project_read_credentials(access_context) 50 51 else: 52 return generate_project_file_access_token.sync( 53 client=self._api_client, 54 project_id=access_context.project_id, 55 body=access_context.file_access_request 56 ) 57 58 def _get_project_read_credentials(self, access_context: FileAccessContext): 59 """ 60 Retrieves credentials to read project data, this can be cached 61 """ 62 access_request = access_context.file_access_request 63 project_id = access_context.project_id 64 with self._get_token_lock: 65 cached_token = self._read_token_cache.get(project_id) 66 67 if not cached_token or datetime.now(tz=timezone.utc) > cached_token.expiration: 68 new_token = generate_project_file_access_token.sync( 69 client=self._api_client, 70 project_id=project_id, 71 body=access_request 72 ) 73 74 self._read_token_cache[project_id] = new_token 75 76 return self._read_token_cache[project_id] 77 78 def get_aws_s3_client(self, access_context: FileAccessContext) -> BaseClient: 79 """ 80 Gets the underlying AWS S3 client to perform operations on files 81 82 This is seeded with refreshable credentials from the access_context parameter 83 84 This may be used to perform advanced operations, such as CopyObject, S3 Select, etc. 85 """ 86 s3_client = self._generate_s3_client(access_context) 87 return s3_client.get_aws_client() 88 89 def get_file(self, file: File) -> bytes: 90 """ 91 Gets the contents of a file 92 93 Args: 94 file (cirro.models.file.File): 95 96 Returns: 97 The raw bytes of the file 98 """ 99 return self.get_file_from_path(file.access_context, file.relative_path) 100 101 def get_file_from_path(self, access_context: FileAccessContext, file_path: str) -> bytes: 102 """ 103 Gets the contents of a file by providing the path, used internally 104 105 Args: 106 access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate 107 file_path (str): Relative path to file within dataset 108 109 Returns: 110 The raw bytes of the file 111 """ 112 s3_client = self._generate_s3_client(access_context) 113 114 full_path = f'{access_context.prefix}/{file_path}'.lstrip('/') 115 116 return s3_client.get_file(access_context.bucket, full_path) 117 118 def create_file(self, access_context: FileAccessContext, key: str, 119 contents: str, content_type: str) -> None: 120 """ 121 Creates a file at the specified path 122 123 Args: 124 access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate 125 key (str): Key of object to create 126 contents (str): Content of object 127 content_type (str): 128 """ 129 s3_client = self._generate_s3_client(access_context) 130 131 s3_client.create_object( 132 key=key, 133 contents=contents, 134 content_type=content_type, 135 bucket=access_context.bucket 136 ) 137 138 def upload_files(self, 139 access_context: FileAccessContext, 140 directory: PathLike, 141 files: List[PathLike], 142 file_path_map: Dict[PathLike, str]) -> None: 143 """ 144 Uploads a list of files from the specified directory 145 146 Args: 147 access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate 148 directory (str|Path): Path to directory 149 files (typing.List[str|Path]): List of paths to files within the directory 150 must be the same type as directory. 151 file_path_map (typing.Dict[str|Path, str]): Optional mapping of file paths to upload 152 from source path to destination path, used to "re-write" paths within the dataset. 153 """ 154 s3_client = self._generate_s3_client(access_context) 155 156 upload_directory( 157 directory=directory, 158 files=files, 159 file_path_map=file_path_map, 160 s3_client=s3_client, 161 bucket=access_context.bucket, 162 prefix=access_context.prefix, 163 max_retries=self.transfer_retries 164 ) 165 166 def download_files(self, access_context: FileAccessContext, directory: str, files: List[str]) -> List[Path]: 167 """ 168 Download a list of files to the specified directory 169 170 Args: 171 access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate 172 directory (str): download location 173 files (List[str]): relative path of files to download 174 Returns: 175 List of paths to downloaded files 176 """ 177 s3_client = self._generate_s3_client(access_context) 178 179 return download_directory( 180 directory, 181 files, 182 s3_client, 183 access_context.bucket, 184 access_context.prefix 185 ) 186 187 def is_valid_file(self, file: File, local_file: Path) -> bool: 188 """ 189 Validates the checksum of a file against a local file 190 See ``validate_file`` method for details. 191 192 Args: 193 file (File): Cirro file to validate 194 local_file (PathLike): Local file path to compare against 195 196 Returns: 197 bool: True if file integrity matches, False otherwise 198 199 Raises: 200 RuntimeWarning: If the remote checksum is not available or not supported 201 """ 202 try: 203 self.validate_file(file, local_file) 204 return True 205 except ValueError: 206 return False 207 208 def validate_file(self, file: File, local_file: PathLike): 209 """ 210 Validates the checksum of a file against a local file 211 This is used to ensure file integrity after download or upload 212 213 Checksums might not be available if the file was uploaded without checksum support 214 215 https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html 216 Args: 217 file (File): Cirro file to validate 218 local_file (PathLike): Local file path to compare against 219 220 Raises: 221 ValueError: If checksums do not match 222 RuntimeWarning: If the remote checksum is not available or not supported 223 """ 224 local_file = Path(local_file).expanduser() if isinstance(local_file, str) else local_file.expanduser() 225 226 stats = self.get_file_stats(file) 227 228 remote_checksum_key = next((prop for prop in stats.keys() 229 if 'Checksum' in prop and prop != 'ChecksumType'), None) 230 231 if 'ChecksumType' in stats and stats['ChecksumType'] != 'FULL_OBJECT': 232 raise RuntimeWarning(f"Only 'FULL_OBJECT' checksums are supported, not {stats['ChecksumType']}") 233 234 if remote_checksum_key is None: 235 raise RuntimeWarning(f"File {file.relative_path} does not have a checksum available for validation.") 236 237 remote_checksum = stats[remote_checksum_key] 238 remote_checksum_name = remote_checksum_key.replace('Checksum', '') 239 logger.debug(f"Checksum for file {file.relative_path} is {remote_checksum} using {remote_checksum_name}") 240 241 local_checksum = get_checksum(local_file, remote_checksum_name) 242 logger.debug(f"Local checksum for file {local_file} is {local_checksum} using {remote_checksum_name}") 243 244 if local_checksum != remote_checksum: 245 raise ValueError(f"Checksum mismatch for file {file.relative_path}: " 246 f"local {local_checksum}, remote {remote_checksum}") 247 248 def get_file_stats(self, file: File) -> dict: 249 """ 250 Gets the file stats for a file, such as size, checksum, etc. 251 Equivalent to the `head_object` operation in S3 252 """ 253 s3_client = self._generate_s3_client(file.access_context) 254 255 full_path = f'{file.access_context.prefix}/{file.relative_path}' 256 257 stats = s3_client.get_file_stats( 258 bucket=file.access_context.bucket, 259 key=full_path 260 ) 261 logger.debug(f"File stats for file {file.relative_path} is {stats}") 262 return stats 263 264 def _generate_s3_client(self, access_context: FileAccessContext): 265 """ 266 Generates the Cirro-S3 client to perform operations on files 267 """ 268 return S3Client( 269 partial(self.get_access_credentials, access_context), 270 self.checksum_method 271 )
Service for interacting with files
31 def __init__(self, api_client, checksum_method, transfer_retries): 32 """ 33 Instantiates the file service class 34 """ 35 self._api_client = api_client 36 self.checksum_method = checksum_method 37 self.transfer_retries = transfer_retries
Instantiates the file service class
39 def get_access_credentials(self, access_context: FileAccessContext) -> AWSCredentials: 40 """ 41 Retrieves credentials to access files 42 43 Args: 44 access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate 45 """ 46 access_request = access_context.file_access_request 47 48 if access_request.access_type == ProjectAccessType.PROJECT_DOWNLOAD: 49 return self._get_project_read_credentials(access_context) 50 51 else: 52 return generate_project_file_access_token.sync( 53 client=self._api_client, 54 project_id=access_context.project_id, 55 body=access_context.file_access_request 56 )
Retrieves credentials to access files
Arguments:
- access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
78 def get_aws_s3_client(self, access_context: FileAccessContext) -> BaseClient: 79 """ 80 Gets the underlying AWS S3 client to perform operations on files 81 82 This is seeded with refreshable credentials from the access_context parameter 83 84 This may be used to perform advanced operations, such as CopyObject, S3 Select, etc. 85 """ 86 s3_client = self._generate_s3_client(access_context) 87 return s3_client.get_aws_client()
Gets the underlying AWS S3 client to perform operations on files
This is seeded with refreshable credentials from the access_context parameter
This may be used to perform advanced operations, such as CopyObject, S3 Select, etc.
89 def get_file(self, file: File) -> bytes: 90 """ 91 Gets the contents of a file 92 93 Args: 94 file (cirro.models.file.File): 95 96 Returns: 97 The raw bytes of the file 98 """ 99 return self.get_file_from_path(file.access_context, file.relative_path)
Gets the contents of a file
Arguments:
- file (cirro.models.file.File):
Returns:
The raw bytes of the file
101 def get_file_from_path(self, access_context: FileAccessContext, file_path: str) -> bytes: 102 """ 103 Gets the contents of a file by providing the path, used internally 104 105 Args: 106 access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate 107 file_path (str): Relative path to file within dataset 108 109 Returns: 110 The raw bytes of the file 111 """ 112 s3_client = self._generate_s3_client(access_context) 113 114 full_path = f'{access_context.prefix}/{file_path}'.lstrip('/') 115 116 return s3_client.get_file(access_context.bucket, full_path)
Gets the contents of a file by providing the path, used internally
Arguments:
- access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
- file_path (str): Relative path to file within dataset
Returns:
The raw bytes of the file
118 def create_file(self, access_context: FileAccessContext, key: str, 119 contents: str, content_type: str) -> None: 120 """ 121 Creates a file at the specified path 122 123 Args: 124 access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate 125 key (str): Key of object to create 126 contents (str): Content of object 127 content_type (str): 128 """ 129 s3_client = self._generate_s3_client(access_context) 130 131 s3_client.create_object( 132 key=key, 133 contents=contents, 134 content_type=content_type, 135 bucket=access_context.bucket 136 )
Creates a file at the specified path
Arguments:
- access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
- key (str): Key of object to create
- contents (str): Content of object
- content_type (str):
138 def upload_files(self, 139 access_context: FileAccessContext, 140 directory: PathLike, 141 files: List[PathLike], 142 file_path_map: Dict[PathLike, str]) -> None: 143 """ 144 Uploads a list of files from the specified directory 145 146 Args: 147 access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate 148 directory (str|Path): Path to directory 149 files (typing.List[str|Path]): List of paths to files within the directory 150 must be the same type as directory. 151 file_path_map (typing.Dict[str|Path, str]): Optional mapping of file paths to upload 152 from source path to destination path, used to "re-write" paths within the dataset. 153 """ 154 s3_client = self._generate_s3_client(access_context) 155 156 upload_directory( 157 directory=directory, 158 files=files, 159 file_path_map=file_path_map, 160 s3_client=s3_client, 161 bucket=access_context.bucket, 162 prefix=access_context.prefix, 163 max_retries=self.transfer_retries 164 )
Uploads a list of files from the specified directory
Arguments:
- access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
- directory (str|Path): Path to directory
- files (typing.List[str|Path]): List of paths to files within the directory must be the same type as directory.
- file_path_map (typing.Dict[str|Path, str]): Optional mapping of file paths to upload from source path to destination path, used to "re-write" paths within the dataset.
166 def download_files(self, access_context: FileAccessContext, directory: str, files: List[str]) -> List[Path]: 167 """ 168 Download a list of files to the specified directory 169 170 Args: 171 access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate 172 directory (str): download location 173 files (List[str]): relative path of files to download 174 Returns: 175 List of paths to downloaded files 176 """ 177 s3_client = self._generate_s3_client(access_context) 178 179 return download_directory( 180 directory, 181 files, 182 s3_client, 183 access_context.bucket, 184 access_context.prefix 185 )
Download a list of files to the specified directory
Arguments:
- access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
- directory (str): download location
- files (List[str]): relative path of files to download
Returns:
List of paths to downloaded files
187 def is_valid_file(self, file: File, local_file: Path) -> bool: 188 """ 189 Validates the checksum of a file against a local file 190 See ``validate_file`` method for details. 191 192 Args: 193 file (File): Cirro file to validate 194 local_file (PathLike): Local file path to compare against 195 196 Returns: 197 bool: True if file integrity matches, False otherwise 198 199 Raises: 200 RuntimeWarning: If the remote checksum is not available or not supported 201 """ 202 try: 203 self.validate_file(file, local_file) 204 return True 205 except ValueError: 206 return False
Validates the checksum of a file against a local file
See validate_file method for details.
Arguments:
- file (File): Cirro file to validate
- local_file (PathLike): Local file path to compare against
Returns:
bool: True if file integrity matches, False otherwise
Raises:
- RuntimeWarning: If the remote checksum is not available or not supported
208 def validate_file(self, file: File, local_file: PathLike): 209 """ 210 Validates the checksum of a file against a local file 211 This is used to ensure file integrity after download or upload 212 213 Checksums might not be available if the file was uploaded without checksum support 214 215 https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html 216 Args: 217 file (File): Cirro file to validate 218 local_file (PathLike): Local file path to compare against 219 220 Raises: 221 ValueError: If checksums do not match 222 RuntimeWarning: If the remote checksum is not available or not supported 223 """ 224 local_file = Path(local_file).expanduser() if isinstance(local_file, str) else local_file.expanduser() 225 226 stats = self.get_file_stats(file) 227 228 remote_checksum_key = next((prop for prop in stats.keys() 229 if 'Checksum' in prop and prop != 'ChecksumType'), None) 230 231 if 'ChecksumType' in stats and stats['ChecksumType'] != 'FULL_OBJECT': 232 raise RuntimeWarning(f"Only 'FULL_OBJECT' checksums are supported, not {stats['ChecksumType']}") 233 234 if remote_checksum_key is None: 235 raise RuntimeWarning(f"File {file.relative_path} does not have a checksum available for validation.") 236 237 remote_checksum = stats[remote_checksum_key] 238 remote_checksum_name = remote_checksum_key.replace('Checksum', '') 239 logger.debug(f"Checksum for file {file.relative_path} is {remote_checksum} using {remote_checksum_name}") 240 241 local_checksum = get_checksum(local_file, remote_checksum_name) 242 logger.debug(f"Local checksum for file {local_file} is {local_checksum} using {remote_checksum_name}") 243 244 if local_checksum != remote_checksum: 245 raise ValueError(f"Checksum mismatch for file {file.relative_path}: " 246 f"local {local_checksum}, remote {remote_checksum}")
Validates the checksum of a file against a local file This is used to ensure file integrity after download or upload
Checksums might not be available if the file was uploaded without checksum support
https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html
Arguments:
- file (File): Cirro file to validate
- local_file (PathLike): Local file path to compare against
Raises:
- ValueError: If checksums do not match
- RuntimeWarning: If the remote checksum is not available or not supported
248 def get_file_stats(self, file: File) -> dict: 249 """ 250 Gets the file stats for a file, such as size, checksum, etc. 251 Equivalent to the `head_object` operation in S3 252 """ 253 s3_client = self._generate_s3_client(file.access_context) 254 255 full_path = f'{file.access_context.prefix}/{file.relative_path}' 256 257 stats = s3_client.get_file_stats( 258 bucket=file.access_context.bucket, 259 key=full_path 260 ) 261 logger.debug(f"File stats for file {file.relative_path} is {stats}") 262 return stats
Gets the file stats for a file, such as size, checksum, etc.
Equivalent to the head_object operation in S3
11class MetadataService(BaseService): 12 """ 13 Service for interacting with the Metadata endpoints 14 """ 15 def get_project_samples(self, project_id: str, max_items: int = 10000) -> List[Sample]: 16 """ 17 Retrieves a list of samples associated with a project along with their metadata 18 19 Args: 20 project_id (str): ID of the Project 21 max_items (int): Maximum number of records to get (default 10,000) 22 """ 23 return get_all_records( 24 records_getter=lambda page_args: get_project_samples.sync(project_id=project_id, 25 client=self._api_client, 26 next_token=page_args.next_token, 27 limit=page_args.limit), 28 max_items=max_items 29 ) 30 31 def get_project_schema(self, project_id: str) -> FormSchema: 32 """ 33 Get project metadata schema 34 35 Args: 36 project_id (str): ID of the Project 37 """ 38 return get_project_schema.sync(project_id=project_id, client=self._api_client) 39 40 def update_project_schema(self, project_id: str, schema: FormSchema): 41 """ 42 Update project metadata schema 43 44 Args: 45 project_id (str): ID of the Project 46 schema (cirro_api_client.v1.models.FormSchema): Metadata schema 47 """ 48 update_project_schema.sync_detailed(project_id=project_id, body=schema, client=self._api_client) 49 50 def update_sample(self, project_id: str, sample_id: str, sample: SampleRequest) -> Sample: 51 """ 52 Updates metadata information for sample 53 54 Args: 55 project_id (str): ID of the Project 56 sample_id (str): ID of the sample 57 sample (cirro_api_client.v1.models.SampleRequest): Metadata information for the sample 58 """ 59 return update_sample.sync( 60 project_id=project_id, 61 sample_id=sample_id, 62 body=sample, 63 client=self._api_client 64 )
Service for interacting with the Metadata endpoints
15 def get_project_samples(self, project_id: str, max_items: int = 10000) -> List[Sample]: 16 """ 17 Retrieves a list of samples associated with a project along with their metadata 18 19 Args: 20 project_id (str): ID of the Project 21 max_items (int): Maximum number of records to get (default 10,000) 22 """ 23 return get_all_records( 24 records_getter=lambda page_args: get_project_samples.sync(project_id=project_id, 25 client=self._api_client, 26 next_token=page_args.next_token, 27 limit=page_args.limit), 28 max_items=max_items 29 )
Retrieves a list of samples associated with a project along with their metadata
Arguments:
- project_id (str): ID of the Project
- max_items (int): Maximum number of records to get (default 10,000)
31 def get_project_schema(self, project_id: str) -> FormSchema: 32 """ 33 Get project metadata schema 34 35 Args: 36 project_id (str): ID of the Project 37 """ 38 return get_project_schema.sync(project_id=project_id, client=self._api_client)
Get project metadata schema
Arguments:
- project_id (str): ID of the Project
40 def update_project_schema(self, project_id: str, schema: FormSchema): 41 """ 42 Update project metadata schema 43 44 Args: 45 project_id (str): ID of the Project 46 schema (cirro_api_client.v1.models.FormSchema): Metadata schema 47 """ 48 update_project_schema.sync_detailed(project_id=project_id, body=schema, client=self._api_client)
Update project metadata schema
Arguments:
- project_id (str): ID of the Project
- schema (cirro_api_client.v1.models.FormSchema): Metadata schema
50 def update_sample(self, project_id: str, sample_id: str, sample: SampleRequest) -> Sample: 51 """ 52 Updates metadata information for sample 53 54 Args: 55 project_id (str): ID of the Project 56 sample_id (str): ID of the sample 57 sample (cirro_api_client.v1.models.SampleRequest): Metadata information for the sample 58 """ 59 return update_sample.sync( 60 project_id=project_id, 61 sample_id=sample_id, 62 body=sample, 63 client=self._api_client 64 )
Updates metadata information for sample
Arguments:
- project_id (str): ID of the Project
- sample_id (str): ID of the sample
- sample (cirro_api_client.v1.models.SampleRequest): Metadata information for the sample
10class MetricsService(BaseService): 11 """ 12 Service for interacting with the Metrics endpoints 13 """ 14 def get_for_project(self, project_id: str) -> ProjectMetrics: 15 """ 16 Retrieves the cost and storage metrics for a project. 17 18 Args: 19 project_id (str): ID of the Project 20 """ 21 return get_project_metrics.sync( 22 project_id=project_id, 23 client=self._api_client 24 ) 25 26 def get_all_metrics(self) -> List[ProjectMetrics]: 27 """ 28 Retrieves all available metrics 29 """ 30 return get_all_metrics.sync(client=self._api_client)
Service for interacting with the Metrics endpoints
14 def get_for_project(self, project_id: str) -> ProjectMetrics: 15 """ 16 Retrieves the cost and storage metrics for a project. 17 18 Args: 19 project_id (str): ID of the Project 20 """ 21 return get_project_metrics.sync( 22 project_id=project_id, 23 client=self._api_client 24 )
Retrieves the cost and storage metrics for a project.
Arguments:
- project_id (str): ID of the Project
15class ProcessService(BaseService): 16 """ 17 Service for interacting with the Process endpoints 18 """ 19 def list(self, process_type: Executor = None, include_archived=False) -> List[Process]: 20 """ 21 Retrieves a list of available processes 22 23 Args: 24 process_type (`cirro_api_client.v1.models.Executor`): Optional process type (INGEST, CROMWELL, or NEXTFLOW) 25 include_archived: Whether to include archived processes in the list (default False) 26 """ 27 processes = get_processes.sync(client=self._api_client, include_archived=include_archived) 28 return [p for p in processes if not process_type or process_type == p.executor] 29 30 def get(self, process_id: str) -> ProcessDetail: 31 """ 32 Retrieves detailed information on a process 33 34 Args: 35 process_id (str): Process ID 36 """ 37 return get_process.sync(process_id=process_id, client=self._api_client) 38 39 def archive(self, process_id: str): 40 """ 41 Removes a custom process from the list of available processes. 42 43 Error will be raised if the requested process does not exist. No value 44 is returned, and no error raised if process exists and request is satisfied. 45 46 Args: 47 process_id (str): Process ID 48 """ 49 archive_custom_process.sync_detailed(process_id=process_id, client=self._api_client) 50 51 def find_by_name(self, name: str) -> Optional[ProcessDetail]: 52 """ 53 Get a process by its display name 54 55 Args: 56 name (str): Process name 57 """ 58 matched_process = next((p for p in self.list() if p.name == name), None) 59 if not matched_process: 60 return None 61 62 return self.get(matched_process.id) 63 64 def get_parameter_spec(self, process_id: str) -> ParameterSpecification: 65 """ 66 Gets a specification used to describe the parameters used in the process 67 68 Args: 69 process_id (str): Process ID 70 """ 71 form_spec = get_process_parameters.sync(process_id=process_id, client=self._api_client) 72 return ParameterSpecification(form_spec) 73 74 def check_dataset_files(self, files: List[str], process_id: str, directory: str): 75 """ 76 Checks if the file mapping rules for a process are met by the list of files 77 78 Error will be raised if the file mapping rules for the process are not met. 79 No value is returned and no error is raised if the rules are satisfied. 80 81 Args: 82 process_id (str): ID for the process containing the file mapping rules 83 directory: path to directory containing files 84 files (List[str]): File names to check 85 """ 86 # Parse sample sheet file if present 87 sample_sheet = None 88 sample_sheet_file = Path(directory, 'samplesheet.csv') 89 if sample_sheet_file.exists(): 90 sample_sheet = sample_sheet_file.read_text() 91 92 request = ValidateFileRequirementsRequest( 93 file_names=files, 94 sample_sheet=sample_sheet 95 ) 96 requirements = validate_file_requirements.sync(process_id=process_id, body=request, client=self._api_client) 97 98 # These will be sample sheet errors or no files errors 99 if error_msg := requirements.error_msg: 100 raise ValueError(error_msg) 101 102 errors = [ 103 f'{entry.description}. {entry.error_msg}. We accept any of the following naming conventions: \n\t- ' + 104 '\n\t- '.join([ 105 e.example_name 106 for e in entry.allowed_patterns 107 ]) 108 for entry in requirements.allowed_data_types 109 if entry.error_msg is not None 110 ] 111 112 files_provided = ', '.join(files) 113 114 if len(errors) != 0: 115 raise ValueError(f"The files you have provided are: {files_provided} \n\n" 116 "They do not meet the dataset requirements. " 117 "The required file types are: \n" + 118 "\n".join(errors)) 119 120 def create_custom_process(self, process: CustomProcessInput) -> CreateResponse: 121 """ 122 Creates a custom process (pipeline or data type) in the system. 123 124 Whether it is a pipeline or data type is determined by the executor field. 125 For pipelines, you must specify the pipeline code and configuration repository. 126 127 This process will be available to all users in the system if `is_tenant_wide` is set to True. 128 If `is_tenant_wide` is set to False, the process will only be available the projects 129 specified in `linked_projects_ids`. Making it available tenant wide requires tenant admin privileges. 130 131 If the repository is private, you must complete the authorization flow on the UI. 132 133 See https://docs.cirro.bio/pipelines/importing-custom-pipelines/ for more info. 134 135 Args: 136 process (Process): Process to create 137 138 Example: 139 ```python 140 from cirro_api_client.v1.models import CustomProcessInput, Executor, \ 141 PipelineCode, FileMappingRule, FileNamePattern, RepositoryType, CustomPipelineSettings 142 from cirro.cirro_client import CirroApi 143 144 cirro = CirroApi() 145 146 # New pipeline 147 new_pipeline = CustomProcessInput( 148 id="my_pipeline", 149 name="My Pipeline", 150 description="This is a test pipeline", 151 executor=Executor.CROMWELL, 152 category="DNA Sequencing", 153 child_process_ids=[], 154 parent_process_ids=["rnaseq"], 155 documentation_url="https://example.com/docs", 156 pipeline_code=PipelineCode( 157 repository_path="CirroBio/test-pipeline", 158 version="v1.0.0", 159 entry_point="main.nf", 160 repository_type=RepositoryType.GITHUB_PUBLIC 161 ), 162 linked_project_ids=[], 163 is_tenant_wide=True, 164 allow_multiple_sources=True, 165 uses_sample_sheet=True, 166 # This can be the same or different from the pipeline_code 167 custom_settings=CustomPipelineSettings( 168 repository="CirroBio/test-pipeline", 169 branch="v1.0.0", 170 repository_type=RepositoryType.GITHUB_PUBLIC, 171 ), 172 file_mapping_rules=[ 173 FileMappingRule( 174 description="Filtered Feature Matrix", 175 file_name_patterns=[ 176 FileNamePattern( 177 example_name="filtered_feature_bc_matrix.h5", 178 description="Matrix", 179 sample_matching_pattern="(?P<sampleName>[\\S ]*)\\.jpg" 180 ) 181 ] 182 ) 183 ] 184 ) 185 186 cirro.processes.create_custom_process(new_pipeline) 187 188 # New data type 189 new_data_type = CustomProcessInput( 190 id="images_jpg", 191 name="JPG Images", 192 description="Used for generic JPG images", 193 executor=Executor.INGEST, 194 child_process_ids=[], 195 parent_process_ids=[], 196 documentation_url="https://example.com/docs", 197 linked_project_ids=["project_id_1", "project_id_2"], 198 file_mapping_rules=[ 199 FileMappingRule( 200 description="Images", 201 min_=1, 202 file_name_patterns=[ 203 FileNamePattern( 204 example_name="image.jpg", 205 description="JPG Image", 206 sample_matching_pattern="(?P<sampleName>[\\S ]*)/outs/image\\.jpg" 207 ) 208 ] 209 ) 210 ] 211 ) 212 cirro.processes.create_custom_process(new_data_type) 213 ``` 214 """ 215 return create_custom_process.sync(client=self._api_client, body=process) 216 217 def update_custom_process(self, process_id: str, process: CustomProcessInput): 218 """ 219 Updates the custom process (pipeline or data type) in the system. 220 221 Please run `sync_custom_process` after updating to ensure the pipeline configuration is up to date. 222 223 Args: 224 process_id (str): ID of the process to update 225 process (CustomProcessInput): Process to update 226 """ 227 update_custom_process.sync_detailed(client=self._api_client, 228 process_id=process_id, 229 body=process) 230 231 def sync_custom_process(self, process_id: str) -> CustomPipelineSettings: 232 """ 233 Syncs a custom pipeline in the system. 234 235 This updates the pipeline configurations in Cirro (form configuration, 236 input mapping, preprocess etc.) to match what is in the configured repository. 237 See https://docs.cirro.bio/pipelines/configuring-pipeline/ for more details. 238 239 Args: 240 process_id (str): ID of the process to sync 241 """ 242 return sync_custom_process.sync(client=self._api_client, process_id=process_id)
Service for interacting with the Process endpoints
19 def list(self, process_type: Executor = None, include_archived=False) -> List[Process]: 20 """ 21 Retrieves a list of available processes 22 23 Args: 24 process_type (`cirro_api_client.v1.models.Executor`): Optional process type (INGEST, CROMWELL, or NEXTFLOW) 25 include_archived: Whether to include archived processes in the list (default False) 26 """ 27 processes = get_processes.sync(client=self._api_client, include_archived=include_archived) 28 return [p for p in processes if not process_type or process_type == p.executor]
Retrieves a list of available processes
Arguments:
- process_type (
cirro_api_client.v1.models.Executor): Optional process type (INGEST, CROMWELL, or NEXTFLOW) - include_archived: Whether to include archived processes in the list (default False)
30 def get(self, process_id: str) -> ProcessDetail: 31 """ 32 Retrieves detailed information on a process 33 34 Args: 35 process_id (str): Process ID 36 """ 37 return get_process.sync(process_id=process_id, client=self._api_client)
Retrieves detailed information on a process
Arguments:
- process_id (str): Process ID
39 def archive(self, process_id: str): 40 """ 41 Removes a custom process from the list of available processes. 42 43 Error will be raised if the requested process does not exist. No value 44 is returned, and no error raised if process exists and request is satisfied. 45 46 Args: 47 process_id (str): Process ID 48 """ 49 archive_custom_process.sync_detailed(process_id=process_id, client=self._api_client)
Removes a custom process from the list of available processes.
Error will be raised if the requested process does not exist. No value is returned, and no error raised if process exists and request is satisfied.
Arguments:
- process_id (str): Process ID
51 def find_by_name(self, name: str) -> Optional[ProcessDetail]: 52 """ 53 Get a process by its display name 54 55 Args: 56 name (str): Process name 57 """ 58 matched_process = next((p for p in self.list() if p.name == name), None) 59 if not matched_process: 60 return None 61 62 return self.get(matched_process.id)
Get a process by its display name
Arguments:
- name (str): Process name
64 def get_parameter_spec(self, process_id: str) -> ParameterSpecification: 65 """ 66 Gets a specification used to describe the parameters used in the process 67 68 Args: 69 process_id (str): Process ID 70 """ 71 form_spec = get_process_parameters.sync(process_id=process_id, client=self._api_client) 72 return ParameterSpecification(form_spec)
Gets a specification used to describe the parameters used in the process
Arguments:
- process_id (str): Process ID
74 def check_dataset_files(self, files: List[str], process_id: str, directory: str): 75 """ 76 Checks if the file mapping rules for a process are met by the list of files 77 78 Error will be raised if the file mapping rules for the process are not met. 79 No value is returned and no error is raised if the rules are satisfied. 80 81 Args: 82 process_id (str): ID for the process containing the file mapping rules 83 directory: path to directory containing files 84 files (List[str]): File names to check 85 """ 86 # Parse sample sheet file if present 87 sample_sheet = None 88 sample_sheet_file = Path(directory, 'samplesheet.csv') 89 if sample_sheet_file.exists(): 90 sample_sheet = sample_sheet_file.read_text() 91 92 request = ValidateFileRequirementsRequest( 93 file_names=files, 94 sample_sheet=sample_sheet 95 ) 96 requirements = validate_file_requirements.sync(process_id=process_id, body=request, client=self._api_client) 97 98 # These will be sample sheet errors or no files errors 99 if error_msg := requirements.error_msg: 100 raise ValueError(error_msg) 101 102 errors = [ 103 f'{entry.description}. {entry.error_msg}. We accept any of the following naming conventions: \n\t- ' + 104 '\n\t- '.join([ 105 e.example_name 106 for e in entry.allowed_patterns 107 ]) 108 for entry in requirements.allowed_data_types 109 if entry.error_msg is not None 110 ] 111 112 files_provided = ', '.join(files) 113 114 if len(errors) != 0: 115 raise ValueError(f"The files you have provided are: {files_provided} \n\n" 116 "They do not meet the dataset requirements. " 117 "The required file types are: \n" + 118 "\n".join(errors))
Checks if the file mapping rules for a process are met by the list of files
Error will be raised if the file mapping rules for the process are not met. No value is returned and no error is raised if the rules are satisfied.
Arguments:
- process_id (str): ID for the process containing the file mapping rules
- directory: path to directory containing files
- files (List[str]): File names to check
120 def create_custom_process(self, process: CustomProcessInput) -> CreateResponse: 121 """ 122 Creates a custom process (pipeline or data type) in the system. 123 124 Whether it is a pipeline or data type is determined by the executor field. 125 For pipelines, you must specify the pipeline code and configuration repository. 126 127 This process will be available to all users in the system if `is_tenant_wide` is set to True. 128 If `is_tenant_wide` is set to False, the process will only be available the projects 129 specified in `linked_projects_ids`. Making it available tenant wide requires tenant admin privileges. 130 131 If the repository is private, you must complete the authorization flow on the UI. 132 133 See https://docs.cirro.bio/pipelines/importing-custom-pipelines/ for more info. 134 135 Args: 136 process (Process): Process to create 137 138 Example: 139 ```python 140 from cirro_api_client.v1.models import CustomProcessInput, Executor, \ 141 PipelineCode, FileMappingRule, FileNamePattern, RepositoryType, CustomPipelineSettings 142 from cirro.cirro_client import CirroApi 143 144 cirro = CirroApi() 145 146 # New pipeline 147 new_pipeline = CustomProcessInput( 148 id="my_pipeline", 149 name="My Pipeline", 150 description="This is a test pipeline", 151 executor=Executor.CROMWELL, 152 category="DNA Sequencing", 153 child_process_ids=[], 154 parent_process_ids=["rnaseq"], 155 documentation_url="https://example.com/docs", 156 pipeline_code=PipelineCode( 157 repository_path="CirroBio/test-pipeline", 158 version="v1.0.0", 159 entry_point="main.nf", 160 repository_type=RepositoryType.GITHUB_PUBLIC 161 ), 162 linked_project_ids=[], 163 is_tenant_wide=True, 164 allow_multiple_sources=True, 165 uses_sample_sheet=True, 166 # This can be the same or different from the pipeline_code 167 custom_settings=CustomPipelineSettings( 168 repository="CirroBio/test-pipeline", 169 branch="v1.0.0", 170 repository_type=RepositoryType.GITHUB_PUBLIC, 171 ), 172 file_mapping_rules=[ 173 FileMappingRule( 174 description="Filtered Feature Matrix", 175 file_name_patterns=[ 176 FileNamePattern( 177 example_name="filtered_feature_bc_matrix.h5", 178 description="Matrix", 179 sample_matching_pattern="(?P<sampleName>[\\S ]*)\\.jpg" 180 ) 181 ] 182 ) 183 ] 184 ) 185 186 cirro.processes.create_custom_process(new_pipeline) 187 188 # New data type 189 new_data_type = CustomProcessInput( 190 id="images_jpg", 191 name="JPG Images", 192 description="Used for generic JPG images", 193 executor=Executor.INGEST, 194 child_process_ids=[], 195 parent_process_ids=[], 196 documentation_url="https://example.com/docs", 197 linked_project_ids=["project_id_1", "project_id_2"], 198 file_mapping_rules=[ 199 FileMappingRule( 200 description="Images", 201 min_=1, 202 file_name_patterns=[ 203 FileNamePattern( 204 example_name="image.jpg", 205 description="JPG Image", 206 sample_matching_pattern="(?P<sampleName>[\\S ]*)/outs/image\\.jpg" 207 ) 208 ] 209 ) 210 ] 211 ) 212 cirro.processes.create_custom_process(new_data_type) 213 ``` 214 """ 215 return create_custom_process.sync(client=self._api_client, body=process)
Creates a custom process (pipeline or data type) in the system.
Whether it is a pipeline or data type is determined by the executor field. For pipelines, you must specify the pipeline code and configuration repository.
This process will be available to all users in the system if is_tenant_wide is set to True.
If is_tenant_wide is set to False, the process will only be available the projects
specified in linked_projects_ids. Making it available tenant wide requires tenant admin privileges.
If the repository is private, you must complete the authorization flow on the UI.
See https://docs.cirro.bio/pipelines/importing-custom-pipelines/ for more info.
Arguments:
- process (Process): Process to create
Example:
from cirro_api_client.v1.models import CustomProcessInput, Executor, PipelineCode, FileMappingRule, FileNamePattern, RepositoryType, CustomPipelineSettings
from cirro.cirro_client import CirroApi
cirro = CirroApi()
# New pipeline
new_pipeline = CustomProcessInput(
id="my_pipeline",
name="My Pipeline",
description="This is a test pipeline",
executor=Executor.CROMWELL,
category="DNA Sequencing",
child_process_ids=[],
parent_process_ids=["rnaseq"],
documentation_url="https://example.com/docs",
pipeline_code=PipelineCode(
repository_path="CirroBio/test-pipeline",
version="v1.0.0",
entry_point="main.nf",
repository_type=RepositoryType.GITHUB_PUBLIC
),
linked_project_ids=[],
is_tenant_wide=True,
allow_multiple_sources=True,
uses_sample_sheet=True,
# This can be the same or different from the pipeline_code
custom_settings=CustomPipelineSettings(
repository="CirroBio/test-pipeline",
branch="v1.0.0",
repository_type=RepositoryType.GITHUB_PUBLIC,
),
file_mapping_rules=[
FileMappingRule(
description="Filtered Feature Matrix",
file_name_patterns=[
FileNamePattern(
example_name="filtered_feature_bc_matrix.h5",
description="Matrix",
sample_matching_pattern="(?P<sampleName>[\S ]*)\.jpg"
)
]
)
]
)
cirro.processes.create_custom_process(new_pipeline)
# New data type
new_data_type = CustomProcessInput(
id="images_jpg",
name="JPG Images",
description="Used for generic JPG images",
executor=Executor.INGEST,
child_process_ids=[],
parent_process_ids=[],
documentation_url="https://example.com/docs",
linked_project_ids=["project_id_1", "project_id_2"],
file_mapping_rules=[
FileMappingRule(
description="Images",
min_=1,
file_name_patterns=[
FileNamePattern(
example_name="image.jpg",
description="JPG Image",
sample_matching_pattern="(?P<sampleName>[\S ]*)/outs/image\.jpg"
)
]
)
]
)
cirro.processes.create_custom_process(new_data_type)
217 def update_custom_process(self, process_id: str, process: CustomProcessInput): 218 """ 219 Updates the custom process (pipeline or data type) in the system. 220 221 Please run `sync_custom_process` after updating to ensure the pipeline configuration is up to date. 222 223 Args: 224 process_id (str): ID of the process to update 225 process (CustomProcessInput): Process to update 226 """ 227 update_custom_process.sync_detailed(client=self._api_client, 228 process_id=process_id, 229 body=process)
Updates the custom process (pipeline or data type) in the system.
Please run sync_custom_process after updating to ensure the pipeline configuration is up to date.
Arguments:
- process_id (str): ID of the process to update
- process (CustomProcessInput): Process to update
231 def sync_custom_process(self, process_id: str) -> CustomPipelineSettings: 232 """ 233 Syncs a custom pipeline in the system. 234 235 This updates the pipeline configurations in Cirro (form configuration, 236 input mapping, preprocess etc.) to match what is in the configured repository. 237 See https://docs.cirro.bio/pipelines/configuring-pipeline/ for more details. 238 239 Args: 240 process_id (str): ID of the process to sync 241 """ 242 return sync_custom_process.sync(client=self._api_client, process_id=process_id)
Syncs a custom pipeline in the system.
This updates the pipeline configurations in Cirro (form configuration, input mapping, preprocess etc.) to match what is in the configured repository. See https://docs.cirro.bio/pipelines/configuring-pipeline/ for more details.
Arguments:
- process_id (str): ID of the process to sync
12class ProjectService(BaseService): 13 """ 14 Service for interacting with the Project endpoints 15 """ 16 def list(self): 17 """ 18 Retrieve a list of projects 19 """ 20 return get_projects.sync(client=self._api_client) 21 22 def get(self, project_id: str) -> ProjectDetail: 23 """ 24 Get detailed project information 25 26 Args: 27 project_id (str): Project ID 28 """ 29 return get_project.sync(project_id=project_id, client=self._api_client) 30 31 def create(self, request: ProjectInput) -> CreateResponse: 32 """ 33 Create a project 34 35 Args: 36 request (`cirro_api_client.v1.models.ProjectInput`): Detailed information about the project to create 37 38 Examples: 39 ```python 40 from cirro_api_client.v1.models import ProjectInput, ProjectSettings, Contact, BudgetPeriod, CloudAccount, \ 41 CloudAccountType 42 from cirro.cirro_client import CirroApi 43 44 cirro = CirroApi() 45 46 # Bring your own account projects are hosted by the user 47 # You must provide the account details and VPC information 48 # Please view the ProjectSettings model for more information on the attributes required 49 byoa_project = ProjectInput( 50 name="New Project Name", 51 description="Description of new project", 52 billing_account_id="billing-account-id", 53 settings=ProjectSettings( 54 budget_period=BudgetPeriod.MONTHLY, 55 budget_amount=1000, 56 max_spot_vcpu=300, 57 service_connections=[], 58 retention_policy_days=7, 59 vpc_id="vpc-000000000000", 60 batch_subnets=["subnet-000000000000", "subnet-000000000001"], 61 sagemaker_subnets=["subnet-000000000000", "subnet-000000000001"], 62 kms_arn="arn:aws:kms:us-west-2:000000000000:key/00000000-0000-0000-0000-000000000000" 63 ), 64 account=CloudAccount( 65 account_id="<AWS_ACCOUNT_ID>", 66 region_name="us-west-2", 67 account_name="Cirro Lab Project", 68 account_type=CloudAccountType.BYOA 69 ), 70 contacts=[ 71 Contact( 72 name="Contact Name", 73 organization="Contact Organization", 74 email="contact@email.com", 75 phone="987-654-3210" 76 ) 77 ] 78 ) 79 80 # Hosted projects are managed by Cirro 81 hosted_project = ProjectInput( 82 name="New Project Name", 83 description="Description of new project", 84 billing_account_id="billing-account-id", 85 settings=ProjectSettings( 86 budget_period=BudgetPeriod.MONTHLY, 87 budget_amount=1000, 88 max_spot_vcpu=300, 89 service_connections=[], 90 retention_policy_days=7 91 ), 92 contacts=[ 93 Contact( 94 name="Contact Name", 95 organization="Contact Organization", 96 email="contact@email.com", 97 phone="987-654-3210" 98 ) 99 ] 100 ) 101 102 cirro.projects.create(byoa_project) 103 ``` 104 """ 105 return create_project.sync(client=self._api_client, body=request) 106 107 def update(self, project_id: str, request: ProjectInput) -> ProjectDetail: 108 """ 109 Updates a project 110 111 Args: 112 project_id (str): ID of project to update 113 request (`cirro_api_client.v1.models.ProjectInput`): New details for the project 114 """ 115 return update_project.sync(project_id=project_id, body=request, client=self._api_client) 116 117 def update_tags(self, project_id: str, tags: List[Tag]): 118 """ 119 Sets tags on a project 120 121 Args: 122 project_id (str): Project ID 123 tags (List[Tag]): List of tags to apply 124 """ 125 update_project_tags.sync_detailed(project_id=project_id, body=tags, client=self._api_client) 126 127 def get_users(self, project_id: str) -> Optional[List[ProjectUser]]: 128 """ 129 Gets users who have access to the project 130 131 Args: 132 project_id (str): Project ID 133 """ 134 return get_project_users.sync(project_id=project_id, client=self._api_client) 135 136 def set_user_role(self, project_id: str, username: str, role: ProjectRole, suppress_notification=False): 137 """ 138 Sets a user's role within a project. 139 140 Set to `ProjectRole.NONE` if removing the user from a project. 141 142 Args: 143 project_id (str): Project ID 144 username (str): Username 145 role (`cirro_api_client.v1.models.ProjectRole`): New role to apply 146 suppress_notification (bool): Suppress email notification 147 """ 148 request_body = SetUserProjectRoleRequest( 149 username=username, 150 role=role, 151 suppress_notification=suppress_notification 152 ) 153 set_user_project_role.sync_detailed(project_id=project_id, body=request_body, client=self._api_client) 154 155 def archive(self, project_id: str): 156 """ 157 Sets the project status to archived (hidden from active projects) 158 159 Args: 160 project_id (str): Project ID 161 """ 162 archive_project.sync_detailed(project_id=project_id, client=self._api_client) 163 164 def unarchive(self, project_id: str): 165 """ 166 Sets the project status to active 167 168 Args: 169 project_id (str): Project ID 170 """ 171 unarchive_project.sync_detailed(project_id=project_id, client=self._api_client)
Service for interacting with the Project endpoints
16 def list(self): 17 """ 18 Retrieve a list of projects 19 """ 20 return get_projects.sync(client=self._api_client)
Retrieve a list of projects
22 def get(self, project_id: str) -> ProjectDetail: 23 """ 24 Get detailed project information 25 26 Args: 27 project_id (str): Project ID 28 """ 29 return get_project.sync(project_id=project_id, client=self._api_client)
Get detailed project information
Arguments:
- project_id (str): Project ID
31 def create(self, request: ProjectInput) -> CreateResponse: 32 """ 33 Create a project 34 35 Args: 36 request (`cirro_api_client.v1.models.ProjectInput`): Detailed information about the project to create 37 38 Examples: 39 ```python 40 from cirro_api_client.v1.models import ProjectInput, ProjectSettings, Contact, BudgetPeriod, CloudAccount, \ 41 CloudAccountType 42 from cirro.cirro_client import CirroApi 43 44 cirro = CirroApi() 45 46 # Bring your own account projects are hosted by the user 47 # You must provide the account details and VPC information 48 # Please view the ProjectSettings model for more information on the attributes required 49 byoa_project = ProjectInput( 50 name="New Project Name", 51 description="Description of new project", 52 billing_account_id="billing-account-id", 53 settings=ProjectSettings( 54 budget_period=BudgetPeriod.MONTHLY, 55 budget_amount=1000, 56 max_spot_vcpu=300, 57 service_connections=[], 58 retention_policy_days=7, 59 vpc_id="vpc-000000000000", 60 batch_subnets=["subnet-000000000000", "subnet-000000000001"], 61 sagemaker_subnets=["subnet-000000000000", "subnet-000000000001"], 62 kms_arn="arn:aws:kms:us-west-2:000000000000:key/00000000-0000-0000-0000-000000000000" 63 ), 64 account=CloudAccount( 65 account_id="<AWS_ACCOUNT_ID>", 66 region_name="us-west-2", 67 account_name="Cirro Lab Project", 68 account_type=CloudAccountType.BYOA 69 ), 70 contacts=[ 71 Contact( 72 name="Contact Name", 73 organization="Contact Organization", 74 email="contact@email.com", 75 phone="987-654-3210" 76 ) 77 ] 78 ) 79 80 # Hosted projects are managed by Cirro 81 hosted_project = ProjectInput( 82 name="New Project Name", 83 description="Description of new project", 84 billing_account_id="billing-account-id", 85 settings=ProjectSettings( 86 budget_period=BudgetPeriod.MONTHLY, 87 budget_amount=1000, 88 max_spot_vcpu=300, 89 service_connections=[], 90 retention_policy_days=7 91 ), 92 contacts=[ 93 Contact( 94 name="Contact Name", 95 organization="Contact Organization", 96 email="contact@email.com", 97 phone="987-654-3210" 98 ) 99 ] 100 ) 101 102 cirro.projects.create(byoa_project) 103 ``` 104 """ 105 return create_project.sync(client=self._api_client, body=request)
Create a project
Arguments:
- request (
cirro_api_client.v1.models.ProjectInput): Detailed information about the project to create
Examples:
from cirro_api_client.v1.models import ProjectInput, ProjectSettings, Contact, BudgetPeriod, CloudAccount, CloudAccountType
from cirro.cirro_client import CirroApi
cirro = CirroApi()
# Bring your own account projects are hosted by the user
# You must provide the account details and VPC information
# Please view the ProjectSettings model for more information on the attributes required
byoa_project = ProjectInput(
name="New Project Name",
description="Description of new project",
billing_account_id="billing-account-id",
settings=ProjectSettings(
budget_period=BudgetPeriod.MONTHLY,
budget_amount=1000,
max_spot_vcpu=300,
service_connections=[],
retention_policy_days=7,
vpc_id="vpc-000000000000",
batch_subnets=["subnet-000000000000", "subnet-000000000001"],
sagemaker_subnets=["subnet-000000000000", "subnet-000000000001"],
kms_arn="arn:aws:kms:us-west-2:000000000000:key/00000000-0000-0000-0000-000000000000"
),
account=CloudAccount(
account_id="<AWS_ACCOUNT_ID>",
region_name="us-west-2",
account_name="Cirro Lab Project",
account_type=CloudAccountType.BYOA
),
contacts=[
Contact(
name="Contact Name",
organization="Contact Organization",
email="contact@email.com",
phone="987-654-3210"
)
]
)
# Hosted projects are managed by Cirro
hosted_project = ProjectInput(
name="New Project Name",
description="Description of new project",
billing_account_id="billing-account-id",
settings=ProjectSettings(
budget_period=BudgetPeriod.MONTHLY,
budget_amount=1000,
max_spot_vcpu=300,
service_connections=[],
retention_policy_days=7
),
contacts=[
Contact(
name="Contact Name",
organization="Contact Organization",
email="contact@email.com",
phone="987-654-3210"
)
]
)
cirro.projects.create(byoa_project)
107 def update(self, project_id: str, request: ProjectInput) -> ProjectDetail: 108 """ 109 Updates a project 110 111 Args: 112 project_id (str): ID of project to update 113 request (`cirro_api_client.v1.models.ProjectInput`): New details for the project 114 """ 115 return update_project.sync(project_id=project_id, body=request, client=self._api_client)
Updates a project
Arguments:
- project_id (str): ID of project to update
- request (
cirro_api_client.v1.models.ProjectInput): New details for the project
127 def get_users(self, project_id: str) -> Optional[List[ProjectUser]]: 128 """ 129 Gets users who have access to the project 130 131 Args: 132 project_id (str): Project ID 133 """ 134 return get_project_users.sync(project_id=project_id, client=self._api_client)
Gets users who have access to the project
Arguments:
- project_id (str): Project ID
136 def set_user_role(self, project_id: str, username: str, role: ProjectRole, suppress_notification=False): 137 """ 138 Sets a user's role within a project. 139 140 Set to `ProjectRole.NONE` if removing the user from a project. 141 142 Args: 143 project_id (str): Project ID 144 username (str): Username 145 role (`cirro_api_client.v1.models.ProjectRole`): New role to apply 146 suppress_notification (bool): Suppress email notification 147 """ 148 request_body = SetUserProjectRoleRequest( 149 username=username, 150 role=role, 151 suppress_notification=suppress_notification 152 ) 153 set_user_project_role.sync_detailed(project_id=project_id, body=request_body, client=self._api_client)
Sets a user's role within a project.
Set to ProjectRole.NONE if removing the user from a project.
Arguments:
- project_id (str): Project ID
- username (str): Username
- role (
cirro_api_client.v1.models.ProjectRole): New role to apply - suppress_notification (bool): Suppress email notification
155 def archive(self, project_id: str): 156 """ 157 Sets the project status to archived (hidden from active projects) 158 159 Args: 160 project_id (str): Project ID 161 """ 162 archive_project.sync_detailed(project_id=project_id, client=self._api_client)
Sets the project status to archived (hidden from active projects)
Arguments:
- project_id (str): Project ID
164 def unarchive(self, project_id: str): 165 """ 166 Sets the project status to active 167 168 Args: 169 project_id (str): Project ID 170 """ 171 unarchive_project.sync_detailed(project_id=project_id, client=self._api_client)
Sets the project status to active
Arguments:
- project_id (str): Project ID
14class ReferenceService(FileEnabledService): 15 """ 16 Service for interacting with the References endpoints 17 """ 18 def get_types(self) -> Optional[List[ReferenceType]]: 19 """ 20 List available reference types 21 """ 22 return get_reference_types.sync(client=self._api_client) 23 24 def get_type(self, reference_type_name: str) -> Optional[ReferenceType]: 25 """ 26 Get a specific reference type by name 27 28 Args: 29 reference_type_name (str): Name of the reference type (e.g. "Reference Genome (FASTA)") 30 """ 31 types = self.get_types() 32 return next((t for t in types if t.name == reference_type_name), None) 33 34 def get_for_project(self, project_id: str) -> Optional[List[Reference]]: 35 """ 36 List available references for a given project 37 """ 38 return get_references_for_project.sync(project_id=project_id, client=self._api_client) 39 40 def upload_reference(self, 41 name: str, 42 reference_files: list[PathLike], 43 project_id: str, 44 ref_type: ReferenceType): 45 """ 46 Upload a reference to a project 47 48 Args: 49 name (str): Name of the reference (e.g. "GRCh38") 50 no spaces are allowed in the name 51 reference_files (list[PathLike]): Path to the reference file to upload 52 project_id (str): ID of the project to upload the reference to 53 ref_type (ReferenceType): Type of the reference 54 55 ```python 56 from pathlib import Path 57 58 from cirro import CirroApi 59 60 cirro = CirroApi() 61 62 crispr_library = cirro.references.get_type("CRISPR sgRNA Library") 63 files = [ 64 Path("~/crispr_library.csv").expanduser() 65 ] 66 67 cirro.references.upload_reference( 68 name="Library Name", 69 project_id="project-id", 70 ref_type=crispr_library, 71 reference_files=files, 72 ) 73 ``` 74 """ 75 # Validate name 76 if ' ' in name: 77 raise ValueError("Reference name cannot contain spaces") 78 79 access_context = FileAccessContext.upload_reference( 80 project_id=project_id, 81 base_url=f's3://project-{project_id}/resources/data/references', 82 ) 83 84 file_path_map = generate_reference_file_path_map( 85 files=reference_files, 86 name=name, 87 ref_type=ref_type 88 ) 89 90 self._file_service.upload_files( 91 access_context=access_context, 92 directory=Path('~').expanduser(), # Full path expected in reference_files 93 files=reference_files, 94 file_path_map=file_path_map 95 ) 96 97 refresh_project_references.sync_detailed(project_id=project_id, 98 client=self._api_client)
Service for interacting with the References endpoints
18 def get_types(self) -> Optional[List[ReferenceType]]: 19 """ 20 List available reference types 21 """ 22 return get_reference_types.sync(client=self._api_client)
List available reference types
24 def get_type(self, reference_type_name: str) -> Optional[ReferenceType]: 25 """ 26 Get a specific reference type by name 27 28 Args: 29 reference_type_name (str): Name of the reference type (e.g. "Reference Genome (FASTA)") 30 """ 31 types = self.get_types() 32 return next((t for t in types if t.name == reference_type_name), None)
Get a specific reference type by name
Arguments:
- reference_type_name (str): Name of the reference type (e.g. "Reference Genome (FASTA)")
34 def get_for_project(self, project_id: str) -> Optional[List[Reference]]: 35 """ 36 List available references for a given project 37 """ 38 return get_references_for_project.sync(project_id=project_id, client=self._api_client)
List available references for a given project
40 def upload_reference(self, 41 name: str, 42 reference_files: list[PathLike], 43 project_id: str, 44 ref_type: ReferenceType): 45 """ 46 Upload a reference to a project 47 48 Args: 49 name (str): Name of the reference (e.g. "GRCh38") 50 no spaces are allowed in the name 51 reference_files (list[PathLike]): Path to the reference file to upload 52 project_id (str): ID of the project to upload the reference to 53 ref_type (ReferenceType): Type of the reference 54 55 ```python 56 from pathlib import Path 57 58 from cirro import CirroApi 59 60 cirro = CirroApi() 61 62 crispr_library = cirro.references.get_type("CRISPR sgRNA Library") 63 files = [ 64 Path("~/crispr_library.csv").expanduser() 65 ] 66 67 cirro.references.upload_reference( 68 name="Library Name", 69 project_id="project-id", 70 ref_type=crispr_library, 71 reference_files=files, 72 ) 73 ``` 74 """ 75 # Validate name 76 if ' ' in name: 77 raise ValueError("Reference name cannot contain spaces") 78 79 access_context = FileAccessContext.upload_reference( 80 project_id=project_id, 81 base_url=f's3://project-{project_id}/resources/data/references', 82 ) 83 84 file_path_map = generate_reference_file_path_map( 85 files=reference_files, 86 name=name, 87 ref_type=ref_type 88 ) 89 90 self._file_service.upload_files( 91 access_context=access_context, 92 directory=Path('~').expanduser(), # Full path expected in reference_files 93 files=reference_files, 94 file_path_map=file_path_map 95 ) 96 97 refresh_project_references.sync_detailed(project_id=project_id, 98 client=self._api_client)
Upload a reference to a project
Arguments:
- name (str): Name of the reference (e.g. "GRCh38") no spaces are allowed in the name
- reference_files (list[PathLike]): Path to the reference file to upload
- project_id (str): ID of the project to upload the reference to
- ref_type (ReferenceType): Type of the reference
from pathlib import Path
from cirro import CirroApi
cirro = CirroApi()
crispr_library = cirro.references.get_type("CRISPR sgRNA Library")
files = [
Path("~/crispr_library.csv").expanduser()
]
cirro.references.upload_reference(
name="Library Name",
project_id="project-id",
ref_type=crispr_library,
reference_files=files,
)
10class UserService(BaseService): 11 def list(self, max_items: int = 10000) -> List[User]: 12 """ 13 List users in the system 14 """ 15 return get_all_records( 16 records_getter=lambda page_args: list_users.sync( 17 client=self._api_client, 18 next_token=page_args.next_token, 19 limit=page_args.limit 20 ), 21 max_items=max_items 22 ) 23 24 def get(self, username: str) -> UserDetail: 25 """ 26 Get user details by username, including what projects they are assigned to 27 """ 28 return get_user.sync(username=username, client=self._api_client) 29 30 def invite_user(self, name: str, organization: str, email: str): 31 """ 32 Invite a user to the system. 33 If the user already exists, it will return a message that the user already exists. 34 35 Args: 36 name (str): Name of the user 37 organization (str): Organization of the user 38 email (str): Email (username) of the user 39 """ 40 request = InviteUserRequest( 41 name=name, 42 organization=organization, 43 email=email, 44 ) 45 response = invite_user.sync(client=self._api_client, body=request) 46 return response.message
Not to be instantiated directly
11 def list(self, max_items: int = 10000) -> List[User]: 12 """ 13 List users in the system 14 """ 15 return get_all_records( 16 records_getter=lambda page_args: list_users.sync( 17 client=self._api_client, 18 next_token=page_args.next_token, 19 limit=page_args.limit 20 ), 21 max_items=max_items 22 )
List users in the system
24 def get(self, username: str) -> UserDetail: 25 """ 26 Get user details by username, including what projects they are assigned to 27 """ 28 return get_user.sync(username=username, client=self._api_client)
Get user details by username, including what projects they are assigned to
30 def invite_user(self, name: str, organization: str, email: str): 31 """ 32 Invite a user to the system. 33 If the user already exists, it will return a message that the user already exists. 34 35 Args: 36 name (str): Name of the user 37 organization (str): Organization of the user 38 email (str): Email (username) of the user 39 """ 40 request = InviteUserRequest( 41 name=name, 42 organization=organization, 43 email=email, 44 ) 45 response = invite_user.sync(client=self._api_client, body=request) 46 return response.message
Invite a user to the system. If the user already exists, it will return a message that the user already exists.
Arguments:
- name (str): Name of the user
- organization (str): Organization of the user
- email (str): Email (username) of the user
9class WorkspaceService(BaseService): 10 """ 11 Service for interacting with the Workspace endpoints 12 """ 13 def list_environments(self) -> list[WorkspaceEnvironment]: 14 """ 15 List available workspace environments 16 """ 17 return get_workspace_environments.sync(client=self._api_client) 18 19 def list(self, project_id: str) -> list[Workspace]: 20 """ 21 Retrieves a list of workspaces that the user has access to 22 23 Args: 24 project_id (str): ID of the Project 25 """ 26 return get_workspaces.sync(project_id, client=self._api_client) 27 28 def get(self, project_id: str, workspace_id: str) -> Workspace: 29 """ 30 Get details of a workspace 31 32 Args: 33 project_id (str): ID of the Project 34 workspace_id (str): ID of the Workspace 35 """ 36 return get_workspace.sync(project_id=project_id, workspace_id=workspace_id, client=self._api_client) 37 38 def create(self, project_id: str, workspace: WorkspaceInput) -> CreateResponse: 39 """ 40 Create a new workspace in the given project 41 42 Args: 43 project_id (str): ID of the Project 44 workspace (WorkspaceInput): Workspace object to create 45 """ 46 return create_workspace.sync(project_id=project_id, client=self._api_client, body=workspace) 47 48 def delete(self, project_id: str, workspace_id: str) -> None: 49 """ 50 Delete a workspace in the given project 51 52 Args: 53 project_id (str): ID of the Project 54 workspace_id (str): ID of the Workspace 55 """ 56 delete_workspace.sync_detailed(project_id=project_id, workspace_id=workspace_id, client=self._api_client) 57 58 def start(self, project_id: str, workspace_id: str) -> None: 59 """ 60 Start a workspace environment 61 62 Args: 63 project_id (str): ID of the Project 64 workspace_id (str): ID of the Workspace 65 """ 66 start_workspace.sync_detailed(project_id=project_id, workspace_id=workspace_id, client=self._api_client) 67 68 def stop(self, project_id: str, workspace_id: str) -> None: 69 """ 70 Stop a workspace environment 71 72 Args: 73 project_id (str): ID of the Project 74 workspace_id (str): ID of the Workspace 75 """ 76 stop_workspace.sync_detailed(project_id=project_id, workspace_id=workspace_id, client=self._api_client)
Service for interacting with the Workspace endpoints
13 def list_environments(self) -> list[WorkspaceEnvironment]: 14 """ 15 List available workspace environments 16 """ 17 return get_workspace_environments.sync(client=self._api_client)
List available workspace environments
19 def list(self, project_id: str) -> list[Workspace]: 20 """ 21 Retrieves a list of workspaces that the user has access to 22 23 Args: 24 project_id (str): ID of the Project 25 """ 26 return get_workspaces.sync(project_id, client=self._api_client)
Retrieves a list of workspaces that the user has access to
Arguments:
- project_id (str): ID of the Project
28 def get(self, project_id: str, workspace_id: str) -> Workspace: 29 """ 30 Get details of a workspace 31 32 Args: 33 project_id (str): ID of the Project 34 workspace_id (str): ID of the Workspace 35 """ 36 return get_workspace.sync(project_id=project_id, workspace_id=workspace_id, client=self._api_client)
Get details of a workspace
Arguments:
- project_id (str): ID of the Project
- workspace_id (str): ID of the Workspace
38 def create(self, project_id: str, workspace: WorkspaceInput) -> CreateResponse: 39 """ 40 Create a new workspace in the given project 41 42 Args: 43 project_id (str): ID of the Project 44 workspace (WorkspaceInput): Workspace object to create 45 """ 46 return create_workspace.sync(project_id=project_id, client=self._api_client, body=workspace)
Create a new workspace in the given project
Arguments:
- project_id (str): ID of the Project
- workspace (WorkspaceInput): Workspace object to create
48 def delete(self, project_id: str, workspace_id: str) -> None: 49 """ 50 Delete a workspace in the given project 51 52 Args: 53 project_id (str): ID of the Project 54 workspace_id (str): ID of the Workspace 55 """ 56 delete_workspace.sync_detailed(project_id=project_id, workspace_id=workspace_id, client=self._api_client)
Delete a workspace in the given project
Arguments:
- project_id (str): ID of the Project
- workspace_id (str): ID of the Workspace
58 def start(self, project_id: str, workspace_id: str) -> None: 59 """ 60 Start a workspace environment 61 62 Args: 63 project_id (str): ID of the Project 64 workspace_id (str): ID of the Workspace 65 """ 66 start_workspace.sync_detailed(project_id=project_id, workspace_id=workspace_id, client=self._api_client)
Start a workspace environment
Arguments:
- project_id (str): ID of the Project
- workspace_id (str): ID of the Workspace
68 def stop(self, project_id: str, workspace_id: str) -> None: 69 """ 70 Stop a workspace environment 71 72 Args: 73 project_id (str): ID of the Project 74 workspace_id (str): ID of the Workspace 75 """ 76 stop_workspace.sync_detailed(project_id=project_id, workspace_id=workspace_id, client=self._api_client)
Stop a workspace environment
Arguments:
- project_id (str): ID of the Project
- workspace_id (str): ID of the Workspace