cirro.sdk.project
1from functools import cache 2from time import sleep 3from typing import List, Union 4 5from cirro_api_client.v1.models import Project, UploadDatasetRequest, Dataset, Sample, Tag, Status 6 7from cirro.cirro_client import CirroApi 8from cirro.file_utils import get_files_in_directory 9from cirro.sdk.asset import DataPortalAssets, DataPortalAsset 10from cirro.sdk.dataset import DataPortalDataset, DataPortalDatasets 11from cirro.sdk.exceptions import DataPortalAssetNotFound, DataPortalInputError 12from cirro.sdk.helpers import parse_process_name_or_id 13from cirro.sdk.process import DataPortalProcess 14from cirro.sdk.reference import DataPortalReference, DataPortalReferences 15from cirro.sdk.reference_type import DataPortalReferenceType, DataPortalReferenceTypes 16from cirro.services.service_helpers import list_all_datasets 17 18 19class DataPortalProject(DataPortalAsset): 20 """ 21 Projects in the Data Portal contain collections of Datasets. 22 Users are granted permissions at the project-level, allowing them 23 to view and/or modify all the datasets in that collection. 24 """ 25 def __init__(self, proj: Project, client: CirroApi): 26 """ 27 Instantiate with helper method 28 29 ```python 30 from cirro import DataPortal() 31 portal = DataPortal() 32 project = portal.get_project_by_name("Project Name") 33 ``` 34 35 """ 36 self._data = proj 37 self._client = client 38 39 @property 40 def id(self) -> str: 41 """ 42 Unique identifier 43 """ 44 return self._data.id 45 46 @property 47 def name(self) -> str: 48 """ 49 Readable name 50 """ 51 return self._data.name 52 53 @property 54 def description(self) -> str: 55 """ 56 Longer description of the project 57 """ 58 return self._data.description 59 60 @property 61 def status(self) -> Status: 62 """ 63 Status of the project 64 """ 65 return self._data.status 66 67 def __str__(self): 68 """Control how the Project is rendered as a string.""" 69 70 return '\n'.join([ 71 f"{i.title()}: {self.__getattribute__(i)}" 72 for i in ['name', 'id', 'description'] 73 ]) 74 75 @cache 76 def _get_datasets(self) -> List[Dataset]: 77 return list_all_datasets(project_id=self.id, 78 client=self._client) 79 80 def list_datasets(self, force_refresh=False) -> DataPortalDatasets: 81 """List all the datasets available in the project.""" 82 if force_refresh: 83 self._get_datasets.cache_clear() 84 85 return DataPortalDatasets( 86 [ 87 DataPortalDataset(d, self._client) 88 for d in self._get_datasets() 89 ] 90 ) 91 92 def get_dataset(self, name_or_id: str, force_refresh=False) -> DataPortalDataset: 93 """Return the dataset matching the given ID or name. 94 95 Tries to match by ID first, then by name. 96 Raises an error if the name matches multiple datasets. 97 """ 98 if force_refresh: 99 self._get_datasets.cache_clear() 100 101 # Try by ID first 102 try: 103 return self.get_dataset_by_id(name_or_id) 104 except Exception: 105 pass 106 107 # Fall back to name matching 108 matches = [d for d in self._get_datasets() if d.name == name_or_id] 109 if len(matches) == 0: 110 raise DataPortalAssetNotFound(f'Dataset with name or ID "{name_or_id}" not found') 111 if len(matches) > 1: 112 raise DataPortalInputError( 113 f'Multiple datasets found with the name "{name_or_id}" — use get_dataset_by_id instead' 114 ) 115 return self.get_dataset_by_id(matches[0].id) 116 117 def get_dataset_by_name(self, name: str, force_refresh=False) -> DataPortalDataset: 118 """Return the dataset with the specified name.""" 119 if force_refresh: 120 self._get_datasets.cache_clear() 121 122 dataset = next((d for d in self._get_datasets() if d.name == name), None) 123 if dataset is None: 124 raise DataPortalAssetNotFound(f'Dataset with name {name} not found') 125 return self.get_dataset_by_id(dataset.id) 126 127 def get_dataset_by_id(self, _id: str = None) -> DataPortalDataset: 128 """Return the dataset with the specified id.""" 129 130 dataset = self._client.datasets.get(project_id=self.id, dataset_id=_id) 131 if dataset is None: 132 raise DataPortalAssetNotFound(f'Dataset with ID {_id} not found') 133 return DataPortalDataset(dataset, self._client) 134 135 def list_references(self, reference_type: str = None) -> DataPortalReferences: 136 """ 137 List the references available in a project. 138 Optionally filter to references of a particular type (identified by name) 139 """ 140 141 # Get the complete list of references which are available 142 reference_types = DataPortalReferenceTypes( 143 [ 144 DataPortalReferenceType(ref) 145 for ref in self._client.references.get_types() 146 ] 147 ) 148 149 # If a particular name was specified 150 if reference_type is not None: 151 reference_types = reference_types.filter_by_pattern(reference_type) 152 if len(reference_types) == 0: 153 msg = f"Could not find any reference types with the name {reference_type}" 154 raise DataPortalAssetNotFound(msg) 155 156 return DataPortalReferences( 157 [ 158 DataPortalReference(ref, project_id=self.id, client=self._client) 159 for ref in self._client.references.get_for_project( 160 self.id 161 ) 162 if reference_type is None or ref.type_ == reference_type 163 ] 164 ) 165 166 def get_reference_by_name(self, name: str = None, ref_type: str = None) -> DataPortalReference: 167 """Return the reference of a particular type with the specified name.""" 168 169 if name is None: 170 raise DataPortalInputError("Must specify the reference name") 171 172 return self.list_references(ref_type).get_by_name(name) 173 174 def upload_dataset( 175 self, 176 name: str = None, 177 description='', 178 process: Union[DataPortalProcess, str] = None, 179 upload_folder: str = None, 180 files: List[str] = None, 181 tags: List[str] = None, 182 ): 183 """ 184 Upload a set of files to the Data Portal, creating a new dataset. 185 186 If the files parameter is not provided, it will upload all files in the upload folder 187 188 Args: 189 name (str): Name of newly created dataset 190 description (str): Description of newly created dataset 191 process (str | DataPortalProcess): Process to run may be referenced by name, ID, or object 192 upload_folder (str): Folder containing files to upload 193 files (List[str]): Optional subset of files to upload from the folder 194 tags (List[str]): Optional list of tags to apply to the dataset 195 """ 196 197 if name is None: 198 raise DataPortalInputError("Must provide name for new dataset") 199 if process is None: 200 raise DataPortalInputError("Must provide the process which is used for ingest") 201 if upload_folder is None: 202 raise DataPortalInputError("Must provide upload_folder -- folder containing files to upload") 203 204 # Parse the process provided by the user 205 process = parse_process_name_or_id(process, self._client) 206 207 # If no files were provided 208 if files is None: 209 # Get the list of files in the upload folder 210 files = get_files_in_directory(upload_folder) 211 212 if files is None or len(files) == 0: 213 raise RuntimeWarning("No files to upload, exiting") 214 215 # Normalize into Tag object 216 if tags is not None: 217 tags = [Tag(value=value) for value in tags] 218 219 # Make sure that the files match the expected pattern 220 self._client.processes.check_dataset_files(files, process.id, upload_folder) 221 222 # Create the ingest process request 223 dataset_create_request = UploadDatasetRequest( 224 process_id=process.id, 225 name=name, 226 description=description, 227 expected_files=files, 228 tags=tags, 229 ) 230 231 # Get the response 232 create_response = self._client.datasets.create(project_id=self.id, 233 upload_request=dataset_create_request) 234 235 # Upload the files 236 self._client.datasets.upload_files( 237 project_id=self.id, 238 dataset_id=create_response.id, 239 directory=upload_folder, 240 files=files 241 ) 242 243 # Return the dataset which was created, which might take a second to update 244 max_attempts = 5 245 for attempt in range(max_attempts): 246 try: 247 return self.get_dataset_by_id(create_response.id) 248 except DataPortalAssetNotFound as e: 249 if attempt == max_attempts - 1: 250 raise e 251 else: 252 sleep(2) 253 254 def samples(self, max_items: int = 10000) -> List[Sample]: 255 """ 256 Retrieves a list of samples associated with a project along with their metadata 257 258 Args: 259 max_items (int): Maximum number of records to get (default 10,000) 260 """ 261 return self._client.metadata.get_project_samples(self.id, max_items) 262 263 264class DataPortalProjects(DataPortalAssets[DataPortalProject]): 265 """Collection of DataPortalProject objects""" 266 asset_name = "project"
20class DataPortalProject(DataPortalAsset): 21 """ 22 Projects in the Data Portal contain collections of Datasets. 23 Users are granted permissions at the project-level, allowing them 24 to view and/or modify all the datasets in that collection. 25 """ 26 def __init__(self, proj: Project, client: CirroApi): 27 """ 28 Instantiate with helper method 29 30 ```python 31 from cirro import DataPortal() 32 portal = DataPortal() 33 project = portal.get_project_by_name("Project Name") 34 ``` 35 36 """ 37 self._data = proj 38 self._client = client 39 40 @property 41 def id(self) -> str: 42 """ 43 Unique identifier 44 """ 45 return self._data.id 46 47 @property 48 def name(self) -> str: 49 """ 50 Readable name 51 """ 52 return self._data.name 53 54 @property 55 def description(self) -> str: 56 """ 57 Longer description of the project 58 """ 59 return self._data.description 60 61 @property 62 def status(self) -> Status: 63 """ 64 Status of the project 65 """ 66 return self._data.status 67 68 def __str__(self): 69 """Control how the Project is rendered as a string.""" 70 71 return '\n'.join([ 72 f"{i.title()}: {self.__getattribute__(i)}" 73 for i in ['name', 'id', 'description'] 74 ]) 75 76 @cache 77 def _get_datasets(self) -> List[Dataset]: 78 return list_all_datasets(project_id=self.id, 79 client=self._client) 80 81 def list_datasets(self, force_refresh=False) -> DataPortalDatasets: 82 """List all the datasets available in the project.""" 83 if force_refresh: 84 self._get_datasets.cache_clear() 85 86 return DataPortalDatasets( 87 [ 88 DataPortalDataset(d, self._client) 89 for d in self._get_datasets() 90 ] 91 ) 92 93 def get_dataset(self, name_or_id: str, force_refresh=False) -> DataPortalDataset: 94 """Return the dataset matching the given ID or name. 95 96 Tries to match by ID first, then by name. 97 Raises an error if the name matches multiple datasets. 98 """ 99 if force_refresh: 100 self._get_datasets.cache_clear() 101 102 # Try by ID first 103 try: 104 return self.get_dataset_by_id(name_or_id) 105 except Exception: 106 pass 107 108 # Fall back to name matching 109 matches = [d for d in self._get_datasets() if d.name == name_or_id] 110 if len(matches) == 0: 111 raise DataPortalAssetNotFound(f'Dataset with name or ID "{name_or_id}" not found') 112 if len(matches) > 1: 113 raise DataPortalInputError( 114 f'Multiple datasets found with the name "{name_or_id}" — use get_dataset_by_id instead' 115 ) 116 return self.get_dataset_by_id(matches[0].id) 117 118 def get_dataset_by_name(self, name: str, force_refresh=False) -> DataPortalDataset: 119 """Return the dataset with the specified name.""" 120 if force_refresh: 121 self._get_datasets.cache_clear() 122 123 dataset = next((d for d in self._get_datasets() if d.name == name), None) 124 if dataset is None: 125 raise DataPortalAssetNotFound(f'Dataset with name {name} not found') 126 return self.get_dataset_by_id(dataset.id) 127 128 def get_dataset_by_id(self, _id: str = None) -> DataPortalDataset: 129 """Return the dataset with the specified id.""" 130 131 dataset = self._client.datasets.get(project_id=self.id, dataset_id=_id) 132 if dataset is None: 133 raise DataPortalAssetNotFound(f'Dataset with ID {_id} not found') 134 return DataPortalDataset(dataset, self._client) 135 136 def list_references(self, reference_type: str = None) -> DataPortalReferences: 137 """ 138 List the references available in a project. 139 Optionally filter to references of a particular type (identified by name) 140 """ 141 142 # Get the complete list of references which are available 143 reference_types = DataPortalReferenceTypes( 144 [ 145 DataPortalReferenceType(ref) 146 for ref in self._client.references.get_types() 147 ] 148 ) 149 150 # If a particular name was specified 151 if reference_type is not None: 152 reference_types = reference_types.filter_by_pattern(reference_type) 153 if len(reference_types) == 0: 154 msg = f"Could not find any reference types with the name {reference_type}" 155 raise DataPortalAssetNotFound(msg) 156 157 return DataPortalReferences( 158 [ 159 DataPortalReference(ref, project_id=self.id, client=self._client) 160 for ref in self._client.references.get_for_project( 161 self.id 162 ) 163 if reference_type is None or ref.type_ == reference_type 164 ] 165 ) 166 167 def get_reference_by_name(self, name: str = None, ref_type: str = None) -> DataPortalReference: 168 """Return the reference of a particular type with the specified name.""" 169 170 if name is None: 171 raise DataPortalInputError("Must specify the reference name") 172 173 return self.list_references(ref_type).get_by_name(name) 174 175 def upload_dataset( 176 self, 177 name: str = None, 178 description='', 179 process: Union[DataPortalProcess, str] = None, 180 upload_folder: str = None, 181 files: List[str] = None, 182 tags: List[str] = None, 183 ): 184 """ 185 Upload a set of files to the Data Portal, creating a new dataset. 186 187 If the files parameter is not provided, it will upload all files in the upload folder 188 189 Args: 190 name (str): Name of newly created dataset 191 description (str): Description of newly created dataset 192 process (str | DataPortalProcess): Process to run may be referenced by name, ID, or object 193 upload_folder (str): Folder containing files to upload 194 files (List[str]): Optional subset of files to upload from the folder 195 tags (List[str]): Optional list of tags to apply to the dataset 196 """ 197 198 if name is None: 199 raise DataPortalInputError("Must provide name for new dataset") 200 if process is None: 201 raise DataPortalInputError("Must provide the process which is used for ingest") 202 if upload_folder is None: 203 raise DataPortalInputError("Must provide upload_folder -- folder containing files to upload") 204 205 # Parse the process provided by the user 206 process = parse_process_name_or_id(process, self._client) 207 208 # If no files were provided 209 if files is None: 210 # Get the list of files in the upload folder 211 files = get_files_in_directory(upload_folder) 212 213 if files is None or len(files) == 0: 214 raise RuntimeWarning("No files to upload, exiting") 215 216 # Normalize into Tag object 217 if tags is not None: 218 tags = [Tag(value=value) for value in tags] 219 220 # Make sure that the files match the expected pattern 221 self._client.processes.check_dataset_files(files, process.id, upload_folder) 222 223 # Create the ingest process request 224 dataset_create_request = UploadDatasetRequest( 225 process_id=process.id, 226 name=name, 227 description=description, 228 expected_files=files, 229 tags=tags, 230 ) 231 232 # Get the response 233 create_response = self._client.datasets.create(project_id=self.id, 234 upload_request=dataset_create_request) 235 236 # Upload the files 237 self._client.datasets.upload_files( 238 project_id=self.id, 239 dataset_id=create_response.id, 240 directory=upload_folder, 241 files=files 242 ) 243 244 # Return the dataset which was created, which might take a second to update 245 max_attempts = 5 246 for attempt in range(max_attempts): 247 try: 248 return self.get_dataset_by_id(create_response.id) 249 except DataPortalAssetNotFound as e: 250 if attempt == max_attempts - 1: 251 raise e 252 else: 253 sleep(2) 254 255 def samples(self, max_items: int = 10000) -> List[Sample]: 256 """ 257 Retrieves a list of samples associated with a project along with their metadata 258 259 Args: 260 max_items (int): Maximum number of records to get (default 10,000) 261 """ 262 return self._client.metadata.get_project_samples(self.id, max_items)
Projects in the Data Portal contain collections of Datasets. Users are granted permissions at the project-level, allowing them to view and/or modify all the datasets in that collection.
26 def __init__(self, proj: Project, client: CirroApi): 27 """ 28 Instantiate with helper method 29 30 ```python 31 from cirro import DataPortal() 32 portal = DataPortal() 33 project = portal.get_project_by_name("Project Name") 34 ``` 35 36 """ 37 self._data = proj 38 self._client = client
Instantiate with helper method
from cirro import DataPortal()
portal = DataPortal()
project = portal.get_project_by_name("Project Name")
54 @property 55 def description(self) -> str: 56 """ 57 Longer description of the project 58 """ 59 return self._data.description
Longer description of the project
61 @property 62 def status(self) -> Status: 63 """ 64 Status of the project 65 """ 66 return self._data.status
Status of the project
81 def list_datasets(self, force_refresh=False) -> DataPortalDatasets: 82 """List all the datasets available in the project.""" 83 if force_refresh: 84 self._get_datasets.cache_clear() 85 86 return DataPortalDatasets( 87 [ 88 DataPortalDataset(d, self._client) 89 for d in self._get_datasets() 90 ] 91 )
List all the datasets available in the project.
93 def get_dataset(self, name_or_id: str, force_refresh=False) -> DataPortalDataset: 94 """Return the dataset matching the given ID or name. 95 96 Tries to match by ID first, then by name. 97 Raises an error if the name matches multiple datasets. 98 """ 99 if force_refresh: 100 self._get_datasets.cache_clear() 101 102 # Try by ID first 103 try: 104 return self.get_dataset_by_id(name_or_id) 105 except Exception: 106 pass 107 108 # Fall back to name matching 109 matches = [d for d in self._get_datasets() if d.name == name_or_id] 110 if len(matches) == 0: 111 raise DataPortalAssetNotFound(f'Dataset with name or ID "{name_or_id}" not found') 112 if len(matches) > 1: 113 raise DataPortalInputError( 114 f'Multiple datasets found with the name "{name_or_id}" — use get_dataset_by_id instead' 115 ) 116 return self.get_dataset_by_id(matches[0].id)
Return the dataset matching the given ID or name.
Tries to match by ID first, then by name. Raises an error if the name matches multiple datasets.
118 def get_dataset_by_name(self, name: str, force_refresh=False) -> DataPortalDataset: 119 """Return the dataset with the specified name.""" 120 if force_refresh: 121 self._get_datasets.cache_clear() 122 123 dataset = next((d for d in self._get_datasets() if d.name == name), None) 124 if dataset is None: 125 raise DataPortalAssetNotFound(f'Dataset with name {name} not found') 126 return self.get_dataset_by_id(dataset.id)
Return the dataset with the specified name.
128 def get_dataset_by_id(self, _id: str = None) -> DataPortalDataset: 129 """Return the dataset with the specified id.""" 130 131 dataset = self._client.datasets.get(project_id=self.id, dataset_id=_id) 132 if dataset is None: 133 raise DataPortalAssetNotFound(f'Dataset with ID {_id} not found') 134 return DataPortalDataset(dataset, self._client)
Return the dataset with the specified id.
136 def list_references(self, reference_type: str = None) -> DataPortalReferences: 137 """ 138 List the references available in a project. 139 Optionally filter to references of a particular type (identified by name) 140 """ 141 142 # Get the complete list of references which are available 143 reference_types = DataPortalReferenceTypes( 144 [ 145 DataPortalReferenceType(ref) 146 for ref in self._client.references.get_types() 147 ] 148 ) 149 150 # If a particular name was specified 151 if reference_type is not None: 152 reference_types = reference_types.filter_by_pattern(reference_type) 153 if len(reference_types) == 0: 154 msg = f"Could not find any reference types with the name {reference_type}" 155 raise DataPortalAssetNotFound(msg) 156 157 return DataPortalReferences( 158 [ 159 DataPortalReference(ref, project_id=self.id, client=self._client) 160 for ref in self._client.references.get_for_project( 161 self.id 162 ) 163 if reference_type is None or ref.type_ == reference_type 164 ] 165 )
List the references available in a project. Optionally filter to references of a particular type (identified by name)
167 def get_reference_by_name(self, name: str = None, ref_type: str = None) -> DataPortalReference: 168 """Return the reference of a particular type with the specified name.""" 169 170 if name is None: 171 raise DataPortalInputError("Must specify the reference name") 172 173 return self.list_references(ref_type).get_by_name(name)
Return the reference of a particular type with the specified name.
175 def upload_dataset( 176 self, 177 name: str = None, 178 description='', 179 process: Union[DataPortalProcess, str] = None, 180 upload_folder: str = None, 181 files: List[str] = None, 182 tags: List[str] = None, 183 ): 184 """ 185 Upload a set of files to the Data Portal, creating a new dataset. 186 187 If the files parameter is not provided, it will upload all files in the upload folder 188 189 Args: 190 name (str): Name of newly created dataset 191 description (str): Description of newly created dataset 192 process (str | DataPortalProcess): Process to run may be referenced by name, ID, or object 193 upload_folder (str): Folder containing files to upload 194 files (List[str]): Optional subset of files to upload from the folder 195 tags (List[str]): Optional list of tags to apply to the dataset 196 """ 197 198 if name is None: 199 raise DataPortalInputError("Must provide name for new dataset") 200 if process is None: 201 raise DataPortalInputError("Must provide the process which is used for ingest") 202 if upload_folder is None: 203 raise DataPortalInputError("Must provide upload_folder -- folder containing files to upload") 204 205 # Parse the process provided by the user 206 process = parse_process_name_or_id(process, self._client) 207 208 # If no files were provided 209 if files is None: 210 # Get the list of files in the upload folder 211 files = get_files_in_directory(upload_folder) 212 213 if files is None or len(files) == 0: 214 raise RuntimeWarning("No files to upload, exiting") 215 216 # Normalize into Tag object 217 if tags is not None: 218 tags = [Tag(value=value) for value in tags] 219 220 # Make sure that the files match the expected pattern 221 self._client.processes.check_dataset_files(files, process.id, upload_folder) 222 223 # Create the ingest process request 224 dataset_create_request = UploadDatasetRequest( 225 process_id=process.id, 226 name=name, 227 description=description, 228 expected_files=files, 229 tags=tags, 230 ) 231 232 # Get the response 233 create_response = self._client.datasets.create(project_id=self.id, 234 upload_request=dataset_create_request) 235 236 # Upload the files 237 self._client.datasets.upload_files( 238 project_id=self.id, 239 dataset_id=create_response.id, 240 directory=upload_folder, 241 files=files 242 ) 243 244 # Return the dataset which was created, which might take a second to update 245 max_attempts = 5 246 for attempt in range(max_attempts): 247 try: 248 return self.get_dataset_by_id(create_response.id) 249 except DataPortalAssetNotFound as e: 250 if attempt == max_attempts - 1: 251 raise e 252 else: 253 sleep(2)
Upload a set of files to the Data Portal, creating a new dataset.
If the files parameter is not provided, it will upload all files in the upload folder
Arguments:
- name (str): Name of newly created dataset
- description (str): Description of newly created dataset
- process (str | DataPortalProcess): Process to run may be referenced by name, ID, or object
- upload_folder (str): Folder containing files to upload
- files (List[str]): Optional subset of files to upload from the folder
- tags (List[str]): Optional list of tags to apply to the dataset
255 def samples(self, max_items: int = 10000) -> List[Sample]: 256 """ 257 Retrieves a list of samples associated with a project along with their metadata 258 259 Args: 260 max_items (int): Maximum number of records to get (default 10,000) 261 """ 262 return self._client.metadata.get_project_samples(self.id, max_items)
Retrieves a list of samples associated with a project along with their metadata
Arguments:
- max_items (int): Maximum number of records to get (default 10,000)
265class DataPortalProjects(DataPortalAssets[DataPortalProject]): 266 """Collection of DataPortalProject objects""" 267 asset_name = "project"
Collection of DataPortalProject objects