cirro.services

 1from .billing import BillingService
 2from .compute_environment import ComputeEnvironmentService
 3from .dataset import DatasetService
 4from .execution import ExecutionService
 5from .file import FileService
 6from .metadata import MetadataService
 7from .metrics import MetricsService
 8from .process import ProcessService
 9from .projects import ProjectService
10from .references import ReferenceService
11from .share import ShareService
12from .user import UserService
13
14__all__ = [
15    'BillingService',
16    'DatasetService',
17    'ExecutionService',
18    'ComputeEnvironmentService',
19    'FileService',
20    'MetadataService',
21    'MetricsService',
22    'ProcessService',
23    'ProjectService',
24    'ReferenceService',
25    'ShareService',
26    'UserService'
27]
class BillingService(cirro.services.base.BaseService):
10class BillingService(BaseService):
11    """
12    Service for interacting with the Billing endpoints
13    """
14    def list(self) -> List[BillingAccount]:
15        """
16        Gets a list of billing accounts the current user has access to
17        """
18        return get_billing_accounts.sync(client=self._api_client)
19
20    def update(self, billing_account_id: str, request: BillingAccountRequest):
21        """
22        Updates a billing account
23
24        Args:
25            billing_account_id (str): Billing account ID
26            request (cirro_api_client.v1.models.BillingAccountRequest):
27
28        ```python
29        from cirro_api_client.v1.models import BillingAccountRequest
30        from cirro.cirro_client import CirroApi
31
32        cirro = CirroApi()
33        request = BillingAccountRequest(
34            name="New billing account name",
35            primary_budget_number="new-budget-number",
36            owner="New Owner"
37        )
38        cirro.billing.update("billing-account-id", request)
39        ```
40        """
41        update_billing_account.sync_detailed(
42            billing_account_id=billing_account_id,
43            body=request,
44            client=self._api_client
45        )

Service for interacting with the Billing endpoints

def list(self) -> List[cirro_api_client.v1.models.BillingAccount]:
14    def list(self) -> List[BillingAccount]:
15        """
16        Gets a list of billing accounts the current user has access to
17        """
18        return get_billing_accounts.sync(client=self._api_client)

Gets a list of billing accounts the current user has access to

def update( self, billing_account_id: str, request: cirro_api_client.v1.models.BillingAccountRequest):
20    def update(self, billing_account_id: str, request: BillingAccountRequest):
21        """
22        Updates a billing account
23
24        Args:
25            billing_account_id (str): Billing account ID
26            request (cirro_api_client.v1.models.BillingAccountRequest):
27
28        ```python
29        from cirro_api_client.v1.models import BillingAccountRequest
30        from cirro.cirro_client import CirroApi
31
32        cirro = CirroApi()
33        request = BillingAccountRequest(
34            name="New billing account name",
35            primary_budget_number="new-budget-number",
36            owner="New Owner"
37        )
38        cirro.billing.update("billing-account-id", request)
39        ```
40        """
41        update_billing_account.sync_detailed(
42            billing_account_id=billing_account_id,
43            body=request,
44            client=self._api_client
45        )

Updates a billing account

Arguments:
from cirro_api_client.v1.models import BillingAccountRequest
from cirro.cirro_client import CirroApi

cirro = CirroApi()
request = BillingAccountRequest(
    name="New billing account name",
    primary_budget_number="new-budget-number",
    owner="New Owner"
)
cirro.billing.update("billing-account-id", request)
class DatasetService(cirro.services.file.FileEnabledService):
 22class DatasetService(FileEnabledService):
 23    """
 24    Service for interacting with the Dataset endpoints
 25    """
 26
 27    def list(self, project_id: str, max_items: int = 10000) -> List[Dataset]:
 28        """List datasets
 29
 30        Retrieves a list of datasets for a given project
 31
 32        Args:
 33            project_id (str): ID of the Project
 34            max_items (int): Maximum number of records to get (default 10,000)
 35        """
 36        return get_all_records(
 37            records_getter=lambda page_args: get_datasets.sync(
 38                project_id=project_id,
 39                client=self._api_client,
 40                next_token=page_args.next_token,
 41                limit=page_args.limit
 42            ),
 43            max_items=max_items
 44        )
 45
 46    def list_shared(self, project_id: str, share_id: str, max_items: int = 10000) -> List[Dataset]:
 47        """
 48        Retrieves a list of shared datasets for a given project and share
 49
 50        Args:
 51            project_id (str): ID of the Project
 52            share_id (str): ID of the Share
 53            max_items (int): Maximum number of records to get (default 10,000)
 54
 55        Example:
 56        ```python
 57        from cirro_api_client.v1.models import ShareType
 58        from cirro.cirro_client import CirroApi
 59
 60        cirro = CirroApi()
 61
 62        # List shares that are subscribed to
 63        subscribed_shares = cirro.shares.list(project_id="project-id", share_type=ShareType.SUBSCRIBER)
 64        cirro.datasets.list_shared("project-id", subscribed_shares[0].id)
 65        ```
 66        """
 67        return get_all_records(
 68            records_getter=lambda page_args: get_shared_datasets.sync(
 69                project_id=project_id,
 70                share_id=share_id,
 71                client=self._api_client,
 72                next_token=page_args.next_token,
 73                limit=page_args.limit
 74            ),
 75            max_items=max_items
 76        )
 77
 78    def import_public(self, project_id: str, import_request: ImportDataRequest) -> CreateResponse:
 79        """
 80        Download data from public repositories
 81
 82        Args:
 83            project_id (str): ID of the Project
 84            import_request (cirro_api_client.v1.models.ImportDataRequest):
 85
 86        Returns:
 87            ID of the created dataset
 88
 89        ```python
 90        from cirro_api_client.v1.models import ImportDataRequest, Tag
 91        from cirro.cirro_client import CirroApi
 92
 93        cirro = CirroApi()
 94        request = ImportDataRequest(
 95            name="Imported dataset",
 96            description="Description of the dataset",
 97            public_ids=["SRR123456", "SRR123457"],
 98            tags=[Tag(value="tag1")]
 99        )
100        cirro.datasets.import_public("project-id", request)
101        ```
102        """
103        return import_public_dataset.sync(project_id=project_id, client=self._api_client, body=import_request)
104
105    def create(self, project_id: str, upload_request: UploadDatasetRequest) -> UploadDatasetCreateResponse:
106        """
107        Registers a dataset in Cirro, which can subsequently have files uploaded to it
108
109        Args:
110            project_id (str): ID of the Project
111            upload_request (cirro_api_client.v1.models.UploadDatasetRequest):
112
113        Returns:
114            ID of the created dataset and the path to upload files
115
116        ```python
117        from cirro_api_client.v1.models import UploadDatasetRequest, Tag
118        from cirro.cirro_client import CirroApi
119
120        cirro = CirroApi()
121        request = UploadDatasetRequest(
122            name="Name of new dataset",
123            process_id="paired_dnaseq",
124            expected_files=["read_1.fastq.gz", "read_2.fastq.gz"],
125            description="Description of the dataset",
126            tags=[Tag(value="tag1"), Tag(value="tag2")]
127        )
128        cirro.datasets.create("project-id", request)
129        ```
130        """
131        return upload_dataset.sync(project_id=project_id, client=self._api_client, body=upload_request)
132
133    def get(self, project_id: str, dataset_id: str) -> Optional[DatasetDetail]:
134        """
135        Gets detailed information about a dataset
136
137        Args:
138            project_id (str): ID of the Project
139            dataset_id (str): ID of the Dataset
140
141        Returns:
142            The dataset, if found
143        """
144        return get_dataset.sync(project_id=project_id, dataset_id=dataset_id, client=self._api_client)
145
146    def update(self, project_id: str, dataset_id: str, request: UpdateDatasetRequest) -> DatasetDetail:
147        """
148        Update info on a dataset (name, description, and/or tags)
149
150        Args:
151            project_id (str): ID of the Project
152            dataset_id (str): ID of the Dataset
153            request (cirro_api_client.v1.models.UpdateDatasetRequest):
154
155        Returns:
156            The updated dataset
157
158        ```python
159        from cirro_api_client.v1.models import UpdateDatasetRequest
160        from cirro.cirro_client import CirroApi
161
162        cirro = CirroApi()
163        request = UpdateDatasetRequest(
164            name="Name of new dataset",
165            process_id="paired_dnaseq",
166            description="Description of the dataset"
167        )
168        cirro.datasets.update("project-id", "dataset-id", request)
169        ```
170        """
171        return update_dataset.sync(project_id=project_id, dataset_id=dataset_id, body=request, client=self._api_client)
172
173    def delete(self, project_id: str, dataset_id: str) -> None:
174        """
175        Delete a dataset
176
177        After a dataset has been deleted, the files associated with that
178        dataset are saved according to the project's retention time.
179
180        Args:
181            project_id (str): ID of the Project
182            dataset_id (str): ID of the Dataset
183        """
184        delete_dataset.sync_detailed(project_id=project_id, dataset_id=dataset_id, client=self._api_client)
185
186    def get_assets_listing(self, project_id: str, dataset_id: str, file_limit: int = 100000) -> DatasetAssets:
187        """
188        Gets a listing of files, charts, and other assets available for the dataset
189
190        Args:
191            project_id (str): ID of the Project
192            dataset_id (str): ID of the Dataset
193            file_limit (int): Maximum number of files to get (default 100,000)
194        """
195        dataset = self.get(project_id, dataset_id)
196        if file_limit < 1:
197            raise ValueError("file_limit must be greater than 0")
198        all_files = []
199        file_offset = 0
200        domain = None
201        artifacts = None
202
203        while len(all_files) < file_limit:
204            manifest = get_dataset_manifest.sync(
205                project_id=project_id,
206                dataset_id=dataset_id,
207                file_offset=file_offset,
208                client=self._api_client
209            )
210            all_files.extend(manifest.files)
211            file_offset += len(manifest.files)
212
213            if not artifacts:
214                artifacts = manifest.artifacts
215
216            domain = manifest.domain
217            if len(all_files) >= manifest.total_files or len(manifest.files) == 0:
218                break
219
220        files = [
221            File.from_file_entry(
222                f,
223                project_id=project_id,
224                dataset=dataset,
225                domain=domain
226            )
227            for f in all_files
228        ]
229        artifacts = [
230            Artifact(
231                artifact_type=a.type,
232                file=File.from_file_entry(
233                    FileEntry(a.path),
234                    project_id=project_id,
235                    dataset=dataset,
236                    domain=domain
237                )
238            )
239            for a in artifacts
240        ]
241        return DatasetAssets(files=files, artifacts=artifacts)
242
243    def upload_files(self,
244                     project_id: str,
245                     dataset_id: str,
246                     directory: PathLike,
247                     files: List[PathLike] = None,
248                     file_path_map: Dict[PathLike, str] = None) -> None:
249        """
250        Uploads files to a given dataset from the specified directory.
251
252        All files must be relative to the specified directory.
253        If files need to be flattened, or you are sourcing files from multiple directories,
254        please include `file_path_map` or call this method multiple times.
255
256        Args:
257            project_id (str): ID of the Project
258            dataset_id (str): ID of the Dataset
259            directory (str|Path): Path to directory
260            files (typing.List[str|Path]): List of paths to files within the directory,
261                must be the same type as directory.
262            file_path_map (typing.Dict[str|Path, str|Path]): Optional mapping of file paths to upload
263             from source path to destination path, used to "re-write" paths within the dataset.
264        ```python
265        from cirro.cirro_client import CirroApi
266        from cirro.file_utils import generate_flattened_file_map
267
268        cirro = CirroApi()
269
270        directory = "~/Downloads"
271        # Re-write file paths
272        file_map = {
273            "data1/file1.fastq.gz": "file1.fastq.gz",
274            "data2/file2.fastq.gz": "file2.fastq.gz",
275            "file3.fastq.gz": "new_file3.txt"
276        }
277
278        # Or you could automate the flattening
279        files = ["data1/file1.fastq.gz"]
280        file_map = generate_flattened_file_map(files)
281
282        cirro.datasets.upload_files(
283            project_id="project-id",
284            dataset_id="dataset-id",
285            directory=directory,
286            files=list(file_map.keys()),
287            file_path_map=file_map
288        )
289        ```
290        """
291        if file_path_map is None:
292            file_path_map = {}
293
294        dataset = self.get(project_id, dataset_id)
295
296        access_context = FileAccessContext.upload_dataset(
297            project_id=project_id,
298            dataset_id=dataset_id,
299            base_url=dataset.s3
300        )
301
302        self._file_service.upload_files(
303            access_context=access_context,
304            directory=directory,
305            files=files,
306            file_path_map=file_path_map
307        )
308
309    def validate_folder(
310        self,
311        project_id: str,
312        dataset_id: str,
313        local_folder: PathLike
314    ) -> DatasetValidationResponse:
315        """
316        Validates that the contents of a dataset match that of a local folder.
317        """
318        ds_files = self.get_assets_listing(project_id, dataset_id).files
319
320        local_folder = Path(local_folder)
321        if not local_folder.is_dir():
322            raise ValueError(f"{local_folder} is not a valid local folder")
323
324        # Keep track of files from the dataset which match by checksum, don't match, or are missing
325        ds_files_matching = []
326        ds_files_not_matching = []
327        ds_files_missing = []
328        ds_validate_failed = []
329        for ds_file in ds_files:
330            ds_file_path = ds_file.normalized_path
331            # Get the corresponding local file
332            local_file = local_folder / ds_file_path
333            if not local_file.exists():
334                ds_files_missing.append(ds_file_path)
335            else:
336                try:
337                    if self._file_service.is_valid_file(ds_file, local_file):
338                        ds_files_matching.append(ds_file_path)
339                    else:
340                        ds_files_not_matching.append(ds_file_path)
341                except RuntimeWarning as e:
342                    logger.warning(f"File validation failed: {e}")
343                    ds_validate_failed.append(ds_file_path)
344
345        # Find local files that are not in the dataset
346        local_file_paths = [
347            file.relative_to(local_folder).as_posix()
348            for file in local_folder.rglob("*")
349            if not file.is_dir() and not is_hidden_file(file)
350        ]
351        dataset_file_paths = [file.normalized_path for file in ds_files]
352        local_only_files = [
353            file
354            for file in local_file_paths
355            if file not in dataset_file_paths
356        ]
357
358        return DatasetValidationResponse(
359            files_matching=ds_files_matching,
360            files_not_matching=ds_files_not_matching,
361            files_missing=ds_files_missing,
362            local_only_files=local_only_files,
363            validate_errors=ds_validate_failed,
364        )
365
366    def download_files(
367        self,
368        project_id: str,
369        dataset_id: str,
370        download_location: str,
371        files: Union[List[File], List[str]] = None
372    ) -> None:
373        """
374        Downloads files from a dataset
375
376        The `files` argument is used to optionally specify a subset of files
377        to be downloaded. By default, all files are downloaded.
378
379        Args:
380            project_id (str): ID of the Project
381            dataset_id (str): ID of the Dataset
382            download_location (str): Local destination for downloaded files
383            files (typing.List[str]): Optional list of files to download
384        """
385        if files is None:
386            files = self.get_assets_listing(project_id, dataset_id).files
387
388        if len(files) == 0:
389            return
390
391        first_file = files[0]
392        if isinstance(first_file, File):
393            files = [file.relative_path for file in files]
394            access_context = first_file.access_context
395        else:
396            dataset = self.get(project_id, dataset_id)
397            if dataset.share:
398                access_context = FileAccessContext.download_shared_dataset(project_id=project_id,
399                                                                           dataset_id=dataset_id,
400                                                                           base_url=dataset.s3)
401            else:
402                access_context = FileAccessContext.download(project_id=project_id,
403                                                            base_url=dataset.s3)
404
405        self._file_service.download_files(access_context, download_location, files)
406
407    def update_samplesheet(
408        self,
409        project_id: str,
410        dataset_id: str,
411        samplesheet: str
412    ):
413        """
414        Updates a samplesheet on a dataset
415
416        Args:
417            project_id (str): ID of the Project
418            dataset_id (str): ID of the Dataset
419            samplesheet (str): Samplesheet contents to update (should be a CSV string)
420        """
421        dataset = self.get(project_id, dataset_id)
422        access_context = FileAccessContext.upload_sample_sheet(project_id=project_id,
423                                                               dataset_id=dataset_id,
424                                                               base_url=dataset.s3)
425
426        samplesheet_key = f'{access_context.prefix}/samplesheet.csv'
427        self._file_service.create_file(
428            access_context=access_context,
429            key=samplesheet_key,
430            contents=samplesheet,
431            content_type='text/csv'
432        )

Service for interacting with the Dataset endpoints

def list( self, project_id: str, max_items: int = 10000) -> List[cirro_api_client.v1.models.Dataset]:
27    def list(self, project_id: str, max_items: int = 10000) -> List[Dataset]:
28        """List datasets
29
30        Retrieves a list of datasets for a given project
31
32        Args:
33            project_id (str): ID of the Project
34            max_items (int): Maximum number of records to get (default 10,000)
35        """
36        return get_all_records(
37            records_getter=lambda page_args: get_datasets.sync(
38                project_id=project_id,
39                client=self._api_client,
40                next_token=page_args.next_token,
41                limit=page_args.limit
42            ),
43            max_items=max_items
44        )

List datasets

Retrieves a list of datasets for a given project

Arguments:
  • project_id (str): ID of the Project
  • max_items (int): Maximum number of records to get (default 10,000)
def list_shared( self, project_id: str, share_id: str, max_items: int = 10000) -> List[cirro_api_client.v1.models.Dataset]:
46    def list_shared(self, project_id: str, share_id: str, max_items: int = 10000) -> List[Dataset]:
47        """
48        Retrieves a list of shared datasets for a given project and share
49
50        Args:
51            project_id (str): ID of the Project
52            share_id (str): ID of the Share
53            max_items (int): Maximum number of records to get (default 10,000)
54
55        Example:
56        ```python
57        from cirro_api_client.v1.models import ShareType
58        from cirro.cirro_client import CirroApi
59
60        cirro = CirroApi()
61
62        # List shares that are subscribed to
63        subscribed_shares = cirro.shares.list(project_id="project-id", share_type=ShareType.SUBSCRIBER)
64        cirro.datasets.list_shared("project-id", subscribed_shares[0].id)
65        ```
66        """
67        return get_all_records(
68            records_getter=lambda page_args: get_shared_datasets.sync(
69                project_id=project_id,
70                share_id=share_id,
71                client=self._api_client,
72                next_token=page_args.next_token,
73                limit=page_args.limit
74            ),
75            max_items=max_items
76        )

Retrieves a list of shared datasets for a given project and share

Arguments:
  • project_id (str): ID of the Project
  • share_id (str): ID of the Share
  • max_items (int): Maximum number of records to get (default 10,000)

Example:

from cirro_api_client.v1.models import ShareType
from cirro.cirro_client import CirroApi

cirro = CirroApi()

# List shares that are subscribed to
subscribed_shares = cirro.shares.list(project_id="project-id", share_type=ShareType.SUBSCRIBER)
cirro.datasets.list_shared("project-id", subscribed_shares[0].id)
def import_public( self, project_id: str, import_request: cirro_api_client.v1.models.ImportDataRequest) -> cirro_api_client.v1.models.CreateResponse:
 78    def import_public(self, project_id: str, import_request: ImportDataRequest) -> CreateResponse:
 79        """
 80        Download data from public repositories
 81
 82        Args:
 83            project_id (str): ID of the Project
 84            import_request (cirro_api_client.v1.models.ImportDataRequest):
 85
 86        Returns:
 87            ID of the created dataset
 88
 89        ```python
 90        from cirro_api_client.v1.models import ImportDataRequest, Tag
 91        from cirro.cirro_client import CirroApi
 92
 93        cirro = CirroApi()
 94        request = ImportDataRequest(
 95            name="Imported dataset",
 96            description="Description of the dataset",
 97            public_ids=["SRR123456", "SRR123457"],
 98            tags=[Tag(value="tag1")]
 99        )
100        cirro.datasets.import_public("project-id", request)
101        ```
102        """
103        return import_public_dataset.sync(project_id=project_id, client=self._api_client, body=import_request)

Download data from public repositories

Arguments:
Returns:

ID of the created dataset

from cirro_api_client.v1.models import ImportDataRequest, Tag
from cirro.cirro_client import CirroApi

cirro = CirroApi()
request = ImportDataRequest(
    name="Imported dataset",
    description="Description of the dataset",
    public_ids=["SRR123456", "SRR123457"],
    tags=[Tag(value="tag1")]
)
cirro.datasets.import_public("project-id", request)
def create( self, project_id: str, upload_request: cirro_api_client.v1.models.UploadDatasetRequest) -> cirro_api_client.v1.models.UploadDatasetCreateResponse:
105    def create(self, project_id: str, upload_request: UploadDatasetRequest) -> UploadDatasetCreateResponse:
106        """
107        Registers a dataset in Cirro, which can subsequently have files uploaded to it
108
109        Args:
110            project_id (str): ID of the Project
111            upload_request (cirro_api_client.v1.models.UploadDatasetRequest):
112
113        Returns:
114            ID of the created dataset and the path to upload files
115
116        ```python
117        from cirro_api_client.v1.models import UploadDatasetRequest, Tag
118        from cirro.cirro_client import CirroApi
119
120        cirro = CirroApi()
121        request = UploadDatasetRequest(
122            name="Name of new dataset",
123            process_id="paired_dnaseq",
124            expected_files=["read_1.fastq.gz", "read_2.fastq.gz"],
125            description="Description of the dataset",
126            tags=[Tag(value="tag1"), Tag(value="tag2")]
127        )
128        cirro.datasets.create("project-id", request)
129        ```
130        """
131        return upload_dataset.sync(project_id=project_id, client=self._api_client, body=upload_request)

Registers a dataset in Cirro, which can subsequently have files uploaded to it

Arguments:
Returns:

ID of the created dataset and the path to upload files

from cirro_api_client.v1.models import UploadDatasetRequest, Tag
from cirro.cirro_client import CirroApi

cirro = CirroApi()
request = UploadDatasetRequest(
    name="Name of new dataset",
    process_id="paired_dnaseq",
    expected_files=["read_1.fastq.gz", "read_2.fastq.gz"],
    description="Description of the dataset",
    tags=[Tag(value="tag1"), Tag(value="tag2")]
)
cirro.datasets.create("project-id", request)
def get( self, project_id: str, dataset_id: str) -> Optional[cirro_api_client.v1.models.DatasetDetail]:
133    def get(self, project_id: str, dataset_id: str) -> Optional[DatasetDetail]:
134        """
135        Gets detailed information about a dataset
136
137        Args:
138            project_id (str): ID of the Project
139            dataset_id (str): ID of the Dataset
140
141        Returns:
142            The dataset, if found
143        """
144        return get_dataset.sync(project_id=project_id, dataset_id=dataset_id, client=self._api_client)

Gets detailed information about a dataset

Arguments:
  • project_id (str): ID of the Project
  • dataset_id (str): ID of the Dataset
Returns:

The dataset, if found

def update( self, project_id: str, dataset_id: str, request: cirro_api_client.v1.models.UpdateDatasetRequest) -> cirro_api_client.v1.models.DatasetDetail:
146    def update(self, project_id: str, dataset_id: str, request: UpdateDatasetRequest) -> DatasetDetail:
147        """
148        Update info on a dataset (name, description, and/or tags)
149
150        Args:
151            project_id (str): ID of the Project
152            dataset_id (str): ID of the Dataset
153            request (cirro_api_client.v1.models.UpdateDatasetRequest):
154
155        Returns:
156            The updated dataset
157
158        ```python
159        from cirro_api_client.v1.models import UpdateDatasetRequest
160        from cirro.cirro_client import CirroApi
161
162        cirro = CirroApi()
163        request = UpdateDatasetRequest(
164            name="Name of new dataset",
165            process_id="paired_dnaseq",
166            description="Description of the dataset"
167        )
168        cirro.datasets.update("project-id", "dataset-id", request)
169        ```
170        """
171        return update_dataset.sync(project_id=project_id, dataset_id=dataset_id, body=request, client=self._api_client)

Update info on a dataset (name, description, and/or tags)

Arguments:
Returns:

The updated dataset

from cirro_api_client.v1.models import UpdateDatasetRequest
from cirro.cirro_client import CirroApi

cirro = CirroApi()
request = UpdateDatasetRequest(
    name="Name of new dataset",
    process_id="paired_dnaseq",
    description="Description of the dataset"
)
cirro.datasets.update("project-id", "dataset-id", request)
def delete(self, project_id: str, dataset_id: str) -> None:
173    def delete(self, project_id: str, dataset_id: str) -> None:
174        """
175        Delete a dataset
176
177        After a dataset has been deleted, the files associated with that
178        dataset are saved according to the project's retention time.
179
180        Args:
181            project_id (str): ID of the Project
182            dataset_id (str): ID of the Dataset
183        """
184        delete_dataset.sync_detailed(project_id=project_id, dataset_id=dataset_id, client=self._api_client)

Delete a dataset

After a dataset has been deleted, the files associated with that dataset are saved according to the project's retention time.

Arguments:
  • project_id (str): ID of the Project
  • dataset_id (str): ID of the Dataset
def get_assets_listing( self, project_id: str, dataset_id: str, file_limit: int = 100000) -> cirro.models.assets.DatasetAssets:
186    def get_assets_listing(self, project_id: str, dataset_id: str, file_limit: int = 100000) -> DatasetAssets:
187        """
188        Gets a listing of files, charts, and other assets available for the dataset
189
190        Args:
191            project_id (str): ID of the Project
192            dataset_id (str): ID of the Dataset
193            file_limit (int): Maximum number of files to get (default 100,000)
194        """
195        dataset = self.get(project_id, dataset_id)
196        if file_limit < 1:
197            raise ValueError("file_limit must be greater than 0")
198        all_files = []
199        file_offset = 0
200        domain = None
201        artifacts = None
202
203        while len(all_files) < file_limit:
204            manifest = get_dataset_manifest.sync(
205                project_id=project_id,
206                dataset_id=dataset_id,
207                file_offset=file_offset,
208                client=self._api_client
209            )
210            all_files.extend(manifest.files)
211            file_offset += len(manifest.files)
212
213            if not artifacts:
214                artifacts = manifest.artifacts
215
216            domain = manifest.domain
217            if len(all_files) >= manifest.total_files or len(manifest.files) == 0:
218                break
219
220        files = [
221            File.from_file_entry(
222                f,
223                project_id=project_id,
224                dataset=dataset,
225                domain=domain
226            )
227            for f in all_files
228        ]
229        artifacts = [
230            Artifact(
231                artifact_type=a.type,
232                file=File.from_file_entry(
233                    FileEntry(a.path),
234                    project_id=project_id,
235                    dataset=dataset,
236                    domain=domain
237                )
238            )
239            for a in artifacts
240        ]
241        return DatasetAssets(files=files, artifacts=artifacts)

Gets a listing of files, charts, and other assets available for the dataset

Arguments:
  • project_id (str): ID of the Project
  • dataset_id (str): ID of the Dataset
  • file_limit (int): Maximum number of files to get (default 100,000)
def upload_files( self, project_id: str, dataset_id: str, directory: ~PathLike, files: List[~PathLike] = None, file_path_map: Dict[~PathLike, str] = None) -> None:
243    def upload_files(self,
244                     project_id: str,
245                     dataset_id: str,
246                     directory: PathLike,
247                     files: List[PathLike] = None,
248                     file_path_map: Dict[PathLike, str] = None) -> None:
249        """
250        Uploads files to a given dataset from the specified directory.
251
252        All files must be relative to the specified directory.
253        If files need to be flattened, or you are sourcing files from multiple directories,
254        please include `file_path_map` or call this method multiple times.
255
256        Args:
257            project_id (str): ID of the Project
258            dataset_id (str): ID of the Dataset
259            directory (str|Path): Path to directory
260            files (typing.List[str|Path]): List of paths to files within the directory,
261                must be the same type as directory.
262            file_path_map (typing.Dict[str|Path, str|Path]): Optional mapping of file paths to upload
263             from source path to destination path, used to "re-write" paths within the dataset.
264        ```python
265        from cirro.cirro_client import CirroApi
266        from cirro.file_utils import generate_flattened_file_map
267
268        cirro = CirroApi()
269
270        directory = "~/Downloads"
271        # Re-write file paths
272        file_map = {
273            "data1/file1.fastq.gz": "file1.fastq.gz",
274            "data2/file2.fastq.gz": "file2.fastq.gz",
275            "file3.fastq.gz": "new_file3.txt"
276        }
277
278        # Or you could automate the flattening
279        files = ["data1/file1.fastq.gz"]
280        file_map = generate_flattened_file_map(files)
281
282        cirro.datasets.upload_files(
283            project_id="project-id",
284            dataset_id="dataset-id",
285            directory=directory,
286            files=list(file_map.keys()),
287            file_path_map=file_map
288        )
289        ```
290        """
291        if file_path_map is None:
292            file_path_map = {}
293
294        dataset = self.get(project_id, dataset_id)
295
296        access_context = FileAccessContext.upload_dataset(
297            project_id=project_id,
298            dataset_id=dataset_id,
299            base_url=dataset.s3
300        )
301
302        self._file_service.upload_files(
303            access_context=access_context,
304            directory=directory,
305            files=files,
306            file_path_map=file_path_map
307        )

Uploads files to a given dataset from the specified directory.

All files must be relative to the specified directory. If files need to be flattened, or you are sourcing files from multiple directories, please include file_path_map or call this method multiple times.

Arguments:
  • project_id (str): ID of the Project
  • dataset_id (str): ID of the Dataset
  • directory (str|Path): Path to directory
  • files (typing.List[str|Path]): List of paths to files within the directory, must be the same type as directory.
  • file_path_map (typing.Dict[str|Path, str|Path]): Optional mapping of file paths to upload from source path to destination path, used to "re-write" paths within the dataset.
from cirro.cirro_client import CirroApi
from cirro.file_utils import generate_flattened_file_map

cirro = CirroApi()

directory = "~/Downloads"
# Re-write file paths
file_map = {
    "data1/file1.fastq.gz": "file1.fastq.gz",
    "data2/file2.fastq.gz": "file2.fastq.gz",
    "file3.fastq.gz": "new_file3.txt"
}

# Or you could automate the flattening
files = ["data1/file1.fastq.gz"]
file_map = generate_flattened_file_map(files)

cirro.datasets.upload_files(
    project_id="project-id",
    dataset_id="dataset-id",
    directory=directory,
    files=list(file_map.keys()),
    file_path_map=file_map
)
def validate_folder( self, project_id: str, dataset_id: str, local_folder: ~PathLike) -> cirro.models.dataset.DatasetValidationResponse:
309    def validate_folder(
310        self,
311        project_id: str,
312        dataset_id: str,
313        local_folder: PathLike
314    ) -> DatasetValidationResponse:
315        """
316        Validates that the contents of a dataset match that of a local folder.
317        """
318        ds_files = self.get_assets_listing(project_id, dataset_id).files
319
320        local_folder = Path(local_folder)
321        if not local_folder.is_dir():
322            raise ValueError(f"{local_folder} is not a valid local folder")
323
324        # Keep track of files from the dataset which match by checksum, don't match, or are missing
325        ds_files_matching = []
326        ds_files_not_matching = []
327        ds_files_missing = []
328        ds_validate_failed = []
329        for ds_file in ds_files:
330            ds_file_path = ds_file.normalized_path
331            # Get the corresponding local file
332            local_file = local_folder / ds_file_path
333            if not local_file.exists():
334                ds_files_missing.append(ds_file_path)
335            else:
336                try:
337                    if self._file_service.is_valid_file(ds_file, local_file):
338                        ds_files_matching.append(ds_file_path)
339                    else:
340                        ds_files_not_matching.append(ds_file_path)
341                except RuntimeWarning as e:
342                    logger.warning(f"File validation failed: {e}")
343                    ds_validate_failed.append(ds_file_path)
344
345        # Find local files that are not in the dataset
346        local_file_paths = [
347            file.relative_to(local_folder).as_posix()
348            for file in local_folder.rglob("*")
349            if not file.is_dir() and not is_hidden_file(file)
350        ]
351        dataset_file_paths = [file.normalized_path for file in ds_files]
352        local_only_files = [
353            file
354            for file in local_file_paths
355            if file not in dataset_file_paths
356        ]
357
358        return DatasetValidationResponse(
359            files_matching=ds_files_matching,
360            files_not_matching=ds_files_not_matching,
361            files_missing=ds_files_missing,
362            local_only_files=local_only_files,
363            validate_errors=ds_validate_failed,
364        )

Validates that the contents of a dataset match that of a local folder.

def download_files( self, project_id: str, dataset_id: str, download_location: str, files: Union[List[cirro.models.file.File], List[str]] = None) -> None:
366    def download_files(
367        self,
368        project_id: str,
369        dataset_id: str,
370        download_location: str,
371        files: Union[List[File], List[str]] = None
372    ) -> None:
373        """
374        Downloads files from a dataset
375
376        The `files` argument is used to optionally specify a subset of files
377        to be downloaded. By default, all files are downloaded.
378
379        Args:
380            project_id (str): ID of the Project
381            dataset_id (str): ID of the Dataset
382            download_location (str): Local destination for downloaded files
383            files (typing.List[str]): Optional list of files to download
384        """
385        if files is None:
386            files = self.get_assets_listing(project_id, dataset_id).files
387
388        if len(files) == 0:
389            return
390
391        first_file = files[0]
392        if isinstance(first_file, File):
393            files = [file.relative_path for file in files]
394            access_context = first_file.access_context
395        else:
396            dataset = self.get(project_id, dataset_id)
397            if dataset.share:
398                access_context = FileAccessContext.download_shared_dataset(project_id=project_id,
399                                                                           dataset_id=dataset_id,
400                                                                           base_url=dataset.s3)
401            else:
402                access_context = FileAccessContext.download(project_id=project_id,
403                                                            base_url=dataset.s3)
404
405        self._file_service.download_files(access_context, download_location, files)

Downloads files from a dataset

The files argument is used to optionally specify a subset of files to be downloaded. By default, all files are downloaded.

Arguments:
  • project_id (str): ID of the Project
  • dataset_id (str): ID of the Dataset
  • download_location (str): Local destination for downloaded files
  • files (typing.List[str]): Optional list of files to download
def update_samplesheet(self, project_id: str, dataset_id: str, samplesheet: str):
407    def update_samplesheet(
408        self,
409        project_id: str,
410        dataset_id: str,
411        samplesheet: str
412    ):
413        """
414        Updates a samplesheet on a dataset
415
416        Args:
417            project_id (str): ID of the Project
418            dataset_id (str): ID of the Dataset
419            samplesheet (str): Samplesheet contents to update (should be a CSV string)
420        """
421        dataset = self.get(project_id, dataset_id)
422        access_context = FileAccessContext.upload_sample_sheet(project_id=project_id,
423                                                               dataset_id=dataset_id,
424                                                               base_url=dataset.s3)
425
426        samplesheet_key = f'{access_context.prefix}/samplesheet.csv'
427        self._file_service.create_file(
428            access_context=access_context,
429            key=samplesheet_key,
430            contents=samplesheet,
431            content_type='text/csv'
432        )

Updates a samplesheet on a dataset

Arguments:
  • project_id (str): ID of the Project
  • dataset_id (str): ID of the Dataset
  • samplesheet (str): Samplesheet contents to update (should be a CSV string)
class ExecutionService(cirro.services.base.BaseService):
 13class ExecutionService(BaseService):
 14    """
 15    Service for interacting with the Execution endpoints
 16    """
 17    def run_analysis(self, project_id: str, request: RunAnalysisRequest) -> CreateResponse:
 18        """
 19        Launch an analysis job running a process on a set of inputs
 20
 21        Args:
 22            project_id (str): ID of the Project
 23            request (cirro_api_client.v1.models.RunAnalysisRequest):
 24
 25        Returns:
 26            The ID of the created dataset
 27
 28        ```python
 29        from cirro_api_client.v1.models import RunAnalysisRequest, RunAnalysisRequestParams
 30        from cirro.cirro_client import CirroApi
 31
 32        # Example:
 33        # Run the "process-nf-core-rnaseq-3_8" process using input data
 34        # from a dataset with the id "source-dataset-id"
 35
 36        # Optional analysis parameters
 37        params = RunAnalysisRequestParams.from_dict({
 38            "param_a": "val_a",
 39            "param_b": "val_b"
 40        })
 41
 42        cirro = CirroApi()
 43        request = RunAnalysisRequest(
 44            name="Name of the newly created dataset",
 45            description="Longer description of newly created dataset",
 46            process_id="process-nf-core-rnaseq-3_8",
 47            source_dataset_ids=["source-dataset-id"],
 48            params=params
 49        )
 50        cirro.execution.run_analysis("project-id", request)
 51        ```
 52        """
 53
 54        form_spec = get_process_parameters.sync(
 55            process_id=request.process_id,
 56            client=self._api_client
 57        )
 58
 59        ParameterSpecification(
 60            form_spec
 61        ).validate_params(
 62            request.params.to_dict() if request.params else {}
 63        )
 64
 65        return run_analysis.sync(
 66            project_id=project_id,
 67            body=request,
 68            client=self._api_client
 69        )
 70
 71    def stop_analysis(self, project_id: str, dataset_id: str):
 72        """
 73        Terminates all jobs related to a running analysis
 74
 75        Args:
 76            project_id (str): ID of the Project
 77            dataset_id (str): ID of the Dataset
 78        """
 79
 80        return stop_analysis.sync(
 81            project_id=project_id,
 82            dataset_id=dataset_id,
 83            client=self._api_client
 84        )
 85
 86    def get_project_summary(self, project_id: str) -> Dict[str, List[Task]]:
 87        """
 88        Gets an overview of the executions currently running in the project, by job queue
 89
 90        Args:
 91            project_id (str): ID of the Project
 92
 93        Returns:
 94            `cirro_api_client.v1.models.GetProjectSummaryResponse200`
 95        """
 96
 97        return get_project_summary.sync(
 98            project_id=project_id,
 99            client=self._api_client
100        ).additional_properties
101
102    def get_execution_logs(self, project_id: str, dataset_id: str, force_live=False) -> str:
103        """
104        Gets live logs from main execution task
105
106        Args:
107            project_id (str): ID of the Project
108            dataset_id (str): ID of the Dataset
109            force_live (bool): If True, it will fetch logs from CloudWatch,
110                even if the execution is already completed
111
112        """
113
114        resp = get_execution_logs.sync(
115            project_id=project_id,
116            dataset_id=dataset_id,
117            force_live=force_live,
118            client=self._api_client
119        )
120
121        return '\n'.join(e.message for e in resp.events)
122
123    def get_tasks_for_execution(self, project_id: str, dataset_id: str, force_live=False) -> Optional[List[Task]]:
124        """
125        Gets the tasks submitted by the workflow execution
126
127        Args:
128            project_id (str): ID of the Project
129            dataset_id (str): ID of the Dataset
130            force_live (bool): If True, it will try to get the list of jobs
131                from the executor (i.e., AWS Batch), rather than the workflow report
132        """
133
134        return get_tasks_for_execution.sync(
135            project_id=project_id,
136            dataset_id=dataset_id,
137            force_live=force_live,
138            client=self._api_client
139        )
140
141    def get_task_logs(self, project_id: str, dataset_id: str, task_id: str, force_live=False) -> str:
142        """
143        Gets the log output from an individual task
144
145        Args:
146            project_id (str): ID of the Project
147            dataset_id (str): ID of the Dataset
148            task_id (str): ID of the task
149            force_live (bool): If True, it will fetch logs from CloudWatch,
150                even if the execution is already completed
151
152        """
153
154        resp = get_task_logs.sync(
155            project_id=project_id,
156            dataset_id=dataset_id,
157            task_id=task_id,
158            force_live=force_live,
159            client=self._api_client
160        )
161
162        return '\n'.join(e.message for e in resp.events)

Service for interacting with the Execution endpoints

def run_analysis( self, project_id: str, request: cirro_api_client.v1.models.RunAnalysisRequest) -> cirro_api_client.v1.models.CreateResponse:
17    def run_analysis(self, project_id: str, request: RunAnalysisRequest) -> CreateResponse:
18        """
19        Launch an analysis job running a process on a set of inputs
20
21        Args:
22            project_id (str): ID of the Project
23            request (cirro_api_client.v1.models.RunAnalysisRequest):
24
25        Returns:
26            The ID of the created dataset
27
28        ```python
29        from cirro_api_client.v1.models import RunAnalysisRequest, RunAnalysisRequestParams
30        from cirro.cirro_client import CirroApi
31
32        # Example:
33        # Run the "process-nf-core-rnaseq-3_8" process using input data
34        # from a dataset with the id "source-dataset-id"
35
36        # Optional analysis parameters
37        params = RunAnalysisRequestParams.from_dict({
38            "param_a": "val_a",
39            "param_b": "val_b"
40        })
41
42        cirro = CirroApi()
43        request = RunAnalysisRequest(
44            name="Name of the newly created dataset",
45            description="Longer description of newly created dataset",
46            process_id="process-nf-core-rnaseq-3_8",
47            source_dataset_ids=["source-dataset-id"],
48            params=params
49        )
50        cirro.execution.run_analysis("project-id", request)
51        ```
52        """
53
54        form_spec = get_process_parameters.sync(
55            process_id=request.process_id,
56            client=self._api_client
57        )
58
59        ParameterSpecification(
60            form_spec
61        ).validate_params(
62            request.params.to_dict() if request.params else {}
63        )
64
65        return run_analysis.sync(
66            project_id=project_id,
67            body=request,
68            client=self._api_client
69        )

Launch an analysis job running a process on a set of inputs

Arguments:
Returns:

The ID of the created dataset

from cirro_api_client.v1.models import RunAnalysisRequest, RunAnalysisRequestParams
from cirro.cirro_client import CirroApi

# Example:
# Run the "process-nf-core-rnaseq-3_8" process using input data
# from a dataset with the id "source-dataset-id"

# Optional analysis parameters
params = RunAnalysisRequestParams.from_dict({
    "param_a": "val_a",
    "param_b": "val_b"
})

cirro = CirroApi()
request = RunAnalysisRequest(
    name="Name of the newly created dataset",
    description="Longer description of newly created dataset",
    process_id="process-nf-core-rnaseq-3_8",
    source_dataset_ids=["source-dataset-id"],
    params=params
)
cirro.execution.run_analysis("project-id", request)
def stop_analysis(self, project_id: str, dataset_id: str):
71    def stop_analysis(self, project_id: str, dataset_id: str):
72        """
73        Terminates all jobs related to a running analysis
74
75        Args:
76            project_id (str): ID of the Project
77            dataset_id (str): ID of the Dataset
78        """
79
80        return stop_analysis.sync(
81            project_id=project_id,
82            dataset_id=dataset_id,
83            client=self._api_client
84        )

Terminates all jobs related to a running analysis

Arguments:
  • project_id (str): ID of the Project
  • dataset_id (str): ID of the Dataset
def get_project_summary( self, project_id: str) -> Dict[str, List[cirro_api_client.v1.models.Task]]:
 86    def get_project_summary(self, project_id: str) -> Dict[str, List[Task]]:
 87        """
 88        Gets an overview of the executions currently running in the project, by job queue
 89
 90        Args:
 91            project_id (str): ID of the Project
 92
 93        Returns:
 94            `cirro_api_client.v1.models.GetProjectSummaryResponse200`
 95        """
 96
 97        return get_project_summary.sync(
 98            project_id=project_id,
 99            client=self._api_client
100        ).additional_properties

Gets an overview of the executions currently running in the project, by job queue

Arguments:
  • project_id (str): ID of the Project
Returns:

cirro_api_client.v1.models.GetProjectSummaryResponse200

def get_execution_logs(self, project_id: str, dataset_id: str, force_live=False) -> str:
102    def get_execution_logs(self, project_id: str, dataset_id: str, force_live=False) -> str:
103        """
104        Gets live logs from main execution task
105
106        Args:
107            project_id (str): ID of the Project
108            dataset_id (str): ID of the Dataset
109            force_live (bool): If True, it will fetch logs from CloudWatch,
110                even if the execution is already completed
111
112        """
113
114        resp = get_execution_logs.sync(
115            project_id=project_id,
116            dataset_id=dataset_id,
117            force_live=force_live,
118            client=self._api_client
119        )
120
121        return '\n'.join(e.message for e in resp.events)

Gets live logs from main execution task

Arguments:
  • project_id (str): ID of the Project
  • dataset_id (str): ID of the Dataset
  • force_live (bool): If True, it will fetch logs from CloudWatch, even if the execution is already completed
def get_tasks_for_execution( self, project_id: str, dataset_id: str, force_live=False) -> Optional[List[cirro_api_client.v1.models.Task]]:
123    def get_tasks_for_execution(self, project_id: str, dataset_id: str, force_live=False) -> Optional[List[Task]]:
124        """
125        Gets the tasks submitted by the workflow execution
126
127        Args:
128            project_id (str): ID of the Project
129            dataset_id (str): ID of the Dataset
130            force_live (bool): If True, it will try to get the list of jobs
131                from the executor (i.e., AWS Batch), rather than the workflow report
132        """
133
134        return get_tasks_for_execution.sync(
135            project_id=project_id,
136            dataset_id=dataset_id,
137            force_live=force_live,
138            client=self._api_client
139        )

Gets the tasks submitted by the workflow execution

Arguments:
  • project_id (str): ID of the Project
  • dataset_id (str): ID of the Dataset
  • force_live (bool): If True, it will try to get the list of jobs from the executor (i.e., AWS Batch), rather than the workflow report
def get_task_logs( self, project_id: str, dataset_id: str, task_id: str, force_live=False) -> str:
141    def get_task_logs(self, project_id: str, dataset_id: str, task_id: str, force_live=False) -> str:
142        """
143        Gets the log output from an individual task
144
145        Args:
146            project_id (str): ID of the Project
147            dataset_id (str): ID of the Dataset
148            task_id (str): ID of the task
149            force_live (bool): If True, it will fetch logs from CloudWatch,
150                even if the execution is already completed
151
152        """
153
154        resp = get_task_logs.sync(
155            project_id=project_id,
156            dataset_id=dataset_id,
157            task_id=task_id,
158            force_live=force_live,
159            client=self._api_client
160        )
161
162        return '\n'.join(e.message for e in resp.events)

Gets the log output from an individual task

Arguments:
  • project_id (str): ID of the Project
  • dataset_id (str): ID of the Dataset
  • task_id (str): ID of the task
  • force_live (bool): If True, it will fetch logs from CloudWatch, even if the execution is already completed
class ComputeEnvironmentService(cirro.services.base.BaseService):
10class ComputeEnvironmentService(BaseService):
11    """
12    Service for interacting with the Compute Environment endpoints
13    """
14
15    def list_environments_for_project(self, project_id: str) -> List[ComputeEnvironmentConfiguration]:
16        """
17        List of custom compute environments for a project (i.e., an agent)
18
19        Args:
20            project_id (str): Project ID
21
22        Returns:
23            List of compute environments that are available for the project
24        """
25        return get_compute_environments.sync(project_id=project_id,
26                                             client=self._api_client)

Service for interacting with the Compute Environment endpoints

def list_environments_for_project( self, project_id: str) -> List[cirro_api_client.v1.models.ComputeEnvironmentConfiguration]:
15    def list_environments_for_project(self, project_id: str) -> List[ComputeEnvironmentConfiguration]:
16        """
17        List of custom compute environments for a project (i.e., an agent)
18
19        Args:
20            project_id (str): Project ID
21
22        Returns:
23            List of compute environments that are available for the project
24        """
25        return get_compute_environments.sync(project_id=project_id,
26                                             client=self._api_client)

List of custom compute environments for a project (i.e., an agent)

Arguments:
  • project_id (str): Project ID
Returns:

List of compute environments that are available for the project

class FileService(cirro.services.base.BaseService):
 22class FileService(BaseService):
 23    """
 24    Service for interacting with files
 25    """
 26    checksum_method: str
 27    transfer_retries: int
 28    _get_token_lock = threading.Lock()
 29    _read_token_cache: Dict[str, AWSCredentials] = {}
 30
 31    def __init__(self, api_client, checksum_method, transfer_retries):
 32        """
 33        Instantiates the file service class
 34        """
 35        self._api_client = api_client
 36        self.checksum_method = checksum_method
 37        self.transfer_retries = transfer_retries
 38
 39    def get_access_credentials(self, access_context: FileAccessContext) -> AWSCredentials:
 40        """
 41        Retrieves credentials to access files
 42
 43        Args:
 44            access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
 45        """
 46        access_request = access_context.file_access_request
 47
 48        if access_request.access_type == ProjectAccessType.PROJECT_DOWNLOAD:
 49            return self._get_project_read_credentials(access_context)
 50
 51        else:
 52            return generate_project_file_access_token.sync(
 53                client=self._api_client,
 54                project_id=access_context.project_id,
 55                body=access_context.file_access_request
 56            )
 57
 58    def _get_project_read_credentials(self, access_context: FileAccessContext):
 59        """
 60        Retrieves credentials to read project data, this can be cached
 61        """
 62        access_request = access_context.file_access_request
 63        project_id = access_context.project_id
 64        with self._get_token_lock:
 65            cached_token = self._read_token_cache.get(project_id)
 66
 67            if not cached_token or datetime.now(tz=timezone.utc) > cached_token.expiration:
 68                new_token = generate_project_file_access_token.sync(
 69                    client=self._api_client,
 70                    project_id=project_id,
 71                    body=access_request
 72                )
 73
 74                self._read_token_cache[project_id] = new_token
 75
 76        return self._read_token_cache[project_id]
 77
 78    def get_aws_s3_client(self, access_context: FileAccessContext) -> BaseClient:
 79        """
 80        Gets the underlying AWS S3 client to perform operations on files
 81
 82        This is seeded with refreshable credentials from the access_context parameter
 83
 84        This may be used to perform advanced operations, such as CopyObject, S3 Select, etc.
 85        """
 86        s3_client = self._generate_s3_client(access_context)
 87        return s3_client.get_aws_client()
 88
 89    def get_file(self, file: File) -> bytes:
 90        """
 91        Gets the contents of a file
 92
 93        Args:
 94            file (cirro.models.file.File):
 95
 96        Returns:
 97            The raw bytes of the file
 98        """
 99        return self.get_file_from_path(file.access_context, file.relative_path)
100
101    def get_file_from_path(self, access_context: FileAccessContext, file_path: str) -> bytes:
102        """
103        Gets the contents of a file by providing the path, used internally
104
105        Args:
106            access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
107            file_path (str): Relative path to file within dataset
108
109        Returns:
110            The raw bytes of the file
111        """
112        s3_client = self._generate_s3_client(access_context)
113
114        full_path = f'{access_context.prefix}/{file_path}'.lstrip('/')
115
116        return s3_client.get_file(access_context.bucket, full_path)
117
118    def create_file(self, access_context: FileAccessContext, key: str,
119                    contents: str, content_type: str) -> None:
120        """
121        Creates a file at the specified path
122
123        Args:
124            access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
125            key (str): Key of object to create
126            contents (str): Content of object
127            content_type (str):
128        """
129        s3_client = self._generate_s3_client(access_context)
130
131        s3_client.create_object(
132            key=key,
133            contents=contents,
134            content_type=content_type,
135            bucket=access_context.bucket
136        )
137
138    def upload_files(self,
139                     access_context: FileAccessContext,
140                     directory: PathLike,
141                     files: List[PathLike],
142                     file_path_map: Dict[PathLike, str]) -> None:
143        """
144        Uploads a list of files from the specified directory
145
146        Args:
147            access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
148            directory (str|Path): Path to directory
149            files (typing.List[str|Path]): List of paths to files within the directory
150                must be the same type as directory.
151            file_path_map (typing.Dict[str|Path, str]): Optional mapping of file paths to upload
152             from source path to destination path, used to "re-write" paths within the dataset.
153        """
154        s3_client = self._generate_s3_client(access_context)
155
156        upload_directory(
157            directory=directory,
158            files=files,
159            file_path_map=file_path_map,
160            s3_client=s3_client,
161            bucket=access_context.bucket,
162            prefix=access_context.prefix,
163            max_retries=self.transfer_retries
164        )
165
166    def download_files(self, access_context: FileAccessContext, directory: str, files: List[str]) -> None:
167        """
168        Download a list of files to the specified directory
169
170        Args:
171            access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
172            directory (str): download location
173            files (List[str]): relative path of files to download
174        """
175        s3_client = self._generate_s3_client(access_context)
176
177        download_directory(
178            directory,
179            files,
180            s3_client,
181            access_context.bucket,
182            access_context.prefix
183        )
184
185    def is_valid_file(self, file: File, local_file: Path) -> bool:
186        """
187        Validates the checksum of a file against a local file
188        See ``validate_file`` method for details.
189
190        Args:
191            file (File): Cirro file to validate
192            local_file (PathLike): Local file path to compare against
193
194        Returns:
195            bool: True if file integrity matches, False otherwise
196
197        Raises:
198            RuntimeWarning: If the remote checksum is not available or not supported
199        """
200        try:
201            self.validate_file(file, local_file)
202            return True
203        except ValueError:
204            return False
205
206    def validate_file(self, file: File, local_file: PathLike):
207        """
208        Validates the checksum of a file against a local file
209        This is used to ensure file integrity after download or upload
210
211        Checksums might not be available if the file was uploaded without checksum support
212
213        https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html
214        Args:
215            file (File): Cirro file to validate
216            local_file (PathLike): Local file path to compare against
217
218        Raises:
219            ValueError: If checksums do not match
220            RuntimeWarning: If the remote checksum is not available or not supported
221        """
222        stats = self.get_file_stats(file)
223
224        remote_checksum_key = next((prop for prop in stats.keys()
225                                    if 'Checksum' in prop and prop != 'ChecksumType'), None)
226
227        if 'ChecksumType' in stats and stats['ChecksumType'] != 'FULL_OBJECT':
228            raise RuntimeWarning(f"Only 'FULL_OBJECT' checksums are supported, not {stats['ChecksumType']}")
229
230        if remote_checksum_key is None:
231            raise RuntimeWarning(f"File {file.relative_path} does not have a checksum available for validation.")
232
233        remote_checksum = stats[remote_checksum_key]
234        remote_checksum_name = remote_checksum_key.replace('Checksum', '')
235        logger.debug(f"Checksum for file {file.relative_path} is {remote_checksum} using {remote_checksum_name}")
236
237        local_checksum = get_checksum(local_file, remote_checksum_name)
238        logger.debug(f"Local checksum for file {local_file} is {local_checksum} using {remote_checksum_name}")
239
240        if local_checksum != remote_checksum:
241            raise ValueError(f"Checksum mismatch for file {file.relative_path}: "
242                             f"local {local_checksum}, remote {remote_checksum}")
243
244    def get_file_stats(self, file: File) -> dict:
245        """
246        Gets the file stats for a file, such as size, checksum, etc.
247        Equivalent to the `head_object` operation in S3
248        """
249        s3_client = self._generate_s3_client(file.access_context)
250
251        full_path = f'{file.access_context.prefix}/{file.relative_path}'
252
253        stats = s3_client.get_file_stats(
254            bucket=file.access_context.bucket,
255            key=full_path
256        )
257        logger.debug(f"File stats for file {file.relative_path} is {stats}")
258        return stats
259
260    def _generate_s3_client(self, access_context: FileAccessContext):
261        """
262        Generates the Cirro-S3 client to perform operations on files
263        """
264        return S3Client(
265            partial(self.get_access_credentials, access_context),
266            self.checksum_method
267        )

Service for interacting with files

FileService(api_client, checksum_method, transfer_retries)
31    def __init__(self, api_client, checksum_method, transfer_retries):
32        """
33        Instantiates the file service class
34        """
35        self._api_client = api_client
36        self.checksum_method = checksum_method
37        self.transfer_retries = transfer_retries

Instantiates the file service class

checksum_method: str
transfer_retries: int
def get_access_credentials( self, access_context: cirro.models.file.FileAccessContext) -> cirro_api_client.v1.models.AWSCredentials:
39    def get_access_credentials(self, access_context: FileAccessContext) -> AWSCredentials:
40        """
41        Retrieves credentials to access files
42
43        Args:
44            access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
45        """
46        access_request = access_context.file_access_request
47
48        if access_request.access_type == ProjectAccessType.PROJECT_DOWNLOAD:
49            return self._get_project_read_credentials(access_context)
50
51        else:
52            return generate_project_file_access_token.sync(
53                client=self._api_client,
54                project_id=access_context.project_id,
55                body=access_context.file_access_request
56            )

Retrieves credentials to access files

Arguments:
def get_aws_s3_client( self, access_context: cirro.models.file.FileAccessContext) -> botocore.client.BaseClient:
78    def get_aws_s3_client(self, access_context: FileAccessContext) -> BaseClient:
79        """
80        Gets the underlying AWS S3 client to perform operations on files
81
82        This is seeded with refreshable credentials from the access_context parameter
83
84        This may be used to perform advanced operations, such as CopyObject, S3 Select, etc.
85        """
86        s3_client = self._generate_s3_client(access_context)
87        return s3_client.get_aws_client()

Gets the underlying AWS S3 client to perform operations on files

This is seeded with refreshable credentials from the access_context parameter

This may be used to perform advanced operations, such as CopyObject, S3 Select, etc.

def get_file(self, file: cirro.models.file.File) -> bytes:
89    def get_file(self, file: File) -> bytes:
90        """
91        Gets the contents of a file
92
93        Args:
94            file (cirro.models.file.File):
95
96        Returns:
97            The raw bytes of the file
98        """
99        return self.get_file_from_path(file.access_context, file.relative_path)

Gets the contents of a file

Arguments:
Returns:

The raw bytes of the file

def get_file_from_path( self, access_context: cirro.models.file.FileAccessContext, file_path: str) -> bytes:
101    def get_file_from_path(self, access_context: FileAccessContext, file_path: str) -> bytes:
102        """
103        Gets the contents of a file by providing the path, used internally
104
105        Args:
106            access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
107            file_path (str): Relative path to file within dataset
108
109        Returns:
110            The raw bytes of the file
111        """
112        s3_client = self._generate_s3_client(access_context)
113
114        full_path = f'{access_context.prefix}/{file_path}'.lstrip('/')
115
116        return s3_client.get_file(access_context.bucket, full_path)

Gets the contents of a file by providing the path, used internally

Arguments:
Returns:

The raw bytes of the file

def create_file( self, access_context: cirro.models.file.FileAccessContext, key: str, contents: str, content_type: str) -> None:
118    def create_file(self, access_context: FileAccessContext, key: str,
119                    contents: str, content_type: str) -> None:
120        """
121        Creates a file at the specified path
122
123        Args:
124            access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
125            key (str): Key of object to create
126            contents (str): Content of object
127            content_type (str):
128        """
129        s3_client = self._generate_s3_client(access_context)
130
131        s3_client.create_object(
132            key=key,
133            contents=contents,
134            content_type=content_type,
135            bucket=access_context.bucket
136        )

Creates a file at the specified path

Arguments:
  • access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
  • key (str): Key of object to create
  • contents (str): Content of object
  • content_type (str):
def upload_files( self, access_context: cirro.models.file.FileAccessContext, directory: ~PathLike, files: List[~PathLike], file_path_map: Dict[~PathLike, str]) -> None:
138    def upload_files(self,
139                     access_context: FileAccessContext,
140                     directory: PathLike,
141                     files: List[PathLike],
142                     file_path_map: Dict[PathLike, str]) -> None:
143        """
144        Uploads a list of files from the specified directory
145
146        Args:
147            access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
148            directory (str|Path): Path to directory
149            files (typing.List[str|Path]): List of paths to files within the directory
150                must be the same type as directory.
151            file_path_map (typing.Dict[str|Path, str]): Optional mapping of file paths to upload
152             from source path to destination path, used to "re-write" paths within the dataset.
153        """
154        s3_client = self._generate_s3_client(access_context)
155
156        upload_directory(
157            directory=directory,
158            files=files,
159            file_path_map=file_path_map,
160            s3_client=s3_client,
161            bucket=access_context.bucket,
162            prefix=access_context.prefix,
163            max_retries=self.transfer_retries
164        )

Uploads a list of files from the specified directory

Arguments:
  • access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
  • directory (str|Path): Path to directory
  • files (typing.List[str|Path]): List of paths to files within the directory must be the same type as directory.
  • file_path_map (typing.Dict[str|Path, str]): Optional mapping of file paths to upload from source path to destination path, used to "re-write" paths within the dataset.
def download_files( self, access_context: cirro.models.file.FileAccessContext, directory: str, files: List[str]) -> None:
166    def download_files(self, access_context: FileAccessContext, directory: str, files: List[str]) -> None:
167        """
168        Download a list of files to the specified directory
169
170        Args:
171            access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
172            directory (str): download location
173            files (List[str]): relative path of files to download
174        """
175        s3_client = self._generate_s3_client(access_context)
176
177        download_directory(
178            directory,
179            files,
180            s3_client,
181            access_context.bucket,
182            access_context.prefix
183        )

Download a list of files to the specified directory

Arguments:
  • access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
  • directory (str): download location
  • files (List[str]): relative path of files to download
def is_valid_file(self, file: cirro.models.file.File, local_file: pathlib.Path) -> bool:
185    def is_valid_file(self, file: File, local_file: Path) -> bool:
186        """
187        Validates the checksum of a file against a local file
188        See ``validate_file`` method for details.
189
190        Args:
191            file (File): Cirro file to validate
192            local_file (PathLike): Local file path to compare against
193
194        Returns:
195            bool: True if file integrity matches, False otherwise
196
197        Raises:
198            RuntimeWarning: If the remote checksum is not available or not supported
199        """
200        try:
201            self.validate_file(file, local_file)
202            return True
203        except ValueError:
204            return False

Validates the checksum of a file against a local file See validate_file method for details.

Arguments:
  • file (File): Cirro file to validate
  • local_file (PathLike): Local file path to compare against
Returns:

bool: True if file integrity matches, False otherwise

Raises:
  • RuntimeWarning: If the remote checksum is not available or not supported
def validate_file(self, file: cirro.models.file.File, local_file: ~PathLike):
206    def validate_file(self, file: File, local_file: PathLike):
207        """
208        Validates the checksum of a file against a local file
209        This is used to ensure file integrity after download or upload
210
211        Checksums might not be available if the file was uploaded without checksum support
212
213        https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html
214        Args:
215            file (File): Cirro file to validate
216            local_file (PathLike): Local file path to compare against
217
218        Raises:
219            ValueError: If checksums do not match
220            RuntimeWarning: If the remote checksum is not available or not supported
221        """
222        stats = self.get_file_stats(file)
223
224        remote_checksum_key = next((prop for prop in stats.keys()
225                                    if 'Checksum' in prop and prop != 'ChecksumType'), None)
226
227        if 'ChecksumType' in stats and stats['ChecksumType'] != 'FULL_OBJECT':
228            raise RuntimeWarning(f"Only 'FULL_OBJECT' checksums are supported, not {stats['ChecksumType']}")
229
230        if remote_checksum_key is None:
231            raise RuntimeWarning(f"File {file.relative_path} does not have a checksum available for validation.")
232
233        remote_checksum = stats[remote_checksum_key]
234        remote_checksum_name = remote_checksum_key.replace('Checksum', '')
235        logger.debug(f"Checksum for file {file.relative_path} is {remote_checksum} using {remote_checksum_name}")
236
237        local_checksum = get_checksum(local_file, remote_checksum_name)
238        logger.debug(f"Local checksum for file {local_file} is {local_checksum} using {remote_checksum_name}")
239
240        if local_checksum != remote_checksum:
241            raise ValueError(f"Checksum mismatch for file {file.relative_path}: "
242                             f"local {local_checksum}, remote {remote_checksum}")

Validates the checksum of a file against a local file This is used to ensure file integrity after download or upload

Checksums might not be available if the file was uploaded without checksum support

https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html

Arguments:
  • file (File): Cirro file to validate
  • local_file (PathLike): Local file path to compare against
Raises:
  • ValueError: If checksums do not match
  • RuntimeWarning: If the remote checksum is not available or not supported
def get_file_stats(self, file: cirro.models.file.File) -> dict:
244    def get_file_stats(self, file: File) -> dict:
245        """
246        Gets the file stats for a file, such as size, checksum, etc.
247        Equivalent to the `head_object` operation in S3
248        """
249        s3_client = self._generate_s3_client(file.access_context)
250
251        full_path = f'{file.access_context.prefix}/{file.relative_path}'
252
253        stats = s3_client.get_file_stats(
254            bucket=file.access_context.bucket,
255            key=full_path
256        )
257        logger.debug(f"File stats for file {file.relative_path} is {stats}")
258        return stats

Gets the file stats for a file, such as size, checksum, etc. Equivalent to the head_object operation in S3

class MetadataService(cirro.services.base.BaseService):
11class MetadataService(BaseService):
12    """
13    Service for interacting with the Metadata endpoints
14    """
15    def get_project_samples(self, project_id: str, max_items: int = 10000) -> List[Sample]:
16        """
17        Retrieves a list of samples associated with a project along with their metadata
18
19        Args:
20            project_id (str): ID of the Project
21            max_items (int): Maximum number of records to get (default 10,000)
22        """
23        return get_all_records(
24            records_getter=lambda page_args: get_project_samples.sync(project_id=project_id,
25                                                                      client=self._api_client,
26                                                                      next_token=page_args.next_token,
27                                                                      limit=page_args.limit),
28            max_items=max_items
29        )
30
31    def get_project_schema(self, project_id: str) -> FormSchema:
32        """
33        Get project metadata schema
34
35        Args:
36            project_id (str): ID of the Project
37        """
38        return get_project_schema.sync(project_id=project_id, client=self._api_client)
39
40    def update_project_schema(self, project_id: str, schema: FormSchema):
41        """
42        Update project metadata schema
43
44        Args:
45            project_id (str): ID of the Project
46            schema (cirro_api_client.v1.models.FormSchema): Metadata schema
47        """
48        update_project_schema.sync_detailed(project_id=project_id, body=schema, client=self._api_client)
49
50    def update_sample(self, project_id: str, sample_id: str, sample: SampleRequest) -> Sample:
51        """
52        Updates metadata information for sample
53
54        Args:
55            project_id (str): ID of the Project
56            sample_id (str): ID of the sample
57            sample (cirro_api_client.v1.models.SampleRequest): Metadata information for the sample
58        """
59        return update_sample.sync(
60            project_id=project_id,
61            sample_id=sample_id,
62            body=sample,
63            client=self._api_client
64        )

Service for interacting with the Metadata endpoints

def get_project_samples( self, project_id: str, max_items: int = 10000) -> List[cirro_api_client.v1.models.Sample]:
15    def get_project_samples(self, project_id: str, max_items: int = 10000) -> List[Sample]:
16        """
17        Retrieves a list of samples associated with a project along with their metadata
18
19        Args:
20            project_id (str): ID of the Project
21            max_items (int): Maximum number of records to get (default 10,000)
22        """
23        return get_all_records(
24            records_getter=lambda page_args: get_project_samples.sync(project_id=project_id,
25                                                                      client=self._api_client,
26                                                                      next_token=page_args.next_token,
27                                                                      limit=page_args.limit),
28            max_items=max_items
29        )

Retrieves a list of samples associated with a project along with their metadata

Arguments:
  • project_id (str): ID of the Project
  • max_items (int): Maximum number of records to get (default 10,000)
def get_project_schema( self, project_id: str) -> cirro_api_client.v1.models.FormSchema:
31    def get_project_schema(self, project_id: str) -> FormSchema:
32        """
33        Get project metadata schema
34
35        Args:
36            project_id (str): ID of the Project
37        """
38        return get_project_schema.sync(project_id=project_id, client=self._api_client)

Get project metadata schema

Arguments:
  • project_id (str): ID of the Project
def update_project_schema( self, project_id: str, schema: cirro_api_client.v1.models.FormSchema):
40    def update_project_schema(self, project_id: str, schema: FormSchema):
41        """
42        Update project metadata schema
43
44        Args:
45            project_id (str): ID of the Project
46            schema (cirro_api_client.v1.models.FormSchema): Metadata schema
47        """
48        update_project_schema.sync_detailed(project_id=project_id, body=schema, client=self._api_client)

Update project metadata schema

Arguments:
def update_sample( self, project_id: str, sample_id: str, sample: cirro_api_client.v1.models.SampleRequest) -> cirro_api_client.v1.models.Sample:
50    def update_sample(self, project_id: str, sample_id: str, sample: SampleRequest) -> Sample:
51        """
52        Updates metadata information for sample
53
54        Args:
55            project_id (str): ID of the Project
56            sample_id (str): ID of the sample
57            sample (cirro_api_client.v1.models.SampleRequest): Metadata information for the sample
58        """
59        return update_sample.sync(
60            project_id=project_id,
61            sample_id=sample_id,
62            body=sample,
63            client=self._api_client
64        )

Updates metadata information for sample

Arguments:
class MetricsService(cirro.services.base.BaseService):
10class MetricsService(BaseService):
11    """
12    Service for interacting with the Metrics endpoints
13    """
14    def get_for_project(self, project_id: str) -> ProjectMetrics:
15        """
16        Retrieves the cost and storage metrics for a project.
17
18        Args:
19            project_id (str): ID of the Project
20        """
21        return get_project_metrics.sync(
22            project_id=project_id,
23            client=self._api_client
24        )
25
26    def get_all_metrics(self) -> List[ProjectMetrics]:
27        """
28        Retrieves all available metrics
29        """
30        return get_all_metrics.sync(client=self._api_client)

Service for interacting with the Metrics endpoints

def get_for_project( self, project_id: str) -> cirro_api_client.v1.models.ProjectMetrics:
14    def get_for_project(self, project_id: str) -> ProjectMetrics:
15        """
16        Retrieves the cost and storage metrics for a project.
17
18        Args:
19            project_id (str): ID of the Project
20        """
21        return get_project_metrics.sync(
22            project_id=project_id,
23            client=self._api_client
24        )

Retrieves the cost and storage metrics for a project.

Arguments:
  • project_id (str): ID of the Project
def get_all_metrics(self) -> List[cirro_api_client.v1.models.ProjectMetrics]:
26    def get_all_metrics(self) -> List[ProjectMetrics]:
27        """
28        Retrieves all available metrics
29        """
30        return get_all_metrics.sync(client=self._api_client)

Retrieves all available metrics

class ProcessService(cirro.services.base.BaseService):
 15class ProcessService(BaseService):
 16    """
 17    Service for interacting with the Process endpoints
 18    """
 19    def list(self, process_type: Executor = None) -> List[Process]:
 20        """
 21        Retrieves a list of available processes
 22
 23        Args:
 24            process_type (`cirro_api_client.v1.models.Executor`): Optional process type (INGEST, CROMWELL, or NEXTFLOW)
 25        """
 26        processes = get_processes.sync(client=self._api_client)
 27        return [p for p in processes if not process_type or process_type == p.executor]
 28
 29    def get(self, process_id: str) -> ProcessDetail:
 30        """
 31        Retrieves detailed information on a process
 32
 33        Args:
 34            process_id (str): Process ID
 35        """
 36        return get_process.sync(process_id=process_id, client=self._api_client)
 37
 38    def archive(self, process_id: str):
 39        """
 40        Removes a custom process from the list of available processes.
 41
 42        Error will be raised if the requested process does not exist. No value
 43        is returned, and no error raised if process exists and request is satisfied.
 44
 45        Args:
 46            process_id (str): Process ID
 47        """
 48        archive_custom_process.sync_detailed(process_id=process_id, client=self._api_client)
 49
 50    def find_by_name(self, name: str) -> Optional[ProcessDetail]:
 51        """
 52        Get a process by its display name
 53
 54        Args:
 55            name (str): Process name
 56        """
 57        matched_process = next((p for p in self.list() if p.name == name), None)
 58        if not matched_process:
 59            return None
 60
 61        return self.get(matched_process.id)
 62
 63    def get_parameter_spec(self, process_id: str) -> ParameterSpecification:
 64        """
 65        Gets a specification used to describe the parameters used in the process
 66
 67        Args:
 68            process_id (str): Process ID
 69        """
 70        form_spec = get_process_parameters.sync(process_id=process_id, client=self._api_client)
 71        return ParameterSpecification(form_spec)
 72
 73    def check_dataset_files(self, files: List[str], process_id: str, directory: str):
 74        """
 75        Checks if the file mapping rules for a process are met by the list of files
 76
 77        Error will be raised if the file mapping rules for the process are not met.
 78        No value is returned and no error is raised if the rules are satisfied.
 79
 80        Args:
 81            process_id (str): ID for the process containing the file mapping rules
 82            directory: path to directory containing files
 83            files (List[str]): File names to check
 84        """
 85        # Parse sample sheet file if present
 86        sample_sheet = None
 87        sample_sheet_file = Path(directory, 'samplesheet.csv')
 88        if sample_sheet_file.exists():
 89            sample_sheet = sample_sheet_file.read_text()
 90
 91        request = ValidateFileRequirementsRequest(
 92            file_names=files,
 93            sample_sheet=sample_sheet
 94        )
 95        requirements = validate_file_requirements.sync(process_id=process_id, body=request, client=self._api_client)
 96
 97        # These will be sample sheet errors or no files errors
 98        if error_msg := requirements.error_msg:
 99            raise ValueError(error_msg)
100
101        errors = [
102            f'{entry.description}. {entry.error_msg}. We accept any of the following naming conventions: \n\t- ' +
103            '\n\t- '.join([
104                e.example_name
105                for e in entry.allowed_patterns
106            ])
107            for entry in requirements.allowed_data_types
108            if entry.error_msg is not None
109        ]
110
111        files_provided = ', '.join(files)
112
113        if len(errors) != 0:
114            raise ValueError(f"The files you have provided are: {files_provided} \n\n"
115                             "They do not meet the dataset requirements. "
116                             "The required file types are: \n" +
117                             "\n".join(errors))
118
119    def create_custom_process(self, process: CustomProcessInput) -> CreateResponse:
120        """
121        Creates a custom process (pipeline or data type) in the system.
122
123        Whether it is a pipeline or data type is determined by the executor field.
124        For pipelines, you must specify the pipeline code and configuration repository.
125
126        This process will be available to all users in the system if `is_tenant_wide` is set to True.
127        If `is_tenant_wide` is set to False, the process will only be available the projects
128         specified in `linked_projects_ids`. Making it available tenant wide requires tenant admin privileges.
129
130        If the repository is private, you must complete the authorization flow on the UI.
131
132        See https://docs.cirro.bio/pipelines/importing-custom-pipelines/ for more info.
133
134        Args:
135            process (Process): Process to create
136
137        Example:
138        ```python
139        from cirro_api_client.v1.models import CustomProcessInput, Executor, \
140         PipelineCode, FileMappingRule, FileNamePattern, RepositoryType, CustomPipelineSettings
141        from cirro.cirro_client import CirroApi
142
143        cirro = CirroApi()
144
145        # New pipeline
146        new_pipeline = CustomProcessInput(
147            id="my_pipeline",
148            name="My Pipeline",
149            description="This is a test pipeline",
150            executor=Executor.CROMWELL,
151            category="DNA Sequencing",
152            child_process_ids=[],
153            parent_process_ids=["rnaseq"],
154            documentation_url="https://example.com/docs",
155            pipeline_code=PipelineCode(
156                repository_path="CirroBio/test-pipeline",
157                version="v1.0.0",
158                entry_point="main.nf",
159                repository_type=RepositoryType.GITHUB_PUBLIC
160            ),
161            linked_project_ids=[],
162            is_tenant_wide=True,
163            allow_multiple_sources=True,
164            uses_sample_sheet=True,
165            # This can be the same or different from the pipeline_code
166            custom_settings=CustomPipelineSettings(
167                repository="CirroBio/test-pipeline",
168                branch="v1.0.0",
169                repository_type=RepositoryType.GITHUB_PUBLIC,
170            ),
171            file_mapping_rules=[
172                FileMappingRule(
173                    description="Filtered Feature Matrix",
174                    file_name_patterns=[
175                        FileNamePattern(
176                            example_name="filtered_feature_bc_matrix.h5",
177                            description="Matrix",
178                            sample_matching_pattern="(?P<sampleName>[\\S ]*)\\.jpg"
179                        )
180                    ]
181                )
182            ]
183        )
184
185        cirro.processes.create_custom_process(new_pipeline)
186
187        # New data type
188        new_data_type = CustomProcessInput(
189            id="images_jpg",
190            name="JPG Images",
191            description="Used for generic JPG images",
192            executor=Executor.INGEST,
193            child_process_ids=[],
194            parent_process_ids=[],
195            documentation_url="https://example.com/docs",
196            linked_project_ids=["project_id_1", "project_id_2"],
197            file_mapping_rules=[
198                FileMappingRule(
199                    description="Images",
200                    min_=1,
201                    file_name_patterns=[
202                        FileNamePattern(
203                            example_name="image.jpg",
204                            description="JPG Image",
205                            sample_matching_pattern="(?P<sampleName>[\\S ]*)/outs/image\\.jpg"
206                        )
207                    ]
208                )
209            ]
210        )
211        cirro.processes.create_custom_process(new_data_type)
212        ```
213        """
214        return create_custom_process.sync(client=self._api_client, body=process)
215
216    def update_custom_process(self, process_id: str, process: CustomProcessInput):
217        """
218        Updates the custom process (pipeline or data type) in the system.
219
220        Please run `sync_custom_process` after updating to ensure the pipeline configuration is up to date.
221
222        Args:
223            process_id (str): ID of the process to update
224            process (CustomProcessInput): Process to update
225        """
226        update_custom_process.sync_detailed(client=self._api_client,
227                                            process_id=process_id,
228                                            body=process)
229
230    def sync_custom_process(self, process_id: str) -> CustomPipelineSettings:
231        """
232        Syncs a custom pipeline in the system.
233
234        This updates the pipeline configurations in Cirro (form configuration,
235         input mapping, preprocess etc.) to match what is in the configured repository.
236        See https://docs.cirro.bio/pipelines/configuring-pipeline/ for more details.
237
238        Args:
239            process_id (str): ID of the process to sync
240        """
241        return sync_custom_process.sync(client=self._api_client, process_id=process_id)

Service for interacting with the Process endpoints

def list( self, process_type: cirro_api_client.v1.models.Executor = None) -> List[cirro_api_client.v1.models.Process]:
19    def list(self, process_type: Executor = None) -> List[Process]:
20        """
21        Retrieves a list of available processes
22
23        Args:
24            process_type (`cirro_api_client.v1.models.Executor`): Optional process type (INGEST, CROMWELL, or NEXTFLOW)
25        """
26        processes = get_processes.sync(client=self._api_client)
27        return [p for p in processes if not process_type or process_type == p.executor]

Retrieves a list of available processes

Arguments:
def get( self, process_id: str) -> cirro_api_client.v1.models.ProcessDetail:
29    def get(self, process_id: str) -> ProcessDetail:
30        """
31        Retrieves detailed information on a process
32
33        Args:
34            process_id (str): Process ID
35        """
36        return get_process.sync(process_id=process_id, client=self._api_client)

Retrieves detailed information on a process

Arguments:
  • process_id (str): Process ID
def archive(self, process_id: str):
38    def archive(self, process_id: str):
39        """
40        Removes a custom process from the list of available processes.
41
42        Error will be raised if the requested process does not exist. No value
43        is returned, and no error raised if process exists and request is satisfied.
44
45        Args:
46            process_id (str): Process ID
47        """
48        archive_custom_process.sync_detailed(process_id=process_id, client=self._api_client)

Removes a custom process from the list of available processes.

Error will be raised if the requested process does not exist. No value is returned, and no error raised if process exists and request is satisfied.

Arguments:
  • process_id (str): Process ID
def find_by_name( self, name: str) -> Optional[cirro_api_client.v1.models.ProcessDetail]:
50    def find_by_name(self, name: str) -> Optional[ProcessDetail]:
51        """
52        Get a process by its display name
53
54        Args:
55            name (str): Process name
56        """
57        matched_process = next((p for p in self.list() if p.name == name), None)
58        if not matched_process:
59            return None
60
61        return self.get(matched_process.id)

Get a process by its display name

Arguments:
  • name (str): Process name
def get_parameter_spec( self, process_id: str) -> cirro.models.form_specification.ParameterSpecification:
63    def get_parameter_spec(self, process_id: str) -> ParameterSpecification:
64        """
65        Gets a specification used to describe the parameters used in the process
66
67        Args:
68            process_id (str): Process ID
69        """
70        form_spec = get_process_parameters.sync(process_id=process_id, client=self._api_client)
71        return ParameterSpecification(form_spec)

Gets a specification used to describe the parameters used in the process

Arguments:
  • process_id (str): Process ID
def check_dataset_files(self, files: List[str], process_id: str, directory: str):
 73    def check_dataset_files(self, files: List[str], process_id: str, directory: str):
 74        """
 75        Checks if the file mapping rules for a process are met by the list of files
 76
 77        Error will be raised if the file mapping rules for the process are not met.
 78        No value is returned and no error is raised if the rules are satisfied.
 79
 80        Args:
 81            process_id (str): ID for the process containing the file mapping rules
 82            directory: path to directory containing files
 83            files (List[str]): File names to check
 84        """
 85        # Parse sample sheet file if present
 86        sample_sheet = None
 87        sample_sheet_file = Path(directory, 'samplesheet.csv')
 88        if sample_sheet_file.exists():
 89            sample_sheet = sample_sheet_file.read_text()
 90
 91        request = ValidateFileRequirementsRequest(
 92            file_names=files,
 93            sample_sheet=sample_sheet
 94        )
 95        requirements = validate_file_requirements.sync(process_id=process_id, body=request, client=self._api_client)
 96
 97        # These will be sample sheet errors or no files errors
 98        if error_msg := requirements.error_msg:
 99            raise ValueError(error_msg)
100
101        errors = [
102            f'{entry.description}. {entry.error_msg}. We accept any of the following naming conventions: \n\t- ' +
103            '\n\t- '.join([
104                e.example_name
105                for e in entry.allowed_patterns
106            ])
107            for entry in requirements.allowed_data_types
108            if entry.error_msg is not None
109        ]
110
111        files_provided = ', '.join(files)
112
113        if len(errors) != 0:
114            raise ValueError(f"The files you have provided are: {files_provided} \n\n"
115                             "They do not meet the dataset requirements. "
116                             "The required file types are: \n" +
117                             "\n".join(errors))

Checks if the file mapping rules for a process are met by the list of files

Error will be raised if the file mapping rules for the process are not met. No value is returned and no error is raised if the rules are satisfied.

Arguments:
  • process_id (str): ID for the process containing the file mapping rules
  • directory: path to directory containing files
  • files (List[str]): File names to check
def create_custom_process( self, process: cirro_api_client.v1.models.CustomProcessInput) -> cirro_api_client.v1.models.CreateResponse:
119    def create_custom_process(self, process: CustomProcessInput) -> CreateResponse:
120        """
121        Creates a custom process (pipeline or data type) in the system.
122
123        Whether it is a pipeline or data type is determined by the executor field.
124        For pipelines, you must specify the pipeline code and configuration repository.
125
126        This process will be available to all users in the system if `is_tenant_wide` is set to True.
127        If `is_tenant_wide` is set to False, the process will only be available the projects
128         specified in `linked_projects_ids`. Making it available tenant wide requires tenant admin privileges.
129
130        If the repository is private, you must complete the authorization flow on the UI.
131
132        See https://docs.cirro.bio/pipelines/importing-custom-pipelines/ for more info.
133
134        Args:
135            process (Process): Process to create
136
137        Example:
138        ```python
139        from cirro_api_client.v1.models import CustomProcessInput, Executor, \
140         PipelineCode, FileMappingRule, FileNamePattern, RepositoryType, CustomPipelineSettings
141        from cirro.cirro_client import CirroApi
142
143        cirro = CirroApi()
144
145        # New pipeline
146        new_pipeline = CustomProcessInput(
147            id="my_pipeline",
148            name="My Pipeline",
149            description="This is a test pipeline",
150            executor=Executor.CROMWELL,
151            category="DNA Sequencing",
152            child_process_ids=[],
153            parent_process_ids=["rnaseq"],
154            documentation_url="https://example.com/docs",
155            pipeline_code=PipelineCode(
156                repository_path="CirroBio/test-pipeline",
157                version="v1.0.0",
158                entry_point="main.nf",
159                repository_type=RepositoryType.GITHUB_PUBLIC
160            ),
161            linked_project_ids=[],
162            is_tenant_wide=True,
163            allow_multiple_sources=True,
164            uses_sample_sheet=True,
165            # This can be the same or different from the pipeline_code
166            custom_settings=CustomPipelineSettings(
167                repository="CirroBio/test-pipeline",
168                branch="v1.0.0",
169                repository_type=RepositoryType.GITHUB_PUBLIC,
170            ),
171            file_mapping_rules=[
172                FileMappingRule(
173                    description="Filtered Feature Matrix",
174                    file_name_patterns=[
175                        FileNamePattern(
176                            example_name="filtered_feature_bc_matrix.h5",
177                            description="Matrix",
178                            sample_matching_pattern="(?P<sampleName>[\\S ]*)\\.jpg"
179                        )
180                    ]
181                )
182            ]
183        )
184
185        cirro.processes.create_custom_process(new_pipeline)
186
187        # New data type
188        new_data_type = CustomProcessInput(
189            id="images_jpg",
190            name="JPG Images",
191            description="Used for generic JPG images",
192            executor=Executor.INGEST,
193            child_process_ids=[],
194            parent_process_ids=[],
195            documentation_url="https://example.com/docs",
196            linked_project_ids=["project_id_1", "project_id_2"],
197            file_mapping_rules=[
198                FileMappingRule(
199                    description="Images",
200                    min_=1,
201                    file_name_patterns=[
202                        FileNamePattern(
203                            example_name="image.jpg",
204                            description="JPG Image",
205                            sample_matching_pattern="(?P<sampleName>[\\S ]*)/outs/image\\.jpg"
206                        )
207                    ]
208                )
209            ]
210        )
211        cirro.processes.create_custom_process(new_data_type)
212        ```
213        """
214        return create_custom_process.sync(client=self._api_client, body=process)

Creates a custom process (pipeline or data type) in the system.

Whether it is a pipeline or data type is determined by the executor field. For pipelines, you must specify the pipeline code and configuration repository.

This process will be available to all users in the system if is_tenant_wide is set to True. If is_tenant_wide is set to False, the process will only be available the projects specified in linked_projects_ids. Making it available tenant wide requires tenant admin privileges.

If the repository is private, you must complete the authorization flow on the UI.

See https://docs.cirro.bio/pipelines/importing-custom-pipelines/ for more info.

Arguments:
  • process (Process): Process to create

Example:

from cirro_api_client.v1.models import CustomProcessInput, Executor,          PipelineCode, FileMappingRule, FileNamePattern, RepositoryType, CustomPipelineSettings
from cirro.cirro_client import CirroApi

cirro = CirroApi()

# New pipeline
new_pipeline = CustomProcessInput(
    id="my_pipeline",
    name="My Pipeline",
    description="This is a test pipeline",
    executor=Executor.CROMWELL,
    category="DNA Sequencing",
    child_process_ids=[],
    parent_process_ids=["rnaseq"],
    documentation_url="https://example.com/docs",
    pipeline_code=PipelineCode(
        repository_path="CirroBio/test-pipeline",
        version="v1.0.0",
        entry_point="main.nf",
        repository_type=RepositoryType.GITHUB_PUBLIC
    ),
    linked_project_ids=[],
    is_tenant_wide=True,
    allow_multiple_sources=True,
    uses_sample_sheet=True,
    # This can be the same or different from the pipeline_code
    custom_settings=CustomPipelineSettings(
        repository="CirroBio/test-pipeline",
        branch="v1.0.0",
        repository_type=RepositoryType.GITHUB_PUBLIC,
    ),
    file_mapping_rules=[
        FileMappingRule(
            description="Filtered Feature Matrix",
            file_name_patterns=[
                FileNamePattern(
                    example_name="filtered_feature_bc_matrix.h5",
                    description="Matrix",
                    sample_matching_pattern="(?P<sampleName>[\S ]*)\.jpg"
                )
            ]
        )
    ]
)

cirro.processes.create_custom_process(new_pipeline)

# New data type
new_data_type = CustomProcessInput(
    id="images_jpg",
    name="JPG Images",
    description="Used for generic JPG images",
    executor=Executor.INGEST,
    child_process_ids=[],
    parent_process_ids=[],
    documentation_url="https://example.com/docs",
    linked_project_ids=["project_id_1", "project_id_2"],
    file_mapping_rules=[
        FileMappingRule(
            description="Images",
            min_=1,
            file_name_patterns=[
                FileNamePattern(
                    example_name="image.jpg",
                    description="JPG Image",
                    sample_matching_pattern="(?P<sampleName>[\S ]*)/outs/image\.jpg"
                )
            ]
        )
    ]
)
cirro.processes.create_custom_process(new_data_type)
def update_custom_process( self, process_id: str, process: cirro_api_client.v1.models.CustomProcessInput):
216    def update_custom_process(self, process_id: str, process: CustomProcessInput):
217        """
218        Updates the custom process (pipeline or data type) in the system.
219
220        Please run `sync_custom_process` after updating to ensure the pipeline configuration is up to date.
221
222        Args:
223            process_id (str): ID of the process to update
224            process (CustomProcessInput): Process to update
225        """
226        update_custom_process.sync_detailed(client=self._api_client,
227                                            process_id=process_id,
228                                            body=process)

Updates the custom process (pipeline or data type) in the system.

Please run sync_custom_process after updating to ensure the pipeline configuration is up to date.

Arguments:
  • process_id (str): ID of the process to update
  • process (CustomProcessInput): Process to update
def sync_custom_process( self, process_id: str) -> cirro_api_client.v1.models.CustomPipelineSettings:
230    def sync_custom_process(self, process_id: str) -> CustomPipelineSettings:
231        """
232        Syncs a custom pipeline in the system.
233
234        This updates the pipeline configurations in Cirro (form configuration,
235         input mapping, preprocess etc.) to match what is in the configured repository.
236        See https://docs.cirro.bio/pipelines/configuring-pipeline/ for more details.
237
238        Args:
239            process_id (str): ID of the process to sync
240        """
241        return sync_custom_process.sync(client=self._api_client, process_id=process_id)

Syncs a custom pipeline in the system.

This updates the pipeline configurations in Cirro (form configuration, input mapping, preprocess etc.) to match what is in the configured repository. See https://docs.cirro.bio/pipelines/configuring-pipeline/ for more details.

Arguments:
  • process_id (str): ID of the process to sync
class ProjectService(cirro.services.base.BaseService):
 12class ProjectService(BaseService):
 13    """
 14    Service for interacting with the Project endpoints
 15    """
 16    def list(self):
 17        """
 18        Retrieve a list of projects
 19        """
 20        return get_projects.sync(client=self._api_client)
 21
 22    def get(self, project_id: str) -> ProjectDetail:
 23        """
 24        Get detailed project information
 25
 26        Args:
 27            project_id (str): Project ID
 28        """
 29        return get_project.sync(project_id=project_id, client=self._api_client)
 30
 31    def create(self, request: ProjectInput) -> CreateResponse:
 32        """
 33        Create a project
 34
 35        Args:
 36            request (`cirro_api_client.v1.models.ProjectInput`): Detailed information about the project to create
 37
 38        Examples:
 39        ```python
 40        from cirro_api_client.v1.models import ProjectInput, ProjectSettings, Contact, BudgetPeriod, CloudAccount, \
 41            CloudAccountType
 42        from cirro.cirro_client import CirroApi
 43
 44        cirro = CirroApi()
 45
 46        # Bring your own account projects are hosted by the user
 47        # You must provide the account details and VPC information
 48        # Please view the ProjectSettings model for more information on the attributes required
 49        byoa_project = ProjectInput(
 50            name="New Project Name",
 51            description="Description of new project",
 52            billing_account_id="billing-account-id",
 53            settings=ProjectSettings(
 54                budget_period=BudgetPeriod.MONTHLY,
 55                budget_amount=1000,
 56                max_spot_vcpu=300,
 57                service_connections=[],
 58                retention_policy_days=7,
 59                vpc_id="vpc-000000000000",
 60                batch_subnets=["subnet-000000000000", "subnet-000000000001"],
 61                sagemaker_subnets=["subnet-000000000000", "subnet-000000000001"],
 62                kms_arn="arn:aws:kms:us-west-2:000000000000:key/00000000-0000-0000-0000-000000000000"
 63            ),
 64            account=CloudAccount(
 65                account_id="<AWS_ACCOUNT_ID>",
 66                region_name="us-west-2",
 67                account_name="Cirro Lab Project",
 68                account_type=CloudAccountType.BYOA
 69            ),
 70            contacts=[
 71                Contact(
 72                    name="Contact Name",
 73                    organization="Contact Organization",
 74                    email="contact@email.com",
 75                    phone="987-654-3210"
 76                )
 77            ]
 78        )
 79
 80        # Hosted projects are managed by Cirro
 81        hosted_project = ProjectInput(
 82            name="New Project Name",
 83            description="Description of new project",
 84            billing_account_id="billing-account-id",
 85            settings=ProjectSettings(
 86                budget_period=BudgetPeriod.MONTHLY,
 87                budget_amount=1000,
 88                max_spot_vcpu=300,
 89                service_connections=[],
 90                retention_policy_days=7
 91            ),
 92            contacts=[
 93                Contact(
 94                    name="Contact Name",
 95                    organization="Contact Organization",
 96                    email="contact@email.com",
 97                    phone="987-654-3210"
 98                )
 99            ]
100        )
101
102        cirro.projects.create(byoa_project)
103        ```
104        """
105        return create_project.sync(client=self._api_client, body=request)
106
107    def update(self, project_id: str, request: ProjectInput) -> ProjectDetail:
108        """
109        Updates a project
110
111        Args:
112            project_id (str): ID of project to update
113            request (`cirro_api_client.v1.models.ProjectInput`): New details for the project
114        """
115        return update_project.sync(project_id=project_id, body=request, client=self._api_client)
116
117    def update_tags(self, project_id: str, tags: List[Tag]):
118        """
119        Sets tags on a project
120
121        Args:
122            project_id (str): Project ID
123            tags (List[Tag]): List of tags to apply
124        """
125        update_project_tags.sync_detailed(project_id=project_id, body=tags, client=self._api_client)
126
127    def get_users(self, project_id: str) -> Optional[List[ProjectUser]]:
128        """
129        Gets users who have access to the project
130
131        Args:
132            project_id (str): Project ID
133        """
134        return get_project_users.sync(project_id=project_id, client=self._api_client)
135
136    def set_user_role(self, project_id: str, username: str, role: ProjectRole, suppress_notification=False):
137        """
138        Sets a user's role within a project.
139
140        Set to `ProjectRole.NONE` if removing the user from a project.
141
142        Args:
143            project_id (str): Project ID
144            username (str): Username
145            role (`cirro_api_client.v1.models.ProjectRole`): New role to apply
146            suppress_notification (bool): Suppress email notification
147        """
148        request_body = SetUserProjectRoleRequest(
149            username=username,
150            role=role,
151            suppress_notification=suppress_notification
152        )
153        set_user_project_role.sync_detailed(project_id=project_id, body=request_body, client=self._api_client)
154
155    def archive(self, project_id: str):
156        """
157        Sets the project status to archived (hidden from active projects)
158
159        Args:
160            project_id (str): Project ID
161        """
162        archive_project.sync_detailed(project_id=project_id, client=self._api_client)
163
164    def unarchive(self, project_id: str):
165        """
166        Sets the project status to active
167
168        Args:
169            project_id (str): Project ID
170        """
171        unarchive_project.sync_detailed(project_id=project_id, client=self._api_client)

Service for interacting with the Project endpoints

def list(self):
16    def list(self):
17        """
18        Retrieve a list of projects
19        """
20        return get_projects.sync(client=self._api_client)

Retrieve a list of projects

def get( self, project_id: str) -> cirro_api_client.v1.models.ProjectDetail:
22    def get(self, project_id: str) -> ProjectDetail:
23        """
24        Get detailed project information
25
26        Args:
27            project_id (str): Project ID
28        """
29        return get_project.sync(project_id=project_id, client=self._api_client)

Get detailed project information

Arguments:
  • project_id (str): Project ID
 31    def create(self, request: ProjectInput) -> CreateResponse:
 32        """
 33        Create a project
 34
 35        Args:
 36            request (`cirro_api_client.v1.models.ProjectInput`): Detailed information about the project to create
 37
 38        Examples:
 39        ```python
 40        from cirro_api_client.v1.models import ProjectInput, ProjectSettings, Contact, BudgetPeriod, CloudAccount, \
 41            CloudAccountType
 42        from cirro.cirro_client import CirroApi
 43
 44        cirro = CirroApi()
 45
 46        # Bring your own account projects are hosted by the user
 47        # You must provide the account details and VPC information
 48        # Please view the ProjectSettings model for more information on the attributes required
 49        byoa_project = ProjectInput(
 50            name="New Project Name",
 51            description="Description of new project",
 52            billing_account_id="billing-account-id",
 53            settings=ProjectSettings(
 54                budget_period=BudgetPeriod.MONTHLY,
 55                budget_amount=1000,
 56                max_spot_vcpu=300,
 57                service_connections=[],
 58                retention_policy_days=7,
 59                vpc_id="vpc-000000000000",
 60                batch_subnets=["subnet-000000000000", "subnet-000000000001"],
 61                sagemaker_subnets=["subnet-000000000000", "subnet-000000000001"],
 62                kms_arn="arn:aws:kms:us-west-2:000000000000:key/00000000-0000-0000-0000-000000000000"
 63            ),
 64            account=CloudAccount(
 65                account_id="<AWS_ACCOUNT_ID>",
 66                region_name="us-west-2",
 67                account_name="Cirro Lab Project",
 68                account_type=CloudAccountType.BYOA
 69            ),
 70            contacts=[
 71                Contact(
 72                    name="Contact Name",
 73                    organization="Contact Organization",
 74                    email="contact@email.com",
 75                    phone="987-654-3210"
 76                )
 77            ]
 78        )
 79
 80        # Hosted projects are managed by Cirro
 81        hosted_project = ProjectInput(
 82            name="New Project Name",
 83            description="Description of new project",
 84            billing_account_id="billing-account-id",
 85            settings=ProjectSettings(
 86                budget_period=BudgetPeriod.MONTHLY,
 87                budget_amount=1000,
 88                max_spot_vcpu=300,
 89                service_connections=[],
 90                retention_policy_days=7
 91            ),
 92            contacts=[
 93                Contact(
 94                    name="Contact Name",
 95                    organization="Contact Organization",
 96                    email="contact@email.com",
 97                    phone="987-654-3210"
 98                )
 99            ]
100        )
101
102        cirro.projects.create(byoa_project)
103        ```
104        """
105        return create_project.sync(client=self._api_client, body=request)

Create a project

Arguments:

Examples:

from cirro_api_client.v1.models import ProjectInput, ProjectSettings, Contact, BudgetPeriod, CloudAccount,             CloudAccountType
from cirro.cirro_client import CirroApi

cirro = CirroApi()

# Bring your own account projects are hosted by the user
# You must provide the account details and VPC information
# Please view the ProjectSettings model for more information on the attributes required
byoa_project = ProjectInput(
    name="New Project Name",
    description="Description of new project",
    billing_account_id="billing-account-id",
    settings=ProjectSettings(
        budget_period=BudgetPeriod.MONTHLY,
        budget_amount=1000,
        max_spot_vcpu=300,
        service_connections=[],
        retention_policy_days=7,
        vpc_id="vpc-000000000000",
        batch_subnets=["subnet-000000000000", "subnet-000000000001"],
        sagemaker_subnets=["subnet-000000000000", "subnet-000000000001"],
        kms_arn="arn:aws:kms:us-west-2:000000000000:key/00000000-0000-0000-0000-000000000000"
    ),
    account=CloudAccount(
        account_id="<AWS_ACCOUNT_ID>",
        region_name="us-west-2",
        account_name="Cirro Lab Project",
        account_type=CloudAccountType.BYOA
    ),
    contacts=[
        Contact(
            name="Contact Name",
            organization="Contact Organization",
            email="contact@email.com",
            phone="987-654-3210"
        )
    ]
)

# Hosted projects are managed by Cirro
hosted_project = ProjectInput(
    name="New Project Name",
    description="Description of new project",
    billing_account_id="billing-account-id",
    settings=ProjectSettings(
        budget_period=BudgetPeriod.MONTHLY,
        budget_amount=1000,
        max_spot_vcpu=300,
        service_connections=[],
        retention_policy_days=7
    ),
    contacts=[
        Contact(
            name="Contact Name",
            organization="Contact Organization",
            email="contact@email.com",
            phone="987-654-3210"
        )
    ]
)

cirro.projects.create(byoa_project)
def update( self, project_id: str, request: cirro_api_client.v1.models.ProjectInput) -> cirro_api_client.v1.models.ProjectDetail:
107    def update(self, project_id: str, request: ProjectInput) -> ProjectDetail:
108        """
109        Updates a project
110
111        Args:
112            project_id (str): ID of project to update
113            request (`cirro_api_client.v1.models.ProjectInput`): New details for the project
114        """
115        return update_project.sync(project_id=project_id, body=request, client=self._api_client)

Updates a project

Arguments:
def update_tags( self, project_id: str, tags: List[cirro_api_client.v1.models.Tag]):
117    def update_tags(self, project_id: str, tags: List[Tag]):
118        """
119        Sets tags on a project
120
121        Args:
122            project_id (str): Project ID
123            tags (List[Tag]): List of tags to apply
124        """
125        update_project_tags.sync_detailed(project_id=project_id, body=tags, client=self._api_client)

Sets tags on a project

Arguments:
  • project_id (str): Project ID
  • tags (List[Tag]): List of tags to apply
def get_users( self, project_id: str) -> Optional[List[cirro_api_client.v1.models.ProjectUser]]:
127    def get_users(self, project_id: str) -> Optional[List[ProjectUser]]:
128        """
129        Gets users who have access to the project
130
131        Args:
132            project_id (str): Project ID
133        """
134        return get_project_users.sync(project_id=project_id, client=self._api_client)

Gets users who have access to the project

Arguments:
  • project_id (str): Project ID
def set_user_role( self, project_id: str, username: str, role: cirro_api_client.v1.models.ProjectRole, suppress_notification=False):
136    def set_user_role(self, project_id: str, username: str, role: ProjectRole, suppress_notification=False):
137        """
138        Sets a user's role within a project.
139
140        Set to `ProjectRole.NONE` if removing the user from a project.
141
142        Args:
143            project_id (str): Project ID
144            username (str): Username
145            role (`cirro_api_client.v1.models.ProjectRole`): New role to apply
146            suppress_notification (bool): Suppress email notification
147        """
148        request_body = SetUserProjectRoleRequest(
149            username=username,
150            role=role,
151            suppress_notification=suppress_notification
152        )
153        set_user_project_role.sync_detailed(project_id=project_id, body=request_body, client=self._api_client)

Sets a user's role within a project.

Set to ProjectRole.NONE if removing the user from a project.

Arguments:
def archive(self, project_id: str):
155    def archive(self, project_id: str):
156        """
157        Sets the project status to archived (hidden from active projects)
158
159        Args:
160            project_id (str): Project ID
161        """
162        archive_project.sync_detailed(project_id=project_id, client=self._api_client)

Sets the project status to archived (hidden from active projects)

Arguments:
  • project_id (str): Project ID
def unarchive(self, project_id: str):
164    def unarchive(self, project_id: str):
165        """
166        Sets the project status to active
167
168        Args:
169            project_id (str): Project ID
170        """
171        unarchive_project.sync_detailed(project_id=project_id, client=self._api_client)

Sets the project status to active

Arguments:
  • project_id (str): Project ID
class ReferenceService(cirro.services.file.FileEnabledService):
14class ReferenceService(FileEnabledService):
15    """
16    Service for interacting with the References endpoints
17    """
18    def get_types(self) -> Optional[List[ReferenceType]]:
19        """
20        List available reference types
21        """
22        return get_reference_types.sync(client=self._api_client)
23
24    def get_type(self, reference_type_name: str) -> Optional[ReferenceType]:
25        """
26        Get a specific reference type by name
27
28        Args:
29            reference_type_name (str): Name of the reference type (e.g. "Reference Genome (FASTA)")
30        """
31        types = self.get_types()
32        return next((t for t in types if t.name == reference_type_name), None)
33
34    def get_for_project(self, project_id: str) -> Optional[List[Reference]]:
35        """
36        List available references for a given project
37        """
38        return get_references_for_project.sync(project_id=project_id, client=self._api_client)
39
40    def upload_reference(self,
41                         name: str,
42                         reference_files: list[PathLike],
43                         project_id: str,
44                         ref_type: ReferenceType):
45        """
46        Upload a reference to a project
47
48        Args:
49            name (str): Name of the reference (e.g. "GRCh38")
50                no spaces are allowed in the name
51            reference_files (list[PathLike]): Path to the reference file to upload
52            project_id (str): ID of the project to upload the reference to
53            ref_type (ReferenceType): Type of the reference
54
55        ```python
56        from pathlib import Path
57
58        from cirro import CirroApi
59
60        cirro = CirroApi()
61
62        crispr_library = cirro.references.get_type("CRISPR sgRNA Library")
63        files = [
64            Path("~/crispr_library.csv").expanduser()
65        ]
66
67        cirro.references.upload_reference(
68            name="Library Name",
69            project_id="project-id",
70            ref_type=crispr_library,
71            reference_files=files,
72        )
73        ```
74        """
75        # Validate name
76        if ' ' in name:
77            raise ValueError("Reference name cannot contain spaces")
78
79        access_context = FileAccessContext.upload_reference(
80            project_id=project_id,
81            base_url=f's3://project-{project_id}/resources/data/references',
82        )
83
84        file_path_map = generate_reference_file_path_map(
85            files=reference_files,
86            name=name,
87            ref_type=ref_type
88        )
89
90        self._file_service.upload_files(
91            access_context=access_context,
92            directory=Path('~').expanduser(),  # Full path expected in reference_files
93            files=reference_files,
94            file_path_map=file_path_map
95        )
96
97        refresh_project_references.sync_detailed(project_id=project_id,
98                                                 client=self._api_client)

Service for interacting with the References endpoints

def get_types( self) -> Optional[List[cirro_api_client.v1.models.ReferenceType]]:
18    def get_types(self) -> Optional[List[ReferenceType]]:
19        """
20        List available reference types
21        """
22        return get_reference_types.sync(client=self._api_client)

List available reference types

def get_type( self, reference_type_name: str) -> Optional[cirro_api_client.v1.models.ReferenceType]:
24    def get_type(self, reference_type_name: str) -> Optional[ReferenceType]:
25        """
26        Get a specific reference type by name
27
28        Args:
29            reference_type_name (str): Name of the reference type (e.g. "Reference Genome (FASTA)")
30        """
31        types = self.get_types()
32        return next((t for t in types if t.name == reference_type_name), None)

Get a specific reference type by name

Arguments:
  • reference_type_name (str): Name of the reference type (e.g. "Reference Genome (FASTA)")
def get_for_project( self, project_id: str) -> Optional[List[cirro_api_client.v1.models.Reference]]:
34    def get_for_project(self, project_id: str) -> Optional[List[Reference]]:
35        """
36        List available references for a given project
37        """
38        return get_references_for_project.sync(project_id=project_id, client=self._api_client)

List available references for a given project

def upload_reference( self, name: str, reference_files: list[~PathLike], project_id: str, ref_type: cirro_api_client.v1.models.ReferenceType):
40    def upload_reference(self,
41                         name: str,
42                         reference_files: list[PathLike],
43                         project_id: str,
44                         ref_type: ReferenceType):
45        """
46        Upload a reference to a project
47
48        Args:
49            name (str): Name of the reference (e.g. "GRCh38")
50                no spaces are allowed in the name
51            reference_files (list[PathLike]): Path to the reference file to upload
52            project_id (str): ID of the project to upload the reference to
53            ref_type (ReferenceType): Type of the reference
54
55        ```python
56        from pathlib import Path
57
58        from cirro import CirroApi
59
60        cirro = CirroApi()
61
62        crispr_library = cirro.references.get_type("CRISPR sgRNA Library")
63        files = [
64            Path("~/crispr_library.csv").expanduser()
65        ]
66
67        cirro.references.upload_reference(
68            name="Library Name",
69            project_id="project-id",
70            ref_type=crispr_library,
71            reference_files=files,
72        )
73        ```
74        """
75        # Validate name
76        if ' ' in name:
77            raise ValueError("Reference name cannot contain spaces")
78
79        access_context = FileAccessContext.upload_reference(
80            project_id=project_id,
81            base_url=f's3://project-{project_id}/resources/data/references',
82        )
83
84        file_path_map = generate_reference_file_path_map(
85            files=reference_files,
86            name=name,
87            ref_type=ref_type
88        )
89
90        self._file_service.upload_files(
91            access_context=access_context,
92            directory=Path('~').expanduser(),  # Full path expected in reference_files
93            files=reference_files,
94            file_path_map=file_path_map
95        )
96
97        refresh_project_references.sync_detailed(project_id=project_id,
98                                                 client=self._api_client)

Upload a reference to a project

Arguments:
  • name (str): Name of the reference (e.g. "GRCh38") no spaces are allowed in the name
  • reference_files (list[PathLike]): Path to the reference file to upload
  • project_id (str): ID of the project to upload the reference to
  • ref_type (ReferenceType): Type of the reference
from pathlib import Path

from cirro import CirroApi

cirro = CirroApi()

crispr_library = cirro.references.get_type("CRISPR sgRNA Library")
files = [
    Path("~/crispr_library.csv").expanduser()
]

cirro.references.upload_reference(
    name="Library Name",
    project_id="project-id",
    ref_type=crispr_library,
    reference_files=files,
)
class ShareService(cirro.services.base.BaseService):
11class ShareService(BaseService):
12    """
13    Service for interacting with the Share endpoints
14    """
15
16    def list(self, project_id: str, share_type: ShareType = None) -> List[Share]:
17        """
18        Retrieve a list of shares for a given project
19        optionally filtered by share type
20
21        Args:
22            project_id (str): ID of the Project
23            share_type (ShareType): Type of share to filter by
24
25        ```python
26
27        from cirro_api_client.v1.models import ShareType
28        from cirro.cirro_client import CirroApi
29
30        cirro = CirroApi()
31
32        # List shares that we've published
33        cirro.shares.list(project_id="project-id", share_type=ShareType.PUBLISHER)
34        ```
35        """
36        shares = get_shares.sync(project_id=project_id, client=self._api_client)
37        if share_type:
38            shares = [share for share in shares if share.share_type == share_type]
39        return shares
40
41    def get(self, project_id: str, share_id: str) -> ShareDetail:
42        """
43        Get details of a share
44        """
45        return get_share.sync(project_id=project_id, share_id=share_id, client=self._api_client)
46
47    def create(self, project_id: str, share: ShareInput) -> CreateResponse:
48        """
49        Publish a share for a given project
50        """
51        return create_share.sync(project_id=project_id,
52                                 body=share,
53                                 client=self._api_client)
54
55    def update(self, project_id: str, share_id: str, share: ShareInput):
56        """
57        Publish a share for a given project
58        """
59        update_share.sync_detailed(project_id=project_id,
60                                   share_id=share_id,
61                                   body=share,
62                                   client=self._api_client)
63
64    def delete(self, project_id: str, share_id: str):
65        """
66        Delete a share
67        """
68        delete_share.sync_detailed(project_id=project_id,
69                                   share_id=share_id,
70                                   client=self._api_client)
71
72    def subscribe(self, project_id: str, share_id: str):
73        """
74        Subscribe to a share for a given project
75        """
76        subscribe_share.sync_detailed(project_id=project_id,
77                                      share_id=share_id,
78                                      client=self._api_client)
79
80    def unsubscribe(self, project_id: str, share_id: str):
81        """
82        Unsubscribe from a share for a given project
83        """
84        unsubscribe_share.sync_detailed(project_id=project_id,
85                                        share_id=share_id,
86                                        client=self._api_client)

Service for interacting with the Share endpoints

def list( self, project_id: str, share_type: cirro_api_client.v1.models.ShareType = None) -> List[cirro_api_client.v1.models.Share]:
16    def list(self, project_id: str, share_type: ShareType = None) -> List[Share]:
17        """
18        Retrieve a list of shares for a given project
19        optionally filtered by share type
20
21        Args:
22            project_id (str): ID of the Project
23            share_type (ShareType): Type of share to filter by
24
25        ```python
26
27        from cirro_api_client.v1.models import ShareType
28        from cirro.cirro_client import CirroApi
29
30        cirro = CirroApi()
31
32        # List shares that we've published
33        cirro.shares.list(project_id="project-id", share_type=ShareType.PUBLISHER)
34        ```
35        """
36        shares = get_shares.sync(project_id=project_id, client=self._api_client)
37        if share_type:
38            shares = [share for share in shares if share.share_type == share_type]
39        return shares

Retrieve a list of shares for a given project optionally filtered by share type

Arguments:
  • project_id (str): ID of the Project
  • share_type (ShareType): Type of share to filter by
from cirro_api_client.v1.models import ShareType
from cirro.cirro_client import CirroApi

cirro = CirroApi()

# List shares that we've published
cirro.shares.list(project_id="project-id", share_type=ShareType.PUBLISHER)
def get( self, project_id: str, share_id: str) -> cirro_api_client.v1.models.ShareDetail:
41    def get(self, project_id: str, share_id: str) -> ShareDetail:
42        """
43        Get details of a share
44        """
45        return get_share.sync(project_id=project_id, share_id=share_id, client=self._api_client)

Get details of a share

def create( self, project_id: str, share: cirro_api_client.v1.models.ShareInput) -> cirro_api_client.v1.models.CreateResponse:
47    def create(self, project_id: str, share: ShareInput) -> CreateResponse:
48        """
49        Publish a share for a given project
50        """
51        return create_share.sync(project_id=project_id,
52                                 body=share,
53                                 client=self._api_client)

Publish a share for a given project

def update( self, project_id: str, share_id: str, share: cirro_api_client.v1.models.ShareInput):
55    def update(self, project_id: str, share_id: str, share: ShareInput):
56        """
57        Publish a share for a given project
58        """
59        update_share.sync_detailed(project_id=project_id,
60                                   share_id=share_id,
61                                   body=share,
62                                   client=self._api_client)

Publish a share for a given project

def delete(self, project_id: str, share_id: str):
64    def delete(self, project_id: str, share_id: str):
65        """
66        Delete a share
67        """
68        delete_share.sync_detailed(project_id=project_id,
69                                   share_id=share_id,
70                                   client=self._api_client)

Delete a share

def subscribe(self, project_id: str, share_id: str):
72    def subscribe(self, project_id: str, share_id: str):
73        """
74        Subscribe to a share for a given project
75        """
76        subscribe_share.sync_detailed(project_id=project_id,
77                                      share_id=share_id,
78                                      client=self._api_client)

Subscribe to a share for a given project

def unsubscribe(self, project_id: str, share_id: str):
80    def unsubscribe(self, project_id: str, share_id: str):
81        """
82        Unsubscribe from a share for a given project
83        """
84        unsubscribe_share.sync_detailed(project_id=project_id,
85                                        share_id=share_id,
86                                        client=self._api_client)

Unsubscribe from a share for a given project

class UserService(cirro.services.base.BaseService):
10class UserService(BaseService):
11    def list(self, max_items: int = 10000) -> List[User]:
12        """
13        List users in the system
14        """
15        return get_all_records(
16            records_getter=lambda page_args: list_users.sync(
17                client=self._api_client,
18                next_token=page_args.next_token,
19                limit=page_args.limit
20            ),
21            max_items=max_items
22        )
23
24    def get(self, username: str) -> UserDetail:
25        """
26        Get user details by username, including what projects they are assigned to
27        """
28        return get_user.sync(username=username, client=self._api_client)
29
30    def invite_user(self, name: str, organization: str, email: str):
31        """
32        Invite a user to the system.
33        If the user already exists, it will return a message that the user already exists.
34
35        Args:
36            name (str): Name of the user
37            organization (str): Organization of the user
38            email (str): Email (username) of the user
39        """
40        request = InviteUserRequest(
41            name=name,
42            organization=organization,
43            email=email,
44        )
45        response = invite_user.sync(client=self._api_client, body=request)
46        return response.message

Not to be instantiated directly

def list( self, max_items: int = 10000) -> List[cirro_api_client.v1.models.User]:
11    def list(self, max_items: int = 10000) -> List[User]:
12        """
13        List users in the system
14        """
15        return get_all_records(
16            records_getter=lambda page_args: list_users.sync(
17                client=self._api_client,
18                next_token=page_args.next_token,
19                limit=page_args.limit
20            ),
21            max_items=max_items
22        )

List users in the system

def get(self, username: str) -> cirro_api_client.v1.models.UserDetail:
24    def get(self, username: str) -> UserDetail:
25        """
26        Get user details by username, including what projects they are assigned to
27        """
28        return get_user.sync(username=username, client=self._api_client)

Get user details by username, including what projects they are assigned to

def invite_user(self, name: str, organization: str, email: str):
30    def invite_user(self, name: str, organization: str, email: str):
31        """
32        Invite a user to the system.
33        If the user already exists, it will return a message that the user already exists.
34
35        Args:
36            name (str): Name of the user
37            organization (str): Organization of the user
38            email (str): Email (username) of the user
39        """
40        request = InviteUserRequest(
41            name=name,
42            organization=organization,
43            email=email,
44        )
45        response = invite_user.sync(client=self._api_client, body=request)
46        return response.message

Invite a user to the system. If the user already exists, it will return a message that the user already exists.

Arguments:
  • name (str): Name of the user
  • organization (str): Organization of the user
  • email (str): Email (username) of the user