cirro.services

 1from .billing import BillingService
 2from .compute_environment import ComputeEnvironmentService
 3from .dataset import DatasetService
 4from .execution import ExecutionService
 5from .file import FileService
 6from .metadata import MetadataService
 7from .metrics import MetricsService
 8from .process import ProcessService
 9from .projects import ProjectService
10from .references import ReferenceService
11from .share import ShareService
12from .user import UserService
13from .workspace import WorkspaceService
14
15
16__all__ = [
17    'BillingService',
18    'DatasetService',
19    'ExecutionService',
20    'ComputeEnvironmentService',
21    'FileService',
22    'MetadataService',
23    'MetricsService',
24    'ProcessService',
25    'ProjectService',
26    'ReferenceService',
27    'ShareService',
28    'UserService',
29    'WorkspaceService',
30]
class BillingService(cirro.services.base.BaseService):
10class BillingService(BaseService):
11    """
12    Service for interacting with the Billing endpoints
13    """
14    def list(self) -> List[BillingAccount]:
15        """
16        Gets a list of billing accounts the current user has access to
17        """
18        return get_billing_accounts.sync(client=self._api_client)
19
20    def update(self, billing_account_id: str, request: BillingAccountRequest):
21        """
22        Updates a billing account
23
24        Args:
25            billing_account_id (str): Billing account ID
26            request (cirro_api_client.v1.models.BillingAccountRequest):
27
28        ```python
29        from cirro_api_client.v1.models import BillingAccountRequest
30        from cirro.cirro_client import CirroApi
31
32        cirro = CirroApi()
33        request = BillingAccountRequest(
34            name="New billing account name",
35            primary_budget_number="new-budget-number",
36            owner="New Owner"
37        )
38        cirro.billing.update("billing-account-id", request)
39        ```
40        """
41        update_billing_account.sync_detailed(
42            billing_account_id=billing_account_id,
43            body=request,
44            client=self._api_client
45        )

Service for interacting with the Billing endpoints

def list(self) -> List[cirro_api_client.v1.models.BillingAccount]:
14    def list(self) -> List[BillingAccount]:
15        """
16        Gets a list of billing accounts the current user has access to
17        """
18        return get_billing_accounts.sync(client=self._api_client)

Gets a list of billing accounts the current user has access to

def update( self, billing_account_id: str, request: cirro_api_client.v1.models.BillingAccountRequest):
20    def update(self, billing_account_id: str, request: BillingAccountRequest):
21        """
22        Updates a billing account
23
24        Args:
25            billing_account_id (str): Billing account ID
26            request (cirro_api_client.v1.models.BillingAccountRequest):
27
28        ```python
29        from cirro_api_client.v1.models import BillingAccountRequest
30        from cirro.cirro_client import CirroApi
31
32        cirro = CirroApi()
33        request = BillingAccountRequest(
34            name="New billing account name",
35            primary_budget_number="new-budget-number",
36            owner="New Owner"
37        )
38        cirro.billing.update("billing-account-id", request)
39        ```
40        """
41        update_billing_account.sync_detailed(
42            billing_account_id=billing_account_id,
43            body=request,
44            client=self._api_client
45        )

Updates a billing account

Arguments:
from cirro_api_client.v1.models import BillingAccountRequest
from cirro.cirro_client import CirroApi

cirro = CirroApi()
request = BillingAccountRequest(
    name="New billing account name",
    primary_budget_number="new-budget-number",
    owner="New Owner"
)
cirro.billing.update("billing-account-id", request)
class DatasetService(cirro.services.file.FileEnabledService):
 21class DatasetService(FileEnabledService):
 22    """
 23    Service for interacting with the Dataset endpoints
 24    """
 25
 26    def list(self, project_id: str, max_items: int = 10000) -> List[Dataset]:
 27        """List datasets
 28
 29        Retrieves a list of datasets for a given project
 30
 31        Args:
 32            project_id (str): ID of the Project
 33            max_items (int): Maximum number of records to get (default 10,000)
 34        """
 35        return get_all_records(
 36            records_getter=lambda page_args: get_datasets.sync(
 37                project_id=project_id,
 38                client=self._api_client,
 39                next_token=page_args.next_token,
 40                limit=page_args.limit
 41            ),
 42            max_items=max_items
 43        )
 44
 45    def list_shared(self, project_id: str, share_id: str, max_items: int = 10000) -> List[Dataset]:
 46        """
 47        Retrieves a list of shared datasets for a given project and share
 48
 49        Args:
 50            project_id (str): ID of the Project
 51            share_id (str): ID of the Share
 52            max_items (int): Maximum number of records to get (default 10,000)
 53
 54        Example:
 55        ```python
 56        from cirro_api_client.v1.models import ShareType
 57        from cirro.cirro_client import CirroApi
 58
 59        cirro = CirroApi()
 60
 61        # List shares that are subscribed to
 62        subscribed_shares = cirro.shares.list(project_id="project-id", share_type=ShareType.SUBSCRIBER)
 63        cirro.datasets.list_shared("project-id", subscribed_shares[0].id)
 64        ```
 65        """
 66        return get_all_records(
 67            records_getter=lambda page_args: get_shared_datasets.sync(
 68                project_id=project_id,
 69                share_id=share_id,
 70                client=self._api_client,
 71                next_token=page_args.next_token,
 72                limit=page_args.limit
 73            ),
 74            max_items=max_items
 75        )
 76
 77    def import_public(self, project_id: str, import_request: ImportDataRequest) -> CreateResponse:
 78        """
 79        Download data from public repositories
 80
 81        Args:
 82            project_id (str): ID of the Project
 83            import_request (cirro_api_client.v1.models.ImportDataRequest):
 84
 85        Returns:
 86            ID of the created dataset
 87
 88        ```python
 89        from cirro_api_client.v1.models import ImportDataRequest, Tag
 90        from cirro.cirro_client import CirroApi
 91
 92        cirro = CirroApi()
 93        request = ImportDataRequest(
 94            name="Imported dataset",
 95            description="Description of the dataset",
 96            public_ids=["SRR123456", "SRR123457"],
 97            tags=[Tag(value="tag1")]
 98        )
 99        cirro.datasets.import_public("project-id", request)
100        ```
101        """
102        return import_public_dataset.sync(project_id=project_id, client=self._api_client, body=import_request)
103
104    def create(self, project_id: str, upload_request: UploadDatasetRequest) -> UploadDatasetCreateResponse:
105        """
106        Registers a dataset in Cirro, which can subsequently have files uploaded to it
107
108        Args:
109            project_id (str): ID of the Project
110            upload_request (cirro_api_client.v1.models.UploadDatasetRequest):
111
112        Returns:
113            ID of the created dataset and the path to upload files
114
115        ```python
116        from cirro_api_client.v1.models import UploadDatasetRequest, Tag
117        from cirro.cirro_client import CirroApi
118
119        cirro = CirroApi()
120        request = UploadDatasetRequest(
121            name="Name of new dataset",
122            process_id="paired_dnaseq",
123            expected_files=["read_1.fastq.gz", "read_2.fastq.gz"],
124            description="Description of the dataset",
125            tags=[Tag(value="tag1"), Tag(value="tag2")]
126        )
127        cirro.datasets.create("project-id", request)
128        ```
129        """
130        return upload_dataset.sync(project_id=project_id, client=self._api_client, body=upload_request)
131
132    def get(self, project_id: str, dataset_id: str) -> Optional[DatasetDetail]:
133        """
134        Gets detailed information about a dataset
135
136        Args:
137            project_id (str): ID of the Project
138            dataset_id (str): ID of the Dataset
139
140        Returns:
141            The dataset, if found
142        """
143        return get_dataset.sync(project_id=project_id, dataset_id=dataset_id, client=self._api_client)
144
145    def update(self, project_id: str, dataset_id: str, request: UpdateDatasetRequest) -> DatasetDetail:
146        """
147        Update info on a dataset (name, description, and/or tags)
148
149        Args:
150            project_id (str): ID of the Project
151            dataset_id (str): ID of the Dataset
152            request (cirro_api_client.v1.models.UpdateDatasetRequest):
153
154        Returns:
155            The updated dataset
156
157        ```python
158        from cirro_api_client.v1.models import UpdateDatasetRequest
159        from cirro.cirro_client import CirroApi
160
161        cirro = CirroApi()
162        request = UpdateDatasetRequest(
163            name="Name of new dataset",
164            process_id="paired_dnaseq",
165            description="Description of the dataset"
166        )
167        cirro.datasets.update("project-id", "dataset-id", request)
168        ```
169        """
170        return update_dataset.sync(project_id=project_id, dataset_id=dataset_id, body=request, client=self._api_client)
171
172    def delete(self, project_id: str, dataset_id: str) -> None:
173        """
174        Delete a dataset
175
176        After a dataset has been deleted, the files associated with that
177        dataset are saved according to the project's retention time.
178
179        Args:
180            project_id (str): ID of the Project
181            dataset_id (str): ID of the Dataset
182        """
183        delete_dataset.sync_detailed(project_id=project_id, dataset_id=dataset_id, client=self._api_client)
184
185    def get_assets_listing(self, project_id: str, dataset_id: str, file_limit: int = 100000) -> DatasetAssets:
186        """
187        Gets a listing of files, charts, and other assets available for the dataset
188
189        Args:
190            project_id (str): ID of the Project
191            dataset_id (str): ID of the Dataset
192            file_limit (int): Maximum number of files to get (default 100,000)
193        """
194        dataset = self.get(project_id, dataset_id)
195        if file_limit < 1:
196            raise ValueError("file_limit must be greater than 0")
197        all_files = []
198        file_offset = 0
199        domain = None
200        artifacts = None
201
202        while len(all_files) < file_limit:
203            manifest = get_dataset_manifest.sync(
204                project_id=project_id,
205                dataset_id=dataset_id,
206                file_offset=file_offset,
207                client=self._api_client
208            )
209            all_files.extend(manifest.files)
210            file_offset += len(manifest.files)
211
212            if not artifacts:
213                artifacts = manifest.artifacts
214
215            domain = manifest.domain
216            if len(all_files) >= manifest.total_files or len(manifest.files) == 0:
217                break
218
219        files = [
220            File.from_file_entry(
221                f,
222                project_id=project_id,
223                dataset=dataset,
224                domain=domain
225            )
226            for f in all_files
227        ]
228        artifacts = [
229            Artifact(
230                artifact_type=a.type_,
231                file=File.from_file_entry(
232                    FileEntry(a.path),
233                    project_id=project_id,
234                    dataset=dataset,
235                    domain=domain
236                )
237            )
238            for a in artifacts
239        ]
240        return DatasetAssets(files=files, artifacts=artifacts)
241
242    def upload_files(self,
243                     project_id: str,
244                     dataset_id: str,
245                     directory: PathLike,
246                     files: List[PathLike] = None,
247                     file_path_map: Dict[PathLike, str] = None) -> None:
248        """
249        Uploads files to a given dataset from the specified directory.
250
251        All files must be relative to the specified directory.
252        If files need to be flattened, or you are sourcing files from multiple directories,
253        please include `file_path_map` or call this method multiple times.
254
255        Args:
256            project_id (str): ID of the Project
257            dataset_id (str): ID of the Dataset
258            directory (str|Path): Path to directory
259            files (typing.List[str|Path]): List of paths to files within the directory,
260                must be the same type as directory.
261            file_path_map (typing.Dict[str|Path, str|Path]): Optional mapping of file paths to upload
262             from source path to destination path, used to "re-write" paths within the dataset.
263        ```python
264        from cirro.cirro_client import CirroApi
265        from cirro.file_utils import generate_flattened_file_map
266
267        cirro = CirroApi()
268
269        directory = "~/Downloads"
270        # Re-write file paths
271        file_map = {
272            "data1/file1.fastq.gz": "file1.fastq.gz",
273            "data2/file2.fastq.gz": "file2.fastq.gz",
274            "file3.fastq.gz": "new_file3.txt"
275        }
276
277        # Or you could automate the flattening
278        files = ["data1/file1.fastq.gz"]
279        file_map = generate_flattened_file_map(files)
280
281        cirro.datasets.upload_files(
282            project_id="project-id",
283            dataset_id="dataset-id",
284            directory=directory,
285            files=list(file_map.keys()),
286            file_path_map=file_map
287        )
288        ```
289        """
290        if file_path_map is None:
291            file_path_map = {}
292
293        dataset = self.get(project_id, dataset_id)
294
295        access_context = FileAccessContext.upload_dataset(
296            project_id=project_id,
297            dataset_id=dataset_id,
298            base_url=dataset.s3
299        )
300
301        self._file_service.upload_files(
302            access_context=access_context,
303            directory=directory,
304            files=files,
305            file_path_map=file_path_map
306        )
307
308    def validate_folder(
309        self,
310        project_id: str,
311        dataset_id: str,
312        local_folder: PathLike,
313        file_limit: int = 100000
314    ) -> DatasetValidationResponse:
315        """
316        Validates that the contents of a dataset match that of a local folder.
317        """
318        ds_files = self.get_assets_listing(project_id, dataset_id, file_limit=file_limit).files
319
320        local_folder = Path(local_folder)
321        if not local_folder.is_dir():
322            raise ValueError(f"{local_folder} is not a valid local folder")
323
324        # Keep track of files from the dataset which match by checksum, don't match, or are missing
325        ds_files_matching = []
326        ds_files_not_matching = []
327        ds_files_missing = []
328        ds_validate_failed = []
329        for ds_file in ds_files:
330            ds_file_path = ds_file.normalized_path
331            # Get the corresponding local file
332            local_file = local_folder / ds_file_path
333            if not local_file.exists():
334                ds_files_missing.append(ds_file_path)
335            else:
336                try:
337                    if self._file_service.is_valid_file(ds_file, local_file):
338                        ds_files_matching.append(ds_file_path)
339                    else:
340                        ds_files_not_matching.append(ds_file_path)
341                except RuntimeWarning as e:
342                    logger.warning(f"File validation failed: {e}")
343                    ds_validate_failed.append(ds_file_path)
344
345        # Find local files that are not in the dataset
346        local_file_paths = [
347            file.relative_to(local_folder).as_posix()
348            for file in local_folder.rglob("*")
349            if not file.is_dir() and not is_hidden_file(file)
350        ]
351        dataset_file_paths = [file.normalized_path for file in ds_files]
352        local_only_files = [
353            file
354            for file in local_file_paths
355            if file not in dataset_file_paths
356        ]
357
358        return DatasetValidationResponse(
359            files_matching=ds_files_matching,
360            files_not_matching=ds_files_not_matching,
361            files_missing=ds_files_missing,
362            local_only_files=local_only_files,
363            validate_errors=ds_validate_failed,
364        )
365
366    def download_files(
367        self,
368        project_id: str,
369        dataset_id: str,
370        download_location: str,
371        files: Union[List[File], List[str]] = None,
372        file_limit: int = 100000
373    ) -> None:
374        """
375        Downloads files from a dataset
376
377        The `files` argument is used to optionally specify a subset of files
378        to be downloaded. By default, all files are downloaded.
379
380        Args:
381            project_id (str): ID of the Project
382            dataset_id (str): ID of the Dataset
383            download_location (str): Local destination for downloaded files
384            files (typing.List[str]): Optional list of files to download
385            file_limit (int): Maximum number of files to get (default 100,000)
386        """
387        if files is None:
388            files = self.get_assets_listing(project_id, dataset_id, file_limit=file_limit).files
389
390        if len(files) == 0:
391            return
392
393        first_file = files[0]
394        if isinstance(first_file, File):
395            files = [file.relative_path for file in files]
396            access_context = first_file.access_context
397        else:
398            dataset = self.get(project_id, dataset_id)
399            if dataset.share:
400                access_context = FileAccessContext.download_shared_dataset(project_id=project_id,
401                                                                           dataset_id=dataset_id,
402                                                                           base_url=dataset.s3)
403            else:
404                access_context = FileAccessContext.download(project_id=project_id,
405                                                            base_url=dataset.s3)
406
407        self._file_service.download_files(access_context, download_location, files)
408
409    def update_samplesheet(
410        self,
411        project_id: str,
412        dataset_id: str,
413        samplesheet: str
414    ):
415        """
416        Updates a samplesheet on a dataset
417
418        Args:
419            project_id (str): ID of the Project
420            dataset_id (str): ID of the Dataset
421            samplesheet (str): Samplesheet contents to update (should be a CSV string)
422        """
423        dataset = self.get(project_id, dataset_id)
424        access_context = FileAccessContext.upload_sample_sheet(project_id=project_id,
425                                                               dataset_id=dataset_id,
426                                                               base_url=dataset.s3)
427
428        samplesheet_key = f'{access_context.prefix}/samplesheet.csv'
429        self._file_service.create_file(
430            access_context=access_context,
431            key=samplesheet_key,
432            contents=samplesheet,
433            content_type='text/csv'
434        )

Service for interacting with the Dataset endpoints

def list( self, project_id: str, max_items: int = 10000) -> List[cirro_api_client.v1.models.Dataset]:
26    def list(self, project_id: str, max_items: int = 10000) -> List[Dataset]:
27        """List datasets
28
29        Retrieves a list of datasets for a given project
30
31        Args:
32            project_id (str): ID of the Project
33            max_items (int): Maximum number of records to get (default 10,000)
34        """
35        return get_all_records(
36            records_getter=lambda page_args: get_datasets.sync(
37                project_id=project_id,
38                client=self._api_client,
39                next_token=page_args.next_token,
40                limit=page_args.limit
41            ),
42            max_items=max_items
43        )

List datasets

Retrieves a list of datasets for a given project

Arguments:
  • project_id (str): ID of the Project
  • max_items (int): Maximum number of records to get (default 10,000)
def list_shared( self, project_id: str, share_id: str, max_items: int = 10000) -> List[cirro_api_client.v1.models.Dataset]:
45    def list_shared(self, project_id: str, share_id: str, max_items: int = 10000) -> List[Dataset]:
46        """
47        Retrieves a list of shared datasets for a given project and share
48
49        Args:
50            project_id (str): ID of the Project
51            share_id (str): ID of the Share
52            max_items (int): Maximum number of records to get (default 10,000)
53
54        Example:
55        ```python
56        from cirro_api_client.v1.models import ShareType
57        from cirro.cirro_client import CirroApi
58
59        cirro = CirroApi()
60
61        # List shares that are subscribed to
62        subscribed_shares = cirro.shares.list(project_id="project-id", share_type=ShareType.SUBSCRIBER)
63        cirro.datasets.list_shared("project-id", subscribed_shares[0].id)
64        ```
65        """
66        return get_all_records(
67            records_getter=lambda page_args: get_shared_datasets.sync(
68                project_id=project_id,
69                share_id=share_id,
70                client=self._api_client,
71                next_token=page_args.next_token,
72                limit=page_args.limit
73            ),
74            max_items=max_items
75        )

Retrieves a list of shared datasets for a given project and share

Arguments:
  • project_id (str): ID of the Project
  • share_id (str): ID of the Share
  • max_items (int): Maximum number of records to get (default 10,000)

Example:

from cirro_api_client.v1.models import ShareType
from cirro.cirro_client import CirroApi

cirro = CirroApi()

# List shares that are subscribed to
subscribed_shares = cirro.shares.list(project_id="project-id", share_type=ShareType.SUBSCRIBER)
cirro.datasets.list_shared("project-id", subscribed_shares[0].id)
def import_public( self, project_id: str, import_request: cirro_api_client.v1.models.ImportDataRequest) -> cirro_api_client.v1.models.CreateResponse:
 77    def import_public(self, project_id: str, import_request: ImportDataRequest) -> CreateResponse:
 78        """
 79        Download data from public repositories
 80
 81        Args:
 82            project_id (str): ID of the Project
 83            import_request (cirro_api_client.v1.models.ImportDataRequest):
 84
 85        Returns:
 86            ID of the created dataset
 87
 88        ```python
 89        from cirro_api_client.v1.models import ImportDataRequest, Tag
 90        from cirro.cirro_client import CirroApi
 91
 92        cirro = CirroApi()
 93        request = ImportDataRequest(
 94            name="Imported dataset",
 95            description="Description of the dataset",
 96            public_ids=["SRR123456", "SRR123457"],
 97            tags=[Tag(value="tag1")]
 98        )
 99        cirro.datasets.import_public("project-id", request)
100        ```
101        """
102        return import_public_dataset.sync(project_id=project_id, client=self._api_client, body=import_request)

Download data from public repositories

Arguments:
Returns:

ID of the created dataset

from cirro_api_client.v1.models import ImportDataRequest, Tag
from cirro.cirro_client import CirroApi

cirro = CirroApi()
request = ImportDataRequest(
    name="Imported dataset",
    description="Description of the dataset",
    public_ids=["SRR123456", "SRR123457"],
    tags=[Tag(value="tag1")]
)
cirro.datasets.import_public("project-id", request)
def create( self, project_id: str, upload_request: cirro_api_client.v1.models.UploadDatasetRequest) -> cirro_api_client.v1.models.UploadDatasetCreateResponse:
104    def create(self, project_id: str, upload_request: UploadDatasetRequest) -> UploadDatasetCreateResponse:
105        """
106        Registers a dataset in Cirro, which can subsequently have files uploaded to it
107
108        Args:
109            project_id (str): ID of the Project
110            upload_request (cirro_api_client.v1.models.UploadDatasetRequest):
111
112        Returns:
113            ID of the created dataset and the path to upload files
114
115        ```python
116        from cirro_api_client.v1.models import UploadDatasetRequest, Tag
117        from cirro.cirro_client import CirroApi
118
119        cirro = CirroApi()
120        request = UploadDatasetRequest(
121            name="Name of new dataset",
122            process_id="paired_dnaseq",
123            expected_files=["read_1.fastq.gz", "read_2.fastq.gz"],
124            description="Description of the dataset",
125            tags=[Tag(value="tag1"), Tag(value="tag2")]
126        )
127        cirro.datasets.create("project-id", request)
128        ```
129        """
130        return upload_dataset.sync(project_id=project_id, client=self._api_client, body=upload_request)

Registers a dataset in Cirro, which can subsequently have files uploaded to it

Arguments:
Returns:

ID of the created dataset and the path to upload files

from cirro_api_client.v1.models import UploadDatasetRequest, Tag
from cirro.cirro_client import CirroApi

cirro = CirroApi()
request = UploadDatasetRequest(
    name="Name of new dataset",
    process_id="paired_dnaseq",
    expected_files=["read_1.fastq.gz", "read_2.fastq.gz"],
    description="Description of the dataset",
    tags=[Tag(value="tag1"), Tag(value="tag2")]
)
cirro.datasets.create("project-id", request)
def get( self, project_id: str, dataset_id: str) -> Optional[cirro_api_client.v1.models.DatasetDetail]:
132    def get(self, project_id: str, dataset_id: str) -> Optional[DatasetDetail]:
133        """
134        Gets detailed information about a dataset
135
136        Args:
137            project_id (str): ID of the Project
138            dataset_id (str): ID of the Dataset
139
140        Returns:
141            The dataset, if found
142        """
143        return get_dataset.sync(project_id=project_id, dataset_id=dataset_id, client=self._api_client)

Gets detailed information about a dataset

Arguments:
  • project_id (str): ID of the Project
  • dataset_id (str): ID of the Dataset
Returns:

The dataset, if found

def update( self, project_id: str, dataset_id: str, request: cirro_api_client.v1.models.UpdateDatasetRequest) -> cirro_api_client.v1.models.DatasetDetail:
145    def update(self, project_id: str, dataset_id: str, request: UpdateDatasetRequest) -> DatasetDetail:
146        """
147        Update info on a dataset (name, description, and/or tags)
148
149        Args:
150            project_id (str): ID of the Project
151            dataset_id (str): ID of the Dataset
152            request (cirro_api_client.v1.models.UpdateDatasetRequest):
153
154        Returns:
155            The updated dataset
156
157        ```python
158        from cirro_api_client.v1.models import UpdateDatasetRequest
159        from cirro.cirro_client import CirroApi
160
161        cirro = CirroApi()
162        request = UpdateDatasetRequest(
163            name="Name of new dataset",
164            process_id="paired_dnaseq",
165            description="Description of the dataset"
166        )
167        cirro.datasets.update("project-id", "dataset-id", request)
168        ```
169        """
170        return update_dataset.sync(project_id=project_id, dataset_id=dataset_id, body=request, client=self._api_client)

Update info on a dataset (name, description, and/or tags)

Arguments:
Returns:

The updated dataset

from cirro_api_client.v1.models import UpdateDatasetRequest
from cirro.cirro_client import CirroApi

cirro = CirroApi()
request = UpdateDatasetRequest(
    name="Name of new dataset",
    process_id="paired_dnaseq",
    description="Description of the dataset"
)
cirro.datasets.update("project-id", "dataset-id", request)
def delete(self, project_id: str, dataset_id: str) -> None:
172    def delete(self, project_id: str, dataset_id: str) -> None:
173        """
174        Delete a dataset
175
176        After a dataset has been deleted, the files associated with that
177        dataset are saved according to the project's retention time.
178
179        Args:
180            project_id (str): ID of the Project
181            dataset_id (str): ID of the Dataset
182        """
183        delete_dataset.sync_detailed(project_id=project_id, dataset_id=dataset_id, client=self._api_client)

Delete a dataset

After a dataset has been deleted, the files associated with that dataset are saved according to the project's retention time.

Arguments:
  • project_id (str): ID of the Project
  • dataset_id (str): ID of the Dataset
def get_assets_listing( self, project_id: str, dataset_id: str, file_limit: int = 100000) -> cirro.models.assets.DatasetAssets:
185    def get_assets_listing(self, project_id: str, dataset_id: str, file_limit: int = 100000) -> DatasetAssets:
186        """
187        Gets a listing of files, charts, and other assets available for the dataset
188
189        Args:
190            project_id (str): ID of the Project
191            dataset_id (str): ID of the Dataset
192            file_limit (int): Maximum number of files to get (default 100,000)
193        """
194        dataset = self.get(project_id, dataset_id)
195        if file_limit < 1:
196            raise ValueError("file_limit must be greater than 0")
197        all_files = []
198        file_offset = 0
199        domain = None
200        artifacts = None
201
202        while len(all_files) < file_limit:
203            manifest = get_dataset_manifest.sync(
204                project_id=project_id,
205                dataset_id=dataset_id,
206                file_offset=file_offset,
207                client=self._api_client
208            )
209            all_files.extend(manifest.files)
210            file_offset += len(manifest.files)
211
212            if not artifacts:
213                artifacts = manifest.artifacts
214
215            domain = manifest.domain
216            if len(all_files) >= manifest.total_files or len(manifest.files) == 0:
217                break
218
219        files = [
220            File.from_file_entry(
221                f,
222                project_id=project_id,
223                dataset=dataset,
224                domain=domain
225            )
226            for f in all_files
227        ]
228        artifacts = [
229            Artifact(
230                artifact_type=a.type_,
231                file=File.from_file_entry(
232                    FileEntry(a.path),
233                    project_id=project_id,
234                    dataset=dataset,
235                    domain=domain
236                )
237            )
238            for a in artifacts
239        ]
240        return DatasetAssets(files=files, artifacts=artifacts)

Gets a listing of files, charts, and other assets available for the dataset

Arguments:
  • project_id (str): ID of the Project
  • dataset_id (str): ID of the Dataset
  • file_limit (int): Maximum number of files to get (default 100,000)
def upload_files( self, project_id: str, dataset_id: str, directory: ~PathLike, files: List[~PathLike] = None, file_path_map: Dict[~PathLike, str] = None) -> None:
242    def upload_files(self,
243                     project_id: str,
244                     dataset_id: str,
245                     directory: PathLike,
246                     files: List[PathLike] = None,
247                     file_path_map: Dict[PathLike, str] = None) -> None:
248        """
249        Uploads files to a given dataset from the specified directory.
250
251        All files must be relative to the specified directory.
252        If files need to be flattened, or you are sourcing files from multiple directories,
253        please include `file_path_map` or call this method multiple times.
254
255        Args:
256            project_id (str): ID of the Project
257            dataset_id (str): ID of the Dataset
258            directory (str|Path): Path to directory
259            files (typing.List[str|Path]): List of paths to files within the directory,
260                must be the same type as directory.
261            file_path_map (typing.Dict[str|Path, str|Path]): Optional mapping of file paths to upload
262             from source path to destination path, used to "re-write" paths within the dataset.
263        ```python
264        from cirro.cirro_client import CirroApi
265        from cirro.file_utils import generate_flattened_file_map
266
267        cirro = CirroApi()
268
269        directory = "~/Downloads"
270        # Re-write file paths
271        file_map = {
272            "data1/file1.fastq.gz": "file1.fastq.gz",
273            "data2/file2.fastq.gz": "file2.fastq.gz",
274            "file3.fastq.gz": "new_file3.txt"
275        }
276
277        # Or you could automate the flattening
278        files = ["data1/file1.fastq.gz"]
279        file_map = generate_flattened_file_map(files)
280
281        cirro.datasets.upload_files(
282            project_id="project-id",
283            dataset_id="dataset-id",
284            directory=directory,
285            files=list(file_map.keys()),
286            file_path_map=file_map
287        )
288        ```
289        """
290        if file_path_map is None:
291            file_path_map = {}
292
293        dataset = self.get(project_id, dataset_id)
294
295        access_context = FileAccessContext.upload_dataset(
296            project_id=project_id,
297            dataset_id=dataset_id,
298            base_url=dataset.s3
299        )
300
301        self._file_service.upload_files(
302            access_context=access_context,
303            directory=directory,
304            files=files,
305            file_path_map=file_path_map
306        )

Uploads files to a given dataset from the specified directory.

All files must be relative to the specified directory. If files need to be flattened, or you are sourcing files from multiple directories, please include file_path_map or call this method multiple times.

Arguments:
  • project_id (str): ID of the Project
  • dataset_id (str): ID of the Dataset
  • directory (str|Path): Path to directory
  • files (typing.List[str|Path]): List of paths to files within the directory, must be the same type as directory.
  • file_path_map (typing.Dict[str|Path, str|Path]): Optional mapping of file paths to upload from source path to destination path, used to "re-write" paths within the dataset.
from cirro.cirro_client import CirroApi
from cirro.file_utils import generate_flattened_file_map

cirro = CirroApi()

directory = "~/Downloads"
# Re-write file paths
file_map = {
    "data1/file1.fastq.gz": "file1.fastq.gz",
    "data2/file2.fastq.gz": "file2.fastq.gz",
    "file3.fastq.gz": "new_file3.txt"
}

# Or you could automate the flattening
files = ["data1/file1.fastq.gz"]
file_map = generate_flattened_file_map(files)

cirro.datasets.upload_files(
    project_id="project-id",
    dataset_id="dataset-id",
    directory=directory,
    files=list(file_map.keys()),
    file_path_map=file_map
)
def validate_folder( self, project_id: str, dataset_id: str, local_folder: ~PathLike, file_limit: int = 100000) -> cirro.models.dataset.DatasetValidationResponse:
308    def validate_folder(
309        self,
310        project_id: str,
311        dataset_id: str,
312        local_folder: PathLike,
313        file_limit: int = 100000
314    ) -> DatasetValidationResponse:
315        """
316        Validates that the contents of a dataset match that of a local folder.
317        """
318        ds_files = self.get_assets_listing(project_id, dataset_id, file_limit=file_limit).files
319
320        local_folder = Path(local_folder)
321        if not local_folder.is_dir():
322            raise ValueError(f"{local_folder} is not a valid local folder")
323
324        # Keep track of files from the dataset which match by checksum, don't match, or are missing
325        ds_files_matching = []
326        ds_files_not_matching = []
327        ds_files_missing = []
328        ds_validate_failed = []
329        for ds_file in ds_files:
330            ds_file_path = ds_file.normalized_path
331            # Get the corresponding local file
332            local_file = local_folder / ds_file_path
333            if not local_file.exists():
334                ds_files_missing.append(ds_file_path)
335            else:
336                try:
337                    if self._file_service.is_valid_file(ds_file, local_file):
338                        ds_files_matching.append(ds_file_path)
339                    else:
340                        ds_files_not_matching.append(ds_file_path)
341                except RuntimeWarning as e:
342                    logger.warning(f"File validation failed: {e}")
343                    ds_validate_failed.append(ds_file_path)
344
345        # Find local files that are not in the dataset
346        local_file_paths = [
347            file.relative_to(local_folder).as_posix()
348            for file in local_folder.rglob("*")
349            if not file.is_dir() and not is_hidden_file(file)
350        ]
351        dataset_file_paths = [file.normalized_path for file in ds_files]
352        local_only_files = [
353            file
354            for file in local_file_paths
355            if file not in dataset_file_paths
356        ]
357
358        return DatasetValidationResponse(
359            files_matching=ds_files_matching,
360            files_not_matching=ds_files_not_matching,
361            files_missing=ds_files_missing,
362            local_only_files=local_only_files,
363            validate_errors=ds_validate_failed,
364        )

Validates that the contents of a dataset match that of a local folder.

def download_files( self, project_id: str, dataset_id: str, download_location: str, files: Union[List[cirro.models.file.File], List[str]] = None, file_limit: int = 100000) -> None:
366    def download_files(
367        self,
368        project_id: str,
369        dataset_id: str,
370        download_location: str,
371        files: Union[List[File], List[str]] = None,
372        file_limit: int = 100000
373    ) -> None:
374        """
375        Downloads files from a dataset
376
377        The `files` argument is used to optionally specify a subset of files
378        to be downloaded. By default, all files are downloaded.
379
380        Args:
381            project_id (str): ID of the Project
382            dataset_id (str): ID of the Dataset
383            download_location (str): Local destination for downloaded files
384            files (typing.List[str]): Optional list of files to download
385            file_limit (int): Maximum number of files to get (default 100,000)
386        """
387        if files is None:
388            files = self.get_assets_listing(project_id, dataset_id, file_limit=file_limit).files
389
390        if len(files) == 0:
391            return
392
393        first_file = files[0]
394        if isinstance(first_file, File):
395            files = [file.relative_path for file in files]
396            access_context = first_file.access_context
397        else:
398            dataset = self.get(project_id, dataset_id)
399            if dataset.share:
400                access_context = FileAccessContext.download_shared_dataset(project_id=project_id,
401                                                                           dataset_id=dataset_id,
402                                                                           base_url=dataset.s3)
403            else:
404                access_context = FileAccessContext.download(project_id=project_id,
405                                                            base_url=dataset.s3)
406
407        self._file_service.download_files(access_context, download_location, files)

Downloads files from a dataset

The files argument is used to optionally specify a subset of files to be downloaded. By default, all files are downloaded.

Arguments:
  • project_id (str): ID of the Project
  • dataset_id (str): ID of the Dataset
  • download_location (str): Local destination for downloaded files
  • files (typing.List[str]): Optional list of files to download
  • file_limit (int): Maximum number of files to get (default 100,000)
def update_samplesheet(self, project_id: str, dataset_id: str, samplesheet: str):
409    def update_samplesheet(
410        self,
411        project_id: str,
412        dataset_id: str,
413        samplesheet: str
414    ):
415        """
416        Updates a samplesheet on a dataset
417
418        Args:
419            project_id (str): ID of the Project
420            dataset_id (str): ID of the Dataset
421            samplesheet (str): Samplesheet contents to update (should be a CSV string)
422        """
423        dataset = self.get(project_id, dataset_id)
424        access_context = FileAccessContext.upload_sample_sheet(project_id=project_id,
425                                                               dataset_id=dataset_id,
426                                                               base_url=dataset.s3)
427
428        samplesheet_key = f'{access_context.prefix}/samplesheet.csv'
429        self._file_service.create_file(
430            access_context=access_context,
431            key=samplesheet_key,
432            contents=samplesheet,
433            content_type='text/csv'
434        )

Updates a samplesheet on a dataset

Arguments:
  • project_id (str): ID of the Project
  • dataset_id (str): ID of the Dataset
  • samplesheet (str): Samplesheet contents to update (should be a CSV string)
class ExecutionService(cirro.services.base.BaseService):
 13class ExecutionService(BaseService):
 14    """
 15    Service for interacting with the Execution endpoints
 16    """
 17    def run_analysis(self, project_id: str, request: RunAnalysisRequest) -> CreateResponse:
 18        """
 19        Launch an analysis job running a process on a set of inputs
 20
 21        Args:
 22            project_id (str): ID of the Project
 23            request (cirro_api_client.v1.models.RunAnalysisRequest):
 24
 25        Returns:
 26            The ID of the created dataset
 27
 28        ```python
 29        from cirro_api_client.v1.models import RunAnalysisRequest, RunAnalysisRequestParams
 30        from cirro.cirro_client import CirroApi
 31
 32        # Example:
 33        # Run the "process-nf-core-rnaseq-3_8" process using input data
 34        # from a dataset with the id "source-dataset-id"
 35
 36        # Optional analysis parameters
 37        params = RunAnalysisRequestParams.from_dict({
 38            "param_a": "val_a",
 39            "param_b": "val_b"
 40        })
 41
 42        cirro = CirroApi()
 43        request = RunAnalysisRequest(
 44            name="Name of the newly created dataset",
 45            description="Longer description of newly created dataset",
 46            process_id="process-nf-core-rnaseq-3_8",
 47            source_dataset_ids=["source-dataset-id"],
 48            params=params
 49        )
 50        cirro.execution.run_analysis("project-id", request)
 51        ```
 52        """
 53
 54        form_spec = get_process_parameters.sync(
 55            process_id=request.process_id,
 56            client=self._api_client
 57        )
 58
 59        ParameterSpecification(
 60            form_spec
 61        ).validate_params(
 62            request.params.to_dict() if request.params else {}
 63        )
 64
 65        return run_analysis.sync(
 66            project_id=project_id,
 67            body=request,
 68            client=self._api_client
 69        )
 70
 71    def stop_analysis(self, project_id: str, dataset_id: str):
 72        """
 73        Terminates all jobs related to a running analysis
 74
 75        Args:
 76            project_id (str): ID of the Project
 77            dataset_id (str): ID of the Dataset
 78        """
 79
 80        return stop_analysis.sync(
 81            project_id=project_id,
 82            dataset_id=dataset_id,
 83            client=self._api_client
 84        )
 85
 86    def get_project_summary(self, project_id: str) -> Dict[str, List[Task]]:
 87        """
 88        Gets an overview of the executions currently running in the project, by job queue
 89
 90        Args:
 91            project_id (str): ID of the Project
 92
 93        Returns:
 94            `cirro_api_client.v1.models.GetProjectSummaryResponse200`
 95        """
 96
 97        return get_project_summary.sync(
 98            project_id=project_id,
 99            client=self._api_client
100        ).additional_properties
101
102    def get_execution_logs(self, project_id: str, dataset_id: str, force_live=False) -> str:
103        """
104        Gets live logs from main execution task
105
106        Args:
107            project_id (str): ID of the Project
108            dataset_id (str): ID of the Dataset
109            force_live (bool): If True, it will fetch logs from CloudWatch,
110                even if the execution is already completed
111
112        """
113
114        resp = get_execution_logs.sync(
115            project_id=project_id,
116            dataset_id=dataset_id,
117            force_live=force_live,
118            client=self._api_client
119        )
120
121        return '\n'.join(e.message for e in resp.events)
122
123    def get_tasks_for_execution(self, project_id: str, dataset_id: str, force_live=False) -> Optional[List[Task]]:
124        """
125        Gets the tasks submitted by the workflow execution
126
127        Args:
128            project_id (str): ID of the Project
129            dataset_id (str): ID of the Dataset
130            force_live (bool): If True, it will try to get the list of jobs
131                from the executor (i.e., AWS Batch), rather than the workflow report
132        """
133
134        return get_tasks_for_execution.sync(
135            project_id=project_id,
136            dataset_id=dataset_id,
137            force_live=force_live,
138            client=self._api_client
139        )
140
141    def get_task_logs(self, project_id: str, dataset_id: str, task_id: str, force_live=False) -> str:
142        """
143        Gets the log output from an individual task
144
145        Args:
146            project_id (str): ID of the Project
147            dataset_id (str): ID of the Dataset
148            task_id (str): ID of the task
149            force_live (bool): If True, it will fetch logs from CloudWatch,
150                even if the execution is already completed
151
152        """
153
154        resp = get_task_logs.sync(
155            project_id=project_id,
156            dataset_id=dataset_id,
157            task_id=task_id,
158            force_live=force_live,
159            client=self._api_client
160        )
161
162        return '\n'.join(e.message for e in resp.events)

Service for interacting with the Execution endpoints

def run_analysis( self, project_id: str, request: cirro_api_client.v1.models.RunAnalysisRequest) -> cirro_api_client.v1.models.CreateResponse:
17    def run_analysis(self, project_id: str, request: RunAnalysisRequest) -> CreateResponse:
18        """
19        Launch an analysis job running a process on a set of inputs
20
21        Args:
22            project_id (str): ID of the Project
23            request (cirro_api_client.v1.models.RunAnalysisRequest):
24
25        Returns:
26            The ID of the created dataset
27
28        ```python
29        from cirro_api_client.v1.models import RunAnalysisRequest, RunAnalysisRequestParams
30        from cirro.cirro_client import CirroApi
31
32        # Example:
33        # Run the "process-nf-core-rnaseq-3_8" process using input data
34        # from a dataset with the id "source-dataset-id"
35
36        # Optional analysis parameters
37        params = RunAnalysisRequestParams.from_dict({
38            "param_a": "val_a",
39            "param_b": "val_b"
40        })
41
42        cirro = CirroApi()
43        request = RunAnalysisRequest(
44            name="Name of the newly created dataset",
45            description="Longer description of newly created dataset",
46            process_id="process-nf-core-rnaseq-3_8",
47            source_dataset_ids=["source-dataset-id"],
48            params=params
49        )
50        cirro.execution.run_analysis("project-id", request)
51        ```
52        """
53
54        form_spec = get_process_parameters.sync(
55            process_id=request.process_id,
56            client=self._api_client
57        )
58
59        ParameterSpecification(
60            form_spec
61        ).validate_params(
62            request.params.to_dict() if request.params else {}
63        )
64
65        return run_analysis.sync(
66            project_id=project_id,
67            body=request,
68            client=self._api_client
69        )

Launch an analysis job running a process on a set of inputs

Arguments:
Returns:

The ID of the created dataset

from cirro_api_client.v1.models import RunAnalysisRequest, RunAnalysisRequestParams
from cirro.cirro_client import CirroApi

# Example:
# Run the "process-nf-core-rnaseq-3_8" process using input data
# from a dataset with the id "source-dataset-id"

# Optional analysis parameters
params = RunAnalysisRequestParams.from_dict({
    "param_a": "val_a",
    "param_b": "val_b"
})

cirro = CirroApi()
request = RunAnalysisRequest(
    name="Name of the newly created dataset",
    description="Longer description of newly created dataset",
    process_id="process-nf-core-rnaseq-3_8",
    source_dataset_ids=["source-dataset-id"],
    params=params
)
cirro.execution.run_analysis("project-id", request)
def stop_analysis(self, project_id: str, dataset_id: str):
71    def stop_analysis(self, project_id: str, dataset_id: str):
72        """
73        Terminates all jobs related to a running analysis
74
75        Args:
76            project_id (str): ID of the Project
77            dataset_id (str): ID of the Dataset
78        """
79
80        return stop_analysis.sync(
81            project_id=project_id,
82            dataset_id=dataset_id,
83            client=self._api_client
84        )

Terminates all jobs related to a running analysis

Arguments:
  • project_id (str): ID of the Project
  • dataset_id (str): ID of the Dataset
def get_project_summary( self, project_id: str) -> Dict[str, List[cirro_api_client.v1.models.Task]]:
 86    def get_project_summary(self, project_id: str) -> Dict[str, List[Task]]:
 87        """
 88        Gets an overview of the executions currently running in the project, by job queue
 89
 90        Args:
 91            project_id (str): ID of the Project
 92
 93        Returns:
 94            `cirro_api_client.v1.models.GetProjectSummaryResponse200`
 95        """
 96
 97        return get_project_summary.sync(
 98            project_id=project_id,
 99            client=self._api_client
100        ).additional_properties

Gets an overview of the executions currently running in the project, by job queue

Arguments:
  • project_id (str): ID of the Project
Returns:

cirro_api_client.v1.models.GetProjectSummaryResponse200

def get_execution_logs(self, project_id: str, dataset_id: str, force_live=False) -> str:
102    def get_execution_logs(self, project_id: str, dataset_id: str, force_live=False) -> str:
103        """
104        Gets live logs from main execution task
105
106        Args:
107            project_id (str): ID of the Project
108            dataset_id (str): ID of the Dataset
109            force_live (bool): If True, it will fetch logs from CloudWatch,
110                even if the execution is already completed
111
112        """
113
114        resp = get_execution_logs.sync(
115            project_id=project_id,
116            dataset_id=dataset_id,
117            force_live=force_live,
118            client=self._api_client
119        )
120
121        return '\n'.join(e.message for e in resp.events)

Gets live logs from main execution task

Arguments:
  • project_id (str): ID of the Project
  • dataset_id (str): ID of the Dataset
  • force_live (bool): If True, it will fetch logs from CloudWatch, even if the execution is already completed
def get_tasks_for_execution( self, project_id: str, dataset_id: str, force_live=False) -> Optional[List[cirro_api_client.v1.models.Task]]:
123    def get_tasks_for_execution(self, project_id: str, dataset_id: str, force_live=False) -> Optional[List[Task]]:
124        """
125        Gets the tasks submitted by the workflow execution
126
127        Args:
128            project_id (str): ID of the Project
129            dataset_id (str): ID of the Dataset
130            force_live (bool): If True, it will try to get the list of jobs
131                from the executor (i.e., AWS Batch), rather than the workflow report
132        """
133
134        return get_tasks_for_execution.sync(
135            project_id=project_id,
136            dataset_id=dataset_id,
137            force_live=force_live,
138            client=self._api_client
139        )

Gets the tasks submitted by the workflow execution

Arguments:
  • project_id (str): ID of the Project
  • dataset_id (str): ID of the Dataset
  • force_live (bool): If True, it will try to get the list of jobs from the executor (i.e., AWS Batch), rather than the workflow report
def get_task_logs( self, project_id: str, dataset_id: str, task_id: str, force_live=False) -> str:
141    def get_task_logs(self, project_id: str, dataset_id: str, task_id: str, force_live=False) -> str:
142        """
143        Gets the log output from an individual task
144
145        Args:
146            project_id (str): ID of the Project
147            dataset_id (str): ID of the Dataset
148            task_id (str): ID of the task
149            force_live (bool): If True, it will fetch logs from CloudWatch,
150                even if the execution is already completed
151
152        """
153
154        resp = get_task_logs.sync(
155            project_id=project_id,
156            dataset_id=dataset_id,
157            task_id=task_id,
158            force_live=force_live,
159            client=self._api_client
160        )
161
162        return '\n'.join(e.message for e in resp.events)

Gets the log output from an individual task

Arguments:
  • project_id (str): ID of the Project
  • dataset_id (str): ID of the Dataset
  • task_id (str): ID of the task
  • force_live (bool): If True, it will fetch logs from CloudWatch, even if the execution is already completed
class ComputeEnvironmentService(cirro.services.base.BaseService):
10class ComputeEnvironmentService(BaseService):
11    """
12    Service for interacting with the Compute Environment endpoints
13    """
14
15    def list_environments_for_project(self, project_id: str) -> List[ComputeEnvironmentConfiguration]:
16        """
17        List of custom compute environments for a project (i.e., an agent)
18
19        Args:
20            project_id (str): Project ID
21
22        Returns:
23            List of compute environments that are available for the project
24        """
25        return get_compute_environments.sync(project_id=project_id,
26                                             client=self._api_client)

Service for interacting with the Compute Environment endpoints

def list_environments_for_project( self, project_id: str) -> List[cirro_api_client.v1.models.ComputeEnvironmentConfiguration]:
15    def list_environments_for_project(self, project_id: str) -> List[ComputeEnvironmentConfiguration]:
16        """
17        List of custom compute environments for a project (i.e., an agent)
18
19        Args:
20            project_id (str): Project ID
21
22        Returns:
23            List of compute environments that are available for the project
24        """
25        return get_compute_environments.sync(project_id=project_id,
26                                             client=self._api_client)

List of custom compute environments for a project (i.e., an agent)

Arguments:
  • project_id (str): Project ID
Returns:

List of compute environments that are available for the project

class FileService(cirro.services.base.BaseService):
 22class FileService(BaseService):
 23    """
 24    Service for interacting with files
 25    """
 26    checksum_method: str
 27    transfer_retries: int
 28    _get_token_lock = threading.Lock()
 29    _read_token_cache: Dict[str, AWSCredentials] = {}
 30
 31    def __init__(self, api_client, checksum_method, transfer_retries):
 32        """
 33        Instantiates the file service class
 34        """
 35        self._api_client = api_client
 36        self.checksum_method = checksum_method
 37        self.transfer_retries = transfer_retries
 38
 39    def get_access_credentials(self, access_context: FileAccessContext) -> AWSCredentials:
 40        """
 41        Retrieves credentials to access files
 42
 43        Args:
 44            access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
 45        """
 46        access_request = access_context.file_access_request
 47
 48        if access_request.access_type == ProjectAccessType.PROJECT_DOWNLOAD:
 49            return self._get_project_read_credentials(access_context)
 50
 51        else:
 52            return generate_project_file_access_token.sync(
 53                client=self._api_client,
 54                project_id=access_context.project_id,
 55                body=access_context.file_access_request
 56            )
 57
 58    def _get_project_read_credentials(self, access_context: FileAccessContext):
 59        """
 60        Retrieves credentials to read project data, this can be cached
 61        """
 62        access_request = access_context.file_access_request
 63        project_id = access_context.project_id
 64        with self._get_token_lock:
 65            cached_token = self._read_token_cache.get(project_id)
 66
 67            if not cached_token or datetime.now(tz=timezone.utc) > cached_token.expiration:
 68                new_token = generate_project_file_access_token.sync(
 69                    client=self._api_client,
 70                    project_id=project_id,
 71                    body=access_request
 72                )
 73
 74                self._read_token_cache[project_id] = new_token
 75
 76        return self._read_token_cache[project_id]
 77
 78    def get_aws_s3_client(self, access_context: FileAccessContext) -> BaseClient:
 79        """
 80        Gets the underlying AWS S3 client to perform operations on files
 81
 82        This is seeded with refreshable credentials from the access_context parameter
 83
 84        This may be used to perform advanced operations, such as CopyObject, S3 Select, etc.
 85        """
 86        s3_client = self._generate_s3_client(access_context)
 87        return s3_client.get_aws_client()
 88
 89    def get_file(self, file: File) -> bytes:
 90        """
 91        Gets the contents of a file
 92
 93        Args:
 94            file (cirro.models.file.File):
 95
 96        Returns:
 97            The raw bytes of the file
 98        """
 99        return self.get_file_from_path(file.access_context, file.relative_path)
100
101    def get_file_from_path(self, access_context: FileAccessContext, file_path: str) -> bytes:
102        """
103        Gets the contents of a file by providing the path, used internally
104
105        Args:
106            access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
107            file_path (str): Relative path to file within dataset
108
109        Returns:
110            The raw bytes of the file
111        """
112        s3_client = self._generate_s3_client(access_context)
113
114        full_path = f'{access_context.prefix}/{file_path}'.lstrip('/')
115
116        return s3_client.get_file(access_context.bucket, full_path)
117
118    def create_file(self, access_context: FileAccessContext, key: str,
119                    contents: str, content_type: str) -> None:
120        """
121        Creates a file at the specified path
122
123        Args:
124            access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
125            key (str): Key of object to create
126            contents (str): Content of object
127            content_type (str):
128        """
129        s3_client = self._generate_s3_client(access_context)
130
131        s3_client.create_object(
132            key=key,
133            contents=contents,
134            content_type=content_type,
135            bucket=access_context.bucket
136        )
137
138    def upload_files(self,
139                     access_context: FileAccessContext,
140                     directory: PathLike,
141                     files: List[PathLike],
142                     file_path_map: Dict[PathLike, str]) -> None:
143        """
144        Uploads a list of files from the specified directory
145
146        Args:
147            access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
148            directory (str|Path): Path to directory
149            files (typing.List[str|Path]): List of paths to files within the directory
150                must be the same type as directory.
151            file_path_map (typing.Dict[str|Path, str]): Optional mapping of file paths to upload
152             from source path to destination path, used to "re-write" paths within the dataset.
153        """
154        s3_client = self._generate_s3_client(access_context)
155
156        upload_directory(
157            directory=directory,
158            files=files,
159            file_path_map=file_path_map,
160            s3_client=s3_client,
161            bucket=access_context.bucket,
162            prefix=access_context.prefix,
163            max_retries=self.transfer_retries
164        )
165
166    def download_files(self, access_context: FileAccessContext, directory: str, files: List[str]) -> List[Path]:
167        """
168        Download a list of files to the specified directory
169
170        Args:
171            access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
172            directory (str): download location
173            files (List[str]): relative path of files to download
174        Returns:
175            List of paths to downloaded files
176        """
177        s3_client = self._generate_s3_client(access_context)
178
179        return download_directory(
180            directory,
181            files,
182            s3_client,
183            access_context.bucket,
184            access_context.prefix
185        )
186
187    def is_valid_file(self, file: File, local_file: Path) -> bool:
188        """
189        Validates the checksum of a file against a local file
190        See ``validate_file`` method for details.
191
192        Args:
193            file (File): Cirro file to validate
194            local_file (PathLike): Local file path to compare against
195
196        Returns:
197            bool: True if file integrity matches, False otherwise
198
199        Raises:
200            RuntimeWarning: If the remote checksum is not available or not supported
201        """
202        try:
203            self.validate_file(file, local_file)
204            return True
205        except ValueError:
206            return False
207
208    def validate_file(self, file: File, local_file: PathLike):
209        """
210        Validates the checksum of a file against a local file
211        This is used to ensure file integrity after download or upload
212
213        Checksums might not be available if the file was uploaded without checksum support
214
215        https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html
216        Args:
217            file (File): Cirro file to validate
218            local_file (PathLike): Local file path to compare against
219
220        Raises:
221            ValueError: If checksums do not match
222            RuntimeWarning: If the remote checksum is not available or not supported
223        """
224        local_file = Path(local_file).expanduser() if isinstance(local_file, str) else local_file.expanduser()
225
226        stats = self.get_file_stats(file)
227
228        remote_checksum_key = next((prop for prop in stats.keys()
229                                    if 'Checksum' in prop and prop != 'ChecksumType'), None)
230
231        if 'ChecksumType' in stats and stats['ChecksumType'] != 'FULL_OBJECT':
232            raise RuntimeWarning(f"Only 'FULL_OBJECT' checksums are supported, not {stats['ChecksumType']}")
233
234        if remote_checksum_key is None:
235            raise RuntimeWarning(f"File {file.relative_path} does not have a checksum available for validation.")
236
237        remote_checksum = stats[remote_checksum_key]
238        remote_checksum_name = remote_checksum_key.replace('Checksum', '')
239        logger.debug(f"Checksum for file {file.relative_path} is {remote_checksum} using {remote_checksum_name}")
240
241        local_checksum = get_checksum(local_file, remote_checksum_name)
242        logger.debug(f"Local checksum for file {local_file} is {local_checksum} using {remote_checksum_name}")
243
244        if local_checksum != remote_checksum:
245            raise ValueError(f"Checksum mismatch for file {file.relative_path}: "
246                             f"local {local_checksum}, remote {remote_checksum}")
247
248    def get_file_stats(self, file: File) -> dict:
249        """
250        Gets the file stats for a file, such as size, checksum, etc.
251        Equivalent to the `head_object` operation in S3
252        """
253        s3_client = self._generate_s3_client(file.access_context)
254
255        full_path = f'{file.access_context.prefix}/{file.relative_path}'
256
257        stats = s3_client.get_file_stats(
258            bucket=file.access_context.bucket,
259            key=full_path
260        )
261        logger.debug(f"File stats for file {file.relative_path} is {stats}")
262        return stats
263
264    def _generate_s3_client(self, access_context: FileAccessContext):
265        """
266        Generates the Cirro-S3 client to perform operations on files
267        """
268        return S3Client(
269            partial(self.get_access_credentials, access_context),
270            self.checksum_method
271        )

Service for interacting with files

FileService(api_client, checksum_method, transfer_retries)
31    def __init__(self, api_client, checksum_method, transfer_retries):
32        """
33        Instantiates the file service class
34        """
35        self._api_client = api_client
36        self.checksum_method = checksum_method
37        self.transfer_retries = transfer_retries

Instantiates the file service class

checksum_method: str
transfer_retries: int
def get_access_credentials( self, access_context: cirro.models.file.FileAccessContext) -> cirro_api_client.v1.models.AWSCredentials:
39    def get_access_credentials(self, access_context: FileAccessContext) -> AWSCredentials:
40        """
41        Retrieves credentials to access files
42
43        Args:
44            access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
45        """
46        access_request = access_context.file_access_request
47
48        if access_request.access_type == ProjectAccessType.PROJECT_DOWNLOAD:
49            return self._get_project_read_credentials(access_context)
50
51        else:
52            return generate_project_file_access_token.sync(
53                client=self._api_client,
54                project_id=access_context.project_id,
55                body=access_context.file_access_request
56            )

Retrieves credentials to access files

Arguments:
def get_aws_s3_client( self, access_context: cirro.models.file.FileAccessContext) -> botocore.client.BaseClient:
78    def get_aws_s3_client(self, access_context: FileAccessContext) -> BaseClient:
79        """
80        Gets the underlying AWS S3 client to perform operations on files
81
82        This is seeded with refreshable credentials from the access_context parameter
83
84        This may be used to perform advanced operations, such as CopyObject, S3 Select, etc.
85        """
86        s3_client = self._generate_s3_client(access_context)
87        return s3_client.get_aws_client()

Gets the underlying AWS S3 client to perform operations on files

This is seeded with refreshable credentials from the access_context parameter

This may be used to perform advanced operations, such as CopyObject, S3 Select, etc.

def get_file(self, file: cirro.models.file.File) -> bytes:
89    def get_file(self, file: File) -> bytes:
90        """
91        Gets the contents of a file
92
93        Args:
94            file (cirro.models.file.File):
95
96        Returns:
97            The raw bytes of the file
98        """
99        return self.get_file_from_path(file.access_context, file.relative_path)

Gets the contents of a file

Arguments:
Returns:

The raw bytes of the file

def get_file_from_path( self, access_context: cirro.models.file.FileAccessContext, file_path: str) -> bytes:
101    def get_file_from_path(self, access_context: FileAccessContext, file_path: str) -> bytes:
102        """
103        Gets the contents of a file by providing the path, used internally
104
105        Args:
106            access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
107            file_path (str): Relative path to file within dataset
108
109        Returns:
110            The raw bytes of the file
111        """
112        s3_client = self._generate_s3_client(access_context)
113
114        full_path = f'{access_context.prefix}/{file_path}'.lstrip('/')
115
116        return s3_client.get_file(access_context.bucket, full_path)

Gets the contents of a file by providing the path, used internally

Arguments:
Returns:

The raw bytes of the file

def create_file( self, access_context: cirro.models.file.FileAccessContext, key: str, contents: str, content_type: str) -> None:
118    def create_file(self, access_context: FileAccessContext, key: str,
119                    contents: str, content_type: str) -> None:
120        """
121        Creates a file at the specified path
122
123        Args:
124            access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
125            key (str): Key of object to create
126            contents (str): Content of object
127            content_type (str):
128        """
129        s3_client = self._generate_s3_client(access_context)
130
131        s3_client.create_object(
132            key=key,
133            contents=contents,
134            content_type=content_type,
135            bucket=access_context.bucket
136        )

Creates a file at the specified path

Arguments:
  • access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
  • key (str): Key of object to create
  • contents (str): Content of object
  • content_type (str):
def upload_files( self, access_context: cirro.models.file.FileAccessContext, directory: ~PathLike, files: List[~PathLike], file_path_map: Dict[~PathLike, str]) -> None:
138    def upload_files(self,
139                     access_context: FileAccessContext,
140                     directory: PathLike,
141                     files: List[PathLike],
142                     file_path_map: Dict[PathLike, str]) -> None:
143        """
144        Uploads a list of files from the specified directory
145
146        Args:
147            access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
148            directory (str|Path): Path to directory
149            files (typing.List[str|Path]): List of paths to files within the directory
150                must be the same type as directory.
151            file_path_map (typing.Dict[str|Path, str]): Optional mapping of file paths to upload
152             from source path to destination path, used to "re-write" paths within the dataset.
153        """
154        s3_client = self._generate_s3_client(access_context)
155
156        upload_directory(
157            directory=directory,
158            files=files,
159            file_path_map=file_path_map,
160            s3_client=s3_client,
161            bucket=access_context.bucket,
162            prefix=access_context.prefix,
163            max_retries=self.transfer_retries
164        )

Uploads a list of files from the specified directory

Arguments:
  • access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
  • directory (str|Path): Path to directory
  • files (typing.List[str|Path]): List of paths to files within the directory must be the same type as directory.
  • file_path_map (typing.Dict[str|Path, str]): Optional mapping of file paths to upload from source path to destination path, used to "re-write" paths within the dataset.
def download_files( self, access_context: cirro.models.file.FileAccessContext, directory: str, files: List[str]) -> List[pathlib.Path]:
166    def download_files(self, access_context: FileAccessContext, directory: str, files: List[str]) -> List[Path]:
167        """
168        Download a list of files to the specified directory
169
170        Args:
171            access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
172            directory (str): download location
173            files (List[str]): relative path of files to download
174        Returns:
175            List of paths to downloaded files
176        """
177        s3_client = self._generate_s3_client(access_context)
178
179        return download_directory(
180            directory,
181            files,
182            s3_client,
183            access_context.bucket,
184            access_context.prefix
185        )

Download a list of files to the specified directory

Arguments:
  • access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
  • directory (str): download location
  • files (List[str]): relative path of files to download
Returns:

List of paths to downloaded files

def is_valid_file(self, file: cirro.models.file.File, local_file: pathlib.Path) -> bool:
187    def is_valid_file(self, file: File, local_file: Path) -> bool:
188        """
189        Validates the checksum of a file against a local file
190        See ``validate_file`` method for details.
191
192        Args:
193            file (File): Cirro file to validate
194            local_file (PathLike): Local file path to compare against
195
196        Returns:
197            bool: True if file integrity matches, False otherwise
198
199        Raises:
200            RuntimeWarning: If the remote checksum is not available or not supported
201        """
202        try:
203            self.validate_file(file, local_file)
204            return True
205        except ValueError:
206            return False

Validates the checksum of a file against a local file See validate_file method for details.

Arguments:
  • file (File): Cirro file to validate
  • local_file (PathLike): Local file path to compare against
Returns:

bool: True if file integrity matches, False otherwise

Raises:
  • RuntimeWarning: If the remote checksum is not available or not supported
def validate_file(self, file: cirro.models.file.File, local_file: ~PathLike):
208    def validate_file(self, file: File, local_file: PathLike):
209        """
210        Validates the checksum of a file against a local file
211        This is used to ensure file integrity after download or upload
212
213        Checksums might not be available if the file was uploaded without checksum support
214
215        https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html
216        Args:
217            file (File): Cirro file to validate
218            local_file (PathLike): Local file path to compare against
219
220        Raises:
221            ValueError: If checksums do not match
222            RuntimeWarning: If the remote checksum is not available or not supported
223        """
224        local_file = Path(local_file).expanduser() if isinstance(local_file, str) else local_file.expanduser()
225
226        stats = self.get_file_stats(file)
227
228        remote_checksum_key = next((prop for prop in stats.keys()
229                                    if 'Checksum' in prop and prop != 'ChecksumType'), None)
230
231        if 'ChecksumType' in stats and stats['ChecksumType'] != 'FULL_OBJECT':
232            raise RuntimeWarning(f"Only 'FULL_OBJECT' checksums are supported, not {stats['ChecksumType']}")
233
234        if remote_checksum_key is None:
235            raise RuntimeWarning(f"File {file.relative_path} does not have a checksum available for validation.")
236
237        remote_checksum = stats[remote_checksum_key]
238        remote_checksum_name = remote_checksum_key.replace('Checksum', '')
239        logger.debug(f"Checksum for file {file.relative_path} is {remote_checksum} using {remote_checksum_name}")
240
241        local_checksum = get_checksum(local_file, remote_checksum_name)
242        logger.debug(f"Local checksum for file {local_file} is {local_checksum} using {remote_checksum_name}")
243
244        if local_checksum != remote_checksum:
245            raise ValueError(f"Checksum mismatch for file {file.relative_path}: "
246                             f"local {local_checksum}, remote {remote_checksum}")

Validates the checksum of a file against a local file This is used to ensure file integrity after download or upload

Checksums might not be available if the file was uploaded without checksum support

https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html

Arguments:
  • file (File): Cirro file to validate
  • local_file (PathLike): Local file path to compare against
Raises:
  • ValueError: If checksums do not match
  • RuntimeWarning: If the remote checksum is not available or not supported
def get_file_stats(self, file: cirro.models.file.File) -> dict:
248    def get_file_stats(self, file: File) -> dict:
249        """
250        Gets the file stats for a file, such as size, checksum, etc.
251        Equivalent to the `head_object` operation in S3
252        """
253        s3_client = self._generate_s3_client(file.access_context)
254
255        full_path = f'{file.access_context.prefix}/{file.relative_path}'
256
257        stats = s3_client.get_file_stats(
258            bucket=file.access_context.bucket,
259            key=full_path
260        )
261        logger.debug(f"File stats for file {file.relative_path} is {stats}")
262        return stats

Gets the file stats for a file, such as size, checksum, etc. Equivalent to the head_object operation in S3

class MetadataService(cirro.services.base.BaseService):
11class MetadataService(BaseService):
12    """
13    Service for interacting with the Metadata endpoints
14    """
15    def get_project_samples(self, project_id: str, max_items: int = 10000) -> List[Sample]:
16        """
17        Retrieves a list of samples associated with a project along with their metadata
18
19        Args:
20            project_id (str): ID of the Project
21            max_items (int): Maximum number of records to get (default 10,000)
22        """
23        return get_all_records(
24            records_getter=lambda page_args: get_project_samples.sync(project_id=project_id,
25                                                                      client=self._api_client,
26                                                                      next_token=page_args.next_token,
27                                                                      limit=page_args.limit),
28            max_items=max_items
29        )
30
31    def get_project_schema(self, project_id: str) -> FormSchema:
32        """
33        Get project metadata schema
34
35        Args:
36            project_id (str): ID of the Project
37        """
38        return get_project_schema.sync(project_id=project_id, client=self._api_client)
39
40    def update_project_schema(self, project_id: str, schema: FormSchema):
41        """
42        Update project metadata schema
43
44        Args:
45            project_id (str): ID of the Project
46            schema (cirro_api_client.v1.models.FormSchema): Metadata schema
47        """
48        update_project_schema.sync_detailed(project_id=project_id, body=schema, client=self._api_client)
49
50    def update_sample(self, project_id: str, sample_id: str, sample: SampleRequest) -> Sample:
51        """
52        Updates metadata information for sample
53
54        Args:
55            project_id (str): ID of the Project
56            sample_id (str): ID of the sample
57            sample (cirro_api_client.v1.models.SampleRequest): Metadata information for the sample
58        """
59        return update_sample.sync(
60            project_id=project_id,
61            sample_id=sample_id,
62            body=sample,
63            client=self._api_client
64        )

Service for interacting with the Metadata endpoints

def get_project_samples( self, project_id: str, max_items: int = 10000) -> List[cirro_api_client.v1.models.Sample]:
15    def get_project_samples(self, project_id: str, max_items: int = 10000) -> List[Sample]:
16        """
17        Retrieves a list of samples associated with a project along with their metadata
18
19        Args:
20            project_id (str): ID of the Project
21            max_items (int): Maximum number of records to get (default 10,000)
22        """
23        return get_all_records(
24            records_getter=lambda page_args: get_project_samples.sync(project_id=project_id,
25                                                                      client=self._api_client,
26                                                                      next_token=page_args.next_token,
27                                                                      limit=page_args.limit),
28            max_items=max_items
29        )

Retrieves a list of samples associated with a project along with their metadata

Arguments:
  • project_id (str): ID of the Project
  • max_items (int): Maximum number of records to get (default 10,000)
def get_project_schema( self, project_id: str) -> cirro_api_client.v1.models.FormSchema:
31    def get_project_schema(self, project_id: str) -> FormSchema:
32        """
33        Get project metadata schema
34
35        Args:
36            project_id (str): ID of the Project
37        """
38        return get_project_schema.sync(project_id=project_id, client=self._api_client)

Get project metadata schema

Arguments:
  • project_id (str): ID of the Project
def update_project_schema( self, project_id: str, schema: cirro_api_client.v1.models.FormSchema):
40    def update_project_schema(self, project_id: str, schema: FormSchema):
41        """
42        Update project metadata schema
43
44        Args:
45            project_id (str): ID of the Project
46            schema (cirro_api_client.v1.models.FormSchema): Metadata schema
47        """
48        update_project_schema.sync_detailed(project_id=project_id, body=schema, client=self._api_client)

Update project metadata schema

Arguments:
def update_sample( self, project_id: str, sample_id: str, sample: cirro_api_client.v1.models.SampleRequest) -> cirro_api_client.v1.models.Sample:
50    def update_sample(self, project_id: str, sample_id: str, sample: SampleRequest) -> Sample:
51        """
52        Updates metadata information for sample
53
54        Args:
55            project_id (str): ID of the Project
56            sample_id (str): ID of the sample
57            sample (cirro_api_client.v1.models.SampleRequest): Metadata information for the sample
58        """
59        return update_sample.sync(
60            project_id=project_id,
61            sample_id=sample_id,
62            body=sample,
63            client=self._api_client
64        )

Updates metadata information for sample

Arguments:
class MetricsService(cirro.services.base.BaseService):
10class MetricsService(BaseService):
11    """
12    Service for interacting with the Metrics endpoints
13    """
14    def get_for_project(self, project_id: str) -> ProjectMetrics:
15        """
16        Retrieves the cost and storage metrics for a project.
17
18        Args:
19            project_id (str): ID of the Project
20        """
21        return get_project_metrics.sync(
22            project_id=project_id,
23            client=self._api_client
24        )
25
26    def get_all_metrics(self) -> List[ProjectMetrics]:
27        """
28        Retrieves all available metrics
29        """
30        return get_all_metrics.sync(client=self._api_client)

Service for interacting with the Metrics endpoints

def get_for_project( self, project_id: str) -> cirro_api_client.v1.models.ProjectMetrics:
14    def get_for_project(self, project_id: str) -> ProjectMetrics:
15        """
16        Retrieves the cost and storage metrics for a project.
17
18        Args:
19            project_id (str): ID of the Project
20        """
21        return get_project_metrics.sync(
22            project_id=project_id,
23            client=self._api_client
24        )

Retrieves the cost and storage metrics for a project.

Arguments:
  • project_id (str): ID of the Project
def get_all_metrics(self) -> List[cirro_api_client.v1.models.ProjectMetrics]:
26    def get_all_metrics(self) -> List[ProjectMetrics]:
27        """
28        Retrieves all available metrics
29        """
30        return get_all_metrics.sync(client=self._api_client)

Retrieves all available metrics

class ProcessService(cirro.services.base.BaseService):
 15class ProcessService(BaseService):
 16    """
 17    Service for interacting with the Process endpoints
 18    """
 19    def list(self, process_type: Executor = None, include_archived=False) -> List[Process]:
 20        """
 21        Retrieves a list of available processes
 22
 23        Args:
 24            process_type (`cirro_api_client.v1.models.Executor`): Optional process type (INGEST, CROMWELL, or NEXTFLOW)
 25            include_archived: Whether to include archived processes in the list (default False)
 26        """
 27        processes = get_processes.sync(client=self._api_client, include_archived=include_archived)
 28        return [p for p in processes if not process_type or process_type == p.executor]
 29
 30    def get(self, process_id: str) -> ProcessDetail:
 31        """
 32        Retrieves detailed information on a process
 33
 34        Args:
 35            process_id (str): Process ID
 36        """
 37        return get_process.sync(process_id=process_id, client=self._api_client)
 38
 39    def archive(self, process_id: str):
 40        """
 41        Removes a custom process from the list of available processes.
 42
 43        Error will be raised if the requested process does not exist. No value
 44        is returned, and no error raised if process exists and request is satisfied.
 45
 46        Args:
 47            process_id (str): Process ID
 48        """
 49        archive_custom_process.sync_detailed(process_id=process_id, client=self._api_client)
 50
 51    def find_by_name(self, name: str) -> Optional[ProcessDetail]:
 52        """
 53        Get a process by its display name
 54
 55        Args:
 56            name (str): Process name
 57        """
 58        matched_process = next((p for p in self.list() if p.name == name), None)
 59        if not matched_process:
 60            return None
 61
 62        return self.get(matched_process.id)
 63
 64    def get_parameter_spec(self, process_id: str) -> ParameterSpecification:
 65        """
 66        Gets a specification used to describe the parameters used in the process
 67
 68        Args:
 69            process_id (str): Process ID
 70        """
 71        form_spec = get_process_parameters.sync(process_id=process_id, client=self._api_client)
 72        return ParameterSpecification(form_spec)
 73
 74    def check_dataset_files(self, files: List[str], process_id: str, directory: str):
 75        """
 76        Checks if the file mapping rules for a process are met by the list of files
 77
 78        Error will be raised if the file mapping rules for the process are not met.
 79        No value is returned and no error is raised if the rules are satisfied.
 80
 81        Args:
 82            process_id (str): ID for the process containing the file mapping rules
 83            directory: path to directory containing files
 84            files (List[str]): File names to check
 85        """
 86        # Parse sample sheet file if present
 87        sample_sheet = None
 88        sample_sheet_file = Path(directory, 'samplesheet.csv')
 89        if sample_sheet_file.exists():
 90            sample_sheet = sample_sheet_file.read_text()
 91
 92        request = ValidateFileRequirementsRequest(
 93            file_names=files,
 94            sample_sheet=sample_sheet
 95        )
 96        requirements = validate_file_requirements.sync(process_id=process_id, body=request, client=self._api_client)
 97
 98        # These will be sample sheet errors or no files errors
 99        if error_msg := requirements.error_msg:
100            raise ValueError(error_msg)
101
102        errors = [
103            f'{entry.description}. {entry.error_msg}. We accept any of the following naming conventions: \n\t- ' +
104            '\n\t- '.join([
105                e.example_name
106                for e in entry.allowed_patterns
107            ])
108            for entry in requirements.allowed_data_types
109            if entry.error_msg is not None
110        ]
111
112        files_provided = ', '.join(files)
113
114        if len(errors) != 0:
115            raise ValueError(f"The files you have provided are: {files_provided} \n\n"
116                             "They do not meet the dataset requirements. "
117                             "The required file types are: \n" +
118                             "\n".join(errors))
119
120    def create_custom_process(self, process: CustomProcessInput) -> CreateResponse:
121        """
122        Creates a custom process (pipeline or data type) in the system.
123
124        Whether it is a pipeline or data type is determined by the executor field.
125        For pipelines, you must specify the pipeline code and configuration repository.
126
127        This process will be available to all users in the system if `is_tenant_wide` is set to True.
128        If `is_tenant_wide` is set to False, the process will only be available the projects
129         specified in `linked_projects_ids`. Making it available tenant wide requires tenant admin privileges.
130
131        If the repository is private, you must complete the authorization flow on the UI.
132
133        See https://docs.cirro.bio/pipelines/importing-custom-pipelines/ for more info.
134
135        Args:
136            process (Process): Process to create
137
138        Example:
139        ```python
140        from cirro_api_client.v1.models import CustomProcessInput, Executor, \
141         PipelineCode, FileMappingRule, FileNamePattern, RepositoryType, CustomPipelineSettings
142        from cirro.cirro_client import CirroApi
143
144        cirro = CirroApi()
145
146        # New pipeline
147        new_pipeline = CustomProcessInput(
148            id="my_pipeline",
149            name="My Pipeline",
150            description="This is a test pipeline",
151            executor=Executor.CROMWELL,
152            category="DNA Sequencing",
153            child_process_ids=[],
154            parent_process_ids=["rnaseq"],
155            documentation_url="https://example.com/docs",
156            pipeline_code=PipelineCode(
157                repository_path="CirroBio/test-pipeline",
158                version="v1.0.0",
159                entry_point="main.nf",
160                repository_type=RepositoryType.GITHUB_PUBLIC
161            ),
162            linked_project_ids=[],
163            is_tenant_wide=True,
164            allow_multiple_sources=True,
165            uses_sample_sheet=True,
166            # This can be the same or different from the pipeline_code
167            custom_settings=CustomPipelineSettings(
168                repository="CirroBio/test-pipeline",
169                branch="v1.0.0",
170                repository_type=RepositoryType.GITHUB_PUBLIC,
171            ),
172            file_mapping_rules=[
173                FileMappingRule(
174                    description="Filtered Feature Matrix",
175                    file_name_patterns=[
176                        FileNamePattern(
177                            example_name="filtered_feature_bc_matrix.h5",
178                            description="Matrix",
179                            sample_matching_pattern="(?P<sampleName>[\\S ]*)\\.jpg"
180                        )
181                    ]
182                )
183            ]
184        )
185
186        cirro.processes.create_custom_process(new_pipeline)
187
188        # New data type
189        new_data_type = CustomProcessInput(
190            id="images_jpg",
191            name="JPG Images",
192            description="Used for generic JPG images",
193            executor=Executor.INGEST,
194            child_process_ids=[],
195            parent_process_ids=[],
196            documentation_url="https://example.com/docs",
197            linked_project_ids=["project_id_1", "project_id_2"],
198            file_mapping_rules=[
199                FileMappingRule(
200                    description="Images",
201                    min_=1,
202                    file_name_patterns=[
203                        FileNamePattern(
204                            example_name="image.jpg",
205                            description="JPG Image",
206                            sample_matching_pattern="(?P<sampleName>[\\S ]*)/outs/image\\.jpg"
207                        )
208                    ]
209                )
210            ]
211        )
212        cirro.processes.create_custom_process(new_data_type)
213        ```
214        """
215        return create_custom_process.sync(client=self._api_client, body=process)
216
217    def update_custom_process(self, process_id: str, process: CustomProcessInput):
218        """
219        Updates the custom process (pipeline or data type) in the system.
220
221        Please run `sync_custom_process` after updating to ensure the pipeline configuration is up to date.
222
223        Args:
224            process_id (str): ID of the process to update
225            process (CustomProcessInput): Process to update
226        """
227        update_custom_process.sync_detailed(client=self._api_client,
228                                            process_id=process_id,
229                                            body=process)
230
231    def sync_custom_process(self, process_id: str) -> CustomPipelineSettings:
232        """
233        Syncs a custom pipeline in the system.
234
235        This updates the pipeline configurations in Cirro (form configuration,
236         input mapping, preprocess etc.) to match what is in the configured repository.
237        See https://docs.cirro.bio/pipelines/configuring-pipeline/ for more details.
238
239        Args:
240            process_id (str): ID of the process to sync
241        """
242        return sync_custom_process.sync(client=self._api_client, process_id=process_id)

Service for interacting with the Process endpoints

def list( self, process_type: cirro_api_client.v1.models.Executor = None, include_archived=False) -> List[cirro_api_client.v1.models.Process]:
19    def list(self, process_type: Executor = None, include_archived=False) -> List[Process]:
20        """
21        Retrieves a list of available processes
22
23        Args:
24            process_type (`cirro_api_client.v1.models.Executor`): Optional process type (INGEST, CROMWELL, or NEXTFLOW)
25            include_archived: Whether to include archived processes in the list (default False)
26        """
27        processes = get_processes.sync(client=self._api_client, include_archived=include_archived)
28        return [p for p in processes if not process_type or process_type == p.executor]

Retrieves a list of available processes

Arguments:
  • process_type (cirro_api_client.v1.models.Executor): Optional process type (INGEST, CROMWELL, or NEXTFLOW)
  • include_archived: Whether to include archived processes in the list (default False)
def get( self, process_id: str) -> cirro_api_client.v1.models.ProcessDetail:
30    def get(self, process_id: str) -> ProcessDetail:
31        """
32        Retrieves detailed information on a process
33
34        Args:
35            process_id (str): Process ID
36        """
37        return get_process.sync(process_id=process_id, client=self._api_client)

Retrieves detailed information on a process

Arguments:
  • process_id (str): Process ID
def archive(self, process_id: str):
39    def archive(self, process_id: str):
40        """
41        Removes a custom process from the list of available processes.
42
43        Error will be raised if the requested process does not exist. No value
44        is returned, and no error raised if process exists and request is satisfied.
45
46        Args:
47            process_id (str): Process ID
48        """
49        archive_custom_process.sync_detailed(process_id=process_id, client=self._api_client)

Removes a custom process from the list of available processes.

Error will be raised if the requested process does not exist. No value is returned, and no error raised if process exists and request is satisfied.

Arguments:
  • process_id (str): Process ID
def find_by_name( self, name: str) -> Optional[cirro_api_client.v1.models.ProcessDetail]:
51    def find_by_name(self, name: str) -> Optional[ProcessDetail]:
52        """
53        Get a process by its display name
54
55        Args:
56            name (str): Process name
57        """
58        matched_process = next((p for p in self.list() if p.name == name), None)
59        if not matched_process:
60            return None
61
62        return self.get(matched_process.id)

Get a process by its display name

Arguments:
  • name (str): Process name
def get_parameter_spec( self, process_id: str) -> cirro.models.form_specification.ParameterSpecification:
64    def get_parameter_spec(self, process_id: str) -> ParameterSpecification:
65        """
66        Gets a specification used to describe the parameters used in the process
67
68        Args:
69            process_id (str): Process ID
70        """
71        form_spec = get_process_parameters.sync(process_id=process_id, client=self._api_client)
72        return ParameterSpecification(form_spec)

Gets a specification used to describe the parameters used in the process

Arguments:
  • process_id (str): Process ID
def check_dataset_files(self, files: List[str], process_id: str, directory: str):
 74    def check_dataset_files(self, files: List[str], process_id: str, directory: str):
 75        """
 76        Checks if the file mapping rules for a process are met by the list of files
 77
 78        Error will be raised if the file mapping rules for the process are not met.
 79        No value is returned and no error is raised if the rules are satisfied.
 80
 81        Args:
 82            process_id (str): ID for the process containing the file mapping rules
 83            directory: path to directory containing files
 84            files (List[str]): File names to check
 85        """
 86        # Parse sample sheet file if present
 87        sample_sheet = None
 88        sample_sheet_file = Path(directory, 'samplesheet.csv')
 89        if sample_sheet_file.exists():
 90            sample_sheet = sample_sheet_file.read_text()
 91
 92        request = ValidateFileRequirementsRequest(
 93            file_names=files,
 94            sample_sheet=sample_sheet
 95        )
 96        requirements = validate_file_requirements.sync(process_id=process_id, body=request, client=self._api_client)
 97
 98        # These will be sample sheet errors or no files errors
 99        if error_msg := requirements.error_msg:
100            raise ValueError(error_msg)
101
102        errors = [
103            f'{entry.description}. {entry.error_msg}. We accept any of the following naming conventions: \n\t- ' +
104            '\n\t- '.join([
105                e.example_name
106                for e in entry.allowed_patterns
107            ])
108            for entry in requirements.allowed_data_types
109            if entry.error_msg is not None
110        ]
111
112        files_provided = ', '.join(files)
113
114        if len(errors) != 0:
115            raise ValueError(f"The files you have provided are: {files_provided} \n\n"
116                             "They do not meet the dataset requirements. "
117                             "The required file types are: \n" +
118                             "\n".join(errors))

Checks if the file mapping rules for a process are met by the list of files

Error will be raised if the file mapping rules for the process are not met. No value is returned and no error is raised if the rules are satisfied.

Arguments:
  • process_id (str): ID for the process containing the file mapping rules
  • directory: path to directory containing files
  • files (List[str]): File names to check
def create_custom_process( self, process: cirro_api_client.v1.models.CustomProcessInput) -> cirro_api_client.v1.models.CreateResponse:
120    def create_custom_process(self, process: CustomProcessInput) -> CreateResponse:
121        """
122        Creates a custom process (pipeline or data type) in the system.
123
124        Whether it is a pipeline or data type is determined by the executor field.
125        For pipelines, you must specify the pipeline code and configuration repository.
126
127        This process will be available to all users in the system if `is_tenant_wide` is set to True.
128        If `is_tenant_wide` is set to False, the process will only be available the projects
129         specified in `linked_projects_ids`. Making it available tenant wide requires tenant admin privileges.
130
131        If the repository is private, you must complete the authorization flow on the UI.
132
133        See https://docs.cirro.bio/pipelines/importing-custom-pipelines/ for more info.
134
135        Args:
136            process (Process): Process to create
137
138        Example:
139        ```python
140        from cirro_api_client.v1.models import CustomProcessInput, Executor, \
141         PipelineCode, FileMappingRule, FileNamePattern, RepositoryType, CustomPipelineSettings
142        from cirro.cirro_client import CirroApi
143
144        cirro = CirroApi()
145
146        # New pipeline
147        new_pipeline = CustomProcessInput(
148            id="my_pipeline",
149            name="My Pipeline",
150            description="This is a test pipeline",
151            executor=Executor.CROMWELL,
152            category="DNA Sequencing",
153            child_process_ids=[],
154            parent_process_ids=["rnaseq"],
155            documentation_url="https://example.com/docs",
156            pipeline_code=PipelineCode(
157                repository_path="CirroBio/test-pipeline",
158                version="v1.0.0",
159                entry_point="main.nf",
160                repository_type=RepositoryType.GITHUB_PUBLIC
161            ),
162            linked_project_ids=[],
163            is_tenant_wide=True,
164            allow_multiple_sources=True,
165            uses_sample_sheet=True,
166            # This can be the same or different from the pipeline_code
167            custom_settings=CustomPipelineSettings(
168                repository="CirroBio/test-pipeline",
169                branch="v1.0.0",
170                repository_type=RepositoryType.GITHUB_PUBLIC,
171            ),
172            file_mapping_rules=[
173                FileMappingRule(
174                    description="Filtered Feature Matrix",
175                    file_name_patterns=[
176                        FileNamePattern(
177                            example_name="filtered_feature_bc_matrix.h5",
178                            description="Matrix",
179                            sample_matching_pattern="(?P<sampleName>[\\S ]*)\\.jpg"
180                        )
181                    ]
182                )
183            ]
184        )
185
186        cirro.processes.create_custom_process(new_pipeline)
187
188        # New data type
189        new_data_type = CustomProcessInput(
190            id="images_jpg",
191            name="JPG Images",
192            description="Used for generic JPG images",
193            executor=Executor.INGEST,
194            child_process_ids=[],
195            parent_process_ids=[],
196            documentation_url="https://example.com/docs",
197            linked_project_ids=["project_id_1", "project_id_2"],
198            file_mapping_rules=[
199                FileMappingRule(
200                    description="Images",
201                    min_=1,
202                    file_name_patterns=[
203                        FileNamePattern(
204                            example_name="image.jpg",
205                            description="JPG Image",
206                            sample_matching_pattern="(?P<sampleName>[\\S ]*)/outs/image\\.jpg"
207                        )
208                    ]
209                )
210            ]
211        )
212        cirro.processes.create_custom_process(new_data_type)
213        ```
214        """
215        return create_custom_process.sync(client=self._api_client, body=process)

Creates a custom process (pipeline or data type) in the system.

Whether it is a pipeline or data type is determined by the executor field. For pipelines, you must specify the pipeline code and configuration repository.

This process will be available to all users in the system if is_tenant_wide is set to True. If is_tenant_wide is set to False, the process will only be available the projects specified in linked_projects_ids. Making it available tenant wide requires tenant admin privileges.

If the repository is private, you must complete the authorization flow on the UI.

See https://docs.cirro.bio/pipelines/importing-custom-pipelines/ for more info.

Arguments:
  • process (Process): Process to create

Example:

from cirro_api_client.v1.models import CustomProcessInput, Executor,          PipelineCode, FileMappingRule, FileNamePattern, RepositoryType, CustomPipelineSettings
from cirro.cirro_client import CirroApi

cirro = CirroApi()

# New pipeline
new_pipeline = CustomProcessInput(
    id="my_pipeline",
    name="My Pipeline",
    description="This is a test pipeline",
    executor=Executor.CROMWELL,
    category="DNA Sequencing",
    child_process_ids=[],
    parent_process_ids=["rnaseq"],
    documentation_url="https://example.com/docs",
    pipeline_code=PipelineCode(
        repository_path="CirroBio/test-pipeline",
        version="v1.0.0",
        entry_point="main.nf",
        repository_type=RepositoryType.GITHUB_PUBLIC
    ),
    linked_project_ids=[],
    is_tenant_wide=True,
    allow_multiple_sources=True,
    uses_sample_sheet=True,
    # This can be the same or different from the pipeline_code
    custom_settings=CustomPipelineSettings(
        repository="CirroBio/test-pipeline",
        branch="v1.0.0",
        repository_type=RepositoryType.GITHUB_PUBLIC,
    ),
    file_mapping_rules=[
        FileMappingRule(
            description="Filtered Feature Matrix",
            file_name_patterns=[
                FileNamePattern(
                    example_name="filtered_feature_bc_matrix.h5",
                    description="Matrix",
                    sample_matching_pattern="(?P<sampleName>[\S ]*)\.jpg"
                )
            ]
        )
    ]
)

cirro.processes.create_custom_process(new_pipeline)

# New data type
new_data_type = CustomProcessInput(
    id="images_jpg",
    name="JPG Images",
    description="Used for generic JPG images",
    executor=Executor.INGEST,
    child_process_ids=[],
    parent_process_ids=[],
    documentation_url="https://example.com/docs",
    linked_project_ids=["project_id_1", "project_id_2"],
    file_mapping_rules=[
        FileMappingRule(
            description="Images",
            min_=1,
            file_name_patterns=[
                FileNamePattern(
                    example_name="image.jpg",
                    description="JPG Image",
                    sample_matching_pattern="(?P<sampleName>[\S ]*)/outs/image\.jpg"
                )
            ]
        )
    ]
)
cirro.processes.create_custom_process(new_data_type)
def update_custom_process( self, process_id: str, process: cirro_api_client.v1.models.CustomProcessInput):
217    def update_custom_process(self, process_id: str, process: CustomProcessInput):
218        """
219        Updates the custom process (pipeline or data type) in the system.
220
221        Please run `sync_custom_process` after updating to ensure the pipeline configuration is up to date.
222
223        Args:
224            process_id (str): ID of the process to update
225            process (CustomProcessInput): Process to update
226        """
227        update_custom_process.sync_detailed(client=self._api_client,
228                                            process_id=process_id,
229                                            body=process)

Updates the custom process (pipeline or data type) in the system.

Please run sync_custom_process after updating to ensure the pipeline configuration is up to date.

Arguments:
  • process_id (str): ID of the process to update
  • process (CustomProcessInput): Process to update
def sync_custom_process( self, process_id: str) -> cirro_api_client.v1.models.CustomPipelineSettings:
231    def sync_custom_process(self, process_id: str) -> CustomPipelineSettings:
232        """
233        Syncs a custom pipeline in the system.
234
235        This updates the pipeline configurations in Cirro (form configuration,
236         input mapping, preprocess etc.) to match what is in the configured repository.
237        See https://docs.cirro.bio/pipelines/configuring-pipeline/ for more details.
238
239        Args:
240            process_id (str): ID of the process to sync
241        """
242        return sync_custom_process.sync(client=self._api_client, process_id=process_id)

Syncs a custom pipeline in the system.

This updates the pipeline configurations in Cirro (form configuration, input mapping, preprocess etc.) to match what is in the configured repository. See https://docs.cirro.bio/pipelines/configuring-pipeline/ for more details.

Arguments:
  • process_id (str): ID of the process to sync
class ProjectService(cirro.services.base.BaseService):
 12class ProjectService(BaseService):
 13    """
 14    Service for interacting with the Project endpoints
 15    """
 16    def list(self):
 17        """
 18        Retrieve a list of projects
 19        """
 20        return get_projects.sync(client=self._api_client)
 21
 22    def get(self, project_id: str) -> ProjectDetail:
 23        """
 24        Get detailed project information
 25
 26        Args:
 27            project_id (str): Project ID
 28        """
 29        return get_project.sync(project_id=project_id, client=self._api_client)
 30
 31    def create(self, request: ProjectInput) -> CreateResponse:
 32        """
 33        Create a project
 34
 35        Args:
 36            request (`cirro_api_client.v1.models.ProjectInput`): Detailed information about the project to create
 37
 38        Examples:
 39        ```python
 40        from cirro_api_client.v1.models import ProjectInput, ProjectSettings, Contact, BudgetPeriod, CloudAccount, \
 41            CloudAccountType
 42        from cirro.cirro_client import CirroApi
 43
 44        cirro = CirroApi()
 45
 46        # Bring your own account projects are hosted by the user
 47        # You must provide the account details and VPC information
 48        # Please view the ProjectSettings model for more information on the attributes required
 49        byoa_project = ProjectInput(
 50            name="New Project Name",
 51            description="Description of new project",
 52            billing_account_id="billing-account-id",
 53            settings=ProjectSettings(
 54                budget_period=BudgetPeriod.MONTHLY,
 55                budget_amount=1000,
 56                max_spot_vcpu=300,
 57                service_connections=[],
 58                retention_policy_days=7,
 59                vpc_id="vpc-000000000000",
 60                batch_subnets=["subnet-000000000000", "subnet-000000000001"],
 61                sagemaker_subnets=["subnet-000000000000", "subnet-000000000001"],
 62                kms_arn="arn:aws:kms:us-west-2:000000000000:key/00000000-0000-0000-0000-000000000000"
 63            ),
 64            account=CloudAccount(
 65                account_id="<AWS_ACCOUNT_ID>",
 66                region_name="us-west-2",
 67                account_name="Cirro Lab Project",
 68                account_type=CloudAccountType.BYOA
 69            ),
 70            contacts=[
 71                Contact(
 72                    name="Contact Name",
 73                    organization="Contact Organization",
 74                    email="contact@email.com",
 75                    phone="987-654-3210"
 76                )
 77            ]
 78        )
 79
 80        # Hosted projects are managed by Cirro
 81        hosted_project = ProjectInput(
 82            name="New Project Name",
 83            description="Description of new project",
 84            billing_account_id="billing-account-id",
 85            settings=ProjectSettings(
 86                budget_period=BudgetPeriod.MONTHLY,
 87                budget_amount=1000,
 88                max_spot_vcpu=300,
 89                service_connections=[],
 90                retention_policy_days=7
 91            ),
 92            contacts=[
 93                Contact(
 94                    name="Contact Name",
 95                    organization="Contact Organization",
 96                    email="contact@email.com",
 97                    phone="987-654-3210"
 98                )
 99            ]
100        )
101
102        cirro.projects.create(byoa_project)
103        ```
104        """
105        return create_project.sync(client=self._api_client, body=request)
106
107    def update(self, project_id: str, request: ProjectInput) -> ProjectDetail:
108        """
109        Updates a project
110
111        Args:
112            project_id (str): ID of project to update
113            request (`cirro_api_client.v1.models.ProjectInput`): New details for the project
114        """
115        return update_project.sync(project_id=project_id, body=request, client=self._api_client)
116
117    def update_tags(self, project_id: str, tags: List[Tag]):
118        """
119        Sets tags on a project
120
121        Args:
122            project_id (str): Project ID
123            tags (List[Tag]): List of tags to apply
124        """
125        update_project_tags.sync_detailed(project_id=project_id, body=tags, client=self._api_client)
126
127    def get_users(self, project_id: str) -> Optional[List[ProjectUser]]:
128        """
129        Gets users who have access to the project
130
131        Args:
132            project_id (str): Project ID
133        """
134        return get_project_users.sync(project_id=project_id, client=self._api_client)
135
136    def set_user_role(self, project_id: str, username: str, role: ProjectRole, suppress_notification=False):
137        """
138        Sets a user's role within a project.
139
140        Set to `ProjectRole.NONE` if removing the user from a project.
141
142        Args:
143            project_id (str): Project ID
144            username (str): Username
145            role (`cirro_api_client.v1.models.ProjectRole`): New role to apply
146            suppress_notification (bool): Suppress email notification
147        """
148        request_body = SetUserProjectRoleRequest(
149            username=username,
150            role=role,
151            suppress_notification=suppress_notification
152        )
153        set_user_project_role.sync_detailed(project_id=project_id, body=request_body, client=self._api_client)
154
155    def archive(self, project_id: str):
156        """
157        Sets the project status to archived (hidden from active projects)
158
159        Args:
160            project_id (str): Project ID
161        """
162        archive_project.sync_detailed(project_id=project_id, client=self._api_client)
163
164    def unarchive(self, project_id: str):
165        """
166        Sets the project status to active
167
168        Args:
169            project_id (str): Project ID
170        """
171        unarchive_project.sync_detailed(project_id=project_id, client=self._api_client)

Service for interacting with the Project endpoints

def list(self):
16    def list(self):
17        """
18        Retrieve a list of projects
19        """
20        return get_projects.sync(client=self._api_client)

Retrieve a list of projects

def get( self, project_id: str) -> cirro_api_client.v1.models.ProjectDetail:
22    def get(self, project_id: str) -> ProjectDetail:
23        """
24        Get detailed project information
25
26        Args:
27            project_id (str): Project ID
28        """
29        return get_project.sync(project_id=project_id, client=self._api_client)

Get detailed project information

Arguments:
  • project_id (str): Project ID
 31    def create(self, request: ProjectInput) -> CreateResponse:
 32        """
 33        Create a project
 34
 35        Args:
 36            request (`cirro_api_client.v1.models.ProjectInput`): Detailed information about the project to create
 37
 38        Examples:
 39        ```python
 40        from cirro_api_client.v1.models import ProjectInput, ProjectSettings, Contact, BudgetPeriod, CloudAccount, \
 41            CloudAccountType
 42        from cirro.cirro_client import CirroApi
 43
 44        cirro = CirroApi()
 45
 46        # Bring your own account projects are hosted by the user
 47        # You must provide the account details and VPC information
 48        # Please view the ProjectSettings model for more information on the attributes required
 49        byoa_project = ProjectInput(
 50            name="New Project Name",
 51            description="Description of new project",
 52            billing_account_id="billing-account-id",
 53            settings=ProjectSettings(
 54                budget_period=BudgetPeriod.MONTHLY,
 55                budget_amount=1000,
 56                max_spot_vcpu=300,
 57                service_connections=[],
 58                retention_policy_days=7,
 59                vpc_id="vpc-000000000000",
 60                batch_subnets=["subnet-000000000000", "subnet-000000000001"],
 61                sagemaker_subnets=["subnet-000000000000", "subnet-000000000001"],
 62                kms_arn="arn:aws:kms:us-west-2:000000000000:key/00000000-0000-0000-0000-000000000000"
 63            ),
 64            account=CloudAccount(
 65                account_id="<AWS_ACCOUNT_ID>",
 66                region_name="us-west-2",
 67                account_name="Cirro Lab Project",
 68                account_type=CloudAccountType.BYOA
 69            ),
 70            contacts=[
 71                Contact(
 72                    name="Contact Name",
 73                    organization="Contact Organization",
 74                    email="contact@email.com",
 75                    phone="987-654-3210"
 76                )
 77            ]
 78        )
 79
 80        # Hosted projects are managed by Cirro
 81        hosted_project = ProjectInput(
 82            name="New Project Name",
 83            description="Description of new project",
 84            billing_account_id="billing-account-id",
 85            settings=ProjectSettings(
 86                budget_period=BudgetPeriod.MONTHLY,
 87                budget_amount=1000,
 88                max_spot_vcpu=300,
 89                service_connections=[],
 90                retention_policy_days=7
 91            ),
 92            contacts=[
 93                Contact(
 94                    name="Contact Name",
 95                    organization="Contact Organization",
 96                    email="contact@email.com",
 97                    phone="987-654-3210"
 98                )
 99            ]
100        )
101
102        cirro.projects.create(byoa_project)
103        ```
104        """
105        return create_project.sync(client=self._api_client, body=request)

Create a project

Arguments:

Examples:

from cirro_api_client.v1.models import ProjectInput, ProjectSettings, Contact, BudgetPeriod, CloudAccount,             CloudAccountType
from cirro.cirro_client import CirroApi

cirro = CirroApi()

# Bring your own account projects are hosted by the user
# You must provide the account details and VPC information
# Please view the ProjectSettings model for more information on the attributes required
byoa_project = ProjectInput(
    name="New Project Name",
    description="Description of new project",
    billing_account_id="billing-account-id",
    settings=ProjectSettings(
        budget_period=BudgetPeriod.MONTHLY,
        budget_amount=1000,
        max_spot_vcpu=300,
        service_connections=[],
        retention_policy_days=7,
        vpc_id="vpc-000000000000",
        batch_subnets=["subnet-000000000000", "subnet-000000000001"],
        sagemaker_subnets=["subnet-000000000000", "subnet-000000000001"],
        kms_arn="arn:aws:kms:us-west-2:000000000000:key/00000000-0000-0000-0000-000000000000"
    ),
    account=CloudAccount(
        account_id="<AWS_ACCOUNT_ID>",
        region_name="us-west-2",
        account_name="Cirro Lab Project",
        account_type=CloudAccountType.BYOA
    ),
    contacts=[
        Contact(
            name="Contact Name",
            organization="Contact Organization",
            email="contact@email.com",
            phone="987-654-3210"
        )
    ]
)

# Hosted projects are managed by Cirro
hosted_project = ProjectInput(
    name="New Project Name",
    description="Description of new project",
    billing_account_id="billing-account-id",
    settings=ProjectSettings(
        budget_period=BudgetPeriod.MONTHLY,
        budget_amount=1000,
        max_spot_vcpu=300,
        service_connections=[],
        retention_policy_days=7
    ),
    contacts=[
        Contact(
            name="Contact Name",
            organization="Contact Organization",
            email="contact@email.com",
            phone="987-654-3210"
        )
    ]
)

cirro.projects.create(byoa_project)
def update( self, project_id: str, request: cirro_api_client.v1.models.ProjectInput) -> cirro_api_client.v1.models.ProjectDetail:
107    def update(self, project_id: str, request: ProjectInput) -> ProjectDetail:
108        """
109        Updates a project
110
111        Args:
112            project_id (str): ID of project to update
113            request (`cirro_api_client.v1.models.ProjectInput`): New details for the project
114        """
115        return update_project.sync(project_id=project_id, body=request, client=self._api_client)

Updates a project

Arguments:
def update_tags( self, project_id: str, tags: List[cirro_api_client.v1.models.Tag]):
117    def update_tags(self, project_id: str, tags: List[Tag]):
118        """
119        Sets tags on a project
120
121        Args:
122            project_id (str): Project ID
123            tags (List[Tag]): List of tags to apply
124        """
125        update_project_tags.sync_detailed(project_id=project_id, body=tags, client=self._api_client)

Sets tags on a project

Arguments:
  • project_id (str): Project ID
  • tags (List[Tag]): List of tags to apply
def get_users( self, project_id: str) -> Optional[List[cirro_api_client.v1.models.ProjectUser]]:
127    def get_users(self, project_id: str) -> Optional[List[ProjectUser]]:
128        """
129        Gets users who have access to the project
130
131        Args:
132            project_id (str): Project ID
133        """
134        return get_project_users.sync(project_id=project_id, client=self._api_client)

Gets users who have access to the project

Arguments:
  • project_id (str): Project ID
def set_user_role( self, project_id: str, username: str, role: cirro_api_client.v1.models.ProjectRole, suppress_notification=False):
136    def set_user_role(self, project_id: str, username: str, role: ProjectRole, suppress_notification=False):
137        """
138        Sets a user's role within a project.
139
140        Set to `ProjectRole.NONE` if removing the user from a project.
141
142        Args:
143            project_id (str): Project ID
144            username (str): Username
145            role (`cirro_api_client.v1.models.ProjectRole`): New role to apply
146            suppress_notification (bool): Suppress email notification
147        """
148        request_body = SetUserProjectRoleRequest(
149            username=username,
150            role=role,
151            suppress_notification=suppress_notification
152        )
153        set_user_project_role.sync_detailed(project_id=project_id, body=request_body, client=self._api_client)

Sets a user's role within a project.

Set to ProjectRole.NONE if removing the user from a project.

Arguments:
def archive(self, project_id: str):
155    def archive(self, project_id: str):
156        """
157        Sets the project status to archived (hidden from active projects)
158
159        Args:
160            project_id (str): Project ID
161        """
162        archive_project.sync_detailed(project_id=project_id, client=self._api_client)

Sets the project status to archived (hidden from active projects)

Arguments:
  • project_id (str): Project ID
def unarchive(self, project_id: str):
164    def unarchive(self, project_id: str):
165        """
166        Sets the project status to active
167
168        Args:
169            project_id (str): Project ID
170        """
171        unarchive_project.sync_detailed(project_id=project_id, client=self._api_client)

Sets the project status to active

Arguments:
  • project_id (str): Project ID
class ReferenceService(cirro.services.file.FileEnabledService):
14class ReferenceService(FileEnabledService):
15    """
16    Service for interacting with the References endpoints
17    """
18    def get_types(self) -> Optional[List[ReferenceType]]:
19        """
20        List available reference types
21        """
22        return get_reference_types.sync(client=self._api_client)
23
24    def get_type(self, reference_type_name: str) -> Optional[ReferenceType]:
25        """
26        Get a specific reference type by name
27
28        Args:
29            reference_type_name (str): Name of the reference type (e.g. "Reference Genome (FASTA)")
30        """
31        types = self.get_types()
32        return next((t for t in types if t.name == reference_type_name), None)
33
34    def get_for_project(self, project_id: str) -> Optional[List[Reference]]:
35        """
36        List available references for a given project
37        """
38        return get_references_for_project.sync(project_id=project_id, client=self._api_client)
39
40    def upload_reference(self,
41                         name: str,
42                         reference_files: list[PathLike],
43                         project_id: str,
44                         ref_type: ReferenceType):
45        """
46        Upload a reference to a project
47
48        Args:
49            name (str): Name of the reference (e.g. "GRCh38")
50                no spaces are allowed in the name
51            reference_files (list[PathLike]): Path to the reference file to upload
52            project_id (str): ID of the project to upload the reference to
53            ref_type (ReferenceType): Type of the reference
54
55        ```python
56        from pathlib import Path
57
58        from cirro import CirroApi
59
60        cirro = CirroApi()
61
62        crispr_library = cirro.references.get_type("CRISPR sgRNA Library")
63        files = [
64            Path("~/crispr_library.csv").expanduser()
65        ]
66
67        cirro.references.upload_reference(
68            name="Library Name",
69            project_id="project-id",
70            ref_type=crispr_library,
71            reference_files=files,
72        )
73        ```
74        """
75        # Validate name
76        if ' ' in name:
77            raise ValueError("Reference name cannot contain spaces")
78
79        access_context = FileAccessContext.upload_reference(
80            project_id=project_id,
81            base_url=f's3://project-{project_id}/resources/data/references',
82        )
83
84        file_path_map = generate_reference_file_path_map(
85            files=reference_files,
86            name=name,
87            ref_type=ref_type
88        )
89
90        self._file_service.upload_files(
91            access_context=access_context,
92            directory=Path('~').expanduser(),  # Full path expected in reference_files
93            files=reference_files,
94            file_path_map=file_path_map
95        )
96
97        refresh_project_references.sync_detailed(project_id=project_id,
98                                                 client=self._api_client)

Service for interacting with the References endpoints

def get_types( self) -> Optional[List[cirro_api_client.v1.models.ReferenceType]]:
18    def get_types(self) -> Optional[List[ReferenceType]]:
19        """
20        List available reference types
21        """
22        return get_reference_types.sync(client=self._api_client)

List available reference types

def get_type( self, reference_type_name: str) -> Optional[cirro_api_client.v1.models.ReferenceType]:
24    def get_type(self, reference_type_name: str) -> Optional[ReferenceType]:
25        """
26        Get a specific reference type by name
27
28        Args:
29            reference_type_name (str): Name of the reference type (e.g. "Reference Genome (FASTA)")
30        """
31        types = self.get_types()
32        return next((t for t in types if t.name == reference_type_name), None)

Get a specific reference type by name

Arguments:
  • reference_type_name (str): Name of the reference type (e.g. "Reference Genome (FASTA)")
def get_for_project( self, project_id: str) -> Optional[List[cirro_api_client.v1.models.Reference]]:
34    def get_for_project(self, project_id: str) -> Optional[List[Reference]]:
35        """
36        List available references for a given project
37        """
38        return get_references_for_project.sync(project_id=project_id, client=self._api_client)

List available references for a given project

def upload_reference( self, name: str, reference_files: list[~PathLike], project_id: str, ref_type: cirro_api_client.v1.models.ReferenceType):
40    def upload_reference(self,
41                         name: str,
42                         reference_files: list[PathLike],
43                         project_id: str,
44                         ref_type: ReferenceType):
45        """
46        Upload a reference to a project
47
48        Args:
49            name (str): Name of the reference (e.g. "GRCh38")
50                no spaces are allowed in the name
51            reference_files (list[PathLike]): Path to the reference file to upload
52            project_id (str): ID of the project to upload the reference to
53            ref_type (ReferenceType): Type of the reference
54
55        ```python
56        from pathlib import Path
57
58        from cirro import CirroApi
59
60        cirro = CirroApi()
61
62        crispr_library = cirro.references.get_type("CRISPR sgRNA Library")
63        files = [
64            Path("~/crispr_library.csv").expanduser()
65        ]
66
67        cirro.references.upload_reference(
68            name="Library Name",
69            project_id="project-id",
70            ref_type=crispr_library,
71            reference_files=files,
72        )
73        ```
74        """
75        # Validate name
76        if ' ' in name:
77            raise ValueError("Reference name cannot contain spaces")
78
79        access_context = FileAccessContext.upload_reference(
80            project_id=project_id,
81            base_url=f's3://project-{project_id}/resources/data/references',
82        )
83
84        file_path_map = generate_reference_file_path_map(
85            files=reference_files,
86            name=name,
87            ref_type=ref_type
88        )
89
90        self._file_service.upload_files(
91            access_context=access_context,
92            directory=Path('~').expanduser(),  # Full path expected in reference_files
93            files=reference_files,
94            file_path_map=file_path_map
95        )
96
97        refresh_project_references.sync_detailed(project_id=project_id,
98                                                 client=self._api_client)

Upload a reference to a project

Arguments:
  • name (str): Name of the reference (e.g. "GRCh38") no spaces are allowed in the name
  • reference_files (list[PathLike]): Path to the reference file to upload
  • project_id (str): ID of the project to upload the reference to
  • ref_type (ReferenceType): Type of the reference
from pathlib import Path

from cirro import CirroApi

cirro = CirroApi()

crispr_library = cirro.references.get_type("CRISPR sgRNA Library")
files = [
    Path("~/crispr_library.csv").expanduser()
]

cirro.references.upload_reference(
    name="Library Name",
    project_id="project-id",
    ref_type=crispr_library,
    reference_files=files,
)
class ShareService(cirro.services.base.BaseService):
11class ShareService(BaseService):
12    """
13    Service for interacting with the Share endpoints
14    """
15
16    def list(self, project_id: str, share_type: ShareType = None) -> List[Share]:
17        """
18        Retrieve a list of shares for a given project
19        optionally filtered by share type
20
21        Args:
22            project_id (str): ID of the Project
23            share_type (ShareType): Type of share to filter by
24
25        ```python
26
27        from cirro_api_client.v1.models import ShareType
28        from cirro.cirro_client import CirroApi
29
30        cirro = CirroApi()
31
32        # List shares that we've published
33        cirro.shares.list(project_id="project-id", share_type=ShareType.PUBLISHER)
34        ```
35        """
36        shares = get_shares.sync(project_id=project_id, client=self._api_client)
37        if share_type:
38            shares = [share for share in shares if share.share_type == share_type]
39        return shares
40
41    def get(self, project_id: str, share_id: str) -> ShareDetail:
42        """
43        Get details of a share
44        """
45        return get_share.sync(project_id=project_id, share_id=share_id, client=self._api_client)
46
47    def create(self, project_id: str, share: ShareInput) -> CreateResponse:
48        """
49        Publish a share for a given project
50        """
51        return create_share.sync(project_id=project_id,
52                                 body=share,
53                                 client=self._api_client)
54
55    def update(self, project_id: str, share_id: str, share: ShareInput):
56        """
57        Publish a share for a given project
58        """
59        update_share.sync_detailed(project_id=project_id,
60                                   share_id=share_id,
61                                   body=share,
62                                   client=self._api_client)
63
64    def delete(self, project_id: str, share_id: str):
65        """
66        Delete a share
67        """
68        delete_share.sync_detailed(project_id=project_id,
69                                   share_id=share_id,
70                                   client=self._api_client)
71
72    def subscribe(self, project_id: str, share_id: str):
73        """
74        Subscribe to a share for a given project
75        """
76        subscribe_share.sync_detailed(project_id=project_id,
77                                      share_id=share_id,
78                                      client=self._api_client)
79
80    def unsubscribe(self, project_id: str, share_id: str):
81        """
82        Unsubscribe from a share for a given project
83        """
84        unsubscribe_share.sync_detailed(project_id=project_id,
85                                        share_id=share_id,
86                                        client=self._api_client)

Service for interacting with the Share endpoints

def list( self, project_id: str, share_type: cirro_api_client.v1.models.ShareType = None) -> List[cirro_api_client.v1.models.Share]:
16    def list(self, project_id: str, share_type: ShareType = None) -> List[Share]:
17        """
18        Retrieve a list of shares for a given project
19        optionally filtered by share type
20
21        Args:
22            project_id (str): ID of the Project
23            share_type (ShareType): Type of share to filter by
24
25        ```python
26
27        from cirro_api_client.v1.models import ShareType
28        from cirro.cirro_client import CirroApi
29
30        cirro = CirroApi()
31
32        # List shares that we've published
33        cirro.shares.list(project_id="project-id", share_type=ShareType.PUBLISHER)
34        ```
35        """
36        shares = get_shares.sync(project_id=project_id, client=self._api_client)
37        if share_type:
38            shares = [share for share in shares if share.share_type == share_type]
39        return shares

Retrieve a list of shares for a given project optionally filtered by share type

Arguments:
  • project_id (str): ID of the Project
  • share_type (ShareType): Type of share to filter by
from cirro_api_client.v1.models import ShareType
from cirro.cirro_client import CirroApi

cirro = CirroApi()

# List shares that we've published
cirro.shares.list(project_id="project-id", share_type=ShareType.PUBLISHER)
def get( self, project_id: str, share_id: str) -> cirro_api_client.v1.models.ShareDetail:
41    def get(self, project_id: str, share_id: str) -> ShareDetail:
42        """
43        Get details of a share
44        """
45        return get_share.sync(project_id=project_id, share_id=share_id, client=self._api_client)

Get details of a share

def create( self, project_id: str, share: cirro_api_client.v1.models.ShareInput) -> cirro_api_client.v1.models.CreateResponse:
47    def create(self, project_id: str, share: ShareInput) -> CreateResponse:
48        """
49        Publish a share for a given project
50        """
51        return create_share.sync(project_id=project_id,
52                                 body=share,
53                                 client=self._api_client)

Publish a share for a given project

def update( self, project_id: str, share_id: str, share: cirro_api_client.v1.models.ShareInput):
55    def update(self, project_id: str, share_id: str, share: ShareInput):
56        """
57        Publish a share for a given project
58        """
59        update_share.sync_detailed(project_id=project_id,
60                                   share_id=share_id,
61                                   body=share,
62                                   client=self._api_client)

Publish a share for a given project

def delete(self, project_id: str, share_id: str):
64    def delete(self, project_id: str, share_id: str):
65        """
66        Delete a share
67        """
68        delete_share.sync_detailed(project_id=project_id,
69                                   share_id=share_id,
70                                   client=self._api_client)

Delete a share

def subscribe(self, project_id: str, share_id: str):
72    def subscribe(self, project_id: str, share_id: str):
73        """
74        Subscribe to a share for a given project
75        """
76        subscribe_share.sync_detailed(project_id=project_id,
77                                      share_id=share_id,
78                                      client=self._api_client)

Subscribe to a share for a given project

def unsubscribe(self, project_id: str, share_id: str):
80    def unsubscribe(self, project_id: str, share_id: str):
81        """
82        Unsubscribe from a share for a given project
83        """
84        unsubscribe_share.sync_detailed(project_id=project_id,
85                                        share_id=share_id,
86                                        client=self._api_client)

Unsubscribe from a share for a given project

class UserService(cirro.services.base.BaseService):
10class UserService(BaseService):
11    def list(self, max_items: int = 10000) -> List[User]:
12        """
13        List users in the system
14        """
15        return get_all_records(
16            records_getter=lambda page_args: list_users.sync(
17                client=self._api_client,
18                next_token=page_args.next_token,
19                limit=page_args.limit
20            ),
21            max_items=max_items
22        )
23
24    def get(self, username: str) -> UserDetail:
25        """
26        Get user details by username, including what projects they are assigned to
27        """
28        return get_user.sync(username=username, client=self._api_client)
29
30    def invite_user(self, name: str, organization: str, email: str):
31        """
32        Invite a user to the system.
33        If the user already exists, it will return a message that the user already exists.
34
35        Args:
36            name (str): Name of the user
37            organization (str): Organization of the user
38            email (str): Email (username) of the user
39        """
40        request = InviteUserRequest(
41            name=name,
42            organization=organization,
43            email=email,
44        )
45        response = invite_user.sync(client=self._api_client, body=request)
46        return response.message

Not to be instantiated directly

def list( self, max_items: int = 10000) -> List[cirro_api_client.v1.models.User]:
11    def list(self, max_items: int = 10000) -> List[User]:
12        """
13        List users in the system
14        """
15        return get_all_records(
16            records_getter=lambda page_args: list_users.sync(
17                client=self._api_client,
18                next_token=page_args.next_token,
19                limit=page_args.limit
20            ),
21            max_items=max_items
22        )

List users in the system

def get(self, username: str) -> cirro_api_client.v1.models.UserDetail:
24    def get(self, username: str) -> UserDetail:
25        """
26        Get user details by username, including what projects they are assigned to
27        """
28        return get_user.sync(username=username, client=self._api_client)

Get user details by username, including what projects they are assigned to

def invite_user(self, name: str, organization: str, email: str):
30    def invite_user(self, name: str, organization: str, email: str):
31        """
32        Invite a user to the system.
33        If the user already exists, it will return a message that the user already exists.
34
35        Args:
36            name (str): Name of the user
37            organization (str): Organization of the user
38            email (str): Email (username) of the user
39        """
40        request = InviteUserRequest(
41            name=name,
42            organization=organization,
43            email=email,
44        )
45        response = invite_user.sync(client=self._api_client, body=request)
46        return response.message

Invite a user to the system. If the user already exists, it will return a message that the user already exists.

Arguments:
  • name (str): Name of the user
  • organization (str): Organization of the user
  • email (str): Email (username) of the user
class WorkspaceService(cirro.services.base.BaseService):
 9class WorkspaceService(BaseService):
10    """
11    Service for interacting with the Workspace endpoints
12    """
13    def list_environments(self) -> list[WorkspaceEnvironment]:
14        """
15        List available workspace environments
16        """
17        return get_workspace_environments.sync(client=self._api_client)
18
19    def list(self, project_id: str) -> list[Workspace]:
20        """
21        Retrieves a list of workspaces that the user has access to
22
23        Args:
24            project_id (str): ID of the Project
25        """
26        return get_workspaces.sync(project_id, client=self._api_client)
27
28    def get(self, project_id: str, workspace_id: str) -> Workspace:
29        """
30        Get details of a workspace
31
32        Args:
33            project_id (str): ID of the Project
34            workspace_id (str): ID of the Workspace
35        """
36        return get_workspace.sync(project_id=project_id, workspace_id=workspace_id, client=self._api_client)
37
38    def create(self, project_id: str, workspace: WorkspaceInput) -> CreateResponse:
39        """
40        Create a new workspace in the given project
41
42        Args:
43            project_id (str): ID of the Project
44            workspace (WorkspaceInput): Workspace object to create
45        """
46        return create_workspace.sync(project_id=project_id, client=self._api_client, body=workspace)
47
48    def delete(self, project_id: str, workspace_id: str) -> None:
49        """
50        Delete a workspace in the given project
51
52        Args:
53            project_id (str): ID of the Project
54            workspace_id (str): ID of the Workspace
55        """
56        delete_workspace.sync_detailed(project_id=project_id, workspace_id=workspace_id, client=self._api_client)
57
58    def start(self, project_id: str, workspace_id: str) -> None:
59        """
60        Start a workspace environment
61
62        Args:
63            project_id (str): ID of the Project
64            workspace_id (str): ID of the Workspace
65        """
66        start_workspace.sync_detailed(project_id=project_id, workspace_id=workspace_id, client=self._api_client)
67
68    def stop(self, project_id: str, workspace_id: str) -> None:
69        """
70        Stop a workspace environment
71
72        Args:
73            project_id (str): ID of the Project
74            workspace_id (str): ID of the Workspace
75        """
76        stop_workspace.sync_detailed(project_id=project_id, workspace_id=workspace_id, client=self._api_client)

Service for interacting with the Workspace endpoints

def list_environments( self) -> list[cirro_api_client.v1.models.WorkspaceEnvironment]:
13    def list_environments(self) -> list[WorkspaceEnvironment]:
14        """
15        List available workspace environments
16        """
17        return get_workspace_environments.sync(client=self._api_client)

List available workspace environments

def list( self, project_id: str) -> list[cirro_api_client.v1.models.Workspace]:
19    def list(self, project_id: str) -> list[Workspace]:
20        """
21        Retrieves a list of workspaces that the user has access to
22
23        Args:
24            project_id (str): ID of the Project
25        """
26        return get_workspaces.sync(project_id, client=self._api_client)

Retrieves a list of workspaces that the user has access to

Arguments:
  • project_id (str): ID of the Project
def get( self, project_id: str, workspace_id: str) -> cirro_api_client.v1.models.Workspace:
28    def get(self, project_id: str, workspace_id: str) -> Workspace:
29        """
30        Get details of a workspace
31
32        Args:
33            project_id (str): ID of the Project
34            workspace_id (str): ID of the Workspace
35        """
36        return get_workspace.sync(project_id=project_id, workspace_id=workspace_id, client=self._api_client)

Get details of a workspace

Arguments:
  • project_id (str): ID of the Project
  • workspace_id (str): ID of the Workspace
def create( self, project_id: str, workspace: cirro_api_client.v1.models.WorkspaceInput) -> cirro_api_client.v1.models.CreateResponse:
38    def create(self, project_id: str, workspace: WorkspaceInput) -> CreateResponse:
39        """
40        Create a new workspace in the given project
41
42        Args:
43            project_id (str): ID of the Project
44            workspace (WorkspaceInput): Workspace object to create
45        """
46        return create_workspace.sync(project_id=project_id, client=self._api_client, body=workspace)

Create a new workspace in the given project

Arguments:
  • project_id (str): ID of the Project
  • workspace (WorkspaceInput): Workspace object to create
def delete(self, project_id: str, workspace_id: str) -> None:
48    def delete(self, project_id: str, workspace_id: str) -> None:
49        """
50        Delete a workspace in the given project
51
52        Args:
53            project_id (str): ID of the Project
54            workspace_id (str): ID of the Workspace
55        """
56        delete_workspace.sync_detailed(project_id=project_id, workspace_id=workspace_id, client=self._api_client)

Delete a workspace in the given project

Arguments:
  • project_id (str): ID of the Project
  • workspace_id (str): ID of the Workspace
def start(self, project_id: str, workspace_id: str) -> None:
58    def start(self, project_id: str, workspace_id: str) -> None:
59        """
60        Start a workspace environment
61
62        Args:
63            project_id (str): ID of the Project
64            workspace_id (str): ID of the Workspace
65        """
66        start_workspace.sync_detailed(project_id=project_id, workspace_id=workspace_id, client=self._api_client)

Start a workspace environment

Arguments:
  • project_id (str): ID of the Project
  • workspace_id (str): ID of the Workspace
def stop(self, project_id: str, workspace_id: str) -> None:
68    def stop(self, project_id: str, workspace_id: str) -> None:
69        """
70        Stop a workspace environment
71
72        Args:
73            project_id (str): ID of the Project
74            workspace_id (str): ID of the Workspace
75        """
76        stop_workspace.sync_detailed(project_id=project_id, workspace_id=workspace_id, client=self._api_client)

Stop a workspace environment

Arguments:
  • project_id (str): ID of the Project
  • workspace_id (str): ID of the Workspace