cirro.sdk.process
1from typing import List, Union 2 3from cirro_api_client.v1.models import Process, Executor, ProcessDetail, CustomPipelineSettings, PipelineCode, \ 4 RunAnalysisRequest, RunAnalysisRequestParams 5 6from cirro.cirro_client import CirroApi 7from cirro.models.form_specification import ParameterSpecification 8from cirro.sdk.asset import DataPortalAssets, DataPortalAsset 9from cirro.sdk.exceptions import DataPortalInputError 10 11 12class DataPortalProcess(DataPortalAsset): 13 """Helper functions for interacting with analysis processes.""" 14 15 def __init__(self, process: Union[Process, ProcessDetail], client: CirroApi): 16 """ 17 Instantiate with helper method 18 19 ```python 20 from cirro import DataPortal() 21 portal = DataPortal() 22 process = portal.get_process_by_name("Process Name") 23 ``` 24 """ 25 self._data = process 26 self._client = client 27 28 @property 29 def id(self) -> str: 30 """Unique identifier""" 31 return self._data.id 32 33 @property 34 def name(self) -> str: 35 """Readable name""" 36 return self._data.name 37 38 @property 39 def description(self) -> str: 40 """Longer description of process""" 41 return self._data.description 42 43 @property 44 def child_process_ids(self) -> List[str]: 45 """List of processes which can be run on the output of this process""" 46 return self._data.child_process_ids 47 48 @property 49 def executor(self) -> Executor: 50 """INGEST, CROMWELL, or NEXTFLOW""" 51 return self._data.executor 52 53 @property 54 def category(self) -> str: 55 """Category of process""" 56 return self._data.category 57 58 @property 59 def pipeline_type(self) -> str: 60 """Pipeline type""" 61 return self._data.pipeline_type 62 63 @property 64 def documentation_url(self) -> str: 65 """Documentation URL""" 66 return self._data.documentation_url 67 68 @property 69 def file_requirements_message(self) -> str: 70 """Description of files required for INGEST processes""" 71 return self._data.file_requirements_message 72 73 @property 74 def code(self) -> PipelineCode: 75 """Pipeline code configuration""" 76 return self._get_detail().pipeline_code 77 78 @property 79 def custom_settings(self) -> CustomPipelineSettings: 80 """Custom settings for the process""" 81 return self._get_detail().custom_settings 82 83 def _get_detail(self) -> ProcessDetail: 84 if not isinstance(self._data, ProcessDetail): 85 self._data = self._client.processes.get(self.id) 86 return self._data 87 88 def __str__(self): 89 return '\n'.join([ 90 f"{i.title()}: {self.__getattribute__(i)}" 91 for i in ['name', 'id', 'description'] 92 ]) 93 94 def get_parameter_spec(self) -> ParameterSpecification: 95 """ 96 Gets a specification used to describe the parameters used in the process. 97 """ 98 return self._client.processes.get_parameter_spec(self.id) 99 100 def run_analysis( 101 self, 102 name: str = None, 103 project_id: str = None, 104 datasets: list = None, 105 description: str = "", 106 params=None, 107 notifications_emails: List[str] = None, 108 compute_environment: str = None, 109 resume_dataset_id: str = None, 110 source_sample_ids: List[str] = None 111 ) -> str: 112 """ 113 Runs this process on one or more input datasets, returns the ID of the newly created dataset. 114 115 Args: 116 name (str): Name of newly created dataset 117 project_id (str): ID of the project to run the analysis in 118 datasets (List[DataPortalDataset or str]): One or more input datasets 119 (as DataPortalDataset objects or dataset ID strings) 120 description (str): Description of newly created dataset 121 params (dict): Analysis parameters 122 notifications_emails (List[str]): Notification email address(es) 123 compute_environment (str): Name or ID of compute environment to use, 124 if blank it will run in AWS 125 resume_dataset_id (str): ID of dataset to resume from, used for caching task execution. 126 It will attempt to re-use the previous output to minimize duplicate work. 127 Note that Nextflow does not require this parameter, as it will automatically resume 128 from any previous attempts using a global cache. 129 source_sample_ids (List[str]): List of sample IDs to use as input for the analysis. 130 131 Returns: 132 dataset_id (str): ID of newly created dataset 133 """ 134 if name is None: 135 raise DataPortalInputError("Must specify 'name' for run_analysis") 136 if project_id is None: 137 raise DataPortalInputError("Must specify 'project_id' for run_analysis") 138 if not datasets: 139 raise DataPortalInputError("Must specify 'datasets' for run_analysis") 140 if notifications_emails is None: 141 notifications_emails = [] 142 if params is None: 143 params = {} 144 145 # Accept DataPortalDataset objects or raw ID strings 146 source_dataset_ids = [ 147 ds if isinstance(ds, str) else ds.id 148 for ds in datasets 149 ] 150 151 if compute_environment: 152 compute_environment_name = compute_environment 153 compute_environments = self._client.compute_environments.list_environments_for_project( 154 project_id=project_id 155 ) 156 compute_environment = next( 157 (env for env in compute_environments 158 if env.name == compute_environment or env.id == compute_environment), 159 None 160 ) 161 if compute_environment is None: 162 raise DataPortalInputError(f"Compute environment '{compute_environment_name}' not found") 163 164 resp = self._client.execution.run_analysis( 165 project_id=project_id, 166 request=RunAnalysisRequest( 167 name=name, 168 description=description, 169 process_id=self.id, 170 source_dataset_ids=source_dataset_ids, 171 params=RunAnalysisRequestParams.from_dict(params), 172 notification_emails=notifications_emails, 173 resume_dataset_id=resume_dataset_id, 174 source_sample_ids=source_sample_ids, 175 compute_environment_id=compute_environment.id if compute_environment else None 176 ) 177 ) 178 return resp.id 179 180 181class DataPortalProcesses(DataPortalAssets[DataPortalProcess]): 182 """Collection of DataPortalProcess objects.""" 183 asset_name = "process"
13class DataPortalProcess(DataPortalAsset): 14 """Helper functions for interacting with analysis processes.""" 15 16 def __init__(self, process: Union[Process, ProcessDetail], client: CirroApi): 17 """ 18 Instantiate with helper method 19 20 ```python 21 from cirro import DataPortal() 22 portal = DataPortal() 23 process = portal.get_process_by_name("Process Name") 24 ``` 25 """ 26 self._data = process 27 self._client = client 28 29 @property 30 def id(self) -> str: 31 """Unique identifier""" 32 return self._data.id 33 34 @property 35 def name(self) -> str: 36 """Readable name""" 37 return self._data.name 38 39 @property 40 def description(self) -> str: 41 """Longer description of process""" 42 return self._data.description 43 44 @property 45 def child_process_ids(self) -> List[str]: 46 """List of processes which can be run on the output of this process""" 47 return self._data.child_process_ids 48 49 @property 50 def executor(self) -> Executor: 51 """INGEST, CROMWELL, or NEXTFLOW""" 52 return self._data.executor 53 54 @property 55 def category(self) -> str: 56 """Category of process""" 57 return self._data.category 58 59 @property 60 def pipeline_type(self) -> str: 61 """Pipeline type""" 62 return self._data.pipeline_type 63 64 @property 65 def documentation_url(self) -> str: 66 """Documentation URL""" 67 return self._data.documentation_url 68 69 @property 70 def file_requirements_message(self) -> str: 71 """Description of files required for INGEST processes""" 72 return self._data.file_requirements_message 73 74 @property 75 def code(self) -> PipelineCode: 76 """Pipeline code configuration""" 77 return self._get_detail().pipeline_code 78 79 @property 80 def custom_settings(self) -> CustomPipelineSettings: 81 """Custom settings for the process""" 82 return self._get_detail().custom_settings 83 84 def _get_detail(self) -> ProcessDetail: 85 if not isinstance(self._data, ProcessDetail): 86 self._data = self._client.processes.get(self.id) 87 return self._data 88 89 def __str__(self): 90 return '\n'.join([ 91 f"{i.title()}: {self.__getattribute__(i)}" 92 for i in ['name', 'id', 'description'] 93 ]) 94 95 def get_parameter_spec(self) -> ParameterSpecification: 96 """ 97 Gets a specification used to describe the parameters used in the process. 98 """ 99 return self._client.processes.get_parameter_spec(self.id) 100 101 def run_analysis( 102 self, 103 name: str = None, 104 project_id: str = None, 105 datasets: list = None, 106 description: str = "", 107 params=None, 108 notifications_emails: List[str] = None, 109 compute_environment: str = None, 110 resume_dataset_id: str = None, 111 source_sample_ids: List[str] = None 112 ) -> str: 113 """ 114 Runs this process on one or more input datasets, returns the ID of the newly created dataset. 115 116 Args: 117 name (str): Name of newly created dataset 118 project_id (str): ID of the project to run the analysis in 119 datasets (List[DataPortalDataset or str]): One or more input datasets 120 (as DataPortalDataset objects or dataset ID strings) 121 description (str): Description of newly created dataset 122 params (dict): Analysis parameters 123 notifications_emails (List[str]): Notification email address(es) 124 compute_environment (str): Name or ID of compute environment to use, 125 if blank it will run in AWS 126 resume_dataset_id (str): ID of dataset to resume from, used for caching task execution. 127 It will attempt to re-use the previous output to minimize duplicate work. 128 Note that Nextflow does not require this parameter, as it will automatically resume 129 from any previous attempts using a global cache. 130 source_sample_ids (List[str]): List of sample IDs to use as input for the analysis. 131 132 Returns: 133 dataset_id (str): ID of newly created dataset 134 """ 135 if name is None: 136 raise DataPortalInputError("Must specify 'name' for run_analysis") 137 if project_id is None: 138 raise DataPortalInputError("Must specify 'project_id' for run_analysis") 139 if not datasets: 140 raise DataPortalInputError("Must specify 'datasets' for run_analysis") 141 if notifications_emails is None: 142 notifications_emails = [] 143 if params is None: 144 params = {} 145 146 # Accept DataPortalDataset objects or raw ID strings 147 source_dataset_ids = [ 148 ds if isinstance(ds, str) else ds.id 149 for ds in datasets 150 ] 151 152 if compute_environment: 153 compute_environment_name = compute_environment 154 compute_environments = self._client.compute_environments.list_environments_for_project( 155 project_id=project_id 156 ) 157 compute_environment = next( 158 (env for env in compute_environments 159 if env.name == compute_environment or env.id == compute_environment), 160 None 161 ) 162 if compute_environment is None: 163 raise DataPortalInputError(f"Compute environment '{compute_environment_name}' not found") 164 165 resp = self._client.execution.run_analysis( 166 project_id=project_id, 167 request=RunAnalysisRequest( 168 name=name, 169 description=description, 170 process_id=self.id, 171 source_dataset_ids=source_dataset_ids, 172 params=RunAnalysisRequestParams.from_dict(params), 173 notification_emails=notifications_emails, 174 resume_dataset_id=resume_dataset_id, 175 source_sample_ids=source_sample_ids, 176 compute_environment_id=compute_environment.id if compute_environment else None 177 ) 178 ) 179 return resp.id
Helper functions for interacting with analysis processes.
DataPortalProcess( process: Union[cirro_api_client.v1.models.Process, cirro_api_client.v1.models.ProcessDetail], client: cirro.CirroApi)
16 def __init__(self, process: Union[Process, ProcessDetail], client: CirroApi): 17 """ 18 Instantiate with helper method 19 20 ```python 21 from cirro import DataPortal() 22 portal = DataPortal() 23 process = portal.get_process_by_name("Process Name") 24 ``` 25 """ 26 self._data = process 27 self._client = client
Instantiate with helper method
from cirro import DataPortal()
portal = DataPortal()
process = portal.get_process_by_name("Process Name")
description: str
39 @property 40 def description(self) -> str: 41 """Longer description of process""" 42 return self._data.description
Longer description of process
child_process_ids: List[str]
44 @property 45 def child_process_ids(self) -> List[str]: 46 """List of processes which can be run on the output of this process""" 47 return self._data.child_process_ids
List of processes which can be run on the output of this process
executor: cirro_api_client.v1.models.Executor
49 @property 50 def executor(self) -> Executor: 51 """INGEST, CROMWELL, or NEXTFLOW""" 52 return self._data.executor
INGEST, CROMWELL, or NEXTFLOW
category: str
54 @property 55 def category(self) -> str: 56 """Category of process""" 57 return self._data.category
Category of process
pipeline_type: str
59 @property 60 def pipeline_type(self) -> str: 61 """Pipeline type""" 62 return self._data.pipeline_type
Pipeline type
documentation_url: str
64 @property 65 def documentation_url(self) -> str: 66 """Documentation URL""" 67 return self._data.documentation_url
Documentation URL
file_requirements_message: str
69 @property 70 def file_requirements_message(self) -> str: 71 """Description of files required for INGEST processes""" 72 return self._data.file_requirements_message
Description of files required for INGEST processes
74 @property 75 def code(self) -> PipelineCode: 76 """Pipeline code configuration""" 77 return self._get_detail().pipeline_code
Pipeline code configuration
custom_settings: cirro_api_client.v1.models.CustomPipelineSettings
79 @property 80 def custom_settings(self) -> CustomPipelineSettings: 81 """Custom settings for the process""" 82 return self._get_detail().custom_settings
Custom settings for the process
95 def get_parameter_spec(self) -> ParameterSpecification: 96 """ 97 Gets a specification used to describe the parameters used in the process. 98 """ 99 return self._client.processes.get_parameter_spec(self.id)
Gets a specification used to describe the parameters used in the process.
def
run_analysis( self, name: str = None, project_id: str = None, datasets: list = None, description: str = '', params=None, notifications_emails: List[str] = None, compute_environment: str = None, resume_dataset_id: str = None, source_sample_ids: List[str] = None) -> str:
101 def run_analysis( 102 self, 103 name: str = None, 104 project_id: str = None, 105 datasets: list = None, 106 description: str = "", 107 params=None, 108 notifications_emails: List[str] = None, 109 compute_environment: str = None, 110 resume_dataset_id: str = None, 111 source_sample_ids: List[str] = None 112 ) -> str: 113 """ 114 Runs this process on one or more input datasets, returns the ID of the newly created dataset. 115 116 Args: 117 name (str): Name of newly created dataset 118 project_id (str): ID of the project to run the analysis in 119 datasets (List[DataPortalDataset or str]): One or more input datasets 120 (as DataPortalDataset objects or dataset ID strings) 121 description (str): Description of newly created dataset 122 params (dict): Analysis parameters 123 notifications_emails (List[str]): Notification email address(es) 124 compute_environment (str): Name or ID of compute environment to use, 125 if blank it will run in AWS 126 resume_dataset_id (str): ID of dataset to resume from, used for caching task execution. 127 It will attempt to re-use the previous output to minimize duplicate work. 128 Note that Nextflow does not require this parameter, as it will automatically resume 129 from any previous attempts using a global cache. 130 source_sample_ids (List[str]): List of sample IDs to use as input for the analysis. 131 132 Returns: 133 dataset_id (str): ID of newly created dataset 134 """ 135 if name is None: 136 raise DataPortalInputError("Must specify 'name' for run_analysis") 137 if project_id is None: 138 raise DataPortalInputError("Must specify 'project_id' for run_analysis") 139 if not datasets: 140 raise DataPortalInputError("Must specify 'datasets' for run_analysis") 141 if notifications_emails is None: 142 notifications_emails = [] 143 if params is None: 144 params = {} 145 146 # Accept DataPortalDataset objects or raw ID strings 147 source_dataset_ids = [ 148 ds if isinstance(ds, str) else ds.id 149 for ds in datasets 150 ] 151 152 if compute_environment: 153 compute_environment_name = compute_environment 154 compute_environments = self._client.compute_environments.list_environments_for_project( 155 project_id=project_id 156 ) 157 compute_environment = next( 158 (env for env in compute_environments 159 if env.name == compute_environment or env.id == compute_environment), 160 None 161 ) 162 if compute_environment is None: 163 raise DataPortalInputError(f"Compute environment '{compute_environment_name}' not found") 164 165 resp = self._client.execution.run_analysis( 166 project_id=project_id, 167 request=RunAnalysisRequest( 168 name=name, 169 description=description, 170 process_id=self.id, 171 source_dataset_ids=source_dataset_ids, 172 params=RunAnalysisRequestParams.from_dict(params), 173 notification_emails=notifications_emails, 174 resume_dataset_id=resume_dataset_id, 175 source_sample_ids=source_sample_ids, 176 compute_environment_id=compute_environment.id if compute_environment else None 177 ) 178 ) 179 return resp.id
Runs this process on one or more input datasets, returns the ID of the newly created dataset.
Arguments:
- name (str): Name of newly created dataset
- project_id (str): ID of the project to run the analysis in
- datasets (List[DataPortalDataset or str]): One or more input datasets (as DataPortalDataset objects or dataset ID strings)
- description (str): Description of newly created dataset
- params (dict): Analysis parameters
- notifications_emails (List[str]): Notification email address(es)
- compute_environment (str): Name or ID of compute environment to use, if blank it will run in AWS
- resume_dataset_id (str): ID of dataset to resume from, used for caching task execution. It will attempt to re-use the previous output to minimize duplicate work.
- Note that Nextflow does not require this parameter, as it will automatically resume from any previous attempts using a global cache.
- source_sample_ids (List[str]): List of sample IDs to use as input for the analysis.
Returns:
dataset_id (str): ID of newly created dataset
182class DataPortalProcesses(DataPortalAssets[DataPortalProcess]): 183 """Collection of DataPortalProcess objects.""" 184 asset_name = "process"
Collection of DataPortalProcess objects.