cirro.sdk.dataset
1import datetime 2from pathlib import Path 3from typing import Union, List, Optional 4 5from cirro_api_client.v1.api.processes import validate_file_requirements 6from cirro_api_client.v1.models import Dataset, DatasetDetail, RunAnalysisRequest, ProcessDetail, Status, \ 7 RunAnalysisRequestParams, Tag, ArtifactType, NamedItem, Executor, ValidateFileRequirementsRequest 8 9from cirro.cirro_client import CirroApi 10from cirro.models.assets import DatasetAssets 11from cirro.models.file import PathLike 12from cirro.sdk.asset import DataPortalAssets, DataPortalAsset 13from cirro.sdk.exceptions import DataPortalAssetNotFound 14from cirro.sdk.exceptions import DataPortalInputError 15from cirro.sdk.file import DataPortalFile, DataPortalFiles 16from cirro.sdk.helpers import parse_process_name_or_id 17from cirro.sdk.process import DataPortalProcess 18 19 20class DataPortalDataset(DataPortalAsset): 21 """ 22 Datasets in the Data Portal are collections of files which have 23 either been uploaded directly, or which have been output by 24 an analysis pipeline or notebook. 25 """ 26 27 def __init__(self, dataset: Union[Dataset, DatasetDetail], client: CirroApi): 28 """ 29 Instantiate a dataset object 30 31 Should be invoked from a top-level constructor, for example: 32 33 ```python 34 from cirro import DataPortal() 35 portal = DataPortal() 36 dataset = portal.get_dataset( 37 project="id-or-name-of-project", 38 dataset="id-or-name-of-dataset" 39 ) 40 ``` 41 42 """ 43 assert dataset.project_id is not None, "Must provide dataset with project_id attribute" 44 self._data = dataset 45 self._assets: Optional[DatasetAssets] = None 46 self._client = client 47 48 @property 49 def id(self) -> str: 50 """Unique identifier for the dataset""" 51 return self._data.id 52 53 @property 54 def name(self) -> str: 55 """Editable name for the dataset""" 56 return self._data.name 57 58 @property 59 def description(self) -> str: 60 """Longer name for the dataset""" 61 return self._data.description 62 63 @property 64 def process_id(self) -> str: 65 """Unique ID of process used to create the dataset""" 66 return self._data.process_id 67 68 @property 69 def process(self) -> ProcessDetail: 70 """ 71 Object representing the process used to create the dataset 72 """ 73 return self._client.processes.get(self.process_id) 74 75 @property 76 def project_id(self) -> str: 77 """ID of the project containing the dataset""" 78 return self._data.project_id 79 80 @property 81 def status(self) -> Status: 82 """ 83 Status of the dataset 84 """ 85 return self._data.status 86 87 @property 88 def source_dataset_ids(self) -> List[str]: 89 """IDs of the datasets used as sources for this dataset (if any)""" 90 return self._data.source_dataset_ids 91 92 @property 93 def source_datasets(self) -> List['DataPortalDataset']: 94 """ 95 Objects representing the datasets used as sources for this dataset (if any) 96 """ 97 return [ 98 DataPortalDataset( 99 dataset=self._client.datasets.get(project_id=self.project_id, dataset_id=dataset_id), 100 client=self._client 101 ) 102 for dataset_id in self.source_dataset_ids 103 ] 104 105 @property 106 def params(self) -> dict: 107 """ 108 Parameters used to generate the dataset 109 """ 110 return self._get_detail().params.to_dict() 111 112 @property 113 def info(self) -> dict: 114 """ 115 Extra information about the dataset 116 """ 117 return self._get_detail().info.to_dict() 118 119 @property 120 def tags(self) -> List[Tag]: 121 """ 122 Tags applied to the dataset 123 """ 124 return self._data.tags 125 126 @property 127 def share(self) -> Optional[NamedItem]: 128 """ 129 Share associated with the dataset, if any. 130 """ 131 return self._get_detail().share 132 133 @property 134 def created_by(self) -> str: 135 """User who created the dataset""" 136 return self._data.created_by 137 138 @property 139 def created_at(self) -> datetime.datetime: 140 """Timestamp of dataset creation""" 141 return self._data.created_at 142 143 def _get_detail(self): 144 if not isinstance(self._data, DatasetDetail): 145 self._data = self._client.datasets.get(project_id=self.project_id, dataset_id=self.id) 146 return self._data 147 148 def _get_assets(self): 149 if not self._assets: 150 self._assets = self._client.datasets.get_assets_listing( 151 project_id=self.project_id, 152 dataset_id=self.id 153 ) 154 return self._assets 155 156 def __str__(self): 157 return '\n'.join([ 158 f"{i.title()}: {self.__getattribute__(i)}" 159 for i in ['name', 'id', 'description', 'status'] 160 ]) 161 162 def get_file(self, relative_path: str) -> DataPortalFile: 163 """ 164 Get a file from the dataset using its relative path. 165 166 Args: 167 relative_path (str): Relative path of file within the dataset 168 169 Returns: 170 `from cirro.sdk.file import DataPortalFile` 171 """ 172 173 # Get the list of files in this dataset 174 files = self.list_files() 175 176 # Try getting the file using the relative path provided by the user 177 try: 178 return files.get_by_id(relative_path) 179 except DataPortalAssetNotFound: 180 # Try getting the file with the 'data/' prefix prepended 181 try: 182 return files.get_by_id("data/" + relative_path) 183 except DataPortalAssetNotFound: 184 # If not found, raise the exception using the string provided 185 # by the user, not the data/ prepended version (which may be 186 # confusing to the user) 187 msg = '\n'.join([f"No file found with path '{relative_path}'."]) 188 raise DataPortalAssetNotFound(msg) 189 190 def list_files(self) -> DataPortalFiles: 191 """ 192 Return the list of files which make up the dataset. 193 """ 194 files = self._get_assets().files 195 return DataPortalFiles( 196 [ 197 DataPortalFile(file=file, client=self._client) 198 for file in files 199 ] 200 ) 201 202 def get_artifact(self, artifact_type: ArtifactType) -> DataPortalFile: 203 """ 204 Get the artifact of a particular type from the dataset 205 """ 206 artifacts = self._get_assets().artifacts 207 artifact = next((a for a in artifacts if a.artifact_type == artifact_type), None) 208 if artifact is None: 209 raise DataPortalAssetNotFound(f"No artifact found with type '{artifact_type}'") 210 return DataPortalFile(file=artifact.file, client=self._client) 211 212 def list_artifacts(self) -> List[DataPortalFile]: 213 """ 214 Return the list of artifacts associated with the dataset 215 216 An artifact may be something generated as part of the analysis or other process. 217 See `cirro_api_client.v1.models.ArtifactType` for the list of possible artifact types. 218 219 """ 220 artifacts = self._get_assets().artifacts 221 return DataPortalFiles( 222 [ 223 DataPortalFile(file=artifact.file, client=self._client) 224 for artifact in artifacts 225 ] 226 ) 227 228 def download_files(self, download_location: str = None) -> None: 229 """ 230 Download all the files from the dataset to a local directory. 231 232 Args: 233 download_location (str): Path to local directory 234 """ 235 236 # Alias for internal method 237 self.list_files().download(download_location) 238 239 def run_analysis( 240 self, 241 name: str = None, 242 description: str = "", 243 process: Union[DataPortalProcess, str] = None, 244 params=None, 245 notifications_emails: List[str] = None, 246 compute_environment: str = None, 247 resume_dataset_id: str = None 248 ) -> str: 249 """ 250 Runs an analysis on a dataset, returns the ID of the newly created dataset. 251 252 The process can be provided as either a DataPortalProcess object, 253 or a string which corresponds to the name or ID of the process. 254 255 Args: 256 name (str): Name of newly created dataset 257 description (str): Description of newly created dataset 258 process (DataPortalProcess or str): Process to run 259 params (dict): Analysis parameters 260 notifications_emails (List[str]): Notification email address(es) 261 compute_environment (str): Name or ID of compute environment to use, 262 if blank it will run in AWS 263 resume_dataset_id (str): ID of dataset to resume from, used for caching task execution. 264 It will attempt to re-use the previous output to minimize duplicate work 265 266 Returns: 267 dataset_id (str): ID of newly created dataset 268 """ 269 if name is None: 270 raise DataPortalInputError("Must specify 'name' for run_analysis") 271 if process is None: 272 raise DataPortalInputError("Must specify 'process' for run_analysis") 273 if notifications_emails is None: 274 notifications_emails = [] 275 if params is None: 276 params = {} 277 278 # If the process is a string, try to parse it as a process name or ID 279 process = parse_process_name_or_id(process, self._client) 280 281 if compute_environment: 282 compute_environments = self._client.compute_environments.list_environments_for_project( 283 project_id=self.project_id 284 ) 285 compute_environment = next( 286 (env for env in compute_environments 287 if env.name == compute_environment or env.id == compute_environment), 288 None 289 ) 290 if compute_environment is None: 291 raise DataPortalInputError(f"Compute environment '{compute_environment}' not found") 292 293 resp = self._client.execution.run_analysis( 294 project_id=self.project_id, 295 request=RunAnalysisRequest( 296 name=name, 297 description=description, 298 process_id=process.id, 299 source_dataset_ids=[self.id], 300 params=RunAnalysisRequestParams.from_dict(params), 301 notification_emails=notifications_emails, 302 resume_dataset_id=resume_dataset_id, 303 compute_environment_id=compute_environment.id if compute_environment else None 304 ) 305 ) 306 return resp.id 307 308 def update_samplesheet(self, 309 contents: str = None, 310 file_path: PathLike = None): 311 """ 312 Updates the samplesheet metadata of a dataset. 313 Provide either the contents (as a string) or a file path. 314 Both must be in the format of a CSV. 315 316 Args: 317 contents (str): Samplesheet contents to update (should be a CSV string) 318 file_path (PathLike): Path of file to update (should be a CSV file) 319 320 Example: 321 ```python 322 dataset.update_samplesheet( 323 file_path=Path('~/samplesheet.csv') 324 ) 325 ``` 326 """ 327 328 if contents is None and file_path is None: 329 raise DataPortalInputError("Must specify either 'contents' or 'file_path' when updating samplesheet") 330 331 if self.process.executor != Executor.INGEST: 332 raise DataPortalInputError("Cannot update a samplesheet on a non-ingest dataset") 333 334 samplesheet_contents = contents 335 if file_path is not None: 336 samplesheet_contents = Path(file_path).expanduser().read_text() 337 338 # Validate samplesheet 339 file_names = [f.file_name for f in self.list_files()] 340 request = ValidateFileRequirementsRequest( 341 file_names=file_names, 342 sample_sheet=samplesheet_contents, 343 ) 344 requirements = validate_file_requirements.sync(process_id=self.process_id, 345 body=request, 346 client=self._client.api_client) 347 if error_msg := requirements.error_msg: 348 raise DataPortalInputError(error_msg) 349 350 # Update the samplesheet if everything looks ok 351 self._client.datasets.update_samplesheet( 352 project_id=self.project_id, 353 dataset_id=self.id, 354 samplesheet=samplesheet_contents 355 ) 356 357 358class DataPortalDatasets(DataPortalAssets[DataPortalDataset]): 359 """Collection of multiple DataPortalDataset objects.""" 360 asset_name = "dataset"
21class DataPortalDataset(DataPortalAsset): 22 """ 23 Datasets in the Data Portal are collections of files which have 24 either been uploaded directly, or which have been output by 25 an analysis pipeline or notebook. 26 """ 27 28 def __init__(self, dataset: Union[Dataset, DatasetDetail], client: CirroApi): 29 """ 30 Instantiate a dataset object 31 32 Should be invoked from a top-level constructor, for example: 33 34 ```python 35 from cirro import DataPortal() 36 portal = DataPortal() 37 dataset = portal.get_dataset( 38 project="id-or-name-of-project", 39 dataset="id-or-name-of-dataset" 40 ) 41 ``` 42 43 """ 44 assert dataset.project_id is not None, "Must provide dataset with project_id attribute" 45 self._data = dataset 46 self._assets: Optional[DatasetAssets] = None 47 self._client = client 48 49 @property 50 def id(self) -> str: 51 """Unique identifier for the dataset""" 52 return self._data.id 53 54 @property 55 def name(self) -> str: 56 """Editable name for the dataset""" 57 return self._data.name 58 59 @property 60 def description(self) -> str: 61 """Longer name for the dataset""" 62 return self._data.description 63 64 @property 65 def process_id(self) -> str: 66 """Unique ID of process used to create the dataset""" 67 return self._data.process_id 68 69 @property 70 def process(self) -> ProcessDetail: 71 """ 72 Object representing the process used to create the dataset 73 """ 74 return self._client.processes.get(self.process_id) 75 76 @property 77 def project_id(self) -> str: 78 """ID of the project containing the dataset""" 79 return self._data.project_id 80 81 @property 82 def status(self) -> Status: 83 """ 84 Status of the dataset 85 """ 86 return self._data.status 87 88 @property 89 def source_dataset_ids(self) -> List[str]: 90 """IDs of the datasets used as sources for this dataset (if any)""" 91 return self._data.source_dataset_ids 92 93 @property 94 def source_datasets(self) -> List['DataPortalDataset']: 95 """ 96 Objects representing the datasets used as sources for this dataset (if any) 97 """ 98 return [ 99 DataPortalDataset( 100 dataset=self._client.datasets.get(project_id=self.project_id, dataset_id=dataset_id), 101 client=self._client 102 ) 103 for dataset_id in self.source_dataset_ids 104 ] 105 106 @property 107 def params(self) -> dict: 108 """ 109 Parameters used to generate the dataset 110 """ 111 return self._get_detail().params.to_dict() 112 113 @property 114 def info(self) -> dict: 115 """ 116 Extra information about the dataset 117 """ 118 return self._get_detail().info.to_dict() 119 120 @property 121 def tags(self) -> List[Tag]: 122 """ 123 Tags applied to the dataset 124 """ 125 return self._data.tags 126 127 @property 128 def share(self) -> Optional[NamedItem]: 129 """ 130 Share associated with the dataset, if any. 131 """ 132 return self._get_detail().share 133 134 @property 135 def created_by(self) -> str: 136 """User who created the dataset""" 137 return self._data.created_by 138 139 @property 140 def created_at(self) -> datetime.datetime: 141 """Timestamp of dataset creation""" 142 return self._data.created_at 143 144 def _get_detail(self): 145 if not isinstance(self._data, DatasetDetail): 146 self._data = self._client.datasets.get(project_id=self.project_id, dataset_id=self.id) 147 return self._data 148 149 def _get_assets(self): 150 if not self._assets: 151 self._assets = self._client.datasets.get_assets_listing( 152 project_id=self.project_id, 153 dataset_id=self.id 154 ) 155 return self._assets 156 157 def __str__(self): 158 return '\n'.join([ 159 f"{i.title()}: {self.__getattribute__(i)}" 160 for i in ['name', 'id', 'description', 'status'] 161 ]) 162 163 def get_file(self, relative_path: str) -> DataPortalFile: 164 """ 165 Get a file from the dataset using its relative path. 166 167 Args: 168 relative_path (str): Relative path of file within the dataset 169 170 Returns: 171 `from cirro.sdk.file import DataPortalFile` 172 """ 173 174 # Get the list of files in this dataset 175 files = self.list_files() 176 177 # Try getting the file using the relative path provided by the user 178 try: 179 return files.get_by_id(relative_path) 180 except DataPortalAssetNotFound: 181 # Try getting the file with the 'data/' prefix prepended 182 try: 183 return files.get_by_id("data/" + relative_path) 184 except DataPortalAssetNotFound: 185 # If not found, raise the exception using the string provided 186 # by the user, not the data/ prepended version (which may be 187 # confusing to the user) 188 msg = '\n'.join([f"No file found with path '{relative_path}'."]) 189 raise DataPortalAssetNotFound(msg) 190 191 def list_files(self) -> DataPortalFiles: 192 """ 193 Return the list of files which make up the dataset. 194 """ 195 files = self._get_assets().files 196 return DataPortalFiles( 197 [ 198 DataPortalFile(file=file, client=self._client) 199 for file in files 200 ] 201 ) 202 203 def get_artifact(self, artifact_type: ArtifactType) -> DataPortalFile: 204 """ 205 Get the artifact of a particular type from the dataset 206 """ 207 artifacts = self._get_assets().artifacts 208 artifact = next((a for a in artifacts if a.artifact_type == artifact_type), None) 209 if artifact is None: 210 raise DataPortalAssetNotFound(f"No artifact found with type '{artifact_type}'") 211 return DataPortalFile(file=artifact.file, client=self._client) 212 213 def list_artifacts(self) -> List[DataPortalFile]: 214 """ 215 Return the list of artifacts associated with the dataset 216 217 An artifact may be something generated as part of the analysis or other process. 218 See `cirro_api_client.v1.models.ArtifactType` for the list of possible artifact types. 219 220 """ 221 artifacts = self._get_assets().artifacts 222 return DataPortalFiles( 223 [ 224 DataPortalFile(file=artifact.file, client=self._client) 225 for artifact in artifacts 226 ] 227 ) 228 229 def download_files(self, download_location: str = None) -> None: 230 """ 231 Download all the files from the dataset to a local directory. 232 233 Args: 234 download_location (str): Path to local directory 235 """ 236 237 # Alias for internal method 238 self.list_files().download(download_location) 239 240 def run_analysis( 241 self, 242 name: str = None, 243 description: str = "", 244 process: Union[DataPortalProcess, str] = None, 245 params=None, 246 notifications_emails: List[str] = None, 247 compute_environment: str = None, 248 resume_dataset_id: str = None 249 ) -> str: 250 """ 251 Runs an analysis on a dataset, returns the ID of the newly created dataset. 252 253 The process can be provided as either a DataPortalProcess object, 254 or a string which corresponds to the name or ID of the process. 255 256 Args: 257 name (str): Name of newly created dataset 258 description (str): Description of newly created dataset 259 process (DataPortalProcess or str): Process to run 260 params (dict): Analysis parameters 261 notifications_emails (List[str]): Notification email address(es) 262 compute_environment (str): Name or ID of compute environment to use, 263 if blank it will run in AWS 264 resume_dataset_id (str): ID of dataset to resume from, used for caching task execution. 265 It will attempt to re-use the previous output to minimize duplicate work 266 267 Returns: 268 dataset_id (str): ID of newly created dataset 269 """ 270 if name is None: 271 raise DataPortalInputError("Must specify 'name' for run_analysis") 272 if process is None: 273 raise DataPortalInputError("Must specify 'process' for run_analysis") 274 if notifications_emails is None: 275 notifications_emails = [] 276 if params is None: 277 params = {} 278 279 # If the process is a string, try to parse it as a process name or ID 280 process = parse_process_name_or_id(process, self._client) 281 282 if compute_environment: 283 compute_environments = self._client.compute_environments.list_environments_for_project( 284 project_id=self.project_id 285 ) 286 compute_environment = next( 287 (env for env in compute_environments 288 if env.name == compute_environment or env.id == compute_environment), 289 None 290 ) 291 if compute_environment is None: 292 raise DataPortalInputError(f"Compute environment '{compute_environment}' not found") 293 294 resp = self._client.execution.run_analysis( 295 project_id=self.project_id, 296 request=RunAnalysisRequest( 297 name=name, 298 description=description, 299 process_id=process.id, 300 source_dataset_ids=[self.id], 301 params=RunAnalysisRequestParams.from_dict(params), 302 notification_emails=notifications_emails, 303 resume_dataset_id=resume_dataset_id, 304 compute_environment_id=compute_environment.id if compute_environment else None 305 ) 306 ) 307 return resp.id 308 309 def update_samplesheet(self, 310 contents: str = None, 311 file_path: PathLike = None): 312 """ 313 Updates the samplesheet metadata of a dataset. 314 Provide either the contents (as a string) or a file path. 315 Both must be in the format of a CSV. 316 317 Args: 318 contents (str): Samplesheet contents to update (should be a CSV string) 319 file_path (PathLike): Path of file to update (should be a CSV file) 320 321 Example: 322 ```python 323 dataset.update_samplesheet( 324 file_path=Path('~/samplesheet.csv') 325 ) 326 ``` 327 """ 328 329 if contents is None and file_path is None: 330 raise DataPortalInputError("Must specify either 'contents' or 'file_path' when updating samplesheet") 331 332 if self.process.executor != Executor.INGEST: 333 raise DataPortalInputError("Cannot update a samplesheet on a non-ingest dataset") 334 335 samplesheet_contents = contents 336 if file_path is not None: 337 samplesheet_contents = Path(file_path).expanduser().read_text() 338 339 # Validate samplesheet 340 file_names = [f.file_name for f in self.list_files()] 341 request = ValidateFileRequirementsRequest( 342 file_names=file_names, 343 sample_sheet=samplesheet_contents, 344 ) 345 requirements = validate_file_requirements.sync(process_id=self.process_id, 346 body=request, 347 client=self._client.api_client) 348 if error_msg := requirements.error_msg: 349 raise DataPortalInputError(error_msg) 350 351 # Update the samplesheet if everything looks ok 352 self._client.datasets.update_samplesheet( 353 project_id=self.project_id, 354 dataset_id=self.id, 355 samplesheet=samplesheet_contents 356 )
Datasets in the Data Portal are collections of files which have either been uploaded directly, or which have been output by an analysis pipeline or notebook.
28 def __init__(self, dataset: Union[Dataset, DatasetDetail], client: CirroApi): 29 """ 30 Instantiate a dataset object 31 32 Should be invoked from a top-level constructor, for example: 33 34 ```python 35 from cirro import DataPortal() 36 portal = DataPortal() 37 dataset = portal.get_dataset( 38 project="id-or-name-of-project", 39 dataset="id-or-name-of-dataset" 40 ) 41 ``` 42 43 """ 44 assert dataset.project_id is not None, "Must provide dataset with project_id attribute" 45 self._data = dataset 46 self._assets: Optional[DatasetAssets] = None 47 self._client = client
Instantiate a dataset object
Should be invoked from a top-level constructor, for example:
from cirro import DataPortal()
portal = DataPortal()
dataset = portal.get_dataset(
project="id-or-name-of-project",
dataset="id-or-name-of-dataset"
)
49 @property 50 def id(self) -> str: 51 """Unique identifier for the dataset""" 52 return self._data.id
Unique identifier for the dataset
54 @property 55 def name(self) -> str: 56 """Editable name for the dataset""" 57 return self._data.name
Editable name for the dataset
59 @property 60 def description(self) -> str: 61 """Longer name for the dataset""" 62 return self._data.description
Longer name for the dataset
64 @property 65 def process_id(self) -> str: 66 """Unique ID of process used to create the dataset""" 67 return self._data.process_id
Unique ID of process used to create the dataset
69 @property 70 def process(self) -> ProcessDetail: 71 """ 72 Object representing the process used to create the dataset 73 """ 74 return self._client.processes.get(self.process_id)
Object representing the process used to create the dataset
76 @property 77 def project_id(self) -> str: 78 """ID of the project containing the dataset""" 79 return self._data.project_id
ID of the project containing the dataset
81 @property 82 def status(self) -> Status: 83 """ 84 Status of the dataset 85 """ 86 return self._data.status
Status of the dataset
88 @property 89 def source_dataset_ids(self) -> List[str]: 90 """IDs of the datasets used as sources for this dataset (if any)""" 91 return self._data.source_dataset_ids
IDs of the datasets used as sources for this dataset (if any)
93 @property 94 def source_datasets(self) -> List['DataPortalDataset']: 95 """ 96 Objects representing the datasets used as sources for this dataset (if any) 97 """ 98 return [ 99 DataPortalDataset( 100 dataset=self._client.datasets.get(project_id=self.project_id, dataset_id=dataset_id), 101 client=self._client 102 ) 103 for dataset_id in self.source_dataset_ids 104 ]
Objects representing the datasets used as sources for this dataset (if any)
106 @property 107 def params(self) -> dict: 108 """ 109 Parameters used to generate the dataset 110 """ 111 return self._get_detail().params.to_dict()
Parameters used to generate the dataset
113 @property 114 def info(self) -> dict: 115 """ 116 Extra information about the dataset 117 """ 118 return self._get_detail().info.to_dict()
Extra information about the dataset
134 @property 135 def created_by(self) -> str: 136 """User who created the dataset""" 137 return self._data.created_by
User who created the dataset
139 @property 140 def created_at(self) -> datetime.datetime: 141 """Timestamp of dataset creation""" 142 return self._data.created_at
Timestamp of dataset creation
163 def get_file(self, relative_path: str) -> DataPortalFile: 164 """ 165 Get a file from the dataset using its relative path. 166 167 Args: 168 relative_path (str): Relative path of file within the dataset 169 170 Returns: 171 `from cirro.sdk.file import DataPortalFile` 172 """ 173 174 # Get the list of files in this dataset 175 files = self.list_files() 176 177 # Try getting the file using the relative path provided by the user 178 try: 179 return files.get_by_id(relative_path) 180 except DataPortalAssetNotFound: 181 # Try getting the file with the 'data/' prefix prepended 182 try: 183 return files.get_by_id("data/" + relative_path) 184 except DataPortalAssetNotFound: 185 # If not found, raise the exception using the string provided 186 # by the user, not the data/ prepended version (which may be 187 # confusing to the user) 188 msg = '\n'.join([f"No file found with path '{relative_path}'."]) 189 raise DataPortalAssetNotFound(msg)
Get a file from the dataset using its relative path.
Arguments:
- relative_path (str): Relative path of file within the dataset
Returns:
from cirro.sdk.file import DataPortalFile
191 def list_files(self) -> DataPortalFiles: 192 """ 193 Return the list of files which make up the dataset. 194 """ 195 files = self._get_assets().files 196 return DataPortalFiles( 197 [ 198 DataPortalFile(file=file, client=self._client) 199 for file in files 200 ] 201 )
Return the list of files which make up the dataset.
203 def get_artifact(self, artifact_type: ArtifactType) -> DataPortalFile: 204 """ 205 Get the artifact of a particular type from the dataset 206 """ 207 artifacts = self._get_assets().artifacts 208 artifact = next((a for a in artifacts if a.artifact_type == artifact_type), None) 209 if artifact is None: 210 raise DataPortalAssetNotFound(f"No artifact found with type '{artifact_type}'") 211 return DataPortalFile(file=artifact.file, client=self._client)
Get the artifact of a particular type from the dataset
213 def list_artifacts(self) -> List[DataPortalFile]: 214 """ 215 Return the list of artifacts associated with the dataset 216 217 An artifact may be something generated as part of the analysis or other process. 218 See `cirro_api_client.v1.models.ArtifactType` for the list of possible artifact types. 219 220 """ 221 artifacts = self._get_assets().artifacts 222 return DataPortalFiles( 223 [ 224 DataPortalFile(file=artifact.file, client=self._client) 225 for artifact in artifacts 226 ] 227 )
Return the list of artifacts associated with the dataset
An artifact may be something generated as part of the analysis or other process.
See cirro_api_client.v1.models.ArtifactType
for the list of possible artifact types.
229 def download_files(self, download_location: str = None) -> None: 230 """ 231 Download all the files from the dataset to a local directory. 232 233 Args: 234 download_location (str): Path to local directory 235 """ 236 237 # Alias for internal method 238 self.list_files().download(download_location)
Download all the files from the dataset to a local directory.
Arguments:
- download_location (str): Path to local directory
240 def run_analysis( 241 self, 242 name: str = None, 243 description: str = "", 244 process: Union[DataPortalProcess, str] = None, 245 params=None, 246 notifications_emails: List[str] = None, 247 compute_environment: str = None, 248 resume_dataset_id: str = None 249 ) -> str: 250 """ 251 Runs an analysis on a dataset, returns the ID of the newly created dataset. 252 253 The process can be provided as either a DataPortalProcess object, 254 or a string which corresponds to the name or ID of the process. 255 256 Args: 257 name (str): Name of newly created dataset 258 description (str): Description of newly created dataset 259 process (DataPortalProcess or str): Process to run 260 params (dict): Analysis parameters 261 notifications_emails (List[str]): Notification email address(es) 262 compute_environment (str): Name or ID of compute environment to use, 263 if blank it will run in AWS 264 resume_dataset_id (str): ID of dataset to resume from, used for caching task execution. 265 It will attempt to re-use the previous output to minimize duplicate work 266 267 Returns: 268 dataset_id (str): ID of newly created dataset 269 """ 270 if name is None: 271 raise DataPortalInputError("Must specify 'name' for run_analysis") 272 if process is None: 273 raise DataPortalInputError("Must specify 'process' for run_analysis") 274 if notifications_emails is None: 275 notifications_emails = [] 276 if params is None: 277 params = {} 278 279 # If the process is a string, try to parse it as a process name or ID 280 process = parse_process_name_or_id(process, self._client) 281 282 if compute_environment: 283 compute_environments = self._client.compute_environments.list_environments_for_project( 284 project_id=self.project_id 285 ) 286 compute_environment = next( 287 (env for env in compute_environments 288 if env.name == compute_environment or env.id == compute_environment), 289 None 290 ) 291 if compute_environment is None: 292 raise DataPortalInputError(f"Compute environment '{compute_environment}' not found") 293 294 resp = self._client.execution.run_analysis( 295 project_id=self.project_id, 296 request=RunAnalysisRequest( 297 name=name, 298 description=description, 299 process_id=process.id, 300 source_dataset_ids=[self.id], 301 params=RunAnalysisRequestParams.from_dict(params), 302 notification_emails=notifications_emails, 303 resume_dataset_id=resume_dataset_id, 304 compute_environment_id=compute_environment.id if compute_environment else None 305 ) 306 ) 307 return resp.id
Runs an analysis on a dataset, returns the ID of the newly created dataset.
The process can be provided as either a DataPortalProcess object, or a string which corresponds to the name or ID of the process.
Arguments:
- name (str): Name of newly created dataset
- description (str): Description of newly created dataset
- process (DataPortalProcess or str): Process to run
- params (dict): Analysis parameters
- notifications_emails (List[str]): Notification email address(es)
- compute_environment (str): Name or ID of compute environment to use, if blank it will run in AWS
- resume_dataset_id (str): ID of dataset to resume from, used for caching task execution. It will attempt to re-use the previous output to minimize duplicate work
Returns:
dataset_id (str): ID of newly created dataset
309 def update_samplesheet(self, 310 contents: str = None, 311 file_path: PathLike = None): 312 """ 313 Updates the samplesheet metadata of a dataset. 314 Provide either the contents (as a string) or a file path. 315 Both must be in the format of a CSV. 316 317 Args: 318 contents (str): Samplesheet contents to update (should be a CSV string) 319 file_path (PathLike): Path of file to update (should be a CSV file) 320 321 Example: 322 ```python 323 dataset.update_samplesheet( 324 file_path=Path('~/samplesheet.csv') 325 ) 326 ``` 327 """ 328 329 if contents is None and file_path is None: 330 raise DataPortalInputError("Must specify either 'contents' or 'file_path' when updating samplesheet") 331 332 if self.process.executor != Executor.INGEST: 333 raise DataPortalInputError("Cannot update a samplesheet on a non-ingest dataset") 334 335 samplesheet_contents = contents 336 if file_path is not None: 337 samplesheet_contents = Path(file_path).expanduser().read_text() 338 339 # Validate samplesheet 340 file_names = [f.file_name for f in self.list_files()] 341 request = ValidateFileRequirementsRequest( 342 file_names=file_names, 343 sample_sheet=samplesheet_contents, 344 ) 345 requirements = validate_file_requirements.sync(process_id=self.process_id, 346 body=request, 347 client=self._client.api_client) 348 if error_msg := requirements.error_msg: 349 raise DataPortalInputError(error_msg) 350 351 # Update the samplesheet if everything looks ok 352 self._client.datasets.update_samplesheet( 353 project_id=self.project_id, 354 dataset_id=self.id, 355 samplesheet=samplesheet_contents 356 )
Updates the samplesheet metadata of a dataset. Provide either the contents (as a string) or a file path. Both must be in the format of a CSV.
Arguments:
- contents (str): Samplesheet contents to update (should be a CSV string)
- file_path (PathLike): Path of file to update (should be a CSV file)
Example:
dataset.update_samplesheet(
file_path=Path('~/samplesheet.csv')
)
359class DataPortalDatasets(DataPortalAssets[DataPortalDataset]): 360 """Collection of multiple DataPortalDataset objects.""" 361 asset_name = "dataset"
Collection of multiple DataPortalDataset objects.