cirro.services

 1from .billing import BillingService
 2from .compute_environment import ComputeEnvironmentService
 3from .dataset import DatasetService
 4from .execution import ExecutionService
 5from .file import FileService
 6from .metadata import MetadataService
 7from .metrics import MetricsService
 8from .process import ProcessService
 9from .projects import ProjectService
10from .references import ReferenceService
11from .share import ShareService
12from .user import UserService
13
14__all__ = [
15    'BillingService',
16    'DatasetService',
17    'ExecutionService',
18    'ComputeEnvironmentService',
19    'FileService',
20    'MetadataService',
21    'MetricsService',
22    'ProcessService',
23    'ProjectService',
24    'ReferenceService',
25    'ShareService',
26    'UserService'
27]
class BillingService(cirro.services.base.BaseService):
10class BillingService(BaseService):
11    """
12    Service for interacting with the Billing endpoints
13    """
14    def list(self) -> List[BillingAccount]:
15        """
16        Gets a list of billing accounts the current user has access to
17        """
18        return get_billing_accounts.sync(client=self._api_client)
19
20    def update(self, billing_account_id: str, request: BillingAccountRequest):
21        """
22        Updates a billing account
23
24        Args:
25            billing_account_id (str): Billing account ID
26            request (cirro_api_client.v1.models.BillingAccountRequest):
27
28        ```python
29        from cirro_api_client.v1.models import BillingAccountRequest
30        from cirro.cirro_client import CirroApi
31
32        cirro = CirroApi()
33        request = BillingAccountRequest(
34            name="New billing account name",
35            primary_budget_number="new-budget-number",
36            owner="New Owner"
37        )
38        cirro.billing.update("billing-account-id", request)
39        ```
40        """
41        update_billing_account.sync_detailed(
42            billing_account_id=billing_account_id,
43            body=request,
44            client=self._api_client
45        )

Service for interacting with the Billing endpoints

def list(self) -> List[cirro_api_client.v1.models.BillingAccount]:
14    def list(self) -> List[BillingAccount]:
15        """
16        Gets a list of billing accounts the current user has access to
17        """
18        return get_billing_accounts.sync(client=self._api_client)

Gets a list of billing accounts the current user has access to

def update( self, billing_account_id: str, request: cirro_api_client.v1.models.BillingAccountRequest):
20    def update(self, billing_account_id: str, request: BillingAccountRequest):
21        """
22        Updates a billing account
23
24        Args:
25            billing_account_id (str): Billing account ID
26            request (cirro_api_client.v1.models.BillingAccountRequest):
27
28        ```python
29        from cirro_api_client.v1.models import BillingAccountRequest
30        from cirro.cirro_client import CirroApi
31
32        cirro = CirroApi()
33        request = BillingAccountRequest(
34            name="New billing account name",
35            primary_budget_number="new-budget-number",
36            owner="New Owner"
37        )
38        cirro.billing.update("billing-account-id", request)
39        ```
40        """
41        update_billing_account.sync_detailed(
42            billing_account_id=billing_account_id,
43            body=request,
44            client=self._api_client
45        )

Updates a billing account

Arguments:
from cirro_api_client.v1.models import BillingAccountRequest
from cirro.cirro_client import CirroApi

cirro = CirroApi()
request = BillingAccountRequest(
    name="New billing account name",
    primary_budget_number="new-budget-number",
    owner="New Owner"
)
cirro.billing.update("billing-account-id", request)
class DatasetService(cirro.services.file.FileEnabledService):
 16class DatasetService(FileEnabledService):
 17    """
 18    Service for interacting with the Dataset endpoints
 19    """
 20
 21    def list(self, project_id: str, max_items: int = 10000) -> List[Dataset]:
 22        """List datasets
 23
 24        Retrieves a list of datasets for a given project
 25
 26        Args:
 27            project_id (str): ID of the Project
 28            max_items (int): Maximum number of records to get (default 10,000)
 29        """
 30        return get_all_records(
 31            records_getter=lambda page_args: get_datasets.sync(
 32                project_id=project_id,
 33                client=self._api_client,
 34                next_token=page_args.next_token,
 35                limit=page_args.limit
 36            ),
 37            max_items=max_items
 38        )
 39
 40    def list_shared(self, project_id: str, share_id: str, max_items: int = 10000) -> List[Dataset]:
 41        """
 42        Retrieves a list of shared datasets for a given project and share
 43
 44        Args:
 45            project_id (str): ID of the Project
 46            share_id (str): ID of the Share
 47            max_items (int): Maximum number of records to get (default 10,000)
 48
 49        Example:
 50        ```python
 51        from cirro_api_client.v1.models import ShareType
 52        from cirro.cirro_client import CirroApi
 53
 54        cirro = CirroApi()
 55
 56        # List shares that are subscribed to
 57        subscribed_shares = cirro.shares.list(project_id="project-id", share_type=ShareType.SUBSCRIBER)
 58        cirro.datasets.list_shared("project-id", subscribed_shares[0].id)
 59        ```
 60        """
 61        return get_all_records(
 62            records_getter=lambda page_args: get_shared_datasets.sync(
 63                project_id=project_id,
 64                share_id=share_id,
 65                client=self._api_client,
 66                next_token=page_args.next_token,
 67                limit=page_args.limit
 68            ),
 69            max_items=max_items
 70        )
 71
 72    def import_public(self, project_id: str, import_request: ImportDataRequest) -> CreateResponse:
 73        """
 74        Download data from public repositories
 75
 76        Args:
 77            project_id (str): ID of the Project
 78            import_request (cirro_api_client.v1.models.ImportDataRequest):
 79
 80        Returns:
 81            ID of the created dataset
 82
 83        ```python
 84        from cirro_api_client.v1.models import ImportDataRequest, Tag
 85        from cirro.cirro_client import CirroApi
 86
 87        cirro = CirroApi()
 88        request = ImportDataRequest(
 89            name="Imported dataset",
 90            description="Description of the dataset",
 91            public_ids=["SRR123456", "SRR123457"],
 92            tags=[Tag(value="tag1")]
 93        )
 94        cirro.datasets.import_public("project-id", request)
 95        ```
 96        """
 97        return import_public_dataset.sync(project_id=project_id, client=self._api_client, body=import_request)
 98
 99    def create(self, project_id: str, upload_request: UploadDatasetRequest) -> UploadDatasetCreateResponse:
100        """
101        Registers a dataset in Cirro, which can subsequently have files uploaded to it
102
103        Args:
104            project_id (str): ID of the Project
105            upload_request (cirro_api_client.v1.models.UploadDatasetRequest):
106
107        Returns:
108            ID of the created dataset and the path to upload files
109
110        ```python
111        from cirro_api_client.v1.models import UploadDatasetRequest, Tag
112        from cirro.cirro_client import CirroApi
113
114        cirro = CirroApi()
115        request = UploadDatasetRequest(
116            name="Name of new dataset",
117            process_id="paired_dnaseq",
118            expected_files=["read_1.fastq.gz", "read_2.fastq.gz"],
119            description="Description of the dataset",
120            tags=[Tag(value="tag1"), Tag(value="tag2")]
121        )
122        cirro.datasets.create("project-id", request)
123        ```
124        """
125        return upload_dataset.sync(project_id=project_id, client=self._api_client, body=upload_request)
126
127    def get(self, project_id: str, dataset_id: str) -> Optional[DatasetDetail]:
128        """
129        Gets detailed information about a dataset
130
131        Args:
132            project_id (str): ID of the Project
133            dataset_id (str): ID of the Dataset
134
135        Returns:
136            The dataset, if found
137        """
138        return get_dataset.sync(project_id=project_id, dataset_id=dataset_id, client=self._api_client)
139
140    def update(self, project_id: str, dataset_id: str, request: UpdateDatasetRequest) -> DatasetDetail:
141        """
142        Update info on a dataset (name, description, and/or tags)
143
144        Args:
145            project_id (str): ID of the Project
146            dataset_id (str): ID of the Dataset
147            request (cirro_api_client.v1.models.UpdateDatasetRequest):
148
149        Returns:
150            The updated dataset
151
152        ```python
153        from cirro_api_client.v1.models import UpdateDatasetRequest
154        from cirro.cirro_client import CirroApi
155
156        cirro = CirroApi()
157        request = UpdateDatasetRequest(
158            name="Name of new dataset",
159            process_id="paired_dnaseq",
160            description="Description of the dataset"
161        )
162        cirro.datasets.update("project-id", "dataset-id", request)
163        ```
164        """
165        return update_dataset.sync(project_id=project_id, dataset_id=dataset_id, body=request, client=self._api_client)
166
167    def delete(self, project_id: str, dataset_id: str) -> None:
168        """
169        Delete a dataset
170
171        After a dataset has been deleted, the files associated with that
172        dataset are saved according to the project's retention time.
173
174        Args:
175            project_id (str): ID of the Project
176            dataset_id (str): ID of the Dataset
177        """
178        delete_dataset.sync_detailed(project_id=project_id, dataset_id=dataset_id, client=self._api_client)
179
180    def get_assets_listing(self, project_id: str, dataset_id: str, file_limit: int = 100000) -> DatasetAssets:
181        """
182        Gets a listing of files, charts, and other assets available for the dataset
183
184        Args:
185            project_id (str): ID of the Project
186            dataset_id (str): ID of the Dataset
187            file_limit (int): Maximum number of files to get (default 100,000)
188        """
189        dataset = self.get(project_id, dataset_id)
190        if file_limit < 1:
191            raise ValueError("file_limit must be greater than 0")
192        all_files = []
193        file_offset = 0
194        domain = None
195        artifacts = None
196
197        while len(all_files) < file_limit:
198            manifest = get_dataset_manifest.sync(
199                project_id=project_id,
200                dataset_id=dataset_id,
201                file_offset=file_offset,
202                client=self._api_client
203            )
204            all_files.extend(manifest.files)
205            file_offset += len(manifest.files)
206
207            if not artifacts:
208                artifacts = manifest.artifacts
209
210            domain = manifest.domain
211            if len(all_files) >= manifest.total_files or len(manifest.files) == 0:
212                break
213
214        files = [
215            File.from_file_entry(
216                f,
217                project_id=project_id,
218                dataset=dataset,
219                domain=domain
220            )
221            for f in all_files
222        ]
223        artifacts = [
224            Artifact(
225                artifact_type=a.type,
226                file=File.from_file_entry(
227                    FileEntry(a.path),
228                    project_id=project_id,
229                    dataset=dataset,
230                    domain=domain
231                )
232            )
233            for a in artifacts
234        ]
235        return DatasetAssets(files=files, artifacts=artifacts)
236
237    def upload_files(self,
238                     project_id: str,
239                     dataset_id: str,
240                     directory: PathLike,
241                     files: List[PathLike] = None,
242                     file_path_map: Dict[PathLike, str] = None) -> None:
243        """
244        Uploads files to a given dataset from the specified directory.
245
246        All files must be relative to the specified directory.
247        If files need to be flattened, or you are sourcing files from multiple directories,
248        please include `file_path_map` or call this method multiple times.
249
250        Args:
251            project_id (str): ID of the Project
252            dataset_id (str): ID of the Dataset
253            directory (str|Path): Path to directory
254            files (typing.List[str|Path]): List of paths to files within the directory,
255                must be the same type as directory.
256            file_path_map (typing.Dict[str|Path, str|Path]): Optional mapping of file paths to upload
257             from source path to destination path, used to "re-write" paths within the dataset.
258        ```python
259        from cirro.cirro_client import CirroApi
260        from cirro.file_utils import generate_flattened_file_map
261
262        cirro = CirroApi()
263
264        directory = "~/Downloads"
265        # Re-write file paths
266        file_map = {
267            "data1/file1.fastq.gz": "file1.fastq.gz",
268            "data2/file2.fastq.gz": "file2.fastq.gz",
269            "file3.fastq.gz": "new_file3.txt"
270        }
271
272        # Or you could automate the flattening
273        files = ["data1/file1.fastq.gz"]
274        file_map = generate_flattened_file_map(files)
275
276        cirro.datasets.upload_files(
277            project_id="project-id",
278            dataset_id="dataset-id",
279            directory=directory,
280            files=list(file_map.keys()),
281            file_path_map=file_map
282        )
283        ```
284        """
285        if file_path_map is None:
286            file_path_map = {}
287
288        dataset = self.get(project_id, dataset_id)
289
290        access_context = FileAccessContext.upload_dataset(
291            project_id=project_id,
292            dataset_id=dataset_id,
293            base_url=dataset.s3
294        )
295
296        self._file_service.upload_files(
297            access_context=access_context,
298            directory=directory,
299            files=files,
300            file_path_map=file_path_map
301        )
302
303    def download_files(
304        self,
305        project_id: str,
306        dataset_id: str,
307        download_location: str,
308        files: Union[List[File], List[str]] = None
309    ) -> None:
310        """
311        Downloads files from a dataset
312
313        The `files` argument is used to optionally specify a subset of files
314        to be downloaded. By default, all files are downloaded.
315
316        Args:
317            project_id (str): ID of the Project
318            dataset_id (str): ID of the Dataset
319            download_location (str): Local destination for downloaded files
320            files (typing.List[str]): Optional list of files to download
321        """
322        if files is None:
323            files = self.get_assets_listing(project_id, dataset_id).files
324
325        if len(files) == 0:
326            return
327
328        first_file = files[0]
329        if isinstance(first_file, File):
330            files = [file.relative_path for file in files]
331            access_context = first_file.access_context
332        else:
333            dataset = self.get(project_id, dataset_id)
334            if dataset.share:
335                access_context = FileAccessContext.download_shared_dataset(project_id=project_id,
336                                                                           dataset_id=dataset_id,
337                                                                           base_url=dataset.s3)
338            else:
339                access_context = FileAccessContext.download(project_id=project_id,
340                                                            base_url=dataset.s3)
341
342        self._file_service.download_files(access_context, download_location, files)

Service for interacting with the Dataset endpoints

def list( self, project_id: str, max_items: int = 10000) -> List[cirro_api_client.v1.models.Dataset]:
21    def list(self, project_id: str, max_items: int = 10000) -> List[Dataset]:
22        """List datasets
23
24        Retrieves a list of datasets for a given project
25
26        Args:
27            project_id (str): ID of the Project
28            max_items (int): Maximum number of records to get (default 10,000)
29        """
30        return get_all_records(
31            records_getter=lambda page_args: get_datasets.sync(
32                project_id=project_id,
33                client=self._api_client,
34                next_token=page_args.next_token,
35                limit=page_args.limit
36            ),
37            max_items=max_items
38        )

List datasets

Retrieves a list of datasets for a given project

Arguments:
  • project_id (str): ID of the Project
  • max_items (int): Maximum number of records to get (default 10,000)
def list_shared( self, project_id: str, share_id: str, max_items: int = 10000) -> List[cirro_api_client.v1.models.Dataset]:
40    def list_shared(self, project_id: str, share_id: str, max_items: int = 10000) -> List[Dataset]:
41        """
42        Retrieves a list of shared datasets for a given project and share
43
44        Args:
45            project_id (str): ID of the Project
46            share_id (str): ID of the Share
47            max_items (int): Maximum number of records to get (default 10,000)
48
49        Example:
50        ```python
51        from cirro_api_client.v1.models import ShareType
52        from cirro.cirro_client import CirroApi
53
54        cirro = CirroApi()
55
56        # List shares that are subscribed to
57        subscribed_shares = cirro.shares.list(project_id="project-id", share_type=ShareType.SUBSCRIBER)
58        cirro.datasets.list_shared("project-id", subscribed_shares[0].id)
59        ```
60        """
61        return get_all_records(
62            records_getter=lambda page_args: get_shared_datasets.sync(
63                project_id=project_id,
64                share_id=share_id,
65                client=self._api_client,
66                next_token=page_args.next_token,
67                limit=page_args.limit
68            ),
69            max_items=max_items
70        )

Retrieves a list of shared datasets for a given project and share

Arguments:
  • project_id (str): ID of the Project
  • share_id (str): ID of the Share
  • max_items (int): Maximum number of records to get (default 10,000)

Example:

from cirro_api_client.v1.models import ShareType
from cirro.cirro_client import CirroApi

cirro = CirroApi()

# List shares that are subscribed to
subscribed_shares = cirro.shares.list(project_id="project-id", share_type=ShareType.SUBSCRIBER)
cirro.datasets.list_shared("project-id", subscribed_shares[0].id)
def import_public( self, project_id: str, import_request: cirro_api_client.v1.models.ImportDataRequest) -> cirro_api_client.v1.models.CreateResponse:
72    def import_public(self, project_id: str, import_request: ImportDataRequest) -> CreateResponse:
73        """
74        Download data from public repositories
75
76        Args:
77            project_id (str): ID of the Project
78            import_request (cirro_api_client.v1.models.ImportDataRequest):
79
80        Returns:
81            ID of the created dataset
82
83        ```python
84        from cirro_api_client.v1.models import ImportDataRequest, Tag
85        from cirro.cirro_client import CirroApi
86
87        cirro = CirroApi()
88        request = ImportDataRequest(
89            name="Imported dataset",
90            description="Description of the dataset",
91            public_ids=["SRR123456", "SRR123457"],
92            tags=[Tag(value="tag1")]
93        )
94        cirro.datasets.import_public("project-id", request)
95        ```
96        """
97        return import_public_dataset.sync(project_id=project_id, client=self._api_client, body=import_request)

Download data from public repositories

Arguments:
Returns:

ID of the created dataset

from cirro_api_client.v1.models import ImportDataRequest, Tag
from cirro.cirro_client import CirroApi

cirro = CirroApi()
request = ImportDataRequest(
    name="Imported dataset",
    description="Description of the dataset",
    public_ids=["SRR123456", "SRR123457"],
    tags=[Tag(value="tag1")]
)
cirro.datasets.import_public("project-id", request)
def create( self, project_id: str, upload_request: cirro_api_client.v1.models.UploadDatasetRequest) -> cirro_api_client.v1.models.UploadDatasetCreateResponse:
 99    def create(self, project_id: str, upload_request: UploadDatasetRequest) -> UploadDatasetCreateResponse:
100        """
101        Registers a dataset in Cirro, which can subsequently have files uploaded to it
102
103        Args:
104            project_id (str): ID of the Project
105            upload_request (cirro_api_client.v1.models.UploadDatasetRequest):
106
107        Returns:
108            ID of the created dataset and the path to upload files
109
110        ```python
111        from cirro_api_client.v1.models import UploadDatasetRequest, Tag
112        from cirro.cirro_client import CirroApi
113
114        cirro = CirroApi()
115        request = UploadDatasetRequest(
116            name="Name of new dataset",
117            process_id="paired_dnaseq",
118            expected_files=["read_1.fastq.gz", "read_2.fastq.gz"],
119            description="Description of the dataset",
120            tags=[Tag(value="tag1"), Tag(value="tag2")]
121        )
122        cirro.datasets.create("project-id", request)
123        ```
124        """
125        return upload_dataset.sync(project_id=project_id, client=self._api_client, body=upload_request)

Registers a dataset in Cirro, which can subsequently have files uploaded to it

Arguments:
Returns:

ID of the created dataset and the path to upload files

from cirro_api_client.v1.models import UploadDatasetRequest, Tag
from cirro.cirro_client import CirroApi

cirro = CirroApi()
request = UploadDatasetRequest(
    name="Name of new dataset",
    process_id="paired_dnaseq",
    expected_files=["read_1.fastq.gz", "read_2.fastq.gz"],
    description="Description of the dataset",
    tags=[Tag(value="tag1"), Tag(value="tag2")]
)
cirro.datasets.create("project-id", request)
def get( self, project_id: str, dataset_id: str) -> Optional[cirro_api_client.v1.models.DatasetDetail]:
127    def get(self, project_id: str, dataset_id: str) -> Optional[DatasetDetail]:
128        """
129        Gets detailed information about a dataset
130
131        Args:
132            project_id (str): ID of the Project
133            dataset_id (str): ID of the Dataset
134
135        Returns:
136            The dataset, if found
137        """
138        return get_dataset.sync(project_id=project_id, dataset_id=dataset_id, client=self._api_client)

Gets detailed information about a dataset

Arguments:
  • project_id (str): ID of the Project
  • dataset_id (str): ID of the Dataset
Returns:

The dataset, if found

def update( self, project_id: str, dataset_id: str, request: cirro_api_client.v1.models.UpdateDatasetRequest) -> cirro_api_client.v1.models.DatasetDetail:
140    def update(self, project_id: str, dataset_id: str, request: UpdateDatasetRequest) -> DatasetDetail:
141        """
142        Update info on a dataset (name, description, and/or tags)
143
144        Args:
145            project_id (str): ID of the Project
146            dataset_id (str): ID of the Dataset
147            request (cirro_api_client.v1.models.UpdateDatasetRequest):
148
149        Returns:
150            The updated dataset
151
152        ```python
153        from cirro_api_client.v1.models import UpdateDatasetRequest
154        from cirro.cirro_client import CirroApi
155
156        cirro = CirroApi()
157        request = UpdateDatasetRequest(
158            name="Name of new dataset",
159            process_id="paired_dnaseq",
160            description="Description of the dataset"
161        )
162        cirro.datasets.update("project-id", "dataset-id", request)
163        ```
164        """
165        return update_dataset.sync(project_id=project_id, dataset_id=dataset_id, body=request, client=self._api_client)

Update info on a dataset (name, description, and/or tags)

Arguments:
Returns:

The updated dataset

from cirro_api_client.v1.models import UpdateDatasetRequest
from cirro.cirro_client import CirroApi

cirro = CirroApi()
request = UpdateDatasetRequest(
    name="Name of new dataset",
    process_id="paired_dnaseq",
    description="Description of the dataset"
)
cirro.datasets.update("project-id", "dataset-id", request)
def delete(self, project_id: str, dataset_id: str) -> None:
167    def delete(self, project_id: str, dataset_id: str) -> None:
168        """
169        Delete a dataset
170
171        After a dataset has been deleted, the files associated with that
172        dataset are saved according to the project's retention time.
173
174        Args:
175            project_id (str): ID of the Project
176            dataset_id (str): ID of the Dataset
177        """
178        delete_dataset.sync_detailed(project_id=project_id, dataset_id=dataset_id, client=self._api_client)

Delete a dataset

After a dataset has been deleted, the files associated with that dataset are saved according to the project's retention time.

Arguments:
  • project_id (str): ID of the Project
  • dataset_id (str): ID of the Dataset
def get_assets_listing( self, project_id: str, dataset_id: str, file_limit: int = 100000) -> cirro.models.assets.DatasetAssets:
180    def get_assets_listing(self, project_id: str, dataset_id: str, file_limit: int = 100000) -> DatasetAssets:
181        """
182        Gets a listing of files, charts, and other assets available for the dataset
183
184        Args:
185            project_id (str): ID of the Project
186            dataset_id (str): ID of the Dataset
187            file_limit (int): Maximum number of files to get (default 100,000)
188        """
189        dataset = self.get(project_id, dataset_id)
190        if file_limit < 1:
191            raise ValueError("file_limit must be greater than 0")
192        all_files = []
193        file_offset = 0
194        domain = None
195        artifacts = None
196
197        while len(all_files) < file_limit:
198            manifest = get_dataset_manifest.sync(
199                project_id=project_id,
200                dataset_id=dataset_id,
201                file_offset=file_offset,
202                client=self._api_client
203            )
204            all_files.extend(manifest.files)
205            file_offset += len(manifest.files)
206
207            if not artifacts:
208                artifacts = manifest.artifacts
209
210            domain = manifest.domain
211            if len(all_files) >= manifest.total_files or len(manifest.files) == 0:
212                break
213
214        files = [
215            File.from_file_entry(
216                f,
217                project_id=project_id,
218                dataset=dataset,
219                domain=domain
220            )
221            for f in all_files
222        ]
223        artifacts = [
224            Artifact(
225                artifact_type=a.type,
226                file=File.from_file_entry(
227                    FileEntry(a.path),
228                    project_id=project_id,
229                    dataset=dataset,
230                    domain=domain
231                )
232            )
233            for a in artifacts
234        ]
235        return DatasetAssets(files=files, artifacts=artifacts)

Gets a listing of files, charts, and other assets available for the dataset

Arguments:
  • project_id (str): ID of the Project
  • dataset_id (str): ID of the Dataset
  • file_limit (int): Maximum number of files to get (default 100,000)
def upload_files( self, project_id: str, dataset_id: str, directory: ~PathLike, files: List[~PathLike] = None, file_path_map: Dict[~PathLike, str] = None) -> None:
237    def upload_files(self,
238                     project_id: str,
239                     dataset_id: str,
240                     directory: PathLike,
241                     files: List[PathLike] = None,
242                     file_path_map: Dict[PathLike, str] = None) -> None:
243        """
244        Uploads files to a given dataset from the specified directory.
245
246        All files must be relative to the specified directory.
247        If files need to be flattened, or you are sourcing files from multiple directories,
248        please include `file_path_map` or call this method multiple times.
249
250        Args:
251            project_id (str): ID of the Project
252            dataset_id (str): ID of the Dataset
253            directory (str|Path): Path to directory
254            files (typing.List[str|Path]): List of paths to files within the directory,
255                must be the same type as directory.
256            file_path_map (typing.Dict[str|Path, str|Path]): Optional mapping of file paths to upload
257             from source path to destination path, used to "re-write" paths within the dataset.
258        ```python
259        from cirro.cirro_client import CirroApi
260        from cirro.file_utils import generate_flattened_file_map
261
262        cirro = CirroApi()
263
264        directory = "~/Downloads"
265        # Re-write file paths
266        file_map = {
267            "data1/file1.fastq.gz": "file1.fastq.gz",
268            "data2/file2.fastq.gz": "file2.fastq.gz",
269            "file3.fastq.gz": "new_file3.txt"
270        }
271
272        # Or you could automate the flattening
273        files = ["data1/file1.fastq.gz"]
274        file_map = generate_flattened_file_map(files)
275
276        cirro.datasets.upload_files(
277            project_id="project-id",
278            dataset_id="dataset-id",
279            directory=directory,
280            files=list(file_map.keys()),
281            file_path_map=file_map
282        )
283        ```
284        """
285        if file_path_map is None:
286            file_path_map = {}
287
288        dataset = self.get(project_id, dataset_id)
289
290        access_context = FileAccessContext.upload_dataset(
291            project_id=project_id,
292            dataset_id=dataset_id,
293            base_url=dataset.s3
294        )
295
296        self._file_service.upload_files(
297            access_context=access_context,
298            directory=directory,
299            files=files,
300            file_path_map=file_path_map
301        )

Uploads files to a given dataset from the specified directory.

All files must be relative to the specified directory. If files need to be flattened, or you are sourcing files from multiple directories, please include file_path_map or call this method multiple times.

Arguments:
  • project_id (str): ID of the Project
  • dataset_id (str): ID of the Dataset
  • directory (str|Path): Path to directory
  • files (typing.List[str|Path]): List of paths to files within the directory, must be the same type as directory.
  • file_path_map (typing.Dict[str|Path, str|Path]): Optional mapping of file paths to upload from source path to destination path, used to "re-write" paths within the dataset.
from cirro.cirro_client import CirroApi
from cirro.file_utils import generate_flattened_file_map

cirro = CirroApi()

directory = "~/Downloads"
# Re-write file paths
file_map = {
    "data1/file1.fastq.gz": "file1.fastq.gz",
    "data2/file2.fastq.gz": "file2.fastq.gz",
    "file3.fastq.gz": "new_file3.txt"
}

# Or you could automate the flattening
files = ["data1/file1.fastq.gz"]
file_map = generate_flattened_file_map(files)

cirro.datasets.upload_files(
    project_id="project-id",
    dataset_id="dataset-id",
    directory=directory,
    files=list(file_map.keys()),
    file_path_map=file_map
)
def download_files( self, project_id: str, dataset_id: str, download_location: str, files: Union[List[cirro.models.file.File], List[str]] = None) -> None:
303    def download_files(
304        self,
305        project_id: str,
306        dataset_id: str,
307        download_location: str,
308        files: Union[List[File], List[str]] = None
309    ) -> None:
310        """
311        Downloads files from a dataset
312
313        The `files` argument is used to optionally specify a subset of files
314        to be downloaded. By default, all files are downloaded.
315
316        Args:
317            project_id (str): ID of the Project
318            dataset_id (str): ID of the Dataset
319            download_location (str): Local destination for downloaded files
320            files (typing.List[str]): Optional list of files to download
321        """
322        if files is None:
323            files = self.get_assets_listing(project_id, dataset_id).files
324
325        if len(files) == 0:
326            return
327
328        first_file = files[0]
329        if isinstance(first_file, File):
330            files = [file.relative_path for file in files]
331            access_context = first_file.access_context
332        else:
333            dataset = self.get(project_id, dataset_id)
334            if dataset.share:
335                access_context = FileAccessContext.download_shared_dataset(project_id=project_id,
336                                                                           dataset_id=dataset_id,
337                                                                           base_url=dataset.s3)
338            else:
339                access_context = FileAccessContext.download(project_id=project_id,
340                                                            base_url=dataset.s3)
341
342        self._file_service.download_files(access_context, download_location, files)

Downloads files from a dataset

The files argument is used to optionally specify a subset of files to be downloaded. By default, all files are downloaded.

Arguments:
  • project_id (str): ID of the Project
  • dataset_id (str): ID of the Dataset
  • download_location (str): Local destination for downloaded files
  • files (typing.List[str]): Optional list of files to download
class ExecutionService(cirro.services.base.BaseService):
 13class ExecutionService(BaseService):
 14    """
 15    Service for interacting with the Execution endpoints
 16    """
 17    def run_analysis(self, project_id: str, request: RunAnalysisRequest) -> CreateResponse:
 18        """
 19        Launch an analysis job running a process on a set of inputs
 20
 21        Args:
 22            project_id (str): ID of the Project
 23            request (cirro_api_client.v1.models.RunAnalysisRequest):
 24
 25        Returns:
 26            The ID of the created dataset
 27
 28        ```python
 29        from cirro_api_client.v1.models import RunAnalysisRequest, RunAnalysisRequestParams
 30        from cirro.cirro_client import CirroApi
 31
 32        # Example:
 33        # Run the "process-nf-core-rnaseq-3_8" process using input data
 34        # from a dataset with the id "source-dataset-id"
 35
 36        # Optional analysis parameters
 37        params = RunAnalysisRequestParams.from_dict({
 38            "param_a": "val_a",
 39            "param_b": "val_b"
 40        })
 41
 42        cirro = CirroApi()
 43        request = RunAnalysisRequest(
 44            name="Name of the newly created dataset",
 45            description="Longer description of newly created dataset",
 46            process_id="process-nf-core-rnaseq-3_8",
 47            source_dataset_ids=["source-dataset-id"],
 48            params=params
 49        )
 50        cirro.execution.run_analysis("project-id", request)
 51        ```
 52        """
 53
 54        form_spec = get_process_parameters.sync(
 55            process_id=request.process_id,
 56            client=self._api_client
 57        )
 58
 59        ParameterSpecification(
 60            form_spec
 61        ).validate_params(
 62            request.params.to_dict() if request.params else {}
 63        )
 64
 65        return run_analysis.sync(
 66            project_id=project_id,
 67            body=request,
 68            client=self._api_client
 69        )
 70
 71    def stop_analysis(self, project_id: str, dataset_id: str):
 72        """
 73        Terminates all jobs related to a running analysis
 74
 75        Args:
 76            project_id (str): ID of the Project
 77            dataset_id (str): ID of the Dataset
 78        """
 79
 80        return stop_analysis.sync(
 81            project_id=project_id,
 82            dataset_id=dataset_id,
 83            client=self._api_client
 84        )
 85
 86    def get_project_summary(self, project_id: str) -> Dict[str, List[Task]]:
 87        """
 88        Gets an overview of the executions currently running in the project, by job queue
 89
 90        Args:
 91            project_id (str): ID of the Project
 92
 93        Returns:
 94            `cirro_api_client.v1.models.GetProjectSummaryResponse200`
 95        """
 96
 97        return get_project_summary.sync(
 98            project_id=project_id,
 99            client=self._api_client
100        ).additional_properties
101
102    def get_execution_logs(self, project_id: str, dataset_id: str, force_live=False) -> str:
103        """
104        Gets live logs from main execution task
105
106        Args:
107            project_id (str): ID of the Project
108            dataset_id (str): ID of the Dataset
109            force_live (bool): If True, it will fetch logs from CloudWatch,
110                even if the execution is already completed
111
112        """
113
114        resp = get_execution_logs.sync(
115            project_id=project_id,
116            dataset_id=dataset_id,
117            force_live=force_live,
118            client=self._api_client
119        )
120
121        return '\n'.join(e.message for e in resp.events)
122
123    def get_tasks_for_execution(self, project_id: str, dataset_id: str, force_live=False) -> Optional[List[Task]]:
124        """
125        Gets the tasks submitted by the workflow execution
126
127        Args:
128            project_id (str): ID of the Project
129            dataset_id (str): ID of the Dataset
130            force_live (bool): If True, it will try to get the list of jobs
131                from the executor (i.e., AWS Batch), rather than the workflow report
132        """
133
134        return get_tasks_for_execution.sync(
135            project_id=project_id,
136            dataset_id=dataset_id,
137            force_live=force_live,
138            client=self._api_client
139        )
140
141    def get_task_logs(self, project_id: str, dataset_id: str, task_id: str, force_live=False) -> str:
142        """
143        Gets the log output from an individual task
144
145        Args:
146            project_id (str): ID of the Project
147            dataset_id (str): ID of the Dataset
148            task_id (str): ID of the task
149            force_live (bool): If True, it will fetch logs from CloudWatch,
150                even if the execution is already completed
151
152        """
153
154        resp = get_task_logs.sync(
155            project_id=project_id,
156            dataset_id=dataset_id,
157            task_id=task_id,
158            force_live=force_live,
159            client=self._api_client
160        )
161
162        return '\n'.join(e.message for e in resp.events)

Service for interacting with the Execution endpoints

def run_analysis( self, project_id: str, request: cirro_api_client.v1.models.RunAnalysisRequest) -> cirro_api_client.v1.models.CreateResponse:
17    def run_analysis(self, project_id: str, request: RunAnalysisRequest) -> CreateResponse:
18        """
19        Launch an analysis job running a process on a set of inputs
20
21        Args:
22            project_id (str): ID of the Project
23            request (cirro_api_client.v1.models.RunAnalysisRequest):
24
25        Returns:
26            The ID of the created dataset
27
28        ```python
29        from cirro_api_client.v1.models import RunAnalysisRequest, RunAnalysisRequestParams
30        from cirro.cirro_client import CirroApi
31
32        # Example:
33        # Run the "process-nf-core-rnaseq-3_8" process using input data
34        # from a dataset with the id "source-dataset-id"
35
36        # Optional analysis parameters
37        params = RunAnalysisRequestParams.from_dict({
38            "param_a": "val_a",
39            "param_b": "val_b"
40        })
41
42        cirro = CirroApi()
43        request = RunAnalysisRequest(
44            name="Name of the newly created dataset",
45            description="Longer description of newly created dataset",
46            process_id="process-nf-core-rnaseq-3_8",
47            source_dataset_ids=["source-dataset-id"],
48            params=params
49        )
50        cirro.execution.run_analysis("project-id", request)
51        ```
52        """
53
54        form_spec = get_process_parameters.sync(
55            process_id=request.process_id,
56            client=self._api_client
57        )
58
59        ParameterSpecification(
60            form_spec
61        ).validate_params(
62            request.params.to_dict() if request.params else {}
63        )
64
65        return run_analysis.sync(
66            project_id=project_id,
67            body=request,
68            client=self._api_client
69        )

Launch an analysis job running a process on a set of inputs

Arguments:
Returns:

The ID of the created dataset

from cirro_api_client.v1.models import RunAnalysisRequest, RunAnalysisRequestParams
from cirro.cirro_client import CirroApi

# Example:
# Run the "process-nf-core-rnaseq-3_8" process using input data
# from a dataset with the id "source-dataset-id"

# Optional analysis parameters
params = RunAnalysisRequestParams.from_dict({
    "param_a": "val_a",
    "param_b": "val_b"
})

cirro = CirroApi()
request = RunAnalysisRequest(
    name="Name of the newly created dataset",
    description="Longer description of newly created dataset",
    process_id="process-nf-core-rnaseq-3_8",
    source_dataset_ids=["source-dataset-id"],
    params=params
)
cirro.execution.run_analysis("project-id", request)
def stop_analysis(self, project_id: str, dataset_id: str):
71    def stop_analysis(self, project_id: str, dataset_id: str):
72        """
73        Terminates all jobs related to a running analysis
74
75        Args:
76            project_id (str): ID of the Project
77            dataset_id (str): ID of the Dataset
78        """
79
80        return stop_analysis.sync(
81            project_id=project_id,
82            dataset_id=dataset_id,
83            client=self._api_client
84        )

Terminates all jobs related to a running analysis

Arguments:
  • project_id (str): ID of the Project
  • dataset_id (str): ID of the Dataset
def get_project_summary( self, project_id: str) -> Dict[str, List[cirro_api_client.v1.models.Task]]:
 86    def get_project_summary(self, project_id: str) -> Dict[str, List[Task]]:
 87        """
 88        Gets an overview of the executions currently running in the project, by job queue
 89
 90        Args:
 91            project_id (str): ID of the Project
 92
 93        Returns:
 94            `cirro_api_client.v1.models.GetProjectSummaryResponse200`
 95        """
 96
 97        return get_project_summary.sync(
 98            project_id=project_id,
 99            client=self._api_client
100        ).additional_properties

Gets an overview of the executions currently running in the project, by job queue

Arguments:
  • project_id (str): ID of the Project
Returns:

cirro_api_client.v1.models.GetProjectSummaryResponse200

def get_execution_logs(self, project_id: str, dataset_id: str, force_live=False) -> str:
102    def get_execution_logs(self, project_id: str, dataset_id: str, force_live=False) -> str:
103        """
104        Gets live logs from main execution task
105
106        Args:
107            project_id (str): ID of the Project
108            dataset_id (str): ID of the Dataset
109            force_live (bool): If True, it will fetch logs from CloudWatch,
110                even if the execution is already completed
111
112        """
113
114        resp = get_execution_logs.sync(
115            project_id=project_id,
116            dataset_id=dataset_id,
117            force_live=force_live,
118            client=self._api_client
119        )
120
121        return '\n'.join(e.message for e in resp.events)

Gets live logs from main execution task

Arguments:
  • project_id (str): ID of the Project
  • dataset_id (str): ID of the Dataset
  • force_live (bool): If True, it will fetch logs from CloudWatch, even if the execution is already completed
def get_tasks_for_execution( self, project_id: str, dataset_id: str, force_live=False) -> Optional[List[cirro_api_client.v1.models.Task]]:
123    def get_tasks_for_execution(self, project_id: str, dataset_id: str, force_live=False) -> Optional[List[Task]]:
124        """
125        Gets the tasks submitted by the workflow execution
126
127        Args:
128            project_id (str): ID of the Project
129            dataset_id (str): ID of the Dataset
130            force_live (bool): If True, it will try to get the list of jobs
131                from the executor (i.e., AWS Batch), rather than the workflow report
132        """
133
134        return get_tasks_for_execution.sync(
135            project_id=project_id,
136            dataset_id=dataset_id,
137            force_live=force_live,
138            client=self._api_client
139        )

Gets the tasks submitted by the workflow execution

Arguments:
  • project_id (str): ID of the Project
  • dataset_id (str): ID of the Dataset
  • force_live (bool): If True, it will try to get the list of jobs from the executor (i.e., AWS Batch), rather than the workflow report
def get_task_logs( self, project_id: str, dataset_id: str, task_id: str, force_live=False) -> str:
141    def get_task_logs(self, project_id: str, dataset_id: str, task_id: str, force_live=False) -> str:
142        """
143        Gets the log output from an individual task
144
145        Args:
146            project_id (str): ID of the Project
147            dataset_id (str): ID of the Dataset
148            task_id (str): ID of the task
149            force_live (bool): If True, it will fetch logs from CloudWatch,
150                even if the execution is already completed
151
152        """
153
154        resp = get_task_logs.sync(
155            project_id=project_id,
156            dataset_id=dataset_id,
157            task_id=task_id,
158            force_live=force_live,
159            client=self._api_client
160        )
161
162        return '\n'.join(e.message for e in resp.events)

Gets the log output from an individual task

Arguments:
  • project_id (str): ID of the Project
  • dataset_id (str): ID of the Dataset
  • task_id (str): ID of the task
  • force_live (bool): If True, it will fetch logs from CloudWatch, even if the execution is already completed
class ComputeEnvironmentService(cirro.services.base.BaseService):
10class ComputeEnvironmentService(BaseService):
11    """
12    Service for interacting with the Compute Environment endpoints
13    """
14
15    def list_environments_for_project(self, project_id: str) -> List[ComputeEnvironmentConfiguration]:
16        """
17        List of custom compute environments for a project (i.e., an agent)
18
19        Args:
20            project_id (str): Project ID
21
22        Returns:
23            List of compute environments that are available for the project
24        """
25        return get_compute_environments.sync(project_id=project_id,
26                                             client=self._api_client)

Service for interacting with the Compute Environment endpoints

def list_environments_for_project( self, project_id: str) -> List[cirro_api_client.v1.models.ComputeEnvironmentConfiguration]:
15    def list_environments_for_project(self, project_id: str) -> List[ComputeEnvironmentConfiguration]:
16        """
17        List of custom compute environments for a project (i.e., an agent)
18
19        Args:
20            project_id (str): Project ID
21
22        Returns:
23            List of compute environments that are available for the project
24        """
25        return get_compute_environments.sync(project_id=project_id,
26                                             client=self._api_client)

List of custom compute environments for a project (i.e., an agent)

Arguments:
  • project_id (str): Project ID
Returns:

List of compute environments that are available for the project

class FileService(cirro.services.base.BaseService):
 18class FileService(BaseService):
 19    """
 20    Service for interacting with files
 21    """
 22    enable_additional_checksum: bool
 23    transfer_retries: int
 24    _get_token_lock = threading.Lock()
 25    _read_token_cache: Dict[str, AWSCredentials] = {}
 26
 27    def __init__(self, api_client, enable_additional_checksum, transfer_retries):
 28        """
 29        Instantiates the file service class
 30        """
 31        self._api_client = api_client
 32        self.enable_additional_checksum = enable_additional_checksum
 33        self.transfer_retries = transfer_retries
 34
 35    def get_access_credentials(self, access_context: FileAccessContext) -> AWSCredentials:
 36        """
 37        Retrieves credentials to access files
 38
 39        Args:
 40            access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
 41        """
 42        access_request = access_context.file_access_request
 43
 44        if access_request.access_type == ProjectAccessType.PROJECT_DOWNLOAD:
 45            return self._get_project_read_credentials(access_context)
 46
 47        else:
 48            return generate_project_file_access_token.sync(
 49                client=self._api_client,
 50                project_id=access_context.project_id,
 51                body=access_context.file_access_request
 52            )
 53
 54    def _get_project_read_credentials(self, access_context: FileAccessContext):
 55        """
 56        Retrieves credentials to read project data, this can be cached
 57        """
 58        access_request = access_context.file_access_request
 59        project_id = access_context.project_id
 60        with self._get_token_lock:
 61            cached_token = self._read_token_cache.get(project_id)
 62
 63            if not cached_token or datetime.now(tz=timezone.utc) > cached_token.expiration:
 64                new_token = generate_project_file_access_token.sync(
 65                    client=self._api_client,
 66                    project_id=project_id,
 67                    body=access_request
 68                )
 69
 70                self._read_token_cache[project_id] = new_token
 71
 72        return self._read_token_cache[project_id]
 73
 74    def get_aws_s3_client(self, access_context: FileAccessContext) -> BaseClient:
 75        """
 76        Gets the underlying AWS S3 client to perform operations on files
 77
 78        This is seeded with refreshable credentials from the access_context parameter
 79
 80        This may be used to perform advanced operations, such as CopyObject, S3 Select, etc.
 81        """
 82        s3_client = self._generate_s3_client(access_context)
 83        return s3_client.get_aws_client()
 84
 85    def get_file(self, file: File) -> bytes:
 86        """
 87        Gets the contents of a file
 88
 89        Args:
 90            file (cirro.models.file.File):
 91
 92        Returns:
 93            The raw bytes of the file
 94        """
 95        return self.get_file_from_path(file.access_context, file.relative_path)
 96
 97    def get_file_from_path(self, access_context: FileAccessContext, file_path: str) -> bytes:
 98        """
 99        Gets the contents of a file by providing the path, used internally
100
101        Args:
102            access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
103            file_path (str): Relative path to file within dataset
104
105        Returns:
106            The raw bytes of the file
107        """
108        s3_client = self._generate_s3_client(access_context)
109
110        full_path = f'{access_context.prefix}/{file_path}'.lstrip('/')
111
112        return s3_client.get_file(access_context.bucket, full_path)
113
114    def create_file(self, access_context: FileAccessContext, key: str,
115                    contents: str, content_type: str) -> None:
116        """
117        Creates a file at the specified path
118
119        Args:
120            access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
121            key (str): Key of object to create
122            contents (str): Content of object
123            content_type (str):
124        """
125        s3_client = self._generate_s3_client(access_context)
126
127        s3_client.create_object(
128            key=key,
129            contents=contents,
130            content_type=content_type,
131            bucket=access_context.bucket
132        )
133
134    def upload_files(self,
135                     access_context: FileAccessContext,
136                     directory: PathLike,
137                     files: List[PathLike],
138                     file_path_map: Dict[PathLike, str]) -> None:
139        """
140        Uploads a list of files from the specified directory
141
142        Args:
143            access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
144            directory (str|Path): Path to directory
145            files (typing.List[str|Path]): List of paths to files within the directory
146                must be the same type as directory.
147            file_path_map (typing.Dict[str|Path, str]): Optional mapping of file paths to upload
148             from source path to destination path, used to "re-write" paths within the dataset.
149        """
150        s3_client = self._generate_s3_client(access_context)
151
152        upload_directory(
153            directory=directory,
154            files=files,
155            file_path_map=file_path_map,
156            s3_client=s3_client,
157            bucket=access_context.bucket,
158            prefix=access_context.prefix,
159            max_retries=self.transfer_retries
160        )
161
162    def download_files(self, access_context: FileAccessContext, directory: str, files: List[str]) -> None:
163        """
164        Download a list of files to the specified directory
165
166        Args:
167            access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
168            directory (str): download location
169            files (List[str]): relative path of files to download
170        """
171        s3_client = self._generate_s3_client(access_context)
172
173        download_directory(
174            directory,
175            files,
176            s3_client,
177            access_context.bucket,
178            access_context.prefix
179        )
180
181    def _generate_s3_client(self, access_context: FileAccessContext):
182        """
183        Generates the Cirro-S3 client to perform operations on files
184        """
185        return S3Client(
186            partial(self.get_access_credentials, access_context),
187            self.enable_additional_checksum
188        )

Service for interacting with files

FileService(api_client, enable_additional_checksum, transfer_retries)
27    def __init__(self, api_client, enable_additional_checksum, transfer_retries):
28        """
29        Instantiates the file service class
30        """
31        self._api_client = api_client
32        self.enable_additional_checksum = enable_additional_checksum
33        self.transfer_retries = transfer_retries

Instantiates the file service class

enable_additional_checksum: bool
transfer_retries: int
def get_access_credentials( self, access_context: cirro.models.file.FileAccessContext) -> cirro_api_client.v1.models.AWSCredentials:
35    def get_access_credentials(self, access_context: FileAccessContext) -> AWSCredentials:
36        """
37        Retrieves credentials to access files
38
39        Args:
40            access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
41        """
42        access_request = access_context.file_access_request
43
44        if access_request.access_type == ProjectAccessType.PROJECT_DOWNLOAD:
45            return self._get_project_read_credentials(access_context)
46
47        else:
48            return generate_project_file_access_token.sync(
49                client=self._api_client,
50                project_id=access_context.project_id,
51                body=access_context.file_access_request
52            )

Retrieves credentials to access files

Arguments:
def get_aws_s3_client( self, access_context: cirro.models.file.FileAccessContext) -> botocore.client.BaseClient:
74    def get_aws_s3_client(self, access_context: FileAccessContext) -> BaseClient:
75        """
76        Gets the underlying AWS S3 client to perform operations on files
77
78        This is seeded with refreshable credentials from the access_context parameter
79
80        This may be used to perform advanced operations, such as CopyObject, S3 Select, etc.
81        """
82        s3_client = self._generate_s3_client(access_context)
83        return s3_client.get_aws_client()

Gets the underlying AWS S3 client to perform operations on files

This is seeded with refreshable credentials from the access_context parameter

This may be used to perform advanced operations, such as CopyObject, S3 Select, etc.

def get_file(self, file: cirro.models.file.File) -> bytes:
85    def get_file(self, file: File) -> bytes:
86        """
87        Gets the contents of a file
88
89        Args:
90            file (cirro.models.file.File):
91
92        Returns:
93            The raw bytes of the file
94        """
95        return self.get_file_from_path(file.access_context, file.relative_path)

Gets the contents of a file

Arguments:
Returns:

The raw bytes of the file

def get_file_from_path( self, access_context: cirro.models.file.FileAccessContext, file_path: str) -> bytes:
 97    def get_file_from_path(self, access_context: FileAccessContext, file_path: str) -> bytes:
 98        """
 99        Gets the contents of a file by providing the path, used internally
100
101        Args:
102            access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
103            file_path (str): Relative path to file within dataset
104
105        Returns:
106            The raw bytes of the file
107        """
108        s3_client = self._generate_s3_client(access_context)
109
110        full_path = f'{access_context.prefix}/{file_path}'.lstrip('/')
111
112        return s3_client.get_file(access_context.bucket, full_path)

Gets the contents of a file by providing the path, used internally

Arguments:
Returns:

The raw bytes of the file

def create_file( self, access_context: cirro.models.file.FileAccessContext, key: str, contents: str, content_type: str) -> None:
114    def create_file(self, access_context: FileAccessContext, key: str,
115                    contents: str, content_type: str) -> None:
116        """
117        Creates a file at the specified path
118
119        Args:
120            access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
121            key (str): Key of object to create
122            contents (str): Content of object
123            content_type (str):
124        """
125        s3_client = self._generate_s3_client(access_context)
126
127        s3_client.create_object(
128            key=key,
129            contents=contents,
130            content_type=content_type,
131            bucket=access_context.bucket
132        )

Creates a file at the specified path

Arguments:
  • access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
  • key (str): Key of object to create
  • contents (str): Content of object
  • content_type (str):
def upload_files( self, access_context: cirro.models.file.FileAccessContext, directory: ~PathLike, files: List[~PathLike], file_path_map: Dict[~PathLike, str]) -> None:
134    def upload_files(self,
135                     access_context: FileAccessContext,
136                     directory: PathLike,
137                     files: List[PathLike],
138                     file_path_map: Dict[PathLike, str]) -> None:
139        """
140        Uploads a list of files from the specified directory
141
142        Args:
143            access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
144            directory (str|Path): Path to directory
145            files (typing.List[str|Path]): List of paths to files within the directory
146                must be the same type as directory.
147            file_path_map (typing.Dict[str|Path, str]): Optional mapping of file paths to upload
148             from source path to destination path, used to "re-write" paths within the dataset.
149        """
150        s3_client = self._generate_s3_client(access_context)
151
152        upload_directory(
153            directory=directory,
154            files=files,
155            file_path_map=file_path_map,
156            s3_client=s3_client,
157            bucket=access_context.bucket,
158            prefix=access_context.prefix,
159            max_retries=self.transfer_retries
160        )

Uploads a list of files from the specified directory

Arguments:
  • access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
  • directory (str|Path): Path to directory
  • files (typing.List[str|Path]): List of paths to files within the directory must be the same type as directory.
  • file_path_map (typing.Dict[str|Path, str]): Optional mapping of file paths to upload from source path to destination path, used to "re-write" paths within the dataset.
def download_files( self, access_context: cirro.models.file.FileAccessContext, directory: str, files: List[str]) -> None:
162    def download_files(self, access_context: FileAccessContext, directory: str, files: List[str]) -> None:
163        """
164        Download a list of files to the specified directory
165
166        Args:
167            access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
168            directory (str): download location
169            files (List[str]): relative path of files to download
170        """
171        s3_client = self._generate_s3_client(access_context)
172
173        download_directory(
174            directory,
175            files,
176            s3_client,
177            access_context.bucket,
178            access_context.prefix
179        )

Download a list of files to the specified directory

Arguments:
  • access_context (cirro.models.file.FileAccessContext): File access context, use class methods to generate
  • directory (str): download location
  • files (List[str]): relative path of files to download
class MetadataService(cirro.services.base.BaseService):
11class MetadataService(BaseService):
12    """
13    Service for interacting with the Metadata endpoints
14    """
15    def get_project_samples(self, project_id: str, max_items: int = 10000) -> List[Sample]:
16        """
17        Retrieves a list of samples associated with a project along with their metadata
18
19        Args:
20            project_id (str): ID of the Project
21            max_items (int): Maximum number of records to get (default 10,000)
22        """
23        return get_all_records(
24            records_getter=lambda page_args: get_project_samples.sync(project_id=project_id,
25                                                                      client=self._api_client,
26                                                                      next_token=page_args.next_token,
27                                                                      limit=page_args.limit),
28            max_items=max_items
29        )
30
31    def get_project_schema(self, project_id: str) -> FormSchema:
32        """
33        Get project metadata schema
34
35        Args:
36            project_id (str): ID of the Project
37        """
38        return get_project_schema.sync(project_id=project_id, client=self._api_client)
39
40    def update_project_schema(self, project_id: str, schema: FormSchema):
41        """
42        Update project metadata schema
43
44        Args:
45            project_id (str): ID of the Project
46            schema (cirro_api_client.v1.models.FormSchema): Metadata schema
47        """
48        update_project_schema.sync_detailed(project_id=project_id, body=schema, client=self._api_client)
49
50    def update_sample(self, project_id: str, sample_id: str, sample: SampleRequest) -> Sample:
51        """
52        Updates metadata information for sample
53
54        Args:
55            project_id (str): ID of the Project
56            sample_id (str): ID of the sample
57            sample (cirro_api_client.v1.models.SampleRequest): Metadata information for the sample
58        """
59        return update_sample.sync(
60            project_id=project_id,
61            sample_id=sample_id,
62            body=sample,
63            client=self._api_client
64        )

Service for interacting with the Metadata endpoints

def get_project_samples( self, project_id: str, max_items: int = 10000) -> List[cirro_api_client.v1.models.Sample]:
15    def get_project_samples(self, project_id: str, max_items: int = 10000) -> List[Sample]:
16        """
17        Retrieves a list of samples associated with a project along with their metadata
18
19        Args:
20            project_id (str): ID of the Project
21            max_items (int): Maximum number of records to get (default 10,000)
22        """
23        return get_all_records(
24            records_getter=lambda page_args: get_project_samples.sync(project_id=project_id,
25                                                                      client=self._api_client,
26                                                                      next_token=page_args.next_token,
27                                                                      limit=page_args.limit),
28            max_items=max_items
29        )

Retrieves a list of samples associated with a project along with their metadata

Arguments:
  • project_id (str): ID of the Project
  • max_items (int): Maximum number of records to get (default 10,000)
def get_project_schema( self, project_id: str) -> cirro_api_client.v1.models.FormSchema:
31    def get_project_schema(self, project_id: str) -> FormSchema:
32        """
33        Get project metadata schema
34
35        Args:
36            project_id (str): ID of the Project
37        """
38        return get_project_schema.sync(project_id=project_id, client=self._api_client)

Get project metadata schema

Arguments:
  • project_id (str): ID of the Project
def update_project_schema( self, project_id: str, schema: cirro_api_client.v1.models.FormSchema):
40    def update_project_schema(self, project_id: str, schema: FormSchema):
41        """
42        Update project metadata schema
43
44        Args:
45            project_id (str): ID of the Project
46            schema (cirro_api_client.v1.models.FormSchema): Metadata schema
47        """
48        update_project_schema.sync_detailed(project_id=project_id, body=schema, client=self._api_client)

Update project metadata schema

Arguments:
def update_sample( self, project_id: str, sample_id: str, sample: cirro_api_client.v1.models.SampleRequest) -> cirro_api_client.v1.models.Sample:
50    def update_sample(self, project_id: str, sample_id: str, sample: SampleRequest) -> Sample:
51        """
52        Updates metadata information for sample
53
54        Args:
55            project_id (str): ID of the Project
56            sample_id (str): ID of the sample
57            sample (cirro_api_client.v1.models.SampleRequest): Metadata information for the sample
58        """
59        return update_sample.sync(
60            project_id=project_id,
61            sample_id=sample_id,
62            body=sample,
63            client=self._api_client
64        )

Updates metadata information for sample

Arguments:
class MetricsService(cirro.services.base.BaseService):
10class MetricsService(BaseService):
11    """
12    Service for interacting with the Metrics endpoints
13    """
14    def get_for_project(self, project_id: str) -> ProjectMetrics:
15        """
16        Retrieves the cost and storage metrics for a project.
17
18        Args:
19            project_id (str): ID of the Project
20        """
21        return get_project_metrics.sync(
22            project_id=project_id,
23            client=self._api_client
24        )
25
26    def get_all_metrics(self) -> List[ProjectMetrics]:
27        """
28        Retrieves all available metrics
29        """
30        return get_all_metrics.sync(client=self._api_client)

Service for interacting with the Metrics endpoints

def get_for_project( self, project_id: str) -> cirro_api_client.v1.models.ProjectMetrics:
14    def get_for_project(self, project_id: str) -> ProjectMetrics:
15        """
16        Retrieves the cost and storage metrics for a project.
17
18        Args:
19            project_id (str): ID of the Project
20        """
21        return get_project_metrics.sync(
22            project_id=project_id,
23            client=self._api_client
24        )

Retrieves the cost and storage metrics for a project.

Arguments:
  • project_id (str): ID of the Project
def get_all_metrics(self) -> List[cirro_api_client.v1.models.ProjectMetrics]:
26    def get_all_metrics(self) -> List[ProjectMetrics]:
27        """
28        Retrieves all available metrics
29        """
30        return get_all_metrics.sync(client=self._api_client)

Retrieves all available metrics

class ProcessService(cirro.services.base.BaseService):
 15class ProcessService(BaseService):
 16    """
 17    Service for interacting with the Process endpoints
 18    """
 19    def list(self, process_type: Executor = None) -> List[Process]:
 20        """
 21        Retrieves a list of available processes
 22
 23        Args:
 24            process_type (`cirro_api_client.v1.models.Executor`): Optional process type (INGEST, CROMWELL, or NEXTFLOW)
 25        """
 26        processes = get_processes.sync(client=self._api_client)
 27        return [p for p in processes if not process_type or process_type == p.executor]
 28
 29    def get(self, process_id: str) -> ProcessDetail:
 30        """
 31        Retrieves detailed information on a process
 32
 33        Args:
 34            process_id (str): Process ID
 35        """
 36        return get_process.sync(process_id=process_id, client=self._api_client)
 37
 38    def archive(self, process_id: str):
 39        """
 40        Removes a custom process from the list of available processes.
 41
 42        Error will be raised if the requested process does not exist. No value
 43        is returned, and no error raised if process exists and request is satisfied.
 44
 45        Args:
 46            process_id (str): Process ID
 47        """
 48        archive_custom_process.sync_detailed(process_id=process_id, client=self._api_client)
 49
 50    def find_by_name(self, name: str) -> Optional[ProcessDetail]:
 51        """
 52        Get a process by its display name
 53
 54        Args:
 55            name (str): Process name
 56        """
 57        matched_process = next((p for p in self.list() if p.name == name), None)
 58        if not matched_process:
 59            return None
 60
 61        return self.get(matched_process.id)
 62
 63    def get_parameter_spec(self, process_id: str) -> ParameterSpecification:
 64        """
 65        Gets a specification used to describe the parameters used in the process
 66
 67        Args:
 68            process_id (str): Process ID
 69        """
 70        form_spec = get_process_parameters.sync(process_id=process_id, client=self._api_client)
 71        return ParameterSpecification(form_spec)
 72
 73    def check_dataset_files(self, files: List[str], process_id: str, directory: str):
 74        """
 75        Checks if the file mapping rules for a process are met by the list of files
 76
 77        Error will be raised if the file mapping rules for the process are not met.
 78        No value is returned and no error is raised if the rules are satisfied.
 79
 80        Args:
 81            process_id (str): ID for the process containing the file mapping rules
 82            directory: path to directory containing files
 83            files (List[str]): File names to check
 84        """
 85        # Parse sample sheet file if present
 86        sample_sheet = None
 87        sample_sheet_file = Path(directory, 'samplesheet.csv')
 88        if sample_sheet_file.exists():
 89            sample_sheet = sample_sheet_file.read_text()
 90
 91        request = ValidateFileRequirementsRequest(
 92            file_names=files,
 93            sample_sheet=sample_sheet
 94        )
 95        requirements = validate_file_requirements.sync(process_id=process_id, body=request, client=self._api_client)
 96
 97        # These will be sample sheet errors or no files errors
 98        if error_msg := requirements.error_msg:
 99            raise ValueError(error_msg)
100
101        errors = [
102            f'{entry.description}. {entry.error_msg}. We accept any of the following naming conventions: \n\t- ' +
103            '\n\t- '.join([
104                e.example_name
105                for e in entry.allowed_patterns
106            ])
107            for entry in requirements.allowed_data_types
108            if entry.error_msg is not None
109        ]
110
111        files_provided = ', '.join(files)
112
113        if len(errors) != 0:
114            raise ValueError(f"The files you have provided are: {files_provided} \n\n"
115                             "They do not meet the dataset requirements. "
116                             "The required file types are: \n" +
117                             "\n".join(errors))
118
119    def create_custom_process(self, process: CustomProcessInput) -> CreateResponse:
120        """
121        Creates a custom process (pipeline or data type) in the system.
122
123        Whether it is a pipeline or data type is determined by the executor field.
124        For pipelines, you must specify the pipeline code and configuration repository.
125
126        This process will be available to all users in the system if `is_tenant_wide` is set to True.
127        If `is_tenant_wide` is set to False, the process will only be available the projects
128         specified in `linked_projects_ids`. Making it available tenant wide requires tenant admin privileges.
129
130        If the repository is private, you must complete the authorization flow on the UI.
131
132        See https://docs.cirro.bio/pipelines/importing-custom-pipelines/ for more info.
133
134        Args:
135            process (Process): Process to create
136
137        Example:
138        ```python
139        from cirro_api_client.v1.models import CustomProcessInput, Executor, \
140         PipelineCode, FileMappingRule, FileNamePattern, RepositoryType, CustomPipelineSettings
141        from cirro.cirro_client import CirroApi
142
143        cirro = CirroApi()
144
145        # New pipeline
146        new_pipeline = CustomProcessInput(
147            id="my_pipeline",
148            name="My Pipeline",
149            description="This is a test pipeline",
150            executor=Executor.CROMWELL,
151            category="DNA Sequencing",
152            child_process_ids=[],
153            parent_process_ids=["rnaseq"],
154            documentation_url="https://example.com/docs",
155            pipeline_code=PipelineCode(
156                repository_path="CirroBio/test-pipeline",
157                version="v1.0.0",
158                entry_point="main.nf",
159                repository_type=RepositoryType.GITHUB_PUBLIC
160            ),
161            linked_project_ids=[],
162            is_tenant_wide=True,
163            allow_multiple_sources=True,
164            uses_sample_sheet=True,
165            # This can be the same or different from the pipeline_code
166            custom_settings=CustomPipelineSettings(
167                repository="CirroBio/test-pipeline",
168                branch="v1.0.0",
169                repository_type=RepositoryType.GITHUB_PUBLIC,
170            ),
171            file_mapping_rules=[
172                FileMappingRule(
173                    description="Filtered Feature Matrix",
174                    file_name_patterns=[
175                        FileNamePattern(
176                            example_name="filtered_feature_bc_matrix.h5",
177                            description="Matrix",
178                            sample_matching_pattern="(?P<sampleName>[\\S ]*)\\.jpg"
179                        )
180                    ]
181                )
182            ]
183        )
184
185        cirro.processes.create_custom_process(new_pipeline)
186
187        # New data type
188        new_data_type = CustomProcessInput(
189            id="images_jpg",
190            name="JPG Images",
191            description="Used for generic JPG images",
192            executor=Executor.INGEST,
193            child_process_ids=[],
194            parent_process_ids=[],
195            documentation_url="https://example.com/docs",
196            linked_project_ids=["project_id_1", "project_id_2"],
197            file_mapping_rules=[
198                FileMappingRule(
199                    description="Images",
200                    min_=1,
201                    file_name_patterns=[
202                        FileNamePattern(
203                            example_name="image.jpg",
204                            description="JPG Image",
205                            sample_matching_pattern="(?P<sampleName>[\\S ]*)/outs/image\\.jpg"
206                        )
207                    ]
208                )
209            ]
210        )
211        cirro.processes.create_custom_process(new_data_type)
212        ```
213        """
214        return create_custom_process.sync(client=self._api_client, body=process)
215
216    def update_custom_process(self, process_id: str, process: CustomProcessInput):
217        """
218        Updates the custom process (pipeline or data type) in the system.
219
220        Please run `sync_custom_process` after updating to ensure the pipeline configuration is up to date.
221
222        Args:
223            process_id (str): ID of the process to update
224            process (CustomProcessInput): Process to update
225        """
226        update_custom_process.sync_detailed(client=self._api_client,
227                                            process_id=process_id,
228                                            body=process)
229
230    def sync_custom_process(self, process_id: str) -> CustomPipelineSettings:
231        """
232        Syncs a custom pipeline in the system.
233
234        This updates the pipeline configurations in Cirro (form configuration,
235         input mapping, preprocess etc.) to match what is in the configured repository.
236        See https://docs.cirro.bio/pipelines/configuring-pipeline/ for more details.
237
238        Args:
239            process_id (str): ID of the process to sync
240        """
241        return sync_custom_process.sync(client=self._api_client, process_id=process_id)

Service for interacting with the Process endpoints

def list( self, process_type: cirro_api_client.v1.models.Executor = None) -> List[cirro_api_client.v1.models.Process]:
19    def list(self, process_type: Executor = None) -> List[Process]:
20        """
21        Retrieves a list of available processes
22
23        Args:
24            process_type (`cirro_api_client.v1.models.Executor`): Optional process type (INGEST, CROMWELL, or NEXTFLOW)
25        """
26        processes = get_processes.sync(client=self._api_client)
27        return [p for p in processes if not process_type or process_type == p.executor]

Retrieves a list of available processes

Arguments:
def get( self, process_id: str) -> cirro_api_client.v1.models.ProcessDetail:
29    def get(self, process_id: str) -> ProcessDetail:
30        """
31        Retrieves detailed information on a process
32
33        Args:
34            process_id (str): Process ID
35        """
36        return get_process.sync(process_id=process_id, client=self._api_client)

Retrieves detailed information on a process

Arguments:
  • process_id (str): Process ID
def archive(self, process_id: str):
38    def archive(self, process_id: str):
39        """
40        Removes a custom process from the list of available processes.
41
42        Error will be raised if the requested process does not exist. No value
43        is returned, and no error raised if process exists and request is satisfied.
44
45        Args:
46            process_id (str): Process ID
47        """
48        archive_custom_process.sync_detailed(process_id=process_id, client=self._api_client)

Removes a custom process from the list of available processes.

Error will be raised if the requested process does not exist. No value is returned, and no error raised if process exists and request is satisfied.

Arguments:
  • process_id (str): Process ID
def find_by_name( self, name: str) -> Optional[cirro_api_client.v1.models.ProcessDetail]:
50    def find_by_name(self, name: str) -> Optional[ProcessDetail]:
51        """
52        Get a process by its display name
53
54        Args:
55            name (str): Process name
56        """
57        matched_process = next((p for p in self.list() if p.name == name), None)
58        if not matched_process:
59            return None
60
61        return self.get(matched_process.id)

Get a process by its display name

Arguments:
  • name (str): Process name
def get_parameter_spec( self, process_id: str) -> cirro.models.form_specification.ParameterSpecification:
63    def get_parameter_spec(self, process_id: str) -> ParameterSpecification:
64        """
65        Gets a specification used to describe the parameters used in the process
66
67        Args:
68            process_id (str): Process ID
69        """
70        form_spec = get_process_parameters.sync(process_id=process_id, client=self._api_client)
71        return ParameterSpecification(form_spec)

Gets a specification used to describe the parameters used in the process

Arguments:
  • process_id (str): Process ID
def check_dataset_files(self, files: List[str], process_id: str, directory: str):
 73    def check_dataset_files(self, files: List[str], process_id: str, directory: str):
 74        """
 75        Checks if the file mapping rules for a process are met by the list of files
 76
 77        Error will be raised if the file mapping rules for the process are not met.
 78        No value is returned and no error is raised if the rules are satisfied.
 79
 80        Args:
 81            process_id (str): ID for the process containing the file mapping rules
 82            directory: path to directory containing files
 83            files (List[str]): File names to check
 84        """
 85        # Parse sample sheet file if present
 86        sample_sheet = None
 87        sample_sheet_file = Path(directory, 'samplesheet.csv')
 88        if sample_sheet_file.exists():
 89            sample_sheet = sample_sheet_file.read_text()
 90
 91        request = ValidateFileRequirementsRequest(
 92            file_names=files,
 93            sample_sheet=sample_sheet
 94        )
 95        requirements = validate_file_requirements.sync(process_id=process_id, body=request, client=self._api_client)
 96
 97        # These will be sample sheet errors or no files errors
 98        if error_msg := requirements.error_msg:
 99            raise ValueError(error_msg)
100
101        errors = [
102            f'{entry.description}. {entry.error_msg}. We accept any of the following naming conventions: \n\t- ' +
103            '\n\t- '.join([
104                e.example_name
105                for e in entry.allowed_patterns
106            ])
107            for entry in requirements.allowed_data_types
108            if entry.error_msg is not None
109        ]
110
111        files_provided = ', '.join(files)
112
113        if len(errors) != 0:
114            raise ValueError(f"The files you have provided are: {files_provided} \n\n"
115                             "They do not meet the dataset requirements. "
116                             "The required file types are: \n" +
117                             "\n".join(errors))

Checks if the file mapping rules for a process are met by the list of files

Error will be raised if the file mapping rules for the process are not met. No value is returned and no error is raised if the rules are satisfied.

Arguments:
  • process_id (str): ID for the process containing the file mapping rules
  • directory: path to directory containing files
  • files (List[str]): File names to check
def create_custom_process( self, process: cirro_api_client.v1.models.CustomProcessInput) -> cirro_api_client.v1.models.CreateResponse:
119    def create_custom_process(self, process: CustomProcessInput) -> CreateResponse:
120        """
121        Creates a custom process (pipeline or data type) in the system.
122
123        Whether it is a pipeline or data type is determined by the executor field.
124        For pipelines, you must specify the pipeline code and configuration repository.
125
126        This process will be available to all users in the system if `is_tenant_wide` is set to True.
127        If `is_tenant_wide` is set to False, the process will only be available the projects
128         specified in `linked_projects_ids`. Making it available tenant wide requires tenant admin privileges.
129
130        If the repository is private, you must complete the authorization flow on the UI.
131
132        See https://docs.cirro.bio/pipelines/importing-custom-pipelines/ for more info.
133
134        Args:
135            process (Process): Process to create
136
137        Example:
138        ```python
139        from cirro_api_client.v1.models import CustomProcessInput, Executor, \
140         PipelineCode, FileMappingRule, FileNamePattern, RepositoryType, CustomPipelineSettings
141        from cirro.cirro_client import CirroApi
142
143        cirro = CirroApi()
144
145        # New pipeline
146        new_pipeline = CustomProcessInput(
147            id="my_pipeline",
148            name="My Pipeline",
149            description="This is a test pipeline",
150            executor=Executor.CROMWELL,
151            category="DNA Sequencing",
152            child_process_ids=[],
153            parent_process_ids=["rnaseq"],
154            documentation_url="https://example.com/docs",
155            pipeline_code=PipelineCode(
156                repository_path="CirroBio/test-pipeline",
157                version="v1.0.0",
158                entry_point="main.nf",
159                repository_type=RepositoryType.GITHUB_PUBLIC
160            ),
161            linked_project_ids=[],
162            is_tenant_wide=True,
163            allow_multiple_sources=True,
164            uses_sample_sheet=True,
165            # This can be the same or different from the pipeline_code
166            custom_settings=CustomPipelineSettings(
167                repository="CirroBio/test-pipeline",
168                branch="v1.0.0",
169                repository_type=RepositoryType.GITHUB_PUBLIC,
170            ),
171            file_mapping_rules=[
172                FileMappingRule(
173                    description="Filtered Feature Matrix",
174                    file_name_patterns=[
175                        FileNamePattern(
176                            example_name="filtered_feature_bc_matrix.h5",
177                            description="Matrix",
178                            sample_matching_pattern="(?P<sampleName>[\\S ]*)\\.jpg"
179                        )
180                    ]
181                )
182            ]
183        )
184
185        cirro.processes.create_custom_process(new_pipeline)
186
187        # New data type
188        new_data_type = CustomProcessInput(
189            id="images_jpg",
190            name="JPG Images",
191            description="Used for generic JPG images",
192            executor=Executor.INGEST,
193            child_process_ids=[],
194            parent_process_ids=[],
195            documentation_url="https://example.com/docs",
196            linked_project_ids=["project_id_1", "project_id_2"],
197            file_mapping_rules=[
198                FileMappingRule(
199                    description="Images",
200                    min_=1,
201                    file_name_patterns=[
202                        FileNamePattern(
203                            example_name="image.jpg",
204                            description="JPG Image",
205                            sample_matching_pattern="(?P<sampleName>[\\S ]*)/outs/image\\.jpg"
206                        )
207                    ]
208                )
209            ]
210        )
211        cirro.processes.create_custom_process(new_data_type)
212        ```
213        """
214        return create_custom_process.sync(client=self._api_client, body=process)

Creates a custom process (pipeline or data type) in the system.

Whether it is a pipeline or data type is determined by the executor field. For pipelines, you must specify the pipeline code and configuration repository.

This process will be available to all users in the system if is_tenant_wide is set to True. If is_tenant_wide is set to False, the process will only be available the projects specified in linked_projects_ids. Making it available tenant wide requires tenant admin privileges.

If the repository is private, you must complete the authorization flow on the UI.

See https://docs.cirro.bio/pipelines/importing-custom-pipelines/ for more info.

Arguments:
  • process (Process): Process to create

Example:

from cirro_api_client.v1.models import CustomProcessInput, Executor,          PipelineCode, FileMappingRule, FileNamePattern, RepositoryType, CustomPipelineSettings
from cirro.cirro_client import CirroApi

cirro = CirroApi()

# New pipeline
new_pipeline = CustomProcessInput(
    id="my_pipeline",
    name="My Pipeline",
    description="This is a test pipeline",
    executor=Executor.CROMWELL,
    category="DNA Sequencing",
    child_process_ids=[],
    parent_process_ids=["rnaseq"],
    documentation_url="https://example.com/docs",
    pipeline_code=PipelineCode(
        repository_path="CirroBio/test-pipeline",
        version="v1.0.0",
        entry_point="main.nf",
        repository_type=RepositoryType.GITHUB_PUBLIC
    ),
    linked_project_ids=[],
    is_tenant_wide=True,
    allow_multiple_sources=True,
    uses_sample_sheet=True,
    # This can be the same or different from the pipeline_code
    custom_settings=CustomPipelineSettings(
        repository="CirroBio/test-pipeline",
        branch="v1.0.0",
        repository_type=RepositoryType.GITHUB_PUBLIC,
    ),
    file_mapping_rules=[
        FileMappingRule(
            description="Filtered Feature Matrix",
            file_name_patterns=[
                FileNamePattern(
                    example_name="filtered_feature_bc_matrix.h5",
                    description="Matrix",
                    sample_matching_pattern="(?P<sampleName>[\S ]*)\.jpg"
                )
            ]
        )
    ]
)

cirro.processes.create_custom_process(new_pipeline)

# New data type
new_data_type = CustomProcessInput(
    id="images_jpg",
    name="JPG Images",
    description="Used for generic JPG images",
    executor=Executor.INGEST,
    child_process_ids=[],
    parent_process_ids=[],
    documentation_url="https://example.com/docs",
    linked_project_ids=["project_id_1", "project_id_2"],
    file_mapping_rules=[
        FileMappingRule(
            description="Images",
            min_=1,
            file_name_patterns=[
                FileNamePattern(
                    example_name="image.jpg",
                    description="JPG Image",
                    sample_matching_pattern="(?P<sampleName>[\S ]*)/outs/image\.jpg"
                )
            ]
        )
    ]
)
cirro.processes.create_custom_process(new_data_type)
def update_custom_process( self, process_id: str, process: cirro_api_client.v1.models.CustomProcessInput):
216    def update_custom_process(self, process_id: str, process: CustomProcessInput):
217        """
218        Updates the custom process (pipeline or data type) in the system.
219
220        Please run `sync_custom_process` after updating to ensure the pipeline configuration is up to date.
221
222        Args:
223            process_id (str): ID of the process to update
224            process (CustomProcessInput): Process to update
225        """
226        update_custom_process.sync_detailed(client=self._api_client,
227                                            process_id=process_id,
228                                            body=process)

Updates the custom process (pipeline or data type) in the system.

Please run sync_custom_process after updating to ensure the pipeline configuration is up to date.

Arguments:
  • process_id (str): ID of the process to update
  • process (CustomProcessInput): Process to update
def sync_custom_process( self, process_id: str) -> cirro_api_client.v1.models.CustomPipelineSettings:
230    def sync_custom_process(self, process_id: str) -> CustomPipelineSettings:
231        """
232        Syncs a custom pipeline in the system.
233
234        This updates the pipeline configurations in Cirro (form configuration,
235         input mapping, preprocess etc.) to match what is in the configured repository.
236        See https://docs.cirro.bio/pipelines/configuring-pipeline/ for more details.
237
238        Args:
239            process_id (str): ID of the process to sync
240        """
241        return sync_custom_process.sync(client=self._api_client, process_id=process_id)

Syncs a custom pipeline in the system.

This updates the pipeline configurations in Cirro (form configuration, input mapping, preprocess etc.) to match what is in the configured repository. See https://docs.cirro.bio/pipelines/configuring-pipeline/ for more details.

Arguments:
  • process_id (str): ID of the process to sync
class ProjectService(cirro.services.base.BaseService):
 12class ProjectService(BaseService):
 13    """
 14    Service for interacting with the Project endpoints
 15    """
 16    def list(self):
 17        """
 18        Retrieve a list of projects
 19        """
 20        return get_projects.sync(client=self._api_client)
 21
 22    def get(self, project_id: str) -> ProjectDetail:
 23        """
 24        Get detailed project information
 25
 26        Args:
 27            project_id (str): Project ID
 28        """
 29        return get_project.sync(project_id=project_id, client=self._api_client)
 30
 31    def create(self, request: ProjectInput) -> CreateResponse:
 32        """
 33        Create a project
 34
 35        Args:
 36            request (`cirro_api_client.v1.models.ProjectInput`): Detailed information about the project to create
 37
 38        Examples:
 39        ```python
 40        from cirro_api_client.v1.models import ProjectInput, ProjectSettings, Contact, BudgetPeriod, CloudAccount, \
 41            CloudAccountType
 42        from cirro.cirro_client import CirroApi
 43
 44        cirro = CirroApi()
 45
 46        # Bring your own account projects are hosted by the user
 47        # You must provide the account details and VPC information
 48        # Please view the ProjectSettings model for more information on the attributes required
 49        byoa_project = ProjectInput(
 50            name="New Project Name",
 51            description="Description of new project",
 52            billing_account_id="billing-account-id",
 53            settings=ProjectSettings(
 54                budget_period=BudgetPeriod.MONTHLY,
 55                budget_amount=1000,
 56                max_spot_vcpu=300,
 57                service_connections=[],
 58                retention_policy_days=7,
 59                vpc_id="vpc-000000000000",
 60                batch_subnets=["subnet-000000000000", "subnet-000000000001"],
 61                sagemaker_subnets=["subnet-000000000000", "subnet-000000000001"],
 62                kms_arn="arn:aws:kms:us-west-2:000000000000:key/00000000-0000-0000-0000-000000000000"
 63            ),
 64            account=CloudAccount(
 65                account_id="<AWS_ACCOUNT_ID>",
 66                region_name="us-west-2",
 67                account_name="Cirro Lab Project",
 68                account_type=CloudAccountType.BYOA
 69            ),
 70            contacts=[
 71                Contact(
 72                    name="Contact Name",
 73                    organization="Contact Organization",
 74                    email="contact@email.com",
 75                    phone="987-654-3210"
 76                )
 77            ]
 78        )
 79
 80        # Hosted projects are managed by Cirro
 81        hosted_project = ProjectInput(
 82            name="New Project Name",
 83            description="Description of new project",
 84            billing_account_id="billing-account-id",
 85            settings=ProjectSettings(
 86                budget_period=BudgetPeriod.MONTHLY,
 87                budget_amount=1000,
 88                max_spot_vcpu=300,
 89                service_connections=[],
 90                retention_policy_days=7
 91            ),
 92            contacts=[
 93                Contact(
 94                    name="Contact Name",
 95                    organization="Contact Organization",
 96                    email="contact@email.com",
 97                    phone="987-654-3210"
 98                )
 99            ]
100        )
101
102        cirro.projects.create(byoa_project)
103        ```
104        """
105        return create_project.sync(client=self._api_client, body=request)
106
107    def update(self, project_id: str, request: ProjectInput) -> ProjectDetail:
108        """
109        Updates a project
110
111        Args:
112            project_id (str): ID of project to update
113            request (`cirro_api_client.v1.models.ProjectInput`): New details for the project
114        """
115        return update_project.sync(project_id=project_id, body=request, client=self._api_client)
116
117    def update_tags(self, project_id: str, tags: List[Tag]):
118        """
119        Sets tags on a project
120
121        Args:
122            project_id (str): Project ID
123            tags (List[Tag]): List of tags to apply
124        """
125        update_project_tags.sync_detailed(project_id=project_id, body=tags, client=self._api_client)
126
127    def get_users(self, project_id: str) -> Optional[List[ProjectUser]]:
128        """
129        Gets users who have access to the project
130
131        Args:
132            project_id (str): Project ID
133        """
134        return get_project_users.sync(project_id=project_id, client=self._api_client)
135
136    def set_user_role(self, project_id: str, username: str, role: ProjectRole, suppress_notification=False):
137        """
138        Sets a user's role within a project.
139
140        Set to `ProjectRole.NONE` if removing the user from a project.
141
142        Args:
143            project_id (str): Project ID
144            username (str): Username
145            role (`cirro_api_client.v1.models.ProjectRole`): New role to apply
146            suppress_notification (bool): Suppress email notification
147        """
148        request_body = SetUserProjectRoleRequest(
149            username=username,
150            role=role,
151            suppress_notification=suppress_notification
152        )
153        set_user_project_role.sync_detailed(project_id=project_id, body=request_body, client=self._api_client)
154
155    def archive(self, project_id: str):
156        """
157        Sets the project status to archived (hidden from active projects)
158
159        Args:
160            project_id (str): Project ID
161        """
162        archive_project.sync_detailed(project_id=project_id, client=self._api_client)
163
164    def unarchive(self, project_id: str):
165        """
166        Sets the project status to active
167
168        Args:
169            project_id (str): Project ID
170        """
171        unarchive_project.sync_detailed(project_id=project_id, client=self._api_client)

Service for interacting with the Project endpoints

def list(self):
16    def list(self):
17        """
18        Retrieve a list of projects
19        """
20        return get_projects.sync(client=self._api_client)

Retrieve a list of projects

def get( self, project_id: str) -> cirro_api_client.v1.models.ProjectDetail:
22    def get(self, project_id: str) -> ProjectDetail:
23        """
24        Get detailed project information
25
26        Args:
27            project_id (str): Project ID
28        """
29        return get_project.sync(project_id=project_id, client=self._api_client)

Get detailed project information

Arguments:
  • project_id (str): Project ID
 31    def create(self, request: ProjectInput) -> CreateResponse:
 32        """
 33        Create a project
 34
 35        Args:
 36            request (`cirro_api_client.v1.models.ProjectInput`): Detailed information about the project to create
 37
 38        Examples:
 39        ```python
 40        from cirro_api_client.v1.models import ProjectInput, ProjectSettings, Contact, BudgetPeriod, CloudAccount, \
 41            CloudAccountType
 42        from cirro.cirro_client import CirroApi
 43
 44        cirro = CirroApi()
 45
 46        # Bring your own account projects are hosted by the user
 47        # You must provide the account details and VPC information
 48        # Please view the ProjectSettings model for more information on the attributes required
 49        byoa_project = ProjectInput(
 50            name="New Project Name",
 51            description="Description of new project",
 52            billing_account_id="billing-account-id",
 53            settings=ProjectSettings(
 54                budget_period=BudgetPeriod.MONTHLY,
 55                budget_amount=1000,
 56                max_spot_vcpu=300,
 57                service_connections=[],
 58                retention_policy_days=7,
 59                vpc_id="vpc-000000000000",
 60                batch_subnets=["subnet-000000000000", "subnet-000000000001"],
 61                sagemaker_subnets=["subnet-000000000000", "subnet-000000000001"],
 62                kms_arn="arn:aws:kms:us-west-2:000000000000:key/00000000-0000-0000-0000-000000000000"
 63            ),
 64            account=CloudAccount(
 65                account_id="<AWS_ACCOUNT_ID>",
 66                region_name="us-west-2",
 67                account_name="Cirro Lab Project",
 68                account_type=CloudAccountType.BYOA
 69            ),
 70            contacts=[
 71                Contact(
 72                    name="Contact Name",
 73                    organization="Contact Organization",
 74                    email="contact@email.com",
 75                    phone="987-654-3210"
 76                )
 77            ]
 78        )
 79
 80        # Hosted projects are managed by Cirro
 81        hosted_project = ProjectInput(
 82            name="New Project Name",
 83            description="Description of new project",
 84            billing_account_id="billing-account-id",
 85            settings=ProjectSettings(
 86                budget_period=BudgetPeriod.MONTHLY,
 87                budget_amount=1000,
 88                max_spot_vcpu=300,
 89                service_connections=[],
 90                retention_policy_days=7
 91            ),
 92            contacts=[
 93                Contact(
 94                    name="Contact Name",
 95                    organization="Contact Organization",
 96                    email="contact@email.com",
 97                    phone="987-654-3210"
 98                )
 99            ]
100        )
101
102        cirro.projects.create(byoa_project)
103        ```
104        """
105        return create_project.sync(client=self._api_client, body=request)

Create a project

Arguments:

Examples:

from cirro_api_client.v1.models import ProjectInput, ProjectSettings, Contact, BudgetPeriod, CloudAccount,             CloudAccountType
from cirro.cirro_client import CirroApi

cirro = CirroApi()

# Bring your own account projects are hosted by the user
# You must provide the account details and VPC information
# Please view the ProjectSettings model for more information on the attributes required
byoa_project = ProjectInput(
    name="New Project Name",
    description="Description of new project",
    billing_account_id="billing-account-id",
    settings=ProjectSettings(
        budget_period=BudgetPeriod.MONTHLY,
        budget_amount=1000,
        max_spot_vcpu=300,
        service_connections=[],
        retention_policy_days=7,
        vpc_id="vpc-000000000000",
        batch_subnets=["subnet-000000000000", "subnet-000000000001"],
        sagemaker_subnets=["subnet-000000000000", "subnet-000000000001"],
        kms_arn="arn:aws:kms:us-west-2:000000000000:key/00000000-0000-0000-0000-000000000000"
    ),
    account=CloudAccount(
        account_id="<AWS_ACCOUNT_ID>",
        region_name="us-west-2",
        account_name="Cirro Lab Project",
        account_type=CloudAccountType.BYOA
    ),
    contacts=[
        Contact(
            name="Contact Name",
            organization="Contact Organization",
            email="contact@email.com",
            phone="987-654-3210"
        )
    ]
)

# Hosted projects are managed by Cirro
hosted_project = ProjectInput(
    name="New Project Name",
    description="Description of new project",
    billing_account_id="billing-account-id",
    settings=ProjectSettings(
        budget_period=BudgetPeriod.MONTHLY,
        budget_amount=1000,
        max_spot_vcpu=300,
        service_connections=[],
        retention_policy_days=7
    ),
    contacts=[
        Contact(
            name="Contact Name",
            organization="Contact Organization",
            email="contact@email.com",
            phone="987-654-3210"
        )
    ]
)

cirro.projects.create(byoa_project)
def update( self, project_id: str, request: cirro_api_client.v1.models.ProjectInput) -> cirro_api_client.v1.models.ProjectDetail:
107    def update(self, project_id: str, request: ProjectInput) -> ProjectDetail:
108        """
109        Updates a project
110
111        Args:
112            project_id (str): ID of project to update
113            request (`cirro_api_client.v1.models.ProjectInput`): New details for the project
114        """
115        return update_project.sync(project_id=project_id, body=request, client=self._api_client)

Updates a project

Arguments:
def update_tags( self, project_id: str, tags: List[cirro_api_client.v1.models.Tag]):
117    def update_tags(self, project_id: str, tags: List[Tag]):
118        """
119        Sets tags on a project
120
121        Args:
122            project_id (str): Project ID
123            tags (List[Tag]): List of tags to apply
124        """
125        update_project_tags.sync_detailed(project_id=project_id, body=tags, client=self._api_client)

Sets tags on a project

Arguments:
  • project_id (str): Project ID
  • tags (List[Tag]): List of tags to apply
def get_users( self, project_id: str) -> Optional[List[cirro_api_client.v1.models.ProjectUser]]:
127    def get_users(self, project_id: str) -> Optional[List[ProjectUser]]:
128        """
129        Gets users who have access to the project
130
131        Args:
132            project_id (str): Project ID
133        """
134        return get_project_users.sync(project_id=project_id, client=self._api_client)

Gets users who have access to the project

Arguments:
  • project_id (str): Project ID
def set_user_role( self, project_id: str, username: str, role: cirro_api_client.v1.models.ProjectRole, suppress_notification=False):
136    def set_user_role(self, project_id: str, username: str, role: ProjectRole, suppress_notification=False):
137        """
138        Sets a user's role within a project.
139
140        Set to `ProjectRole.NONE` if removing the user from a project.
141
142        Args:
143            project_id (str): Project ID
144            username (str): Username
145            role (`cirro_api_client.v1.models.ProjectRole`): New role to apply
146            suppress_notification (bool): Suppress email notification
147        """
148        request_body = SetUserProjectRoleRequest(
149            username=username,
150            role=role,
151            suppress_notification=suppress_notification
152        )
153        set_user_project_role.sync_detailed(project_id=project_id, body=request_body, client=self._api_client)

Sets a user's role within a project.

Set to ProjectRole.NONE if removing the user from a project.

Arguments:
def archive(self, project_id: str):
155    def archive(self, project_id: str):
156        """
157        Sets the project status to archived (hidden from active projects)
158
159        Args:
160            project_id (str): Project ID
161        """
162        archive_project.sync_detailed(project_id=project_id, client=self._api_client)

Sets the project status to archived (hidden from active projects)

Arguments:
  • project_id (str): Project ID
def unarchive(self, project_id: str):
164    def unarchive(self, project_id: str):
165        """
166        Sets the project status to active
167
168        Args:
169            project_id (str): Project ID
170        """
171        unarchive_project.sync_detailed(project_id=project_id, client=self._api_client)

Sets the project status to active

Arguments:
  • project_id (str): Project ID
class ReferenceService(cirro.services.base.BaseService):
10class ReferenceService(BaseService):
11    """
12    Service for interacting with the References endpoints
13    """
14    def get_types(self) -> Optional[List[ReferenceType]]:
15        """
16        List available reference types
17        """
18        return get_reference_types.sync(client=self._api_client)
19
20    def get_for_project(self, project_id: str) -> Optional[List[Reference]]:
21        """
22        List available references for a given project
23        """
24        return get_references_for_project.sync(project_id=project_id, client=self._api_client)

Service for interacting with the References endpoints

def get_types( self) -> Optional[List[cirro_api_client.v1.models.ReferenceType]]:
14    def get_types(self) -> Optional[List[ReferenceType]]:
15        """
16        List available reference types
17        """
18        return get_reference_types.sync(client=self._api_client)

List available reference types

def get_for_project( self, project_id: str) -> Optional[List[cirro_api_client.v1.models.Reference]]:
20    def get_for_project(self, project_id: str) -> Optional[List[Reference]]:
21        """
22        List available references for a given project
23        """
24        return get_references_for_project.sync(project_id=project_id, client=self._api_client)

List available references for a given project

class ShareService(cirro.services.base.BaseService):
11class ShareService(BaseService):
12    """
13    Service for interacting with the Share endpoints
14    """
15
16    def list(self, project_id: str, share_type: ShareType = None) -> List[Share]:
17        """
18        Retrieve a list of shares for a given project
19        optionally filtered by share type
20
21        Args:
22            project_id (str): ID of the Project
23            share_type (ShareType): Type of share to filter by
24
25        ```python
26
27        from cirro_api_client.v1.models import ShareType
28        from cirro.cirro_client import CirroApi
29
30        cirro = CirroApi()
31
32        # List shares that we've published
33        cirro.shares.list(project_id="project-id", share_type=ShareType.PUBLISHER)
34        ```
35        """
36        shares = get_shares.sync(project_id=project_id, client=self._api_client)
37        if share_type:
38            shares = [share for share in shares if share.share_type == share_type]
39        return shares
40
41    def get(self, project_id: str, share_id: str) -> ShareDetail:
42        """
43        Get details of a share
44        """
45        return get_share.sync(project_id=project_id, share_id=share_id, client=self._api_client)
46
47    def create(self, project_id: str, share: ShareInput) -> CreateResponse:
48        """
49        Publish a share for a given project
50        """
51        return create_share.sync(project_id=project_id,
52                                 body=share,
53                                 client=self._api_client)
54
55    def update(self, project_id: str, share_id: str, share: ShareInput):
56        """
57        Publish a share for a given project
58        """
59        update_share.sync_detailed(project_id=project_id,
60                                   share_id=share_id,
61                                   body=share,
62                                   client=self._api_client)
63
64    def delete(self, project_id: str, share_id: str):
65        """
66        Delete a share
67        """
68        delete_share.sync_detailed(project_id=project_id,
69                                   share_id=share_id,
70                                   client=self._api_client)
71
72    def subscribe(self, project_id: str, share_id: str):
73        """
74        Subscribe to a share for a given project
75        """
76        subscribe_share.sync_detailed(project_id=project_id,
77                                      share_id=share_id,
78                                      client=self._api_client)
79
80    def unsubscribe(self, project_id: str, share_id: str):
81        """
82        Unsubscribe from a share for a given project
83        """
84        unsubscribe_share.sync_detailed(project_id=project_id,
85                                        share_id=share_id,
86                                        client=self._api_client)

Service for interacting with the Share endpoints

def list( self, project_id: str, share_type: cirro_api_client.v1.models.ShareType = None) -> List[cirro_api_client.v1.models.Share]:
16    def list(self, project_id: str, share_type: ShareType = None) -> List[Share]:
17        """
18        Retrieve a list of shares for a given project
19        optionally filtered by share type
20
21        Args:
22            project_id (str): ID of the Project
23            share_type (ShareType): Type of share to filter by
24
25        ```python
26
27        from cirro_api_client.v1.models import ShareType
28        from cirro.cirro_client import CirroApi
29
30        cirro = CirroApi()
31
32        # List shares that we've published
33        cirro.shares.list(project_id="project-id", share_type=ShareType.PUBLISHER)
34        ```
35        """
36        shares = get_shares.sync(project_id=project_id, client=self._api_client)
37        if share_type:
38            shares = [share for share in shares if share.share_type == share_type]
39        return shares

Retrieve a list of shares for a given project optionally filtered by share type

Arguments:
  • project_id (str): ID of the Project
  • share_type (ShareType): Type of share to filter by
from cirro_api_client.v1.models import ShareType
from cirro.cirro_client import CirroApi

cirro = CirroApi()

# List shares that we've published
cirro.shares.list(project_id="project-id", share_type=ShareType.PUBLISHER)
def get( self, project_id: str, share_id: str) -> cirro_api_client.v1.models.ShareDetail:
41    def get(self, project_id: str, share_id: str) -> ShareDetail:
42        """
43        Get details of a share
44        """
45        return get_share.sync(project_id=project_id, share_id=share_id, client=self._api_client)

Get details of a share

def create( self, project_id: str, share: cirro_api_client.v1.models.ShareInput) -> cirro_api_client.v1.models.CreateResponse:
47    def create(self, project_id: str, share: ShareInput) -> CreateResponse:
48        """
49        Publish a share for a given project
50        """
51        return create_share.sync(project_id=project_id,
52                                 body=share,
53                                 client=self._api_client)

Publish a share for a given project

def update( self, project_id: str, share_id: str, share: cirro_api_client.v1.models.ShareInput):
55    def update(self, project_id: str, share_id: str, share: ShareInput):
56        """
57        Publish a share for a given project
58        """
59        update_share.sync_detailed(project_id=project_id,
60                                   share_id=share_id,
61                                   body=share,
62                                   client=self._api_client)

Publish a share for a given project

def delete(self, project_id: str, share_id: str):
64    def delete(self, project_id: str, share_id: str):
65        """
66        Delete a share
67        """
68        delete_share.sync_detailed(project_id=project_id,
69                                   share_id=share_id,
70                                   client=self._api_client)

Delete a share

def subscribe(self, project_id: str, share_id: str):
72    def subscribe(self, project_id: str, share_id: str):
73        """
74        Subscribe to a share for a given project
75        """
76        subscribe_share.sync_detailed(project_id=project_id,
77                                      share_id=share_id,
78                                      client=self._api_client)

Subscribe to a share for a given project

def unsubscribe(self, project_id: str, share_id: str):
80    def unsubscribe(self, project_id: str, share_id: str):
81        """
82        Unsubscribe from a share for a given project
83        """
84        unsubscribe_share.sync_detailed(project_id=project_id,
85                                        share_id=share_id,
86                                        client=self._api_client)

Unsubscribe from a share for a given project

class UserService(cirro.services.base.BaseService):
10class UserService(BaseService):
11    def list(self, max_items: int = 10000) -> List[User]:
12        """
13        List users in the system
14        """
15        return get_all_records(
16            records_getter=lambda page_args: list_users.sync(
17                client=self._api_client,
18                next_token=page_args.next_token,
19                limit=page_args.limit
20            ),
21            max_items=max_items
22        )
23
24    def get(self, username: str) -> UserDetail:
25        """
26        Get user details by username, including what projects they are assigned to
27        """
28        return get_user.sync(username=username, client=self._api_client)
29
30    def invite_user(self, name: str, organization: str, email: str):
31        """
32        Invite a user to the system.
33        If the user already exists, it will return a message that the user already exists.
34
35        Args:
36            name (str): Name of the user
37            organization (str): Organization of the user
38            email (str): Email (username) of the user
39        """
40        request = InviteUserRequest(
41            name=name,
42            organization=organization,
43            email=email,
44        )
45        response = invite_user.sync(client=self._api_client, body=request)
46        return response.message

Not to be instantiated directly

def list( self, max_items: int = 10000) -> List[cirro_api_client.v1.models.User]:
11    def list(self, max_items: int = 10000) -> List[User]:
12        """
13        List users in the system
14        """
15        return get_all_records(
16            records_getter=lambda page_args: list_users.sync(
17                client=self._api_client,
18                next_token=page_args.next_token,
19                limit=page_args.limit
20            ),
21            max_items=max_items
22        )

List users in the system

def get(self, username: str) -> cirro_api_client.v1.models.UserDetail:
24    def get(self, username: str) -> UserDetail:
25        """
26        Get user details by username, including what projects they are assigned to
27        """
28        return get_user.sync(username=username, client=self._api_client)

Get user details by username, including what projects they are assigned to

def invite_user(self, name: str, organization: str, email: str):
30    def invite_user(self, name: str, organization: str, email: str):
31        """
32        Invite a user to the system.
33        If the user already exists, it will return a message that the user already exists.
34
35        Args:
36            name (str): Name of the user
37            organization (str): Organization of the user
38            email (str): Email (username) of the user
39        """
40        request = InviteUserRequest(
41            name=name,
42            organization=organization,
43            email=email,
44        )
45        response = invite_user.sync(client=self._api_client, body=request)
46        return response.message

Invite a user to the system. If the user already exists, it will return a message that the user already exists.

Arguments:
  • name (str): Name of the user
  • organization (str): Organization of the user
  • email (str): Email (username) of the user