cirro.sdk.dataset
1import datetime 2from typing import Union, List, Optional 3 4from cirro_api_client.v1.models import Dataset, DatasetDetail, RunAnalysisRequest, ProcessDetail, Status, \ 5 DatasetDetailParams, RunAnalysisRequestParams, DatasetDetailInfo, \ 6 Tag, ArtifactType 7 8from cirro.cirro_client import CirroApi 9from cirro.models.assets import DatasetAssets 10from cirro.sdk.asset import DataPortalAssets, DataPortalAsset 11from cirro.sdk.exceptions import DataPortalAssetNotFound 12from cirro.sdk.exceptions import DataPortalInputError 13from cirro.sdk.file import DataPortalFile, DataPortalFiles 14from cirro.sdk.helpers import parse_process_name_or_id 15from cirro.sdk.process import DataPortalProcess 16 17 18class DataPortalDataset(DataPortalAsset): 19 """ 20 Datasets in the Data Portal are collections of files which have 21 either been uploaded directly, or which have been output by 22 an analysis pipeline or notebook. 23 """ 24 25 def __init__(self, dataset: Union[Dataset, DatasetDetail], client: CirroApi): 26 """ 27 Instantiate a dataset object 28 29 Should be invoked from a top-level constructor, for example: 30 31 ```python 32 from cirro import DataPortal() 33 portal = DataPortal() 34 dataset = portal.get_dataset( 35 project="id-or-name-of-project", 36 dataset="id-or-name-of-dataset" 37 ) 38 ``` 39 40 """ 41 assert dataset.project_id is not None, "Must provide dataset with project_id attribute" 42 self._data = dataset 43 self._assets: Optional[DatasetAssets] = None 44 self._client = client 45 46 @property 47 def id(self) -> str: 48 """Unique identifier for the dataset""" 49 return self._data.id 50 51 @property 52 def name(self) -> str: 53 """Editible name for the dataset""" 54 return self._data.name 55 56 @property 57 def description(self) -> str: 58 """Longer name for the dataset""" 59 return self._data.description 60 61 @property 62 def process_id(self) -> str: 63 """Unique ID of process used to create the dataset""" 64 return self._data.process_id 65 66 @property 67 def process(self) -> ProcessDetail: 68 """ 69 Object representing the process used to create the dataset 70 """ 71 return self._client.processes.get(self.process_id) 72 73 @property 74 def project_id(self) -> str: 75 """ID of the project containing the dataset""" 76 return self._data.project_id 77 78 @property 79 def status(self) -> Status: 80 """ 81 Status of the dataset 82 """ 83 return self._data.status 84 85 @property 86 def source_dataset_ids(self) -> List[str]: 87 """IDs of the datasets used as sources for this dataset (if any)""" 88 return self._data.source_dataset_ids 89 90 @property 91 def source_datasets(self) -> List['DataPortalDataset']: 92 """ 93 Objects representing the datasets used as sources for this dataset (if any) 94 """ 95 return [ 96 DataPortalDataset( 97 dataset=self._client.datasets.get(project_id=self.project_id, dataset_id=dataset_id), 98 client=self._client 99 ) 100 for dataset_id in self.source_dataset_ids 101 ] 102 103 @property 104 def params(self) -> DatasetDetailParams: 105 """ 106 Parameters used to generate the dataset 107 """ 108 return self._get_detail().params 109 110 @property 111 def info(self) -> DatasetDetailInfo: 112 """ 113 Detailed information about the dataset 114 """ 115 return self._get_detail().info 116 117 @property 118 def tags(self) -> List[Tag]: 119 """ 120 Tags applied to the dataset 121 """ 122 return self._data.tags 123 124 @property 125 def created_by(self) -> str: 126 """User who created the dataset""" 127 return self._data.created_by 128 129 @property 130 def created_at(self) -> datetime.datetime: 131 """Timestamp of dataset creation""" 132 return self._data.created_at 133 134 def _get_detail(self): 135 if not isinstance(self._data, DatasetDetail): 136 self._data = self._client.datasets.get(project_id=self.project_id, dataset_id=self.id) 137 return self._data 138 139 def _get_assets(self): 140 if not self._assets: 141 self._assets = self._client.datasets.get_assets_listing( 142 project_id=self.project_id, 143 dataset_id=self.id 144 ) 145 return self._assets 146 147 def __str__(self): 148 return '\n'.join([ 149 f"{i.title()}: {self.__getattribute__(i)}" 150 for i in ['name', 'id', 'description', 'status'] 151 ]) 152 153 def get_file(self, relative_path: str) -> DataPortalFile: 154 """ 155 Get a file from the dataset using its relative path. 156 157 Args: 158 relative_path (str): Relative path of file within the dataset 159 160 Returns: 161 `from cirro.sdk.file import DataPortalFile` 162 """ 163 164 # Get the list of files in this dataset 165 files = self.list_files() 166 167 # Try getting the file using the relative path provided by the user 168 try: 169 return files.get_by_id(relative_path) 170 except DataPortalAssetNotFound: 171 # Try getting the file with the 'data/' prefix prepended 172 try: 173 return files.get_by_id("data/" + relative_path) 174 except DataPortalAssetNotFound: 175 # If not found, raise the exception using the string provided 176 # by the user, not the data/ prepended version (which may be 177 # confusing to the user) 178 msg = '\n'.join([f"No file found with path '{relative_path}'."]) 179 raise DataPortalAssetNotFound(msg) 180 181 def list_files(self) -> DataPortalFiles: 182 """ 183 Return the list of files which make up the dataset. 184 """ 185 files = self._get_assets().files 186 return DataPortalFiles( 187 [ 188 DataPortalFile(file=file, client=self._client) 189 for file in files 190 ] 191 ) 192 193 def get_artifact(self, artifact_type: ArtifactType) -> DataPortalFile: 194 """ 195 Get the artifact of a particular type from the dataset 196 """ 197 artifacts = self._get_assets().artifacts 198 artifact = next((a for a in artifacts if a.artifact_type == artifact_type), None) 199 if artifact is None: 200 raise DataPortalAssetNotFound(f"No artifact found with type '{artifact_type}'") 201 return DataPortalFile(file=artifact.file, client=self._client) 202 203 def list_artifacts(self) -> List[DataPortalFile]: 204 """ 205 Return the list of artifacts associated with the dataset 206 207 An artifact may be something generated as part of the analysis or other process. 208 See `cirro_api_client.v1.models.ArtifactType` for the list of possible artifact types. 209 210 """ 211 artifacts = self._get_assets().artifacts 212 return DataPortalFiles( 213 [ 214 DataPortalFile(file=artifact.file, client=self._client) 215 for artifact in artifacts 216 ] 217 ) 218 219 def download_files(self, download_location: str = None) -> None: 220 """ 221 Download all the files from the dataset to a local directory. 222 223 Args: 224 download_location (str): Path to local directory 225 """ 226 227 # Alias for internal method 228 self.list_files().download(download_location) 229 230 def run_analysis( 231 self, 232 name: str = None, 233 description: str = "", 234 process: Union[DataPortalProcess, str] = None, 235 params=None, 236 notifications_emails: List[str] = None, 237 compute_environment: str = None, 238 resume_dataset_id: str = None 239 ) -> str: 240 """ 241 Runs an analysis on a dataset, returns the ID of the newly created dataset. 242 243 The process can be provided as either a DataPortalProcess object, 244 or a string which corresponds to the name or ID of the process. 245 246 Args: 247 name (str): Name of newly created dataset 248 description (str): Description of newly created dataset 249 process (DataPortalProcess or str): Process to run 250 params (dict): Analysis parameters 251 notifications_emails (List[str]): Notification email address(es) 252 compute_environment (str): Name or ID of compute environment to use, 253 if blank it will run in AWS 254 resume_dataset_id (str): ID of dataset to resume from, used for caching task execution. 255 It will attempt to re-use the previous output to minimize duplicate work 256 257 Returns: 258 dataset_id (str): ID of newly created dataset 259 """ 260 if name is None: 261 raise DataPortalInputError("Must specify 'name' for run_analysis") 262 if process is None: 263 raise DataPortalInputError("Must specify 'process' for run_analysis") 264 if notifications_emails is None: 265 notifications_emails = [] 266 if params is None: 267 params = {} 268 269 # If the process is a string, try to parse it as a process name or ID 270 process = parse_process_name_or_id(process, self._client) 271 272 if compute_environment: 273 compute_environments = self._client.compute_environments.list_environments_for_project( 274 project_id=self.project_id 275 ) 276 compute_environment = next( 277 (env for env in compute_environments 278 if env.name == compute_environment or env.id == compute_environment), 279 None 280 ) 281 if compute_environment is None: 282 raise DataPortalInputError(f"Compute environment '{compute_environment}' not found") 283 284 resp = self._client.execution.run_analysis( 285 project_id=self.project_id, 286 request=RunAnalysisRequest( 287 name=name, 288 description=description, 289 process_id=process.id, 290 source_dataset_ids=[self.id], 291 params=RunAnalysisRequestParams.from_dict(params), 292 notification_emails=notifications_emails, 293 resume_dataset_id=resume_dataset_id, 294 compute_environment_id=compute_environment.id if compute_environment else None 295 ) 296 ) 297 return resp.id 298 299 300class DataPortalDatasets(DataPortalAssets[DataPortalDataset]): 301 """Collection of multiple DataPortalDataset objects.""" 302 asset_name = "dataset"
19class DataPortalDataset(DataPortalAsset): 20 """ 21 Datasets in the Data Portal are collections of files which have 22 either been uploaded directly, or which have been output by 23 an analysis pipeline or notebook. 24 """ 25 26 def __init__(self, dataset: Union[Dataset, DatasetDetail], client: CirroApi): 27 """ 28 Instantiate a dataset object 29 30 Should be invoked from a top-level constructor, for example: 31 32 ```python 33 from cirro import DataPortal() 34 portal = DataPortal() 35 dataset = portal.get_dataset( 36 project="id-or-name-of-project", 37 dataset="id-or-name-of-dataset" 38 ) 39 ``` 40 41 """ 42 assert dataset.project_id is not None, "Must provide dataset with project_id attribute" 43 self._data = dataset 44 self._assets: Optional[DatasetAssets] = None 45 self._client = client 46 47 @property 48 def id(self) -> str: 49 """Unique identifier for the dataset""" 50 return self._data.id 51 52 @property 53 def name(self) -> str: 54 """Editible name for the dataset""" 55 return self._data.name 56 57 @property 58 def description(self) -> str: 59 """Longer name for the dataset""" 60 return self._data.description 61 62 @property 63 def process_id(self) -> str: 64 """Unique ID of process used to create the dataset""" 65 return self._data.process_id 66 67 @property 68 def process(self) -> ProcessDetail: 69 """ 70 Object representing the process used to create the dataset 71 """ 72 return self._client.processes.get(self.process_id) 73 74 @property 75 def project_id(self) -> str: 76 """ID of the project containing the dataset""" 77 return self._data.project_id 78 79 @property 80 def status(self) -> Status: 81 """ 82 Status of the dataset 83 """ 84 return self._data.status 85 86 @property 87 def source_dataset_ids(self) -> List[str]: 88 """IDs of the datasets used as sources for this dataset (if any)""" 89 return self._data.source_dataset_ids 90 91 @property 92 def source_datasets(self) -> List['DataPortalDataset']: 93 """ 94 Objects representing the datasets used as sources for this dataset (if any) 95 """ 96 return [ 97 DataPortalDataset( 98 dataset=self._client.datasets.get(project_id=self.project_id, dataset_id=dataset_id), 99 client=self._client 100 ) 101 for dataset_id in self.source_dataset_ids 102 ] 103 104 @property 105 def params(self) -> DatasetDetailParams: 106 """ 107 Parameters used to generate the dataset 108 """ 109 return self._get_detail().params 110 111 @property 112 def info(self) -> DatasetDetailInfo: 113 """ 114 Detailed information about the dataset 115 """ 116 return self._get_detail().info 117 118 @property 119 def tags(self) -> List[Tag]: 120 """ 121 Tags applied to the dataset 122 """ 123 return self._data.tags 124 125 @property 126 def created_by(self) -> str: 127 """User who created the dataset""" 128 return self._data.created_by 129 130 @property 131 def created_at(self) -> datetime.datetime: 132 """Timestamp of dataset creation""" 133 return self._data.created_at 134 135 def _get_detail(self): 136 if not isinstance(self._data, DatasetDetail): 137 self._data = self._client.datasets.get(project_id=self.project_id, dataset_id=self.id) 138 return self._data 139 140 def _get_assets(self): 141 if not self._assets: 142 self._assets = self._client.datasets.get_assets_listing( 143 project_id=self.project_id, 144 dataset_id=self.id 145 ) 146 return self._assets 147 148 def __str__(self): 149 return '\n'.join([ 150 f"{i.title()}: {self.__getattribute__(i)}" 151 for i in ['name', 'id', 'description', 'status'] 152 ]) 153 154 def get_file(self, relative_path: str) -> DataPortalFile: 155 """ 156 Get a file from the dataset using its relative path. 157 158 Args: 159 relative_path (str): Relative path of file within the dataset 160 161 Returns: 162 `from cirro.sdk.file import DataPortalFile` 163 """ 164 165 # Get the list of files in this dataset 166 files = self.list_files() 167 168 # Try getting the file using the relative path provided by the user 169 try: 170 return files.get_by_id(relative_path) 171 except DataPortalAssetNotFound: 172 # Try getting the file with the 'data/' prefix prepended 173 try: 174 return files.get_by_id("data/" + relative_path) 175 except DataPortalAssetNotFound: 176 # If not found, raise the exception using the string provided 177 # by the user, not the data/ prepended version (which may be 178 # confusing to the user) 179 msg = '\n'.join([f"No file found with path '{relative_path}'."]) 180 raise DataPortalAssetNotFound(msg) 181 182 def list_files(self) -> DataPortalFiles: 183 """ 184 Return the list of files which make up the dataset. 185 """ 186 files = self._get_assets().files 187 return DataPortalFiles( 188 [ 189 DataPortalFile(file=file, client=self._client) 190 for file in files 191 ] 192 ) 193 194 def get_artifact(self, artifact_type: ArtifactType) -> DataPortalFile: 195 """ 196 Get the artifact of a particular type from the dataset 197 """ 198 artifacts = self._get_assets().artifacts 199 artifact = next((a for a in artifacts if a.artifact_type == artifact_type), None) 200 if artifact is None: 201 raise DataPortalAssetNotFound(f"No artifact found with type '{artifact_type}'") 202 return DataPortalFile(file=artifact.file, client=self._client) 203 204 def list_artifacts(self) -> List[DataPortalFile]: 205 """ 206 Return the list of artifacts associated with the dataset 207 208 An artifact may be something generated as part of the analysis or other process. 209 See `cirro_api_client.v1.models.ArtifactType` for the list of possible artifact types. 210 211 """ 212 artifacts = self._get_assets().artifacts 213 return DataPortalFiles( 214 [ 215 DataPortalFile(file=artifact.file, client=self._client) 216 for artifact in artifacts 217 ] 218 ) 219 220 def download_files(self, download_location: str = None) -> None: 221 """ 222 Download all the files from the dataset to a local directory. 223 224 Args: 225 download_location (str): Path to local directory 226 """ 227 228 # Alias for internal method 229 self.list_files().download(download_location) 230 231 def run_analysis( 232 self, 233 name: str = None, 234 description: str = "", 235 process: Union[DataPortalProcess, str] = None, 236 params=None, 237 notifications_emails: List[str] = None, 238 compute_environment: str = None, 239 resume_dataset_id: str = None 240 ) -> str: 241 """ 242 Runs an analysis on a dataset, returns the ID of the newly created dataset. 243 244 The process can be provided as either a DataPortalProcess object, 245 or a string which corresponds to the name or ID of the process. 246 247 Args: 248 name (str): Name of newly created dataset 249 description (str): Description of newly created dataset 250 process (DataPortalProcess or str): Process to run 251 params (dict): Analysis parameters 252 notifications_emails (List[str]): Notification email address(es) 253 compute_environment (str): Name or ID of compute environment to use, 254 if blank it will run in AWS 255 resume_dataset_id (str): ID of dataset to resume from, used for caching task execution. 256 It will attempt to re-use the previous output to minimize duplicate work 257 258 Returns: 259 dataset_id (str): ID of newly created dataset 260 """ 261 if name is None: 262 raise DataPortalInputError("Must specify 'name' for run_analysis") 263 if process is None: 264 raise DataPortalInputError("Must specify 'process' for run_analysis") 265 if notifications_emails is None: 266 notifications_emails = [] 267 if params is None: 268 params = {} 269 270 # If the process is a string, try to parse it as a process name or ID 271 process = parse_process_name_or_id(process, self._client) 272 273 if compute_environment: 274 compute_environments = self._client.compute_environments.list_environments_for_project( 275 project_id=self.project_id 276 ) 277 compute_environment = next( 278 (env for env in compute_environments 279 if env.name == compute_environment or env.id == compute_environment), 280 None 281 ) 282 if compute_environment is None: 283 raise DataPortalInputError(f"Compute environment '{compute_environment}' not found") 284 285 resp = self._client.execution.run_analysis( 286 project_id=self.project_id, 287 request=RunAnalysisRequest( 288 name=name, 289 description=description, 290 process_id=process.id, 291 source_dataset_ids=[self.id], 292 params=RunAnalysisRequestParams.from_dict(params), 293 notification_emails=notifications_emails, 294 resume_dataset_id=resume_dataset_id, 295 compute_environment_id=compute_environment.id if compute_environment else None 296 ) 297 ) 298 return resp.id
Datasets in the Data Portal are collections of files which have either been uploaded directly, or which have been output by an analysis pipeline or notebook.
26 def __init__(self, dataset: Union[Dataset, DatasetDetail], client: CirroApi): 27 """ 28 Instantiate a dataset object 29 30 Should be invoked from a top-level constructor, for example: 31 32 ```python 33 from cirro import DataPortal() 34 portal = DataPortal() 35 dataset = portal.get_dataset( 36 project="id-or-name-of-project", 37 dataset="id-or-name-of-dataset" 38 ) 39 ``` 40 41 """ 42 assert dataset.project_id is not None, "Must provide dataset with project_id attribute" 43 self._data = dataset 44 self._assets: Optional[DatasetAssets] = None 45 self._client = client
Instantiate a dataset object
Should be invoked from a top-level constructor, for example:
from cirro import DataPortal()
portal = DataPortal()
dataset = portal.get_dataset(
project="id-or-name-of-project",
dataset="id-or-name-of-dataset"
)
47 @property 48 def id(self) -> str: 49 """Unique identifier for the dataset""" 50 return self._data.id
Unique identifier for the dataset
52 @property 53 def name(self) -> str: 54 """Editible name for the dataset""" 55 return self._data.name
Editible name for the dataset
57 @property 58 def description(self) -> str: 59 """Longer name for the dataset""" 60 return self._data.description
Longer name for the dataset
62 @property 63 def process_id(self) -> str: 64 """Unique ID of process used to create the dataset""" 65 return self._data.process_id
Unique ID of process used to create the dataset
67 @property 68 def process(self) -> ProcessDetail: 69 """ 70 Object representing the process used to create the dataset 71 """ 72 return self._client.processes.get(self.process_id)
Object representing the process used to create the dataset
74 @property 75 def project_id(self) -> str: 76 """ID of the project containing the dataset""" 77 return self._data.project_id
ID of the project containing the dataset
79 @property 80 def status(self) -> Status: 81 """ 82 Status of the dataset 83 """ 84 return self._data.status
Status of the dataset
86 @property 87 def source_dataset_ids(self) -> List[str]: 88 """IDs of the datasets used as sources for this dataset (if any)""" 89 return self._data.source_dataset_ids
IDs of the datasets used as sources for this dataset (if any)
91 @property 92 def source_datasets(self) -> List['DataPortalDataset']: 93 """ 94 Objects representing the datasets used as sources for this dataset (if any) 95 """ 96 return [ 97 DataPortalDataset( 98 dataset=self._client.datasets.get(project_id=self.project_id, dataset_id=dataset_id), 99 client=self._client 100 ) 101 for dataset_id in self.source_dataset_ids 102 ]
Objects representing the datasets used as sources for this dataset (if any)
104 @property 105 def params(self) -> DatasetDetailParams: 106 """ 107 Parameters used to generate the dataset 108 """ 109 return self._get_detail().params
Parameters used to generate the dataset
111 @property 112 def info(self) -> DatasetDetailInfo: 113 """ 114 Detailed information about the dataset 115 """ 116 return self._get_detail().info
Detailed information about the dataset
125 @property 126 def created_by(self) -> str: 127 """User who created the dataset""" 128 return self._data.created_by
User who created the dataset
130 @property 131 def created_at(self) -> datetime.datetime: 132 """Timestamp of dataset creation""" 133 return self._data.created_at
Timestamp of dataset creation
154 def get_file(self, relative_path: str) -> DataPortalFile: 155 """ 156 Get a file from the dataset using its relative path. 157 158 Args: 159 relative_path (str): Relative path of file within the dataset 160 161 Returns: 162 `from cirro.sdk.file import DataPortalFile` 163 """ 164 165 # Get the list of files in this dataset 166 files = self.list_files() 167 168 # Try getting the file using the relative path provided by the user 169 try: 170 return files.get_by_id(relative_path) 171 except DataPortalAssetNotFound: 172 # Try getting the file with the 'data/' prefix prepended 173 try: 174 return files.get_by_id("data/" + relative_path) 175 except DataPortalAssetNotFound: 176 # If not found, raise the exception using the string provided 177 # by the user, not the data/ prepended version (which may be 178 # confusing to the user) 179 msg = '\n'.join([f"No file found with path '{relative_path}'."]) 180 raise DataPortalAssetNotFound(msg)
Get a file from the dataset using its relative path.
Arguments:
- relative_path (str): Relative path of file within the dataset
Returns:
from cirro.sdk.file import DataPortalFile
182 def list_files(self) -> DataPortalFiles: 183 """ 184 Return the list of files which make up the dataset. 185 """ 186 files = self._get_assets().files 187 return DataPortalFiles( 188 [ 189 DataPortalFile(file=file, client=self._client) 190 for file in files 191 ] 192 )
Return the list of files which make up the dataset.
194 def get_artifact(self, artifact_type: ArtifactType) -> DataPortalFile: 195 """ 196 Get the artifact of a particular type from the dataset 197 """ 198 artifacts = self._get_assets().artifacts 199 artifact = next((a for a in artifacts if a.artifact_type == artifact_type), None) 200 if artifact is None: 201 raise DataPortalAssetNotFound(f"No artifact found with type '{artifact_type}'") 202 return DataPortalFile(file=artifact.file, client=self._client)
Get the artifact of a particular type from the dataset
204 def list_artifacts(self) -> List[DataPortalFile]: 205 """ 206 Return the list of artifacts associated with the dataset 207 208 An artifact may be something generated as part of the analysis or other process. 209 See `cirro_api_client.v1.models.ArtifactType` for the list of possible artifact types. 210 211 """ 212 artifacts = self._get_assets().artifacts 213 return DataPortalFiles( 214 [ 215 DataPortalFile(file=artifact.file, client=self._client) 216 for artifact in artifacts 217 ] 218 )
Return the list of artifacts associated with the dataset
An artifact may be something generated as part of the analysis or other process.
See cirro_api_client.v1.models.ArtifactType
for the list of possible artifact types.
220 def download_files(self, download_location: str = None) -> None: 221 """ 222 Download all the files from the dataset to a local directory. 223 224 Args: 225 download_location (str): Path to local directory 226 """ 227 228 # Alias for internal method 229 self.list_files().download(download_location)
Download all the files from the dataset to a local directory.
Arguments:
- download_location (str): Path to local directory
231 def run_analysis( 232 self, 233 name: str = None, 234 description: str = "", 235 process: Union[DataPortalProcess, str] = None, 236 params=None, 237 notifications_emails: List[str] = None, 238 compute_environment: str = None, 239 resume_dataset_id: str = None 240 ) -> str: 241 """ 242 Runs an analysis on a dataset, returns the ID of the newly created dataset. 243 244 The process can be provided as either a DataPortalProcess object, 245 or a string which corresponds to the name or ID of the process. 246 247 Args: 248 name (str): Name of newly created dataset 249 description (str): Description of newly created dataset 250 process (DataPortalProcess or str): Process to run 251 params (dict): Analysis parameters 252 notifications_emails (List[str]): Notification email address(es) 253 compute_environment (str): Name or ID of compute environment to use, 254 if blank it will run in AWS 255 resume_dataset_id (str): ID of dataset to resume from, used for caching task execution. 256 It will attempt to re-use the previous output to minimize duplicate work 257 258 Returns: 259 dataset_id (str): ID of newly created dataset 260 """ 261 if name is None: 262 raise DataPortalInputError("Must specify 'name' for run_analysis") 263 if process is None: 264 raise DataPortalInputError("Must specify 'process' for run_analysis") 265 if notifications_emails is None: 266 notifications_emails = [] 267 if params is None: 268 params = {} 269 270 # If the process is a string, try to parse it as a process name or ID 271 process = parse_process_name_or_id(process, self._client) 272 273 if compute_environment: 274 compute_environments = self._client.compute_environments.list_environments_for_project( 275 project_id=self.project_id 276 ) 277 compute_environment = next( 278 (env for env in compute_environments 279 if env.name == compute_environment or env.id == compute_environment), 280 None 281 ) 282 if compute_environment is None: 283 raise DataPortalInputError(f"Compute environment '{compute_environment}' not found") 284 285 resp = self._client.execution.run_analysis( 286 project_id=self.project_id, 287 request=RunAnalysisRequest( 288 name=name, 289 description=description, 290 process_id=process.id, 291 source_dataset_ids=[self.id], 292 params=RunAnalysisRequestParams.from_dict(params), 293 notification_emails=notifications_emails, 294 resume_dataset_id=resume_dataset_id, 295 compute_environment_id=compute_environment.id if compute_environment else None 296 ) 297 ) 298 return resp.id
Runs an analysis on a dataset, returns the ID of the newly created dataset.
The process can be provided as either a DataPortalProcess object, or a string which corresponds to the name or ID of the process.
Arguments:
- name (str): Name of newly created dataset
- description (str): Description of newly created dataset
- process (DataPortalProcess or str): Process to run
- params (dict): Analysis parameters
- notifications_emails (List[str]): Notification email address(es)
- compute_environment (str): Name or ID of compute environment to use, if blank it will run in AWS
- resume_dataset_id (str): ID of dataset to resume from, used for caching task execution. It will attempt to re-use the previous output to minimize duplicate work
Returns:
dataset_id (str): ID of newly created dataset
301class DataPortalDatasets(DataPortalAssets[DataPortalDataset]): 302 """Collection of multiple DataPortalDataset objects.""" 303 asset_name = "dataset"
Collection of multiple DataPortalDataset objects.