cirro.sdk.process

  1from typing import List, Union
  2
  3from cirro_api_client.v1.models import Process, Executor, ProcessDetail, CustomPipelineSettings, PipelineCode, \
  4    RunAnalysisRequest, RunAnalysisRequestParams
  5
  6from cirro.cirro_client import CirroApi
  7from cirro.models.form_specification import ParameterSpecification
  8from cirro.sdk.asset import DataPortalAssets, DataPortalAsset
  9from cirro.sdk.exceptions import DataPortalInputError
 10
 11
 12class DataPortalProcess(DataPortalAsset):
 13    """Helper functions for interacting with analysis processes."""
 14
 15    def __init__(self, process: Union[Process, ProcessDetail], client: CirroApi):
 16        """
 17        Instantiate with helper method
 18
 19        ```python
 20        from cirro import DataPortal()
 21        portal = DataPortal()
 22        process = portal.get_process_by_name("Process Name")
 23        ```
 24        """
 25        self._data = process
 26        self._client = client
 27
 28    @property
 29    def id(self) -> str:
 30        """Unique identifier"""
 31        return self._data.id
 32
 33    @property
 34    def name(self) -> str:
 35        """Readable name"""
 36        return self._data.name
 37
 38    @property
 39    def description(self) -> str:
 40        """Longer description of process"""
 41        return self._data.description
 42
 43    @property
 44    def child_process_ids(self) -> List[str]:
 45        """List of processes which can be run on the output of this process"""
 46        return self._data.child_process_ids
 47
 48    @property
 49    def executor(self) -> Executor:
 50        """INGEST, CROMWELL, or NEXTFLOW"""
 51        return self._data.executor
 52
 53    @property
 54    def category(self) -> str:
 55        """Category of process"""
 56        return self._data.category
 57
 58    @property
 59    def pipeline_type(self) -> str:
 60        """Pipeline type"""
 61        return self._data.pipeline_type
 62
 63    @property
 64    def documentation_url(self) -> str:
 65        """Documentation URL"""
 66        return self._data.documentation_url
 67
 68    @property
 69    def file_requirements_message(self) -> str:
 70        """Description of files required for INGEST processes"""
 71        return self._data.file_requirements_message
 72
 73    @property
 74    def code(self) -> PipelineCode:
 75        """Pipeline code configuration"""
 76        return self._get_detail().pipeline_code
 77
 78    @property
 79    def custom_settings(self) -> CustomPipelineSettings:
 80        """Custom settings for the process"""
 81        return self._get_detail().custom_settings
 82
 83    def _get_detail(self) -> ProcessDetail:
 84        if not isinstance(self._data, ProcessDetail):
 85            self._data = self._client.processes.get(self.id)
 86        return self._data
 87
 88    def __str__(self):
 89        return '\n'.join([
 90            f"{i.title()}: {self.__getattribute__(i)}"
 91            for i in ['name', 'id', 'description']
 92        ])
 93
 94    def get_parameter_spec(self) -> ParameterSpecification:
 95        """
 96        Gets a specification used to describe the parameters used in the process.
 97        """
 98        return self._client.processes.get_parameter_spec(self.id)
 99
100    def run_analysis(
101            self,
102            name: str = None,
103            project_id: str = None,
104            datasets: list = None,
105            description: str = "",
106            params=None,
107            notifications_emails: List[str] = None,
108            compute_environment: str = None,
109            resume_dataset_id: str = None,
110            source_sample_ids: List[str] = None
111    ) -> str:
112        """
113        Runs this process on one or more input datasets, returns the ID of the newly created dataset.
114
115        Args:
116            name (str): Name of newly created dataset
117            project_id (str): ID of the project to run the analysis in
118            datasets (List[DataPortalDataset or str]): One or more input datasets
119             (as DataPortalDataset objects or dataset ID strings)
120            description (str): Description of newly created dataset
121            params (dict): Analysis parameters
122            notifications_emails (List[str]): Notification email address(es)
123            compute_environment (str): Name or ID of compute environment to use,
124             if blank it will run in AWS
125            resume_dataset_id (str): ID of dataset to resume from, used for caching task execution.
126             It will attempt to re-use the previous output to minimize duplicate work.
127            Note that Nextflow does not require this parameter, as it will automatically resume
128             from any previous attempts using a global cache.
129            source_sample_ids (List[str]): List of sample IDs to use as input for the analysis.
130
131        Returns:
132            dataset_id (str): ID of newly created dataset
133        """
134        if name is None:
135            raise DataPortalInputError("Must specify 'name' for run_analysis")
136        if project_id is None:
137            raise DataPortalInputError("Must specify 'project_id' for run_analysis")
138        if not datasets:
139            raise DataPortalInputError("Must specify 'datasets' for run_analysis")
140        if notifications_emails is None:
141            notifications_emails = []
142        if params is None:
143            params = {}
144
145        # Accept DataPortalDataset objects or raw ID strings
146        source_dataset_ids = [
147            ds if isinstance(ds, str) else ds.id
148            for ds in datasets
149        ]
150
151        if compute_environment:
152            compute_environment_name = compute_environment
153            compute_environments = self._client.compute_environments.list_environments_for_project(
154                project_id=project_id
155            )
156            compute_environment = next(
157                (env for env in compute_environments
158                 if env.name == compute_environment or env.id == compute_environment),
159                None
160            )
161            if compute_environment is None:
162                raise DataPortalInputError(f"Compute environment '{compute_environment_name}' not found")
163
164        resp = self._client.execution.run_analysis(
165            project_id=project_id,
166            request=RunAnalysisRequest(
167                name=name,
168                description=description,
169                process_id=self.id,
170                source_dataset_ids=source_dataset_ids,
171                params=RunAnalysisRequestParams.from_dict(params),
172                notification_emails=notifications_emails,
173                resume_dataset_id=resume_dataset_id,
174                source_sample_ids=source_sample_ids,
175                compute_environment_id=compute_environment.id if compute_environment else None
176            )
177        )
178        return resp.id
179
180
181class DataPortalProcesses(DataPortalAssets[DataPortalProcess]):
182    """Collection of DataPortalProcess objects."""
183    asset_name = "process"
class DataPortalProcess(cirro.sdk.asset.DataPortalAsset):
 13class DataPortalProcess(DataPortalAsset):
 14    """Helper functions for interacting with analysis processes."""
 15
 16    def __init__(self, process: Union[Process, ProcessDetail], client: CirroApi):
 17        """
 18        Instantiate with helper method
 19
 20        ```python
 21        from cirro import DataPortal()
 22        portal = DataPortal()
 23        process = portal.get_process_by_name("Process Name")
 24        ```
 25        """
 26        self._data = process
 27        self._client = client
 28
 29    @property
 30    def id(self) -> str:
 31        """Unique identifier"""
 32        return self._data.id
 33
 34    @property
 35    def name(self) -> str:
 36        """Readable name"""
 37        return self._data.name
 38
 39    @property
 40    def description(self) -> str:
 41        """Longer description of process"""
 42        return self._data.description
 43
 44    @property
 45    def child_process_ids(self) -> List[str]:
 46        """List of processes which can be run on the output of this process"""
 47        return self._data.child_process_ids
 48
 49    @property
 50    def executor(self) -> Executor:
 51        """INGEST, CROMWELL, or NEXTFLOW"""
 52        return self._data.executor
 53
 54    @property
 55    def category(self) -> str:
 56        """Category of process"""
 57        return self._data.category
 58
 59    @property
 60    def pipeline_type(self) -> str:
 61        """Pipeline type"""
 62        return self._data.pipeline_type
 63
 64    @property
 65    def documentation_url(self) -> str:
 66        """Documentation URL"""
 67        return self._data.documentation_url
 68
 69    @property
 70    def file_requirements_message(self) -> str:
 71        """Description of files required for INGEST processes"""
 72        return self._data.file_requirements_message
 73
 74    @property
 75    def code(self) -> PipelineCode:
 76        """Pipeline code configuration"""
 77        return self._get_detail().pipeline_code
 78
 79    @property
 80    def custom_settings(self) -> CustomPipelineSettings:
 81        """Custom settings for the process"""
 82        return self._get_detail().custom_settings
 83
 84    def _get_detail(self) -> ProcessDetail:
 85        if not isinstance(self._data, ProcessDetail):
 86            self._data = self._client.processes.get(self.id)
 87        return self._data
 88
 89    def __str__(self):
 90        return '\n'.join([
 91            f"{i.title()}: {self.__getattribute__(i)}"
 92            for i in ['name', 'id', 'description']
 93        ])
 94
 95    def get_parameter_spec(self) -> ParameterSpecification:
 96        """
 97        Gets a specification used to describe the parameters used in the process.
 98        """
 99        return self._client.processes.get_parameter_spec(self.id)
100
101    def run_analysis(
102            self,
103            name: str = None,
104            project_id: str = None,
105            datasets: list = None,
106            description: str = "",
107            params=None,
108            notifications_emails: List[str] = None,
109            compute_environment: str = None,
110            resume_dataset_id: str = None,
111            source_sample_ids: List[str] = None
112    ) -> str:
113        """
114        Runs this process on one or more input datasets, returns the ID of the newly created dataset.
115
116        Args:
117            name (str): Name of newly created dataset
118            project_id (str): ID of the project to run the analysis in
119            datasets (List[DataPortalDataset or str]): One or more input datasets
120             (as DataPortalDataset objects or dataset ID strings)
121            description (str): Description of newly created dataset
122            params (dict): Analysis parameters
123            notifications_emails (List[str]): Notification email address(es)
124            compute_environment (str): Name or ID of compute environment to use,
125             if blank it will run in AWS
126            resume_dataset_id (str): ID of dataset to resume from, used for caching task execution.
127             It will attempt to re-use the previous output to minimize duplicate work.
128            Note that Nextflow does not require this parameter, as it will automatically resume
129             from any previous attempts using a global cache.
130            source_sample_ids (List[str]): List of sample IDs to use as input for the analysis.
131
132        Returns:
133            dataset_id (str): ID of newly created dataset
134        """
135        if name is None:
136            raise DataPortalInputError("Must specify 'name' for run_analysis")
137        if project_id is None:
138            raise DataPortalInputError("Must specify 'project_id' for run_analysis")
139        if not datasets:
140            raise DataPortalInputError("Must specify 'datasets' for run_analysis")
141        if notifications_emails is None:
142            notifications_emails = []
143        if params is None:
144            params = {}
145
146        # Accept DataPortalDataset objects or raw ID strings
147        source_dataset_ids = [
148            ds if isinstance(ds, str) else ds.id
149            for ds in datasets
150        ]
151
152        if compute_environment:
153            compute_environment_name = compute_environment
154            compute_environments = self._client.compute_environments.list_environments_for_project(
155                project_id=project_id
156            )
157            compute_environment = next(
158                (env for env in compute_environments
159                 if env.name == compute_environment or env.id == compute_environment),
160                None
161            )
162            if compute_environment is None:
163                raise DataPortalInputError(f"Compute environment '{compute_environment_name}' not found")
164
165        resp = self._client.execution.run_analysis(
166            project_id=project_id,
167            request=RunAnalysisRequest(
168                name=name,
169                description=description,
170                process_id=self.id,
171                source_dataset_ids=source_dataset_ids,
172                params=RunAnalysisRequestParams.from_dict(params),
173                notification_emails=notifications_emails,
174                resume_dataset_id=resume_dataset_id,
175                source_sample_ids=source_sample_ids,
176                compute_environment_id=compute_environment.id if compute_environment else None
177            )
178        )
179        return resp.id

Helper functions for interacting with analysis processes.

DataPortalProcess( process: Union[cirro_api_client.v1.models.Process, cirro_api_client.v1.models.ProcessDetail], client: cirro.CirroApi)
16    def __init__(self, process: Union[Process, ProcessDetail], client: CirroApi):
17        """
18        Instantiate with helper method
19
20        ```python
21        from cirro import DataPortal()
22        portal = DataPortal()
23        process = portal.get_process_by_name("Process Name")
24        ```
25        """
26        self._data = process
27        self._client = client

Instantiate with helper method

from cirro import DataPortal()
portal = DataPortal()
process = portal.get_process_by_name("Process Name")
id: str
29    @property
30    def id(self) -> str:
31        """Unique identifier"""
32        return self._data.id

Unique identifier

name: str
34    @property
35    def name(self) -> str:
36        """Readable name"""
37        return self._data.name

Readable name

description: str
39    @property
40    def description(self) -> str:
41        """Longer description of process"""
42        return self._data.description

Longer description of process

child_process_ids: List[str]
44    @property
45    def child_process_ids(self) -> List[str]:
46        """List of processes which can be run on the output of this process"""
47        return self._data.child_process_ids

List of processes which can be run on the output of this process

executor: cirro_api_client.v1.models.Executor
49    @property
50    def executor(self) -> Executor:
51        """INGEST, CROMWELL, or NEXTFLOW"""
52        return self._data.executor

INGEST, CROMWELL, or NEXTFLOW

category: str
54    @property
55    def category(self) -> str:
56        """Category of process"""
57        return self._data.category

Category of process

pipeline_type: str
59    @property
60    def pipeline_type(self) -> str:
61        """Pipeline type"""
62        return self._data.pipeline_type

Pipeline type

documentation_url: str
64    @property
65    def documentation_url(self) -> str:
66        """Documentation URL"""
67        return self._data.documentation_url

Documentation URL

file_requirements_message: str
69    @property
70    def file_requirements_message(self) -> str:
71        """Description of files required for INGEST processes"""
72        return self._data.file_requirements_message

Description of files required for INGEST processes

74    @property
75    def code(self) -> PipelineCode:
76        """Pipeline code configuration"""
77        return self._get_detail().pipeline_code

Pipeline code configuration

79    @property
80    def custom_settings(self) -> CustomPipelineSettings:
81        """Custom settings for the process"""
82        return self._get_detail().custom_settings

Custom settings for the process

def get_parameter_spec(self) -> cirro.models.form_specification.ParameterSpecification:
95    def get_parameter_spec(self) -> ParameterSpecification:
96        """
97        Gets a specification used to describe the parameters used in the process.
98        """
99        return self._client.processes.get_parameter_spec(self.id)

Gets a specification used to describe the parameters used in the process.

def run_analysis( self, name: str = None, project_id: str = None, datasets: list = None, description: str = '', params=None, notifications_emails: List[str] = None, compute_environment: str = None, resume_dataset_id: str = None, source_sample_ids: List[str] = None) -> str:
101    def run_analysis(
102            self,
103            name: str = None,
104            project_id: str = None,
105            datasets: list = None,
106            description: str = "",
107            params=None,
108            notifications_emails: List[str] = None,
109            compute_environment: str = None,
110            resume_dataset_id: str = None,
111            source_sample_ids: List[str] = None
112    ) -> str:
113        """
114        Runs this process on one or more input datasets, returns the ID of the newly created dataset.
115
116        Args:
117            name (str): Name of newly created dataset
118            project_id (str): ID of the project to run the analysis in
119            datasets (List[DataPortalDataset or str]): One or more input datasets
120             (as DataPortalDataset objects or dataset ID strings)
121            description (str): Description of newly created dataset
122            params (dict): Analysis parameters
123            notifications_emails (List[str]): Notification email address(es)
124            compute_environment (str): Name or ID of compute environment to use,
125             if blank it will run in AWS
126            resume_dataset_id (str): ID of dataset to resume from, used for caching task execution.
127             It will attempt to re-use the previous output to minimize duplicate work.
128            Note that Nextflow does not require this parameter, as it will automatically resume
129             from any previous attempts using a global cache.
130            source_sample_ids (List[str]): List of sample IDs to use as input for the analysis.
131
132        Returns:
133            dataset_id (str): ID of newly created dataset
134        """
135        if name is None:
136            raise DataPortalInputError("Must specify 'name' for run_analysis")
137        if project_id is None:
138            raise DataPortalInputError("Must specify 'project_id' for run_analysis")
139        if not datasets:
140            raise DataPortalInputError("Must specify 'datasets' for run_analysis")
141        if notifications_emails is None:
142            notifications_emails = []
143        if params is None:
144            params = {}
145
146        # Accept DataPortalDataset objects or raw ID strings
147        source_dataset_ids = [
148            ds if isinstance(ds, str) else ds.id
149            for ds in datasets
150        ]
151
152        if compute_environment:
153            compute_environment_name = compute_environment
154            compute_environments = self._client.compute_environments.list_environments_for_project(
155                project_id=project_id
156            )
157            compute_environment = next(
158                (env for env in compute_environments
159                 if env.name == compute_environment or env.id == compute_environment),
160                None
161            )
162            if compute_environment is None:
163                raise DataPortalInputError(f"Compute environment '{compute_environment_name}' not found")
164
165        resp = self._client.execution.run_analysis(
166            project_id=project_id,
167            request=RunAnalysisRequest(
168                name=name,
169                description=description,
170                process_id=self.id,
171                source_dataset_ids=source_dataset_ids,
172                params=RunAnalysisRequestParams.from_dict(params),
173                notification_emails=notifications_emails,
174                resume_dataset_id=resume_dataset_id,
175                source_sample_ids=source_sample_ids,
176                compute_environment_id=compute_environment.id if compute_environment else None
177            )
178        )
179        return resp.id

Runs this process on one or more input datasets, returns the ID of the newly created dataset.

Arguments:
  • name (str): Name of newly created dataset
  • project_id (str): ID of the project to run the analysis in
  • datasets (List[DataPortalDataset or str]): One or more input datasets (as DataPortalDataset objects or dataset ID strings)
  • description (str): Description of newly created dataset
  • params (dict): Analysis parameters
  • notifications_emails (List[str]): Notification email address(es)
  • compute_environment (str): Name or ID of compute environment to use, if blank it will run in AWS
  • resume_dataset_id (str): ID of dataset to resume from, used for caching task execution. It will attempt to re-use the previous output to minimize duplicate work.
  • Note that Nextflow does not require this parameter, as it will automatically resume from any previous attempts using a global cache.
  • source_sample_ids (List[str]): List of sample IDs to use as input for the analysis.
Returns:

dataset_id (str): ID of newly created dataset

182class DataPortalProcesses(DataPortalAssets[DataPortalProcess]):
183    """Collection of DataPortalProcess objects."""
184    asset_name = "process"

Collection of DataPortalProcess objects.

asset_name = 'process'