cirro.services
1from .billing import BillingService 2from .compute_environment import ComputeEnvironmentService 3from .dataset import DatasetService 4from .execution import ExecutionService 5from .file import FileService 6from .metadata import MetadataService 7from .metrics import MetricsService 8from .process import ProcessService 9from .projects import ProjectService 10from .references import ReferenceService 11from .share import ShareService 12from .user import UserService 13 14__all__ = [ 15 'BillingService', 16 'DatasetService', 17 'ExecutionService', 18 'ComputeEnvironmentService', 19 'FileService', 20 'MetadataService', 21 'MetricsService', 22 'ProcessService', 23 'ProjectService', 24 'ReferenceService', 25 'ShareService', 26 'UserService' 27]
10class BillingService(BaseService): 11 """ 12 Service for interacting with the Billing endpoints 13 """ 14 def list(self) -> List[BillingAccount]: 15 """ 16 Gets a list of billing accounts the current user has access to 17 """ 18 return get_billing_accounts.sync(client=self._api_client) 19 20 def update(self, billing_account_id: str, request: BillingAccountRequest): 21 """ 22 Updates a billing account 23 24 Args: 25 billing_account_id (str): Billing account ID 26 request (cirro_api_client.v1.models.BillingAccountRequest): 27 28 ```python 29 from cirro_api_client.v1.models import BillingAccountRequest 30 from cirro.cirro_client import CirroApi 31 32 cirro = CirroApi() 33 request = BillingAccountRequest( 34 name="New billing account name", 35 primary_budget_number="new-budget-number", 36 owner="New Owner" 37 ) 38 cirro.billing.update("billing-account-id", request) 39 ``` 40 """ 41 update_billing_account.sync_detailed( 42 billing_account_id=billing_account_id, 43 body=request, 44 client=self._api_client 45 )
Service for interacting with the Billing endpoints
14 def list(self) -> List[BillingAccount]: 15 """ 16 Gets a list of billing accounts the current user has access to 17 """ 18 return get_billing_accounts.sync(client=self._api_client)
Gets a list of billing accounts the current user has access to
20 def update(self, billing_account_id: str, request: BillingAccountRequest): 21 """ 22 Updates a billing account 23 24 Args: 25 billing_account_id (str): Billing account ID 26 request (cirro_api_client.v1.models.BillingAccountRequest): 27 28 ```python 29 from cirro_api_client.v1.models import BillingAccountRequest 30 from cirro.cirro_client import CirroApi 31 32 cirro = CirroApi() 33 request = BillingAccountRequest( 34 name="New billing account name", 35 primary_budget_number="new-budget-number", 36 owner="New Owner" 37 ) 38 cirro.billing.update("billing-account-id", request) 39 ``` 40 """ 41 update_billing_account.sync_detailed( 42 billing_account_id=billing_account_id, 43 body=request, 44 client=self._api_client 45 )
Updates a billing account
Arguments:
- billing_account_id (str): Billing account ID
- request (cirro_api_client.v1.models.BillingAccountRequest):
from cirro_api_client.v1.models import BillingAccountRequest
from cirro.cirro_client import CirroApi
cirro = CirroApi()
request = BillingAccountRequest(
name="New billing account name",
primary_budget_number="new-budget-number",
owner="New Owner"
)
cirro.billing.update("billing-account-id", request)
16class DatasetService(FileEnabledService): 17 """ 18 Service for interacting with the Dataset endpoints 19 """ 20 21 def list(self, project_id: str, max_items: int = 10000) -> List[Dataset]: 22 """List datasets 23 24 Retrieves a list of datasets for a given project 25 26 Args: 27 project_id (str): ID of the Project 28 max_items (int): Maximum number of records to get (default 10,000) 29 """ 30 return get_all_records( 31 records_getter=lambda page_args: get_datasets.sync( 32 project_id=project_id, 33 client=self._api_client, 34 next_token=page_args.next_token, 35 limit=page_args.limit 36 ), 37 max_items=max_items 38 ) 39 40 def list_shared(self, project_id: str, share_id: str, max_items: int = 10000) -> List[Dataset]: 41 """ 42 Retrieves a list of shared datasets for a given project and share 43 44 Args: 45 project_id (str): ID of the Project 46 share_id (str): ID of the Share 47 max_items (int): Maximum number of records to get (default 10,000) 48 49 Example: 50 ```python 51 from cirro_api_client.v1.models import ShareType 52 from cirro.cirro_client import CirroApi 53 54 cirro = CirroApi() 55 56 # List shares that are subscribed to 57 subscribed_shares = cirro.shares.list(project_id="project-id", share_type=ShareType.SUBSCRIBER) 58 cirro.datasets.list_shared("project-id", subscribed_shares[0].id) 59 ``` 60 """ 61 return get_all_records( 62 records_getter=lambda page_args: get_shared_datasets.sync( 63 project_id=project_id, 64 share_id=share_id, 65 client=self._api_client, 66 next_token=page_args.next_token, 67 limit=page_args.limit 68 ), 69 max_items=max_items 70 ) 71 72 def import_public(self, project_id: str, import_request: ImportDataRequest) -> CreateResponse: 73 """ 74 Download data from public repositories 75 76 Args: 77 project_id (str): ID of the Project 78 import_request (cirro_api_client.v1.models.ImportDataRequest): 79 80 Returns: 81 ID of the created dataset 82 83 ```python 84 from cirro_api_client.v1.models import ImportDataRequest, Tag 85 from cirro.cirro_client import CirroApi 86 87 cirro = CirroApi() 88 request = ImportDataRequest( 89 name="Imported dataset", 90 description="Description of the dataset", 91 public_ids=["SRR123456", "SRR123457"], 92 tags=[Tag(value="tag1")] 93 ) 94 cirro.datasets.import_public("project-id", request) 95 ``` 96 """ 97 return import_public_dataset.sync(project_id=project_id, client=self._api_client, body=import_request) 98 99 def create(self, project_id: str, upload_request: UploadDatasetRequest) -> UploadDatasetCreateResponse: 100 """ 101 Registers a dataset in Cirro, which can subsequently have files uploaded to it 102 103 Args: 104 project_id (str): ID of the Project 105 upload_request (cirro_api_client.v1.models.UploadDatasetRequest): 106 107 Returns: 108 ID of the created dataset and the path to upload files 109 110 ```python 111 from cirro_api_client.v1.models import UploadDatasetRequest, Tag 112 from cirro.cirro_client import CirroApi 113 114 cirro = CirroApi() 115 request = UploadDatasetRequest( 116 name="Name of new dataset", 117 process_id="paired_dnaseq", 118 expected_files=["read_1.fastq.gz", "read_2.fastq.gz"], 119 description="Description of the dataset", 120 tags=[Tag(value="tag1"), Tag(value="tag2")] 121 ) 122 cirro.datasets.create("project-id", request) 123 ``` 124 """ 125 return upload_dataset.sync(project_id=project_id, client=self._api_client, body=upload_request) 126 127 def get(self, project_id: str, dataset_id: str) -> Optional[DatasetDetail]: 128 """ 129 Gets detailed information about a dataset 130 131 Args: 132 project_id (str): ID of the Project 133 dataset_id (str): ID of the Dataset 134 135 Returns: 136 The dataset, if found 137 """ 138 return get_dataset.sync(project_id=project_id, dataset_id=dataset_id, client=self._api_client) 139 140 def update(self, project_id: str, dataset_id: str, request: UpdateDatasetRequest) -> DatasetDetail: 141 """ 142 Update info on a dataset (name, description, and/or tags) 143 144 Args: 145 project_id (str): ID of the Project 146 dataset_id (str): ID of the Dataset 147 request (cirro_api_client.v1.models.UpdateDatasetRequest): 148 149 Returns: 150 The updated dataset 151 152 ```python 153 from cirro_api_client.v1.models import UpdateDatasetRequest 154 from cirro.cirro_client import CirroApi 155 156 cirro = CirroApi() 157 request = UpdateDatasetRequest( 158 name="Name of new dataset", 159 process_id="paired_dnaseq", 160 description="Description of the dataset" 161 ) 162 cirro.datasets.update("project-id", "dataset-id", request) 163 ``` 164 """ 165 return update_dataset.sync(project_id=project_id, dataset_id=dataset_id, body=request, client=self._api_client) 166 167 def delete(self, project_id: str, dataset_id: str) -> None: 168 """ 169 Delete a dataset 170 171 After a dataset has been deleted, the files associated with that 172 dataset are saved according to the project's retention time. 173 174 Args: 175 project_id (str): ID of the Project 176 dataset_id (str): ID of the Dataset 177 """ 178 delete_dataset.sync_detailed(project_id=project_id, dataset_id=dataset_id, client=self._api_client) 179 180 def get_assets_listing(self, project_id: str, dataset_id: str, file_limit: int = 100000) -> DatasetAssets: 181 """ 182 Gets a listing of files, charts, and other assets available for the dataset 183 184 Args: 185 project_id (str): ID of the Project 186 dataset_id (str): ID of the Dataset 187 file_limit (int): Maximum number of files to get (default 100,000) 188 """ 189 dataset = self.get(project_id, dataset_id) 190 if file_limit < 1: 191 raise ValueError("file_limit must be greater than 0") 192 all_files = [] 193 file_offset = 0 194 domain = None 195 artifacts = None 196 197 while len(all_files) < file_limit: 198 manifest = get_dataset_manifest.sync( 199 project_id=project_id, 200 dataset_id=dataset_id, 201 file_offset=file_offset, 202 client=self._api_client 203 ) 204 all_files.extend(manifest.files) 205 file_offset += len(manifest.files) 206 207 if not artifacts: 208 artifacts = manifest.artifacts 209 210 domain = manifest.domain 211 if len(all_files) >= manifest.total_files or len(manifest.files) == 0: 212 break 213 214 files = [ 215 File.from_file_entry( 216 f, 217 project_id=project_id, 218 dataset=dataset, 219 domain=domain 220 ) 221 for f in all_files 222 ] 223 artifacts = [ 224 Artifact( 225 artifact_type=a.type, 226 file=File.from_file_entry( 227 FileEntry(a.path), 228 project_id=project_id, 229 dataset=dataset, 230 domain=domain 231 ) 232 ) 233 for a in artifacts 234 ] 235 return DatasetAssets(files=files, artifacts=artifacts) 236 237 def upload_files(self, 238 project_id: str, 239 dataset_id: str, 240 directory: PathLike, 241 files: List[PathLike] = None, 242 file_path_map: Dict[PathLike, str] = None) -> None: 243 """ 244 Uploads files to a given dataset from the specified directory. 245 246 All files must be relative to the specified directory. 247 If files need to be flattened, or you are sourcing files from multiple directories, 248 please include `file_path_map` or call this method multiple times. 249 250 Args: 251 project_id (str): ID of the Project 252 dataset_id (str): ID of the Dataset 253 directory (str|Path): Path to directory 254 files (typing.List[str|Path]): List of paths to files within the directory, 255 must be the same type as directory. 256 file_path_map (typing.Dict[str|Path, str|Path]): Optional mapping of file paths to upload 257 from source path to destination path, used to "re-write" paths within the dataset. 258 ```python 259 from cirro.cirro_client import CirroApi 260 from cirro.file_utils import generate_flattened_file_map 261 262 cirro = CirroApi() 263 264 directory = "~/Downloads" 265 # Re-write file paths 266 file_map = { 267 "data1/file1.fastq.gz": "file1.fastq.gz", 268 "data2/file2.fastq.gz": "file2.fastq.gz", 269 "file3.fastq.gz": "new_file3.txt" 270 } 271 272 # Or you could automate the flattening 273 files = ["data1/file1.fastq.gz"] 274 file_map = generate_flattened_file_map(files) 275 276 cirro.datasets.upload_files( 277 project_id="project-id", 278 dataset_id="dataset-id", 279 directory=directory, 280 files=list(file_map.keys()), 281 file_path_map=file_map 282 ) 283 ``` 284 """ 285 if file_path_map is None: 286 file_path_map = {} 287 288 dataset = self.get(project_id, dataset_id) 289 290 access_context = FileAccessContext.upload_dataset( 291 project_id=project_id, 292 dataset_id=dataset_id, 293 base_url=dataset.s3 294 ) 295 296 self._file_service.upload_files( 297 access_context=access_context, 298 directory=directory, 299 files=files, 300 file_path_map=file_path_map 301 ) 302 303 def download_files( 304 self, 305 project_id: str, 306 dataset_id: str, 307 download_location: str, 308 files: Union[List[File], List[str]] = None 309 ) -> None: 310 """ 311 Downloads files from a dataset 312 313 The `files` argument is used to optionally specify a subset of files 314 to be downloaded. By default, all files are downloaded. 315 316 Args: 317 project_id (str): ID of the Project 318 dataset_id (str): ID of the Dataset 319 download_location (str): Local destination for downloaded files 320 files (typing.List[str]): Optional list of files to download 321 """ 322 if files is None: 323 files = self.get_assets_listing(project_id, dataset_id).files 324 325 if len(files) == 0: 326 return 327 328 first_file = files[0] 329 if isinstance(first_file, File): 330 files = [file.relative_path for file in files] 331 access_context = first_file.access_context 332 else: 333 dataset = self.get(project_id, dataset_id) 334 if dataset.share: 335 access_context = FileAccessContext.download_shared_dataset(project_id=project_id, 336 dataset_id=dataset_id, 337 base_url=dataset.s3) 338 else: 339 access_context = FileAccessContext.download(project_id=project_id, 340 base_url=dataset.s3) 341 342 self._file_service.download_files(access_context, download_location, files)
Service for interacting with the Dataset endpoints
21 def list(self, project_id: str, max_items: int = 10000) -> List[Dataset]: 22 """List datasets 23 24 Retrieves a list of datasets for a given project 25 26 Args: 27 project_id (str): ID of the Project 28 max_items (int): Maximum number of records to get (default 10,000) 29 """ 30 return get_all_records( 31 records_getter=lambda page_args: get_datasets.sync( 32 project_id=project_id, 33 client=self._api_client, 34 next_token=page_args.next_token, 35 limit=page_args.limit 36 ), 37 max_items=max_items 38 )
List datasets
Retrieves a list of datasets for a given project
Arguments:
- project_id (str): ID of the Project
- max_items (int): Maximum number of records to get (default 10,000)
72 def import_public(self, project_id: str, import_request: ImportDataRequest) -> CreateResponse: 73 """ 74 Download data from public repositories 75 76 Args: 77 project_id (str): ID of the Project 78 import_request (cirro_api_client.v1.models.ImportDataRequest): 79 80 Returns: 81 ID of the created dataset 82 83 ```python 84 from cirro_api_client.v1.models import ImportDataRequest, Tag 85 from cirro.cirro_client import CirroApi 86 87 cirro = CirroApi() 88 request = ImportDataRequest( 89 name="Imported dataset", 90 description="Description of the dataset", 91 public_ids=["SRR123456", "SRR123457"], 92 tags=[Tag(value="tag1")] 93 ) 94 cirro.datasets.import_public("project-id", request) 95 ``` 96 """ 97 return import_public_dataset.sync(project_id=project_id, client=self._api_client, body=import_request)
Download data from public repositories
Arguments:
- project_id (str): ID of the Project
- import_request (cirro_api_client.v1.models.ImportDataRequest):
Returns:
ID of the created dataset
from cirro_api_client.v1.models import ImportDataRequest, Tag
from cirro.cirro_client import CirroApi
cirro = CirroApi()
request = ImportDataRequest(
name="Imported dataset",
description="Description of the dataset",
public_ids=["SRR123456", "SRR123457"],
tags=[Tag(value="tag1")]
)
cirro.datasets.import_public("project-id", request)
99 def create(self, project_id: str, upload_request: UploadDatasetRequest) -> UploadDatasetCreateResponse: 100 """ 101 Registers a dataset in Cirro, which can subsequently have files uploaded to it 102 103 Args: 104 project_id (str): ID of the Project 105 upload_request (cirro_api_client.v1.models.UploadDatasetRequest): 106 107 Returns: 108 ID of the created dataset and the path to upload files 109 110 ```python 111 from cirro_api_client.v1.models import UploadDatasetRequest, Tag 112 from cirro.cirro_client import CirroApi 113 114 cirro = CirroApi() 115 request = UploadDatasetRequest( 116 name="Name of new dataset", 117 process_id="paired_dnaseq", 118 expected_files=["read_1.fastq.gz", "read_2.fastq.gz"], 119 description="Description of the dataset", 120 tags=[Tag(value="tag1"), Tag(value="tag2")] 121 ) 122 cirro.datasets.create("project-id", request) 123 ``` 124 """ 125 return upload_dataset.sync(project_id=project_id, client=self._api_client, body=upload_request)
Registers a dataset in Cirro, which can subsequently have files uploaded to it
Arguments:
- project_id (str): ID of the Project
- upload_request (cirro_api_client.v1.models.UploadDatasetRequest):
Returns:
ID of the created dataset and the path to upload files
from cirro_api_client.v1.models import UploadDatasetRequest, Tag
from cirro.cirro_client import CirroApi
cirro = CirroApi()
request = UploadDatasetRequest(
name="Name of new dataset",
process_id="paired_dnaseq",
expected_files=["read_1.fastq.gz", "read_2.fastq.gz"],
description="Description of the dataset",
tags=[Tag(value="tag1"), Tag(value="tag2")]
)
cirro.datasets.create("project-id", request)
127 def get(self, project_id: str, dataset_id: str) -> Optional[DatasetDetail]: 128 """ 129 Gets detailed information about a dataset 130 131 Args: 132 project_id (str): ID of the Project 133 dataset_id (str): ID of the Dataset 134 135 Returns: 136 The dataset, if found 137 """ 138 return get_dataset.sync(project_id=project_id, dataset_id=dataset_id, client=self._api_client)
Gets detailed information about a dataset
Arguments:
- project_id (str): ID of the Project
- dataset_id (str): ID of the Dataset
Returns:
The dataset, if found
140 def update(self, project_id: str, dataset_id: str, request: UpdateDatasetRequest) -> DatasetDetail: 141 """ 142 Update info on a dataset (name, description, and/or tags) 143 144 Args: 145 project_id (str): ID of the Project 146 dataset_id (str): ID of the Dataset 147 request (cirro_api_client.v1.models.UpdateDatasetRequest): 148 149 Returns: 150 The updated dataset 151 152 ```python 153 from cirro_api_client.v1.models import UpdateDatasetRequest 154 from cirro.cirro_client import CirroApi 155 156 cirro = CirroApi() 157 request = UpdateDatasetRequest( 158 name="Name of new dataset", 159 process_id="paired_dnaseq", 160 description="Description of the dataset" 161 ) 162 cirro.datasets.update("project-id", "dataset-id", request) 163 ``` 164 """ 165 return update_dataset.sync(project_id=project_id, dataset_id=dataset_id, body=request, client=self._api_client)
Update info on a dataset (name, description, and/or tags)
Arguments:
- project_id (str): ID of the Project
- dataset_id (str): ID of the Dataset
- request (cirro_api_client.v1.models.UpdateDatasetRequest):
Returns:
The updated dataset
from cirro_api_client.v1.models import UpdateDatasetRequest
from cirro.cirro_client import CirroApi
cirro = CirroApi()
request = UpdateDatasetRequest(
name="Name of new dataset",
process_id="paired_dnaseq",
description="Description of the dataset"
)
cirro.datasets.update("project-id", "dataset-id", request)
167 def delete(self, project_id: str, dataset_id: str) -> None: 168 """ 169 Delete a dataset 170 171 After a dataset has been deleted, the files associated with that 172 dataset are saved according to the project's retention time. 173 174 Args: 175 project_id (str): ID of the Project 176 dataset_id (str): ID of the Dataset 177 """ 178 delete_dataset.sync_detailed(project_id=project_id, dataset_id=dataset_id, client=self._api_client)
Delete a dataset
After a dataset has been deleted, the files associated with that dataset are saved according to the project's retention time.
Arguments:
- project_id (str): ID of the Project
- dataset_id (str): ID of the Dataset
180 def get_assets_listing(self, project_id: str, dataset_id: str, file_limit: int = 100000) -> DatasetAssets: 181 """ 182 Gets a listing of files, charts, and other assets available for the dataset 183 184 Args: 185 project_id (str): ID of the Project 186 dataset_id (str): ID of the Dataset 187 file_limit (int): Maximum number of files to get (default 100,000) 188 """ 189 dataset = self.get(project_id, dataset_id) 190 if file_limit < 1: 191 raise ValueError("file_limit must be greater than 0") 192 all_files = [] 193 file_offset = 0 194 domain = None 195 artifacts = None 196 197 while len(all_files) < file_limit: 198 manifest = get_dataset_manifest.sync( 199 project_id=project_id, 200 dataset_id=dataset_id, 201 file_offset=file_offset, 202 client=self._api_client 203 ) 204 all_files.extend(manifest.files) 205 file_offset += len(manifest.files) 206 207 if not artifacts: 208 artifacts = manifest.artifacts 209 210 domain = manifest.domain 211 if len(all_files) >= manifest.total_files or len(manifest.files) == 0: 212 break 213 214 files = [ 215 File.from_file_entry( 216 f, 217 project_id=project_id, 218 dataset=dataset, 219 domain=domain 220 ) 221 for f in all_files 222 ] 223 artifacts = [ 224 Artifact( 225 artifact_type=a.type, 226 file=File.from_file_entry( 227 FileEntry(a.path), 228 project_id=project_id, 229 dataset=dataset, 230 domain=domain 231 ) 232 ) 233 for a in artifacts 234 ] 235 return DatasetAssets(files=files, artifacts=artifacts)
Gets a listing of files, charts, and other assets available for the dataset
Arguments:
- project_id (str): ID of the Project
- dataset_id (str): ID of the Dataset
- file_limit (int): Maximum number of files to get (default 100,000)
237 def upload_files(self, 238 project_id: str, 239 dataset_id: str, 240 directory: PathLike, 241 files: List[PathLike] = None, 242 file_path_map: Dict[PathLike, str] = None) -> None: 243 """ 244 Uploads files to a given dataset from the specified directory. 245 246 All files must be relative to the specified directory. 247 If files need to be flattened, or you are sourcing files from multiple directories, 248 please include `file_path_map` or call this method multiple times. 249 250 Args: 251 project_id (str): ID of the Project 252 dataset_id (str): ID of the Dataset 253 directory (str|Path): Path to directory 254 files (typing.List[str|Path]): List of paths to files within the directory, 255 must be the same type as directory. 256 file_path_map (typing.Dict[str|Path, str|Path]): Optional mapping of file paths to upload 257 from source path to destination path, used to "re-write" paths within the dataset. 258 ```python 259 from cirro.cirro_client import CirroApi 260 from cirro.file_utils import generate_flattened_file_map 261 262 cirro = CirroApi() 263 264 directory = "~/Downloads" 265 # Re-write file paths 266 file_map = { 267 "data1/file1.fastq.gz": "file1.fastq.gz", 268 "data2/file2.fastq.gz": "file2.fastq.gz", 269 "file3.fastq.gz": "new_file3.txt" 270 } 271 272 # Or you could automate the flattening 273 files = ["data1/file1.fastq.gz"] 274 file_map = generate_flattened_file_map(files) 275 276 cirro.datasets.upload_files( 277 project_id="project-id", 278 dataset_id="dataset-id", 279 directory=directory, 280 files=list(file_map.keys()), 281 file_path_map=file_map 282 ) 283 ``` 284 """ 285 if file_path_map is None: 286 file_path_map = {} 287 288 dataset = self.get(project_id, dataset_id) 289 290 access_context = FileAccessContext.upload_dataset( 291 project_id=project_id, 292 dataset_id=dataset_id, 293 base_url=dataset.s3 294 ) 295 296 self._file_service.upload_files( 297 access_context=access_context, 298 directory=directory, 299 files=files, 300 file_path_map=file_path_map 301 )
Uploads files to a given dataset from the specified directory.
All files must be relative to the specified directory.
If files need to be flattened, or you are sourcing files from multiple directories,
please include file_path_map
or call this method multiple times.
Arguments:
- project_id (str): ID of the Project
- dataset_id (str): ID of the Dataset
- directory (str|Path): Path to directory
- files (typing.List[str|Path]): List of paths to files within the directory, must be the same type as directory.
- file_path_map (typing.Dict[str|Path, str|Path]): Optional mapping of file paths to upload from source path to destination path, used to "re-write" paths within the dataset.
from cirro.cirro_client import CirroApi
from cirro.file_utils import generate_flattened_file_map
cirro = CirroApi()
directory = "~/Downloads"
# Re-write file paths
file_map = {
"data1/file1.fastq.gz": "file1.fastq.gz",
"data2/file2.fastq.gz": "file2.fastq.gz",
"file3.fastq.gz": "new_file3.txt"
}
# Or you could automate the flattening
files = ["data1/file1.fastq.gz"]
file_map = generate_flattened_file_map(files)
cirro.datasets.upload_files(
project_id="project-id",
dataset_id="dataset-id",
directory=directory,
files=list(file_map.keys()),
file_path_map=file_map
)
303 def download_files( 304 self, 305 project_id: str, 306 dataset_id: str, 307 download_location: str, 308 files: Union[List[File], List[str]] = None 309 ) -> None: 310 """ 311 Downloads files from a dataset 312 313 The `files` argument is used to optionally specify a subset of files 314 to be downloaded. By default, all files are downloaded. 315 316 Args: 317 project_id (str): ID of the Project 318 dataset_id (str): ID of the Dataset 319 download_location (str): Local destination for downloaded files 320 files (typing.List[str]): Optional list of files to download 321 """ 322 if files is None: 323 files = self.get_assets_listing(project_id, dataset_id).files 324 325 if len(files) == 0: 326 return 327 328 first_file = files[0] 329 if isinstance(first_file, File): 330 files = [file.relative_path for file in files] 331 access_context = first_file.access_context 332 else: 333 dataset = self.get(project_id, dataset_id) 334 if dataset.share: 335 access_context = FileAccessContext.download_shared_dataset(project_id=project_id, 336 dataset_id=dataset_id, 337 base_url=dataset.s3) 338 else: 339 access_context = FileAccessContext.download(project_id=project_id, 340 base_url=dataset.s3) 341 342 self._file_service.download_files(access_context, download_location, files)
Downloads files from a dataset
The files
argument is used to optionally specify a subset of files
to be downloaded. By default, all files are downloaded.
Arguments:
- project_id (str): ID of the Project
- dataset_id (str): ID of the Dataset
- download_location (str): Local destination for downloaded files
- files (typing.List[str]): Optional list of files to download
13class ExecutionService(BaseService): 14 """ 15 Service for interacting with the Execution endpoints 16 """ 17 def run_analysis(self, project_id: str, request: RunAnalysisRequest) -> CreateResponse: 18 """ 19 Launch an analysis job running a process on a set of inputs 20 21 Args: 22 project_id (str): ID of the Project 23 request (cirro_api_client.v1.models.RunAnalysisRequest): 24 25 Returns: 26 The ID of the created dataset 27 28 ```python 29 from cirro_api_client.v1.models import RunAnalysisRequest, RunAnalysisRequestParams 30 from cirro.cirro_client import CirroApi 31 32 # Example: 33 # Run the "process-nf-core-rnaseq-3_8" process using input data 34 # from a dataset with the id "source-dataset-id" 35 36 # Optional analysis parameters 37 params = RunAnalysisRequestParams.from_dict({ 38 "param_a": "val_a", 39 "param_b": "val_b" 40 }) 41 42 cirro = CirroApi() 43 request = RunAnalysisRequest( 44 name="Name of the newly created dataset", 45 description="Longer description of newly created dataset", 46 process_id="process-nf-core-rnaseq-3_8", 47 source_dataset_ids=["source-dataset-id"], 48 params=params 49 ) 50 cirro.execution.run_analysis("project-id", request) 51 ``` 52 """ 53 54 form_spec = get_process_parameters.sync( 55 process_id=request.process_id, 56 client=self._api_client 57 ) 58 59 ParameterSpecification( 60 form_spec 61 ).validate_params( 62 request.params.to_dict() if request.params else {} 63 ) 64 65 return run_analysis.sync( 66 project_id=project_id, 67 body=request, 68 client=self._api_client 69 ) 70 71 def stop_analysis(self, project_id: str, dataset_id: str): 72 """ 73 Terminates all jobs related to a running analysis 74 75 Args: 76 project_id (str): ID of the Project 77 dataset_id (str): ID of the Dataset 78 """ 79 80 return stop_analysis.sync( 81 project_id=project_id, 82 dataset_id=dataset_id, 83 client=self._api_client 84 ) 85 86 def get_project_summary(self, project_id: str) -> Dict[str, List[Task]]: 87 """ 88 Gets an overview of the executions currently running in the project, by job queue 89 90 Args: 91 project_id (str): ID of the Project 92 93 Returns: 94 `cirro_api_client.v1.models.GetProjectSummaryResponse200` 95 """ 96 97 return get_project_summary.sync( 98 project_id=project_id, 99 client=self._api_client 100 ).additional_properties 101 102 def get_execution_logs(self, project_id: str, dataset_id: str, force_live=False) -> str: 103 """ 104 Gets live logs from main execution task 105 106 Args: 107 project_id (str): ID of the Project 108 dataset_id (str): ID of the Dataset 109 force_live (bool): If True, it will fetch logs from CloudWatch, 110 even if the execution is already completed 111 112 """ 113 114 resp = get_execution_logs.sync( 115 project_id=project_id, 116 dataset_id=dataset_id, 117 force_live=force_live, 118 client=self._api_client 119 ) 120 121 return '\n'.join(e.message for e in resp.events) 122 123 def get_tasks_for_execution(self, project_id: str, dataset_id: str, force_live=False) -> Optional[List[Task]]: 124 """ 125 Gets the tasks submitted by the workflow execution 126 127 Args: 128 project_id (str): ID of the Project 129 dataset_id (str): ID of the Dataset 130 force_live (bool): If True, it will try to get the list of jobs 131 from the executor (i.e., AWS Batch), rather than the workflow report 132 """ 133 134 return get_tasks_for_execution.sync( 135 project_id=project_id, 136 dataset_id=dataset_id, 137 force_live=force_live, 138 client=self._api_client 139 ) 140 141 def get_task_logs(self, project_id: str, dataset_id: str, task_id: str, force_live=False) -> str: 142 """ 143 Gets the log output from an individual task 144 145 Args: 146 project_id (str): ID of the Project 147 dataset_id (str): ID of the Dataset 148 task_id (str): ID of the task 149 force_live (bool): If True, it will fetch logs from CloudWatch, 150 even if the execution is already completed 151 152 """ 153 154 resp = get_task_logs.sync( 155 project_id=project_id, 156 dataset_id=dataset_id, 157 task_id=task_id, 158 force_live=force_live, 159 client=self._api_client 160 ) 161 162 return '\n'.join(e.message for e in resp.events)
Service for interacting with the Execution endpoints
17 def run_analysis(self, project_id: str, request: RunAnalysisRequest) -> CreateResponse: 18 """ 19 Launch an analysis job running a process on a set of inputs 20 21 Args: 22 project_id (str): ID of the Project 23 request (cirro_api_client.v1.models.RunAnalysisRequest): 24 25 Returns: 26 The ID of the created dataset 27 28 ```python 29 from cirro_api_client.v1.models import RunAnalysisRequest, RunAnalysisRequestParams 30 from cirro.cirro_client import CirroApi 31 32 # Example: 33 # Run the "process-nf-core-rnaseq-3_8" process using input data 34 # from a dataset with the id "source-dataset-id" 35 36 # Optional analysis parameters 37 params = RunAnalysisRequestParams.from_dict({ 38 "param_a": "val_a", 39 "param_b": "val_b" 40 }) 41 42 cirro = CirroApi() 43 request = RunAnalysisRequest( 44 name="Name of the newly created dataset", 45 description="Longer description of newly created dataset", 46 process_id="process-nf-core-rnaseq-3_8", 47 source_dataset_ids=["source-dataset-id"], 48 params=params 49 ) 50 cirro.execution.run_analysis("project-id", request) 51 ``` 52 """ 53 54 form_spec = get_process_parameters.sync( 55 process_id=request.process_id, 56 client=self._api_client 57 ) 58 59 ParameterSpecification( 60 form_spec 61 ).validate_params( 62 request.params.to_dict() if request.params else {} 63 ) 64 65 return run_analysis.sync( 66 project_id=project_id, 67 body=request, 68 client=self._api_client 69 )
Launch an analysis job running a process on a set of inputs
Arguments:
- project_id (str): ID of the Project
- request (cirro_api_client.v1.models.RunAnalysisRequest):
Returns:
The ID of the created dataset
from cirro_api_client.v1.models import RunAnalysisRequest, RunAnalysisRequestParams
from cirro.cirro_client import CirroApi
# Example:
# Run the "process-nf-core-rnaseq-3_8" process using input data
# from a dataset with the id "source-dataset-id"
# Optional analysis parameters
params = RunAnalysisRequestParams.from_dict({
"param_a": "val_a",
"param_b": "val_b"
})
cirro = CirroApi()
request = RunAnalysisRequest(
name="Name of the newly created dataset",
description="Longer description of newly created dataset",
process_id="process-nf-core-rnaseq-3_8",
source_dataset_ids=["source-dataset-id"],
params=params
)
cirro.execution.run_analysis("project-id", request)
71 def stop_analysis(self, project_id: str, dataset_id: str): 72 """ 73 Terminates all jobs related to a running analysis 74 75 Args: 76 project_id (str): ID of the Project 77 dataset_id (str): ID of the Dataset 78 """ 79 80 return stop_analysis.sync( 81 project_id=project_id, 82 dataset_id=dataset_id, 83 client=self._api_client 84 )
Terminates all jobs related to a running analysis
Arguments:
- project_id (str): ID of the Project
- dataset_id (str): ID of the Dataset
86 def get_project_summary(self, project_id: str) -> Dict[str, List[Task]]: 87 """ 88 Gets an overview of the executions currently running in the project, by job queue 89 90 Args: 91 project_id (str): ID of the Project 92 93 Returns: 94 `cirro_api_client.v1.models.GetProjectSummaryResponse200` 95 """ 96 97 return get_project_summary.sync( 98 project_id=project_id, 99 client=self._api_client 100 ).additional_properties
Gets an overview of the executions currently running in the project, by job queue
Arguments:
- project_id (str): ID of the Project
Returns:
102 def get_execution_logs(self, project_id: str, dataset_id: str, force_live=False) -> str: 103 """ 104 Gets live logs from main execution task 105 106 Args: 107 project_id (str): ID of the Project 108 dataset_id (str): ID of the Dataset 109 force_live (bool): If True, it will fetch logs from CloudWatch, 110 even if the execution is already completed 111 112 """ 113 114 resp = get_execution_logs.sync( 115 project_id=project_id, 116 dataset_id=dataset_id, 117 force_live=force_live, 118 client=self._api_client 119 ) 120 121 return '\n'.join(e.message for e in resp.events)
Gets live logs from main execution task
Arguments:
- project_id (str): ID of the Project
- dataset_id (str): ID of the Dataset
- force_live (bool): If True, it will fetch logs from CloudWatch, even if the execution is already completed
123 def get_tasks_for_execution(self, project_id: str, dataset_id: str, force_live=False) -> Optional[List[Task]]: 124 """ 125 Gets the tasks submitted by the workflow execution 126 127 Args: 128 project_id (str): ID of the Project 129 dataset_id (str): ID of the Dataset 130 force_live (bool): If True, it will try to get the list of jobs 131 from the executor (i.e., AWS Batch), rather than the workflow report 132 """ 133 134 return get_tasks_for_execution.sync( 135 project_id=project_id, 136 dataset_id=dataset_id, 137 force_live=force_live, 138 client=self._api_client 139 )
Gets the tasks submitted by the workflow execution
Arguments:
- project_id (str): ID of the Project
- dataset_id (str): ID of the Dataset
- force_live (bool): If True, it will try to get the list of jobs from the executor (i.e., AWS Batch), rather than the workflow report
141 def get_task_logs(self, project_id: str, dataset_id: str, task_id: str, force_live=False) -> str: 142 """ 143 Gets the log output from an individual task 144 145 Args: 146 project_id (str): ID of the Project 147 dataset_id (str): ID of the Dataset 148 task_id (str): ID of the task 149 force_live (bool): If True, it will fetch logs from CloudWatch, 150 even if the execution is already completed 151 152 """ 153 154 resp = get_task_logs.sync( 155 project_id=project_id, 156 dataset_id=dataset_id, 157 task_id=task_id, 158 force_live=force_live, 159 client=self._api_client 160 ) 161 162 return '\n'.join(e.message for e in resp.events)
Gets the log output from an individual task
Arguments:
- project_id (str): ID of the Project
- dataset_id (str): ID of the Dataset
- task_id (str): ID of the task
- force_live (bool): If True, it will fetch logs from CloudWatch, even if the execution is already completed
10class ComputeEnvironmentService(BaseService): 11 """ 12 Service for interacting with the Compute Environment endpoints 13 """ 14 15 def list_environments_for_project(self, project_id: str) -> List[ComputeEnvironmentConfiguration]: 16 """ 17 List of custom compute environments for a project (i.e., an agent) 18 19 Args: 20 project_id (str): Project ID 21 22 Returns: 23 List of compute environments that are available for the project 24 """ 25 return get_compute_environments.sync(project_id=project_id, 26 client=self._api_client)
Service for interacting with the Compute Environment endpoints
15 def list_environments_for_project(self, project_id: str) -> List[ComputeEnvironmentConfiguration]: 16 """ 17 List of custom compute environments for a project (i.e., an agent) 18 19 Args: 20 project_id (str): Project ID 21 22 Returns: 23 List of compute environments that are available for the project 24 """ 25 return get_compute_environments.sync(project_id=project_id, 26 client=self._api_client)
List of custom compute environments for a project (i.e., an agent)
Arguments:
- project_id (str): Project ID
Returns:
List of compute environments that are available for the project
18class FileService(BaseService): 19 """ 20 Service for interacting with files 21 """ 22 enable_additional_checksum: bool 23 transfer_retries: int 24 _get_token_lock = threading.Lock() 25 _read_token_cache: Dict[str, AWSCredentials] = {} 26 27 def __init__(self, api_client, enable_additional_checksum, transfer_retries): 28 """ 29 Instantiates the file service class 30 """ 31 self._api_client = api_client 32 self.enable_additional_checksum = enable_additional_checksum 33 self.transfer_retries = transfer_retries 34 35 def get_access_credentials(self, access_context: FileAccessContext) -> AWSCredentials: 36 """ 37 Retrieves credentials to access files 38 39 Args: 40 access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate 41 """ 42 access_request = access_context.file_access_request 43 44 if access_request.access_type == ProjectAccessType.PROJECT_DOWNLOAD: 45 return self._get_project_read_credentials(access_context) 46 47 else: 48 return generate_project_file_access_token.sync( 49 client=self._api_client, 50 project_id=access_context.project_id, 51 body=access_context.file_access_request 52 ) 53 54 def _get_project_read_credentials(self, access_context: FileAccessContext): 55 """ 56 Retrieves credentials to read project data, this can be cached 57 """ 58 access_request = access_context.file_access_request 59 project_id = access_context.project_id 60 with self._get_token_lock: 61 cached_token = self._read_token_cache.get(project_id) 62 63 if not cached_token or datetime.now(tz=timezone.utc) > cached_token.expiration: 64 new_token = generate_project_file_access_token.sync( 65 client=self._api_client, 66 project_id=project_id, 67 body=access_request 68 ) 69 70 self._read_token_cache[project_id] = new_token 71 72 return self._read_token_cache[project_id] 73 74 def get_aws_s3_client(self, access_context: FileAccessContext) -> BaseClient: 75 """ 76 Gets the underlying AWS S3 client to perform operations on files 77 78 This is seeded with refreshable credentials from the access_context parameter 79 80 This may be used to perform advanced operations, such as CopyObject, S3 Select, etc. 81 """ 82 s3_client = self._generate_s3_client(access_context) 83 return s3_client.get_aws_client() 84 85 def get_file(self, file: File) -> bytes: 86 """ 87 Gets the contents of a file 88 89 Args: 90 file (cirro.models.file.File): 91 92 Returns: 93 The raw bytes of the file 94 """ 95 return self.get_file_from_path(file.access_context, file.relative_path) 96 97 def get_file_from_path(self, access_context: FileAccessContext, file_path: str) -> bytes: 98 """ 99 Gets the contents of a file by providing the path, used internally 100 101 Args: 102 access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate 103 file_path (str): Relative path to file within dataset 104 105 Returns: 106 The raw bytes of the file 107 """ 108 s3_client = self._generate_s3_client(access_context) 109 110 full_path = f'{access_context.prefix}/{file_path}'.lstrip('/') 111 112 return s3_client.get_file(access_context.bucket, full_path) 113 114 def create_file(self, access_context: FileAccessContext, key: str, 115 contents: str, content_type: str) -> None: 116 """ 117 Creates a file at the specified path 118 119 Args: 120 access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate 121 key (str): Key of object to create 122 contents (str): Content of object 123 content_type (str): 124 """ 125 s3_client = self._generate_s3_client(access_context) 126 127 s3_client.create_object( 128 key=key, 129 contents=contents, 130 content_type=content_type, 131 bucket=access_context.bucket 132 ) 133 134 def upload_files(self, 135 access_context: FileAccessContext, 136 directory: PathLike, 137 files: List[PathLike], 138 file_path_map: Dict[PathLike, str]) -> None: 139 """ 140 Uploads a list of files from the specified directory 141 142 Args: 143 access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate 144 directory (str|Path): Path to directory 145 files (typing.List[str|Path]): List of paths to files within the directory 146 must be the same type as directory. 147 file_path_map (typing.Dict[str|Path, str]): Optional mapping of file paths to upload 148 from source path to destination path, used to "re-write" paths within the dataset. 149 """ 150 s3_client = self._generate_s3_client(access_context) 151 152 upload_directory( 153 directory=directory, 154 files=files, 155 file_path_map=file_path_map, 156 s3_client=s3_client, 157 bucket=access_context.bucket, 158 prefix=access_context.prefix, 159 max_retries=self.transfer_retries 160 ) 161 162 def download_files(self, access_context: FileAccessContext, directory: str, files: List[str]) -> None: 163 """ 164 Download a list of files to the specified directory 165 166 Args: 167 access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate 168 directory (str): download location 169 files (List[str]): relative path of files to download 170 """ 171 s3_client = self._generate_s3_client(access_context) 172 173 download_directory( 174 directory, 175 files, 176 s3_client, 177 access_context.bucket, 178 access_context.prefix 179 ) 180 181 def _generate_s3_client(self, access_context: FileAccessContext): 182 """ 183 Generates the Cirro-S3 client to perform operations on files 184 """ 185 return S3Client( 186 partial(self.get_access_credentials, access_context), 187 self.enable_additional_checksum 188 )
Service for interacting with files
27 def __init__(self, api_client, enable_additional_checksum, transfer_retries): 28 """ 29 Instantiates the file service class 30 """ 31 self._api_client = api_client 32 self.enable_additional_checksum = enable_additional_checksum 33 self.transfer_retries = transfer_retries
Instantiates the file service class
35 def get_access_credentials(self, access_context: FileAccessContext) -> AWSCredentials: 36 """ 37 Retrieves credentials to access files 38 39 Args: 40 access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate 41 """ 42 access_request = access_context.file_access_request 43 44 if access_request.access_type == ProjectAccessType.PROJECT_DOWNLOAD: 45 return self._get_project_read_credentials(access_context) 46 47 else: 48 return generate_project_file_access_token.sync( 49 client=self._api_client, 50 project_id=access_context.project_id, 51 body=access_context.file_access_request 52 )
Retrieves credentials to access files
Arguments:
- access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
74 def get_aws_s3_client(self, access_context: FileAccessContext) -> BaseClient: 75 """ 76 Gets the underlying AWS S3 client to perform operations on files 77 78 This is seeded with refreshable credentials from the access_context parameter 79 80 This may be used to perform advanced operations, such as CopyObject, S3 Select, etc. 81 """ 82 s3_client = self._generate_s3_client(access_context) 83 return s3_client.get_aws_client()
Gets the underlying AWS S3 client to perform operations on files
This is seeded with refreshable credentials from the access_context parameter
This may be used to perform advanced operations, such as CopyObject, S3 Select, etc.
85 def get_file(self, file: File) -> bytes: 86 """ 87 Gets the contents of a file 88 89 Args: 90 file (cirro.models.file.File): 91 92 Returns: 93 The raw bytes of the file 94 """ 95 return self.get_file_from_path(file.access_context, file.relative_path)
Gets the contents of a file
Arguments:
- file (cirro.models.file.File):
Returns:
The raw bytes of the file
97 def get_file_from_path(self, access_context: FileAccessContext, file_path: str) -> bytes: 98 """ 99 Gets the contents of a file by providing the path, used internally 100 101 Args: 102 access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate 103 file_path (str): Relative path to file within dataset 104 105 Returns: 106 The raw bytes of the file 107 """ 108 s3_client = self._generate_s3_client(access_context) 109 110 full_path = f'{access_context.prefix}/{file_path}'.lstrip('/') 111 112 return s3_client.get_file(access_context.bucket, full_path)
Gets the contents of a file by providing the path, used internally
Arguments:
- access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
- file_path (str): Relative path to file within dataset
Returns:
The raw bytes of the file
114 def create_file(self, access_context: FileAccessContext, key: str, 115 contents: str, content_type: str) -> None: 116 """ 117 Creates a file at the specified path 118 119 Args: 120 access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate 121 key (str): Key of object to create 122 contents (str): Content of object 123 content_type (str): 124 """ 125 s3_client = self._generate_s3_client(access_context) 126 127 s3_client.create_object( 128 key=key, 129 contents=contents, 130 content_type=content_type, 131 bucket=access_context.bucket 132 )
Creates a file at the specified path
Arguments:
- access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
- key (str): Key of object to create
- contents (str): Content of object
- content_type (str):
134 def upload_files(self, 135 access_context: FileAccessContext, 136 directory: PathLike, 137 files: List[PathLike], 138 file_path_map: Dict[PathLike, str]) -> None: 139 """ 140 Uploads a list of files from the specified directory 141 142 Args: 143 access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate 144 directory (str|Path): Path to directory 145 files (typing.List[str|Path]): List of paths to files within the directory 146 must be the same type as directory. 147 file_path_map (typing.Dict[str|Path, str]): Optional mapping of file paths to upload 148 from source path to destination path, used to "re-write" paths within the dataset. 149 """ 150 s3_client = self._generate_s3_client(access_context) 151 152 upload_directory( 153 directory=directory, 154 files=files, 155 file_path_map=file_path_map, 156 s3_client=s3_client, 157 bucket=access_context.bucket, 158 prefix=access_context.prefix, 159 max_retries=self.transfer_retries 160 )
Uploads a list of files from the specified directory
Arguments:
- access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
- directory (str|Path): Path to directory
- files (typing.List[str|Path]): List of paths to files within the directory must be the same type as directory.
- file_path_map (typing.Dict[str|Path, str]): Optional mapping of file paths to upload from source path to destination path, used to "re-write" paths within the dataset.
162 def download_files(self, access_context: FileAccessContext, directory: str, files: List[str]) -> None: 163 """ 164 Download a list of files to the specified directory 165 166 Args: 167 access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate 168 directory (str): download location 169 files (List[str]): relative path of files to download 170 """ 171 s3_client = self._generate_s3_client(access_context) 172 173 download_directory( 174 directory, 175 files, 176 s3_client, 177 access_context.bucket, 178 access_context.prefix 179 )
Download a list of files to the specified directory
Arguments:
- access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
- directory (str): download location
- files (List[str]): relative path of files to download
11class MetadataService(BaseService): 12 """ 13 Service for interacting with the Metadata endpoints 14 """ 15 def get_project_samples(self, project_id: str, max_items: int = 10000) -> List[Sample]: 16 """ 17 Retrieves a list of samples associated with a project along with their metadata 18 19 Args: 20 project_id (str): ID of the Project 21 max_items (int): Maximum number of records to get (default 10,000) 22 """ 23 return get_all_records( 24 records_getter=lambda page_args: get_project_samples.sync(project_id=project_id, 25 client=self._api_client, 26 next_token=page_args.next_token, 27 limit=page_args.limit), 28 max_items=max_items 29 ) 30 31 def get_project_schema(self, project_id: str) -> FormSchema: 32 """ 33 Get project metadata schema 34 35 Args: 36 project_id (str): ID of the Project 37 """ 38 return get_project_schema.sync(project_id=project_id, client=self._api_client) 39 40 def update_project_schema(self, project_id: str, schema: FormSchema): 41 """ 42 Update project metadata schema 43 44 Args: 45 project_id (str): ID of the Project 46 schema (cirro_api_client.v1.models.FormSchema): Metadata schema 47 """ 48 update_project_schema.sync_detailed(project_id=project_id, body=schema, client=self._api_client) 49 50 def update_sample(self, project_id: str, sample_id: str, sample: SampleRequest) -> Sample: 51 """ 52 Updates metadata information for sample 53 54 Args: 55 project_id (str): ID of the Project 56 sample_id (str): ID of the sample 57 sample (cirro_api_client.v1.models.SampleRequest): Metadata information for the sample 58 """ 59 return update_sample.sync( 60 project_id=project_id, 61 sample_id=sample_id, 62 body=sample, 63 client=self._api_client 64 )
Service for interacting with the Metadata endpoints
15 def get_project_samples(self, project_id: str, max_items: int = 10000) -> List[Sample]: 16 """ 17 Retrieves a list of samples associated with a project along with their metadata 18 19 Args: 20 project_id (str): ID of the Project 21 max_items (int): Maximum number of records to get (default 10,000) 22 """ 23 return get_all_records( 24 records_getter=lambda page_args: get_project_samples.sync(project_id=project_id, 25 client=self._api_client, 26 next_token=page_args.next_token, 27 limit=page_args.limit), 28 max_items=max_items 29 )
Retrieves a list of samples associated with a project along with their metadata
Arguments:
- project_id (str): ID of the Project
- max_items (int): Maximum number of records to get (default 10,000)
31 def get_project_schema(self, project_id: str) -> FormSchema: 32 """ 33 Get project metadata schema 34 35 Args: 36 project_id (str): ID of the Project 37 """ 38 return get_project_schema.sync(project_id=project_id, client=self._api_client)
Get project metadata schema
Arguments:
- project_id (str): ID of the Project
40 def update_project_schema(self, project_id: str, schema: FormSchema): 41 """ 42 Update project metadata schema 43 44 Args: 45 project_id (str): ID of the Project 46 schema (cirro_api_client.v1.models.FormSchema): Metadata schema 47 """ 48 update_project_schema.sync_detailed(project_id=project_id, body=schema, client=self._api_client)
Update project metadata schema
Arguments:
- project_id (str): ID of the Project
- schema (cirro_api_client.v1.models.FormSchema): Metadata schema
50 def update_sample(self, project_id: str, sample_id: str, sample: SampleRequest) -> Sample: 51 """ 52 Updates metadata information for sample 53 54 Args: 55 project_id (str): ID of the Project 56 sample_id (str): ID of the sample 57 sample (cirro_api_client.v1.models.SampleRequest): Metadata information for the sample 58 """ 59 return update_sample.sync( 60 project_id=project_id, 61 sample_id=sample_id, 62 body=sample, 63 client=self._api_client 64 )
Updates metadata information for sample
Arguments:
- project_id (str): ID of the Project
- sample_id (str): ID of the sample
- sample (cirro_api_client.v1.models.SampleRequest): Metadata information for the sample
10class MetricsService(BaseService): 11 """ 12 Service for interacting with the Metrics endpoints 13 """ 14 def get_for_project(self, project_id: str) -> ProjectMetrics: 15 """ 16 Retrieves the cost and storage metrics for a project. 17 18 Args: 19 project_id (str): ID of the Project 20 """ 21 return get_project_metrics.sync( 22 project_id=project_id, 23 client=self._api_client 24 ) 25 26 def get_all_metrics(self) -> List[ProjectMetrics]: 27 """ 28 Retrieves all available metrics 29 """ 30 return get_all_metrics.sync(client=self._api_client)
Service for interacting with the Metrics endpoints
14 def get_for_project(self, project_id: str) -> ProjectMetrics: 15 """ 16 Retrieves the cost and storage metrics for a project. 17 18 Args: 19 project_id (str): ID of the Project 20 """ 21 return get_project_metrics.sync( 22 project_id=project_id, 23 client=self._api_client 24 )
Retrieves the cost and storage metrics for a project.
Arguments:
- project_id (str): ID of the Project
15class ProcessService(BaseService): 16 """ 17 Service for interacting with the Process endpoints 18 """ 19 def list(self, process_type: Executor = None) -> List[Process]: 20 """ 21 Retrieves a list of available processes 22 23 Args: 24 process_type (`cirro_api_client.v1.models.Executor`): Optional process type (INGEST, CROMWELL, or NEXTFLOW) 25 """ 26 processes = get_processes.sync(client=self._api_client) 27 return [p for p in processes if not process_type or process_type == p.executor] 28 29 def get(self, process_id: str) -> ProcessDetail: 30 """ 31 Retrieves detailed information on a process 32 33 Args: 34 process_id (str): Process ID 35 """ 36 return get_process.sync(process_id=process_id, client=self._api_client) 37 38 def archive(self, process_id: str): 39 """ 40 Removes a custom process from the list of available processes. 41 42 Error will be raised if the requested process does not exist. No value 43 is returned, and no error raised if process exists and request is satisfied. 44 45 Args: 46 process_id (str): Process ID 47 """ 48 archive_custom_process.sync_detailed(process_id=process_id, client=self._api_client) 49 50 def find_by_name(self, name: str) -> Optional[ProcessDetail]: 51 """ 52 Get a process by its display name 53 54 Args: 55 name (str): Process name 56 """ 57 matched_process = next((p for p in self.list() if p.name == name), None) 58 if not matched_process: 59 return None 60 61 return self.get(matched_process.id) 62 63 def get_parameter_spec(self, process_id: str) -> ParameterSpecification: 64 """ 65 Gets a specification used to describe the parameters used in the process 66 67 Args: 68 process_id (str): Process ID 69 """ 70 form_spec = get_process_parameters.sync(process_id=process_id, client=self._api_client) 71 return ParameterSpecification(form_spec) 72 73 def check_dataset_files(self, files: List[str], process_id: str, directory: str): 74 """ 75 Checks if the file mapping rules for a process are met by the list of files 76 77 Error will be raised if the file mapping rules for the process are not met. 78 No value is returned and no error is raised if the rules are satisfied. 79 80 Args: 81 process_id (str): ID for the process containing the file mapping rules 82 directory: path to directory containing files 83 files (List[str]): File names to check 84 """ 85 # Parse sample sheet file if present 86 sample_sheet = None 87 sample_sheet_file = Path(directory, 'samplesheet.csv') 88 if sample_sheet_file.exists(): 89 sample_sheet = sample_sheet_file.read_text() 90 91 request = ValidateFileRequirementsRequest( 92 file_names=files, 93 sample_sheet=sample_sheet 94 ) 95 requirements = validate_file_requirements.sync(process_id=process_id, body=request, client=self._api_client) 96 97 # These will be sample sheet errors or no files errors 98 if error_msg := requirements.error_msg: 99 raise ValueError(error_msg) 100 101 errors = [ 102 f'{entry.description}. {entry.error_msg}. We accept any of the following naming conventions: \n\t- ' + 103 '\n\t- '.join([ 104 e.example_name 105 for e in entry.allowed_patterns 106 ]) 107 for entry in requirements.allowed_data_types 108 if entry.error_msg is not None 109 ] 110 111 files_provided = ', '.join(files) 112 113 if len(errors) != 0: 114 raise ValueError(f"The files you have provided are: {files_provided} \n\n" 115 "They do not meet the dataset requirements. " 116 "The required file types are: \n" + 117 "\n".join(errors)) 118 119 def create_custom_process(self, process: CustomProcessInput) -> CreateResponse: 120 """ 121 Creates a custom process (pipeline or data type) in the system. 122 123 Whether it is a pipeline or data type is determined by the executor field. 124 For pipelines, you must specify the pipeline code and configuration repository. 125 126 This process will be available to all users in the system if `is_tenant_wide` is set to True. 127 If `is_tenant_wide` is set to False, the process will only be available the projects 128 specified in `linked_projects_ids`. Making it available tenant wide requires tenant admin privileges. 129 130 If the repository is private, you must complete the authorization flow on the UI. 131 132 See https://docs.cirro.bio/pipelines/importing-custom-pipelines/ for more info. 133 134 Args: 135 process (Process): Process to create 136 137 Example: 138 ```python 139 from cirro_api_client.v1.models import CustomProcessInput, Executor, \ 140 PipelineCode, FileMappingRule, FileNamePattern, RepositoryType, CustomPipelineSettings 141 from cirro.cirro_client import CirroApi 142 143 cirro = CirroApi() 144 145 # New pipeline 146 new_pipeline = CustomProcessInput( 147 id="my_pipeline", 148 name="My Pipeline", 149 description="This is a test pipeline", 150 executor=Executor.CROMWELL, 151 category="DNA Sequencing", 152 child_process_ids=[], 153 parent_process_ids=["rnaseq"], 154 documentation_url="https://example.com/docs", 155 pipeline_code=PipelineCode( 156 repository_path="CirroBio/test-pipeline", 157 version="v1.0.0", 158 entry_point="main.nf", 159 repository_type=RepositoryType.GITHUB_PUBLIC 160 ), 161 linked_project_ids=[], 162 is_tenant_wide=True, 163 allow_multiple_sources=True, 164 uses_sample_sheet=True, 165 # This can be the same or different from the pipeline_code 166 custom_settings=CustomPipelineSettings( 167 repository="CirroBio/test-pipeline", 168 branch="v1.0.0", 169 repository_type=RepositoryType.GITHUB_PUBLIC, 170 ), 171 file_mapping_rules=[ 172 FileMappingRule( 173 description="Filtered Feature Matrix", 174 file_name_patterns=[ 175 FileNamePattern( 176 example_name="filtered_feature_bc_matrix.h5", 177 description="Matrix", 178 sample_matching_pattern="(?P<sampleName>[\\S ]*)\\.jpg" 179 ) 180 ] 181 ) 182 ] 183 ) 184 185 cirro.processes.create_custom_process(new_pipeline) 186 187 # New data type 188 new_data_type = CustomProcessInput( 189 id="images_jpg", 190 name="JPG Images", 191 description="Used for generic JPG images", 192 executor=Executor.INGEST, 193 child_process_ids=[], 194 parent_process_ids=[], 195 documentation_url="https://example.com/docs", 196 linked_project_ids=["project_id_1", "project_id_2"], 197 file_mapping_rules=[ 198 FileMappingRule( 199 description="Images", 200 min_=1, 201 file_name_patterns=[ 202 FileNamePattern( 203 example_name="image.jpg", 204 description="JPG Image", 205 sample_matching_pattern="(?P<sampleName>[\\S ]*)/outs/image\\.jpg" 206 ) 207 ] 208 ) 209 ] 210 ) 211 cirro.processes.create_custom_process(new_data_type) 212 ``` 213 """ 214 return create_custom_process.sync(client=self._api_client, body=process) 215 216 def update_custom_process(self, process_id: str, process: CustomProcessInput): 217 """ 218 Updates the custom process (pipeline or data type) in the system. 219 220 Please run `sync_custom_process` after updating to ensure the pipeline configuration is up to date. 221 222 Args: 223 process_id (str): ID of the process to update 224 process (CustomProcessInput): Process to update 225 """ 226 update_custom_process.sync_detailed(client=self._api_client, 227 process_id=process_id, 228 body=process) 229 230 def sync_custom_process(self, process_id: str) -> CustomPipelineSettings: 231 """ 232 Syncs a custom pipeline in the system. 233 234 This updates the pipeline configurations in Cirro (form configuration, 235 input mapping, preprocess etc.) to match what is in the configured repository. 236 See https://docs.cirro.bio/pipelines/configuring-pipeline/ for more details. 237 238 Args: 239 process_id (str): ID of the process to sync 240 """ 241 return sync_custom_process.sync(client=self._api_client, process_id=process_id)
Service for interacting with the Process endpoints
19 def list(self, process_type: Executor = None) -> List[Process]: 20 """ 21 Retrieves a list of available processes 22 23 Args: 24 process_type (`cirro_api_client.v1.models.Executor`): Optional process type (INGEST, CROMWELL, or NEXTFLOW) 25 """ 26 processes = get_processes.sync(client=self._api_client) 27 return [p for p in processes if not process_type or process_type == p.executor]
Retrieves a list of available processes
Arguments:
- process_type (
cirro_api_client.v1.models.Executor
): Optional process type (INGEST, CROMWELL, or NEXTFLOW)
29 def get(self, process_id: str) -> ProcessDetail: 30 """ 31 Retrieves detailed information on a process 32 33 Args: 34 process_id (str): Process ID 35 """ 36 return get_process.sync(process_id=process_id, client=self._api_client)
Retrieves detailed information on a process
Arguments:
- process_id (str): Process ID
38 def archive(self, process_id: str): 39 """ 40 Removes a custom process from the list of available processes. 41 42 Error will be raised if the requested process does not exist. No value 43 is returned, and no error raised if process exists and request is satisfied. 44 45 Args: 46 process_id (str): Process ID 47 """ 48 archive_custom_process.sync_detailed(process_id=process_id, client=self._api_client)
Removes a custom process from the list of available processes.
Error will be raised if the requested process does not exist. No value is returned, and no error raised if process exists and request is satisfied.
Arguments:
- process_id (str): Process ID
50 def find_by_name(self, name: str) -> Optional[ProcessDetail]: 51 """ 52 Get a process by its display name 53 54 Args: 55 name (str): Process name 56 """ 57 matched_process = next((p for p in self.list() if p.name == name), None) 58 if not matched_process: 59 return None 60 61 return self.get(matched_process.id)
Get a process by its display name
Arguments:
- name (str): Process name
63 def get_parameter_spec(self, process_id: str) -> ParameterSpecification: 64 """ 65 Gets a specification used to describe the parameters used in the process 66 67 Args: 68 process_id (str): Process ID 69 """ 70 form_spec = get_process_parameters.sync(process_id=process_id, client=self._api_client) 71 return ParameterSpecification(form_spec)
Gets a specification used to describe the parameters used in the process
Arguments:
- process_id (str): Process ID
73 def check_dataset_files(self, files: List[str], process_id: str, directory: str): 74 """ 75 Checks if the file mapping rules for a process are met by the list of files 76 77 Error will be raised if the file mapping rules for the process are not met. 78 No value is returned and no error is raised if the rules are satisfied. 79 80 Args: 81 process_id (str): ID for the process containing the file mapping rules 82 directory: path to directory containing files 83 files (List[str]): File names to check 84 """ 85 # Parse sample sheet file if present 86 sample_sheet = None 87 sample_sheet_file = Path(directory, 'samplesheet.csv') 88 if sample_sheet_file.exists(): 89 sample_sheet = sample_sheet_file.read_text() 90 91 request = ValidateFileRequirementsRequest( 92 file_names=files, 93 sample_sheet=sample_sheet 94 ) 95 requirements = validate_file_requirements.sync(process_id=process_id, body=request, client=self._api_client) 96 97 # These will be sample sheet errors or no files errors 98 if error_msg := requirements.error_msg: 99 raise ValueError(error_msg) 100 101 errors = [ 102 f'{entry.description}. {entry.error_msg}. We accept any of the following naming conventions: \n\t- ' + 103 '\n\t- '.join([ 104 e.example_name 105 for e in entry.allowed_patterns 106 ]) 107 for entry in requirements.allowed_data_types 108 if entry.error_msg is not None 109 ] 110 111 files_provided = ', '.join(files) 112 113 if len(errors) != 0: 114 raise ValueError(f"The files you have provided are: {files_provided} \n\n" 115 "They do not meet the dataset requirements. " 116 "The required file types are: \n" + 117 "\n".join(errors))
Checks if the file mapping rules for a process are met by the list of files
Error will be raised if the file mapping rules for the process are not met. No value is returned and no error is raised if the rules are satisfied.
Arguments:
- process_id (str): ID for the process containing the file mapping rules
- directory: path to directory containing files
- files (List[str]): File names to check
119 def create_custom_process(self, process: CustomProcessInput) -> CreateResponse: 120 """ 121 Creates a custom process (pipeline or data type) in the system. 122 123 Whether it is a pipeline or data type is determined by the executor field. 124 For pipelines, you must specify the pipeline code and configuration repository. 125 126 This process will be available to all users in the system if `is_tenant_wide` is set to True. 127 If `is_tenant_wide` is set to False, the process will only be available the projects 128 specified in `linked_projects_ids`. Making it available tenant wide requires tenant admin privileges. 129 130 If the repository is private, you must complete the authorization flow on the UI. 131 132 See https://docs.cirro.bio/pipelines/importing-custom-pipelines/ for more info. 133 134 Args: 135 process (Process): Process to create 136 137 Example: 138 ```python 139 from cirro_api_client.v1.models import CustomProcessInput, Executor, \ 140 PipelineCode, FileMappingRule, FileNamePattern, RepositoryType, CustomPipelineSettings 141 from cirro.cirro_client import CirroApi 142 143 cirro = CirroApi() 144 145 # New pipeline 146 new_pipeline = CustomProcessInput( 147 id="my_pipeline", 148 name="My Pipeline", 149 description="This is a test pipeline", 150 executor=Executor.CROMWELL, 151 category="DNA Sequencing", 152 child_process_ids=[], 153 parent_process_ids=["rnaseq"], 154 documentation_url="https://example.com/docs", 155 pipeline_code=PipelineCode( 156 repository_path="CirroBio/test-pipeline", 157 version="v1.0.0", 158 entry_point="main.nf", 159 repository_type=RepositoryType.GITHUB_PUBLIC 160 ), 161 linked_project_ids=[], 162 is_tenant_wide=True, 163 allow_multiple_sources=True, 164 uses_sample_sheet=True, 165 # This can be the same or different from the pipeline_code 166 custom_settings=CustomPipelineSettings( 167 repository="CirroBio/test-pipeline", 168 branch="v1.0.0", 169 repository_type=RepositoryType.GITHUB_PUBLIC, 170 ), 171 file_mapping_rules=[ 172 FileMappingRule( 173 description="Filtered Feature Matrix", 174 file_name_patterns=[ 175 FileNamePattern( 176 example_name="filtered_feature_bc_matrix.h5", 177 description="Matrix", 178 sample_matching_pattern="(?P<sampleName>[\\S ]*)\\.jpg" 179 ) 180 ] 181 ) 182 ] 183 ) 184 185 cirro.processes.create_custom_process(new_pipeline) 186 187 # New data type 188 new_data_type = CustomProcessInput( 189 id="images_jpg", 190 name="JPG Images", 191 description="Used for generic JPG images", 192 executor=Executor.INGEST, 193 child_process_ids=[], 194 parent_process_ids=[], 195 documentation_url="https://example.com/docs", 196 linked_project_ids=["project_id_1", "project_id_2"], 197 file_mapping_rules=[ 198 FileMappingRule( 199 description="Images", 200 min_=1, 201 file_name_patterns=[ 202 FileNamePattern( 203 example_name="image.jpg", 204 description="JPG Image", 205 sample_matching_pattern="(?P<sampleName>[\\S ]*)/outs/image\\.jpg" 206 ) 207 ] 208 ) 209 ] 210 ) 211 cirro.processes.create_custom_process(new_data_type) 212 ``` 213 """ 214 return create_custom_process.sync(client=self._api_client, body=process)
Creates a custom process (pipeline or data type) in the system.
Whether it is a pipeline or data type is determined by the executor field. For pipelines, you must specify the pipeline code and configuration repository.
This process will be available to all users in the system if is_tenant_wide
is set to True.
If is_tenant_wide
is set to False, the process will only be available the projects
specified in linked_projects_ids
. Making it available tenant wide requires tenant admin privileges.
If the repository is private, you must complete the authorization flow on the UI.
See https://docs.cirro.bio/pipelines/importing-custom-pipelines/ for more info.
Arguments:
- process (Process): Process to create
Example:
from cirro_api_client.v1.models import CustomProcessInput, Executor, PipelineCode, FileMappingRule, FileNamePattern, RepositoryType, CustomPipelineSettings
from cirro.cirro_client import CirroApi
cirro = CirroApi()
# New pipeline
new_pipeline = CustomProcessInput(
id="my_pipeline",
name="My Pipeline",
description="This is a test pipeline",
executor=Executor.CROMWELL,
category="DNA Sequencing",
child_process_ids=[],
parent_process_ids=["rnaseq"],
documentation_url="https://example.com/docs",
pipeline_code=PipelineCode(
repository_path="CirroBio/test-pipeline",
version="v1.0.0",
entry_point="main.nf",
repository_type=RepositoryType.GITHUB_PUBLIC
),
linked_project_ids=[],
is_tenant_wide=True,
allow_multiple_sources=True,
uses_sample_sheet=True,
# This can be the same or different from the pipeline_code
custom_settings=CustomPipelineSettings(
repository="CirroBio/test-pipeline",
branch="v1.0.0",
repository_type=RepositoryType.GITHUB_PUBLIC,
),
file_mapping_rules=[
FileMappingRule(
description="Filtered Feature Matrix",
file_name_patterns=[
FileNamePattern(
example_name="filtered_feature_bc_matrix.h5",
description="Matrix",
sample_matching_pattern="(?P<sampleName>[\S ]*)\.jpg"
)
]
)
]
)
cirro.processes.create_custom_process(new_pipeline)
# New data type
new_data_type = CustomProcessInput(
id="images_jpg",
name="JPG Images",
description="Used for generic JPG images",
executor=Executor.INGEST,
child_process_ids=[],
parent_process_ids=[],
documentation_url="https://example.com/docs",
linked_project_ids=["project_id_1", "project_id_2"],
file_mapping_rules=[
FileMappingRule(
description="Images",
min_=1,
file_name_patterns=[
FileNamePattern(
example_name="image.jpg",
description="JPG Image",
sample_matching_pattern="(?P<sampleName>[\S ]*)/outs/image\.jpg"
)
]
)
]
)
cirro.processes.create_custom_process(new_data_type)
216 def update_custom_process(self, process_id: str, process: CustomProcessInput): 217 """ 218 Updates the custom process (pipeline or data type) in the system. 219 220 Please run `sync_custom_process` after updating to ensure the pipeline configuration is up to date. 221 222 Args: 223 process_id (str): ID of the process to update 224 process (CustomProcessInput): Process to update 225 """ 226 update_custom_process.sync_detailed(client=self._api_client, 227 process_id=process_id, 228 body=process)
Updates the custom process (pipeline or data type) in the system.
Please run sync_custom_process
after updating to ensure the pipeline configuration is up to date.
Arguments:
- process_id (str): ID of the process to update
- process (CustomProcessInput): Process to update
230 def sync_custom_process(self, process_id: str) -> CustomPipelineSettings: 231 """ 232 Syncs a custom pipeline in the system. 233 234 This updates the pipeline configurations in Cirro (form configuration, 235 input mapping, preprocess etc.) to match what is in the configured repository. 236 See https://docs.cirro.bio/pipelines/configuring-pipeline/ for more details. 237 238 Args: 239 process_id (str): ID of the process to sync 240 """ 241 return sync_custom_process.sync(client=self._api_client, process_id=process_id)
Syncs a custom pipeline in the system.
This updates the pipeline configurations in Cirro (form configuration, input mapping, preprocess etc.) to match what is in the configured repository. See https://docs.cirro.bio/pipelines/configuring-pipeline/ for more details.
Arguments:
- process_id (str): ID of the process to sync
12class ProjectService(BaseService): 13 """ 14 Service for interacting with the Project endpoints 15 """ 16 def list(self): 17 """ 18 Retrieve a list of projects 19 """ 20 return get_projects.sync(client=self._api_client) 21 22 def get(self, project_id: str) -> ProjectDetail: 23 """ 24 Get detailed project information 25 26 Args: 27 project_id (str): Project ID 28 """ 29 return get_project.sync(project_id=project_id, client=self._api_client) 30 31 def create(self, request: ProjectInput) -> CreateResponse: 32 """ 33 Create a project 34 35 Args: 36 request (`cirro_api_client.v1.models.ProjectInput`): Detailed information about the project to create 37 38 Examples: 39 ```python 40 from cirro_api_client.v1.models import ProjectInput, ProjectSettings, Contact, BudgetPeriod, CloudAccount, \ 41 CloudAccountType 42 from cirro.cirro_client import CirroApi 43 44 cirro = CirroApi() 45 46 # Bring your own account projects are hosted by the user 47 # You must provide the account details and VPC information 48 # Please view the ProjectSettings model for more information on the attributes required 49 byoa_project = ProjectInput( 50 name="New Project Name", 51 description="Description of new project", 52 billing_account_id="billing-account-id", 53 settings=ProjectSettings( 54 budget_period=BudgetPeriod.MONTHLY, 55 budget_amount=1000, 56 max_spot_vcpu=300, 57 service_connections=[], 58 retention_policy_days=7, 59 vpc_id="vpc-000000000000", 60 batch_subnets=["subnet-000000000000", "subnet-000000000001"], 61 sagemaker_subnets=["subnet-000000000000", "subnet-000000000001"], 62 kms_arn="arn:aws:kms:us-west-2:000000000000:key/00000000-0000-0000-0000-000000000000" 63 ), 64 account=CloudAccount( 65 account_id="<AWS_ACCOUNT_ID>", 66 region_name="us-west-2", 67 account_name="Cirro Lab Project", 68 account_type=CloudAccountType.BYOA 69 ), 70 contacts=[ 71 Contact( 72 name="Contact Name", 73 organization="Contact Organization", 74 email="contact@email.com", 75 phone="987-654-3210" 76 ) 77 ] 78 ) 79 80 # Hosted projects are managed by Cirro 81 hosted_project = ProjectInput( 82 name="New Project Name", 83 description="Description of new project", 84 billing_account_id="billing-account-id", 85 settings=ProjectSettings( 86 budget_period=BudgetPeriod.MONTHLY, 87 budget_amount=1000, 88 max_spot_vcpu=300, 89 service_connections=[], 90 retention_policy_days=7 91 ), 92 contacts=[ 93 Contact( 94 name="Contact Name", 95 organization="Contact Organization", 96 email="contact@email.com", 97 phone="987-654-3210" 98 ) 99 ] 100 ) 101 102 cirro.projects.create(byoa_project) 103 ``` 104 """ 105 return create_project.sync(client=self._api_client, body=request) 106 107 def update(self, project_id: str, request: ProjectInput) -> ProjectDetail: 108 """ 109 Updates a project 110 111 Args: 112 project_id (str): ID of project to update 113 request (`cirro_api_client.v1.models.ProjectInput`): New details for the project 114 """ 115 return update_project.sync(project_id=project_id, body=request, client=self._api_client) 116 117 def update_tags(self, project_id: str, tags: List[Tag]): 118 """ 119 Sets tags on a project 120 121 Args: 122 project_id (str): Project ID 123 tags (List[Tag]): List of tags to apply 124 """ 125 update_project_tags.sync_detailed(project_id=project_id, body=tags, client=self._api_client) 126 127 def get_users(self, project_id: str) -> Optional[List[ProjectUser]]: 128 """ 129 Gets users who have access to the project 130 131 Args: 132 project_id (str): Project ID 133 """ 134 return get_project_users.sync(project_id=project_id, client=self._api_client) 135 136 def set_user_role(self, project_id: str, username: str, role: ProjectRole, suppress_notification=False): 137 """ 138 Sets a user's role within a project. 139 140 Set to `ProjectRole.NONE` if removing the user from a project. 141 142 Args: 143 project_id (str): Project ID 144 username (str): Username 145 role (`cirro_api_client.v1.models.ProjectRole`): New role to apply 146 suppress_notification (bool): Suppress email notification 147 """ 148 request_body = SetUserProjectRoleRequest( 149 username=username, 150 role=role, 151 suppress_notification=suppress_notification 152 ) 153 set_user_project_role.sync_detailed(project_id=project_id, body=request_body, client=self._api_client) 154 155 def archive(self, project_id: str): 156 """ 157 Sets the project status to archived (hidden from active projects) 158 159 Args: 160 project_id (str): Project ID 161 """ 162 archive_project.sync_detailed(project_id=project_id, client=self._api_client) 163 164 def unarchive(self, project_id: str): 165 """ 166 Sets the project status to active 167 168 Args: 169 project_id (str): Project ID 170 """ 171 unarchive_project.sync_detailed(project_id=project_id, client=self._api_client)
Service for interacting with the Project endpoints
16 def list(self): 17 """ 18 Retrieve a list of projects 19 """ 20 return get_projects.sync(client=self._api_client)
Retrieve a list of projects
22 def get(self, project_id: str) -> ProjectDetail: 23 """ 24 Get detailed project information 25 26 Args: 27 project_id (str): Project ID 28 """ 29 return get_project.sync(project_id=project_id, client=self._api_client)
Get detailed project information
Arguments:
- project_id (str): Project ID
31 def create(self, request: ProjectInput) -> CreateResponse: 32 """ 33 Create a project 34 35 Args: 36 request (`cirro_api_client.v1.models.ProjectInput`): Detailed information about the project to create 37 38 Examples: 39 ```python 40 from cirro_api_client.v1.models import ProjectInput, ProjectSettings, Contact, BudgetPeriod, CloudAccount, \ 41 CloudAccountType 42 from cirro.cirro_client import CirroApi 43 44 cirro = CirroApi() 45 46 # Bring your own account projects are hosted by the user 47 # You must provide the account details and VPC information 48 # Please view the ProjectSettings model for more information on the attributes required 49 byoa_project = ProjectInput( 50 name="New Project Name", 51 description="Description of new project", 52 billing_account_id="billing-account-id", 53 settings=ProjectSettings( 54 budget_period=BudgetPeriod.MONTHLY, 55 budget_amount=1000, 56 max_spot_vcpu=300, 57 service_connections=[], 58 retention_policy_days=7, 59 vpc_id="vpc-000000000000", 60 batch_subnets=["subnet-000000000000", "subnet-000000000001"], 61 sagemaker_subnets=["subnet-000000000000", "subnet-000000000001"], 62 kms_arn="arn:aws:kms:us-west-2:000000000000:key/00000000-0000-0000-0000-000000000000" 63 ), 64 account=CloudAccount( 65 account_id="<AWS_ACCOUNT_ID>", 66 region_name="us-west-2", 67 account_name="Cirro Lab Project", 68 account_type=CloudAccountType.BYOA 69 ), 70 contacts=[ 71 Contact( 72 name="Contact Name", 73 organization="Contact Organization", 74 email="contact@email.com", 75 phone="987-654-3210" 76 ) 77 ] 78 ) 79 80 # Hosted projects are managed by Cirro 81 hosted_project = ProjectInput( 82 name="New Project Name", 83 description="Description of new project", 84 billing_account_id="billing-account-id", 85 settings=ProjectSettings( 86 budget_period=BudgetPeriod.MONTHLY, 87 budget_amount=1000, 88 max_spot_vcpu=300, 89 service_connections=[], 90 retention_policy_days=7 91 ), 92 contacts=[ 93 Contact( 94 name="Contact Name", 95 organization="Contact Organization", 96 email="contact@email.com", 97 phone="987-654-3210" 98 ) 99 ] 100 ) 101 102 cirro.projects.create(byoa_project) 103 ``` 104 """ 105 return create_project.sync(client=self._api_client, body=request)
Create a project
Arguments:
- request (
cirro_api_client.v1.models.ProjectInput
): Detailed information about the project to create
Examples:
from cirro_api_client.v1.models import ProjectInput, ProjectSettings, Contact, BudgetPeriod, CloudAccount, CloudAccountType
from cirro.cirro_client import CirroApi
cirro = CirroApi()
# Bring your own account projects are hosted by the user
# You must provide the account details and VPC information
# Please view the ProjectSettings model for more information on the attributes required
byoa_project = ProjectInput(
name="New Project Name",
description="Description of new project",
billing_account_id="billing-account-id",
settings=ProjectSettings(
budget_period=BudgetPeriod.MONTHLY,
budget_amount=1000,
max_spot_vcpu=300,
service_connections=[],
retention_policy_days=7,
vpc_id="vpc-000000000000",
batch_subnets=["subnet-000000000000", "subnet-000000000001"],
sagemaker_subnets=["subnet-000000000000", "subnet-000000000001"],
kms_arn="arn:aws:kms:us-west-2:000000000000:key/00000000-0000-0000-0000-000000000000"
),
account=CloudAccount(
account_id="<AWS_ACCOUNT_ID>",
region_name="us-west-2",
account_name="Cirro Lab Project",
account_type=CloudAccountType.BYOA
),
contacts=[
Contact(
name="Contact Name",
organization="Contact Organization",
email="contact@email.com",
phone="987-654-3210"
)
]
)
# Hosted projects are managed by Cirro
hosted_project = ProjectInput(
name="New Project Name",
description="Description of new project",
billing_account_id="billing-account-id",
settings=ProjectSettings(
budget_period=BudgetPeriod.MONTHLY,
budget_amount=1000,
max_spot_vcpu=300,
service_connections=[],
retention_policy_days=7
),
contacts=[
Contact(
name="Contact Name",
organization="Contact Organization",
email="contact@email.com",
phone="987-654-3210"
)
]
)
cirro.projects.create(byoa_project)
107 def update(self, project_id: str, request: ProjectInput) -> ProjectDetail: 108 """ 109 Updates a project 110 111 Args: 112 project_id (str): ID of project to update 113 request (`cirro_api_client.v1.models.ProjectInput`): New details for the project 114 """ 115 return update_project.sync(project_id=project_id, body=request, client=self._api_client)
Updates a project
Arguments:
- project_id (str): ID of project to update
- request (
cirro_api_client.v1.models.ProjectInput
): New details for the project
127 def get_users(self, project_id: str) -> Optional[List[ProjectUser]]: 128 """ 129 Gets users who have access to the project 130 131 Args: 132 project_id (str): Project ID 133 """ 134 return get_project_users.sync(project_id=project_id, client=self._api_client)
Gets users who have access to the project
Arguments:
- project_id (str): Project ID
136 def set_user_role(self, project_id: str, username: str, role: ProjectRole, suppress_notification=False): 137 """ 138 Sets a user's role within a project. 139 140 Set to `ProjectRole.NONE` if removing the user from a project. 141 142 Args: 143 project_id (str): Project ID 144 username (str): Username 145 role (`cirro_api_client.v1.models.ProjectRole`): New role to apply 146 suppress_notification (bool): Suppress email notification 147 """ 148 request_body = SetUserProjectRoleRequest( 149 username=username, 150 role=role, 151 suppress_notification=suppress_notification 152 ) 153 set_user_project_role.sync_detailed(project_id=project_id, body=request_body, client=self._api_client)
Sets a user's role within a project.
Set to ProjectRole.NONE
if removing the user from a project.
Arguments:
- project_id (str): Project ID
- username (str): Username
- role (
cirro_api_client.v1.models.ProjectRole
): New role to apply - suppress_notification (bool): Suppress email notification
155 def archive(self, project_id: str): 156 """ 157 Sets the project status to archived (hidden from active projects) 158 159 Args: 160 project_id (str): Project ID 161 """ 162 archive_project.sync_detailed(project_id=project_id, client=self._api_client)
Sets the project status to archived (hidden from active projects)
Arguments:
- project_id (str): Project ID
164 def unarchive(self, project_id: str): 165 """ 166 Sets the project status to active 167 168 Args: 169 project_id (str): Project ID 170 """ 171 unarchive_project.sync_detailed(project_id=project_id, client=self._api_client)
Sets the project status to active
Arguments:
- project_id (str): Project ID
10class ReferenceService(BaseService): 11 """ 12 Service for interacting with the References endpoints 13 """ 14 def get_types(self) -> Optional[List[ReferenceType]]: 15 """ 16 List available reference types 17 """ 18 return get_reference_types.sync(client=self._api_client) 19 20 def get_for_project(self, project_id: str) -> Optional[List[Reference]]: 21 """ 22 List available references for a given project 23 """ 24 return get_references_for_project.sync(project_id=project_id, client=self._api_client)
Service for interacting with the References endpoints
14 def get_types(self) -> Optional[List[ReferenceType]]: 15 """ 16 List available reference types 17 """ 18 return get_reference_types.sync(client=self._api_client)
List available reference types
20 def get_for_project(self, project_id: str) -> Optional[List[Reference]]: 21 """ 22 List available references for a given project 23 """ 24 return get_references_for_project.sync(project_id=project_id, client=self._api_client)
List available references for a given project
10class UserService(BaseService): 11 def list(self, max_items: int = 10000) -> List[User]: 12 """ 13 List users in the system 14 """ 15 return get_all_records( 16 records_getter=lambda page_args: list_users.sync( 17 client=self._api_client, 18 next_token=page_args.next_token, 19 limit=page_args.limit 20 ), 21 max_items=max_items 22 ) 23 24 def get(self, username: str) -> UserDetail: 25 """ 26 Get user details by username, including what projects they are assigned to 27 """ 28 return get_user.sync(username=username, client=self._api_client) 29 30 def invite_user(self, name: str, organization: str, email: str): 31 """ 32 Invite a user to the system. 33 If the user already exists, it will return a message that the user already exists. 34 35 Args: 36 name (str): Name of the user 37 organization (str): Organization of the user 38 email (str): Email (username) of the user 39 """ 40 request = InviteUserRequest( 41 name=name, 42 organization=organization, 43 email=email, 44 ) 45 response = invite_user.sync(client=self._api_client, body=request) 46 return response.message
Not to be instantiated directly
11 def list(self, max_items: int = 10000) -> List[User]: 12 """ 13 List users in the system 14 """ 15 return get_all_records( 16 records_getter=lambda page_args: list_users.sync( 17 client=self._api_client, 18 next_token=page_args.next_token, 19 limit=page_args.limit 20 ), 21 max_items=max_items 22 )
List users in the system
24 def get(self, username: str) -> UserDetail: 25 """ 26 Get user details by username, including what projects they are assigned to 27 """ 28 return get_user.sync(username=username, client=self._api_client)
Get user details by username, including what projects they are assigned to
30 def invite_user(self, name: str, organization: str, email: str): 31 """ 32 Invite a user to the system. 33 If the user already exists, it will return a message that the user already exists. 34 35 Args: 36 name (str): Name of the user 37 organization (str): Organization of the user 38 email (str): Email (username) of the user 39 """ 40 request = InviteUserRequest( 41 name=name, 42 organization=organization, 43 email=email, 44 ) 45 response = invite_user.sync(client=self._api_client, body=request) 46 return response.message
Invite a user to the system. If the user already exists, it will return a message that the user already exists.
Arguments:
- name (str): Name of the user
- organization (str): Organization of the user
- email (str): Email (username) of the user