cirro.sdk.project
1from functools import cache 2from time import sleep 3from typing import List, Union 4 5from cirro_api_client.v1.models import Project, UploadDatasetRequest, Dataset, Sample, Tag, Status 6 7from cirro.cirro_client import CirroApi 8from cirro.file_utils import get_files_in_directory 9from cirro.sdk.asset import DataPortalAssets, DataPortalAsset 10from cirro.sdk.dataset import DataPortalDataset, DataPortalDatasets 11from cirro.sdk.exceptions import DataPortalAssetNotFound, DataPortalInputError 12from cirro.sdk.helpers import parse_process_name_or_id 13from cirro.sdk.process import DataPortalProcess 14from cirro.sdk.reference import DataPortalReference, DataPortalReferences 15from cirro.sdk.reference_type import DataPortalReferenceType, DataPortalReferenceTypes 16from cirro.services.service_helpers import list_all_datasets 17 18 19class DataPortalProject(DataPortalAsset): 20 """ 21 Projects in the Data Portal contain collections of Datasets. 22 Users are granted permissions at the project-level, allowing them 23 to view and/or modify all the datasets in that collection. 24 """ 25 def __init__(self, proj: Project, client: CirroApi): 26 """ 27 Instantiate with helper method 28 29 ```python 30 from cirro import DataPortal() 31 portal = DataPortal() 32 project = portal.get_project_by_name("Project Name") 33 ``` 34 35 """ 36 self._data = proj 37 self._client = client 38 39 @property 40 def id(self) -> str: 41 """ 42 Unique identifier 43 """ 44 return self._data.id 45 46 @property 47 def name(self) -> str: 48 """ 49 Readable name 50 """ 51 return self._data.name 52 53 @property 54 def description(self) -> str: 55 """ 56 Longer description of the project 57 """ 58 return self._data.description 59 60 @property 61 def status(self) -> Status: 62 """ 63 Status of the project 64 """ 65 return self._data.status 66 67 def __str__(self): 68 """Control how the Project is rendered as a string.""" 69 70 return '\n'.join([ 71 f"{i.title()}: {self.__getattribute__(i)}" 72 for i in ['name', 'id', 'description'] 73 ]) 74 75 @cache 76 def _get_datasets(self) -> List[Dataset]: 77 return list_all_datasets(project_id=self.id, 78 client=self._client) 79 80 def list_datasets(self, force_refresh=False) -> DataPortalDatasets: 81 """List all the datasets available in the project.""" 82 if force_refresh: 83 self._get_datasets.cache_clear() 84 85 return DataPortalDatasets( 86 [ 87 DataPortalDataset(d, self._client) 88 for d in self._get_datasets() 89 ] 90 ) 91 92 def get_dataset_by_name(self, name: str, force_refresh=False) -> DataPortalDataset: 93 """Return the dataset with the specified name.""" 94 if force_refresh: 95 self._get_datasets.cache_clear() 96 97 dataset = next((d for d in self._get_datasets() if d.name == name), None) 98 if dataset is None: 99 raise DataPortalAssetNotFound(f'Dataset with name {name} not found') 100 return self.get_dataset_by_id(dataset.id) 101 102 def get_dataset_by_id(self, _id: str = None) -> DataPortalDataset: 103 """Return the dataset with the specified id.""" 104 105 dataset = self._client.datasets.get(project_id=self.id, dataset_id=_id) 106 if dataset is None: 107 raise DataPortalAssetNotFound(f'Dataset with ID {_id} not found') 108 return DataPortalDataset(dataset, self._client) 109 110 def list_references(self, reference_type: str = None) -> DataPortalReferences: 111 """ 112 List the references available in a project. 113 Optionally filter to references of a particular type (identified by name) 114 """ 115 116 # Get the complete list of references which are available 117 reference_types = DataPortalReferenceTypes( 118 [ 119 DataPortalReferenceType(ref) 120 for ref in self._client.references.get_types() 121 ] 122 ) 123 124 # If a particular name was specified 125 if reference_type is not None: 126 reference_types = reference_types.filter_by_pattern(reference_type) 127 if len(reference_types) == 0: 128 msg = f"Could not find any reference types with the name {reference_type}" 129 raise DataPortalAssetNotFound(msg) 130 131 return DataPortalReferences( 132 [ 133 DataPortalReference(ref, project_id=self.id, client=self._client) 134 for ref in self._client.references.get_for_project( 135 self.id 136 ) 137 if reference_type is None or ref.type == reference_type 138 ] 139 ) 140 141 def get_reference_by_name(self, name: str = None, ref_type: str = None) -> DataPortalReference: 142 """Return the reference of a particular type with the specified name.""" 143 144 if name is None: 145 raise DataPortalInputError("Must specify the reference name") 146 147 return self.list_references(ref_type).get_by_name(name) 148 149 def upload_dataset( 150 self, 151 name: str = None, 152 description='', 153 process: Union[DataPortalProcess, str] = None, 154 upload_folder: str = None, 155 files: List[str] = None, 156 tags: List[str] = None, 157 ): 158 """ 159 Upload a set of files to the Data Portal, creating a new dataset. 160 161 If the files parameter is not provided, it will upload all files in the upload folder 162 163 Args: 164 name (str): Name of newly created dataset 165 description (str): Description of newly created dataset 166 process (str | DataPortalProcess): Process to run may be referenced by name, ID, or object 167 upload_folder (str): Folder containing files to upload 168 files (List[str]): Optional subset of files to upload from the folder 169 tags (List[str]): Optional list of tags to apply to the dataset 170 """ 171 172 if name is None: 173 raise DataPortalInputError("Must provide name for new dataset") 174 if process is None: 175 raise DataPortalInputError("Must provide the process which is used for ingest") 176 if upload_folder is None: 177 raise DataPortalInputError("Must provide upload_folder -- folder containing files to upload") 178 179 # Parse the process provided by the user 180 process = parse_process_name_or_id(process, self._client) 181 182 # If no files were provided 183 if files is None: 184 # Get the list of files in the upload folder 185 files = get_files_in_directory(upload_folder) 186 187 if files is None or len(files) == 0: 188 raise RuntimeWarning("No files to upload, exiting") 189 190 # Normalize into Tag object 191 if tags is not None: 192 tags = [Tag(value=value) for value in tags] 193 194 # Make sure that the files match the expected pattern 195 self._client.processes.check_dataset_files(files, process.id, upload_folder) 196 197 # Create the ingest process request 198 dataset_create_request = UploadDatasetRequest( 199 process_id=process.id, 200 name=name, 201 description=description, 202 expected_files=files, 203 tags=tags, 204 ) 205 206 # Get the response 207 create_response = self._client.datasets.create(project_id=self.id, 208 upload_request=dataset_create_request) 209 210 # Upload the files 211 self._client.datasets.upload_files( 212 project_id=self.id, 213 dataset_id=create_response.id, 214 directory=upload_folder, 215 files=files 216 ) 217 218 # Return the dataset which was created, which might take a second to update 219 max_attempts = 5 220 for attempt in range(max_attempts): 221 try: 222 return self.get_dataset_by_id(create_response.id) 223 except DataPortalAssetNotFound as e: 224 if attempt == max_attempts - 1: 225 raise e 226 else: 227 sleep(2) 228 229 def samples(self, max_items: int = 10000) -> List[Sample]: 230 """ 231 Retrieves a list of samples associated with a project along with their metadata 232 233 Args: 234 max_items (int): Maximum number of records to get (default 10,000) 235 """ 236 return self._client.metadata.get_project_samples(self.id, max_items) 237 238 239class DataPortalProjects(DataPortalAssets[DataPortalProject]): 240 """Collection of DataPortalProject objects""" 241 asset_name = "project"
20class DataPortalProject(DataPortalAsset): 21 """ 22 Projects in the Data Portal contain collections of Datasets. 23 Users are granted permissions at the project-level, allowing them 24 to view and/or modify all the datasets in that collection. 25 """ 26 def __init__(self, proj: Project, client: CirroApi): 27 """ 28 Instantiate with helper method 29 30 ```python 31 from cirro import DataPortal() 32 portal = DataPortal() 33 project = portal.get_project_by_name("Project Name") 34 ``` 35 36 """ 37 self._data = proj 38 self._client = client 39 40 @property 41 def id(self) -> str: 42 """ 43 Unique identifier 44 """ 45 return self._data.id 46 47 @property 48 def name(self) -> str: 49 """ 50 Readable name 51 """ 52 return self._data.name 53 54 @property 55 def description(self) -> str: 56 """ 57 Longer description of the project 58 """ 59 return self._data.description 60 61 @property 62 def status(self) -> Status: 63 """ 64 Status of the project 65 """ 66 return self._data.status 67 68 def __str__(self): 69 """Control how the Project is rendered as a string.""" 70 71 return '\n'.join([ 72 f"{i.title()}: {self.__getattribute__(i)}" 73 for i in ['name', 'id', 'description'] 74 ]) 75 76 @cache 77 def _get_datasets(self) -> List[Dataset]: 78 return list_all_datasets(project_id=self.id, 79 client=self._client) 80 81 def list_datasets(self, force_refresh=False) -> DataPortalDatasets: 82 """List all the datasets available in the project.""" 83 if force_refresh: 84 self._get_datasets.cache_clear() 85 86 return DataPortalDatasets( 87 [ 88 DataPortalDataset(d, self._client) 89 for d in self._get_datasets() 90 ] 91 ) 92 93 def get_dataset_by_name(self, name: str, force_refresh=False) -> DataPortalDataset: 94 """Return the dataset with the specified name.""" 95 if force_refresh: 96 self._get_datasets.cache_clear() 97 98 dataset = next((d for d in self._get_datasets() if d.name == name), None) 99 if dataset is None: 100 raise DataPortalAssetNotFound(f'Dataset with name {name} not found') 101 return self.get_dataset_by_id(dataset.id) 102 103 def get_dataset_by_id(self, _id: str = None) -> DataPortalDataset: 104 """Return the dataset with the specified id.""" 105 106 dataset = self._client.datasets.get(project_id=self.id, dataset_id=_id) 107 if dataset is None: 108 raise DataPortalAssetNotFound(f'Dataset with ID {_id} not found') 109 return DataPortalDataset(dataset, self._client) 110 111 def list_references(self, reference_type: str = None) -> DataPortalReferences: 112 """ 113 List the references available in a project. 114 Optionally filter to references of a particular type (identified by name) 115 """ 116 117 # Get the complete list of references which are available 118 reference_types = DataPortalReferenceTypes( 119 [ 120 DataPortalReferenceType(ref) 121 for ref in self._client.references.get_types() 122 ] 123 ) 124 125 # If a particular name was specified 126 if reference_type is not None: 127 reference_types = reference_types.filter_by_pattern(reference_type) 128 if len(reference_types) == 0: 129 msg = f"Could not find any reference types with the name {reference_type}" 130 raise DataPortalAssetNotFound(msg) 131 132 return DataPortalReferences( 133 [ 134 DataPortalReference(ref, project_id=self.id, client=self._client) 135 for ref in self._client.references.get_for_project( 136 self.id 137 ) 138 if reference_type is None or ref.type == reference_type 139 ] 140 ) 141 142 def get_reference_by_name(self, name: str = None, ref_type: str = None) -> DataPortalReference: 143 """Return the reference of a particular type with the specified name.""" 144 145 if name is None: 146 raise DataPortalInputError("Must specify the reference name") 147 148 return self.list_references(ref_type).get_by_name(name) 149 150 def upload_dataset( 151 self, 152 name: str = None, 153 description='', 154 process: Union[DataPortalProcess, str] = None, 155 upload_folder: str = None, 156 files: List[str] = None, 157 tags: List[str] = None, 158 ): 159 """ 160 Upload a set of files to the Data Portal, creating a new dataset. 161 162 If the files parameter is not provided, it will upload all files in the upload folder 163 164 Args: 165 name (str): Name of newly created dataset 166 description (str): Description of newly created dataset 167 process (str | DataPortalProcess): Process to run may be referenced by name, ID, or object 168 upload_folder (str): Folder containing files to upload 169 files (List[str]): Optional subset of files to upload from the folder 170 tags (List[str]): Optional list of tags to apply to the dataset 171 """ 172 173 if name is None: 174 raise DataPortalInputError("Must provide name for new dataset") 175 if process is None: 176 raise DataPortalInputError("Must provide the process which is used for ingest") 177 if upload_folder is None: 178 raise DataPortalInputError("Must provide upload_folder -- folder containing files to upload") 179 180 # Parse the process provided by the user 181 process = parse_process_name_or_id(process, self._client) 182 183 # If no files were provided 184 if files is None: 185 # Get the list of files in the upload folder 186 files = get_files_in_directory(upload_folder) 187 188 if files is None or len(files) == 0: 189 raise RuntimeWarning("No files to upload, exiting") 190 191 # Normalize into Tag object 192 if tags is not None: 193 tags = [Tag(value=value) for value in tags] 194 195 # Make sure that the files match the expected pattern 196 self._client.processes.check_dataset_files(files, process.id, upload_folder) 197 198 # Create the ingest process request 199 dataset_create_request = UploadDatasetRequest( 200 process_id=process.id, 201 name=name, 202 description=description, 203 expected_files=files, 204 tags=tags, 205 ) 206 207 # Get the response 208 create_response = self._client.datasets.create(project_id=self.id, 209 upload_request=dataset_create_request) 210 211 # Upload the files 212 self._client.datasets.upload_files( 213 project_id=self.id, 214 dataset_id=create_response.id, 215 directory=upload_folder, 216 files=files 217 ) 218 219 # Return the dataset which was created, which might take a second to update 220 max_attempts = 5 221 for attempt in range(max_attempts): 222 try: 223 return self.get_dataset_by_id(create_response.id) 224 except DataPortalAssetNotFound as e: 225 if attempt == max_attempts - 1: 226 raise e 227 else: 228 sleep(2) 229 230 def samples(self, max_items: int = 10000) -> List[Sample]: 231 """ 232 Retrieves a list of samples associated with a project along with their metadata 233 234 Args: 235 max_items (int): Maximum number of records to get (default 10,000) 236 """ 237 return self._client.metadata.get_project_samples(self.id, max_items)
Projects in the Data Portal contain collections of Datasets. Users are granted permissions at the project-level, allowing them to view and/or modify all the datasets in that collection.
26 def __init__(self, proj: Project, client: CirroApi): 27 """ 28 Instantiate with helper method 29 30 ```python 31 from cirro import DataPortal() 32 portal = DataPortal() 33 project = portal.get_project_by_name("Project Name") 34 ``` 35 36 """ 37 self._data = proj 38 self._client = client
Instantiate with helper method
from cirro import DataPortal()
portal = DataPortal()
project = portal.get_project_by_name("Project Name")
54 @property 55 def description(self) -> str: 56 """ 57 Longer description of the project 58 """ 59 return self._data.description
Longer description of the project
61 @property 62 def status(self) -> Status: 63 """ 64 Status of the project 65 """ 66 return self._data.status
Status of the project
81 def list_datasets(self, force_refresh=False) -> DataPortalDatasets: 82 """List all the datasets available in the project.""" 83 if force_refresh: 84 self._get_datasets.cache_clear() 85 86 return DataPortalDatasets( 87 [ 88 DataPortalDataset(d, self._client) 89 for d in self._get_datasets() 90 ] 91 )
List all the datasets available in the project.
93 def get_dataset_by_name(self, name: str, force_refresh=False) -> DataPortalDataset: 94 """Return the dataset with the specified name.""" 95 if force_refresh: 96 self._get_datasets.cache_clear() 97 98 dataset = next((d for d in self._get_datasets() if d.name == name), None) 99 if dataset is None: 100 raise DataPortalAssetNotFound(f'Dataset with name {name} not found') 101 return self.get_dataset_by_id(dataset.id)
Return the dataset with the specified name.
103 def get_dataset_by_id(self, _id: str = None) -> DataPortalDataset: 104 """Return the dataset with the specified id.""" 105 106 dataset = self._client.datasets.get(project_id=self.id, dataset_id=_id) 107 if dataset is None: 108 raise DataPortalAssetNotFound(f'Dataset with ID {_id} not found') 109 return DataPortalDataset(dataset, self._client)
Return the dataset with the specified id.
111 def list_references(self, reference_type: str = None) -> DataPortalReferences: 112 """ 113 List the references available in a project. 114 Optionally filter to references of a particular type (identified by name) 115 """ 116 117 # Get the complete list of references which are available 118 reference_types = DataPortalReferenceTypes( 119 [ 120 DataPortalReferenceType(ref) 121 for ref in self._client.references.get_types() 122 ] 123 ) 124 125 # If a particular name was specified 126 if reference_type is not None: 127 reference_types = reference_types.filter_by_pattern(reference_type) 128 if len(reference_types) == 0: 129 msg = f"Could not find any reference types with the name {reference_type}" 130 raise DataPortalAssetNotFound(msg) 131 132 return DataPortalReferences( 133 [ 134 DataPortalReference(ref, project_id=self.id, client=self._client) 135 for ref in self._client.references.get_for_project( 136 self.id 137 ) 138 if reference_type is None or ref.type == reference_type 139 ] 140 )
List the references available in a project. Optionally filter to references of a particular type (identified by name)
142 def get_reference_by_name(self, name: str = None, ref_type: str = None) -> DataPortalReference: 143 """Return the reference of a particular type with the specified name.""" 144 145 if name is None: 146 raise DataPortalInputError("Must specify the reference name") 147 148 return self.list_references(ref_type).get_by_name(name)
Return the reference of a particular type with the specified name.
150 def upload_dataset( 151 self, 152 name: str = None, 153 description='', 154 process: Union[DataPortalProcess, str] = None, 155 upload_folder: str = None, 156 files: List[str] = None, 157 tags: List[str] = None, 158 ): 159 """ 160 Upload a set of files to the Data Portal, creating a new dataset. 161 162 If the files parameter is not provided, it will upload all files in the upload folder 163 164 Args: 165 name (str): Name of newly created dataset 166 description (str): Description of newly created dataset 167 process (str | DataPortalProcess): Process to run may be referenced by name, ID, or object 168 upload_folder (str): Folder containing files to upload 169 files (List[str]): Optional subset of files to upload from the folder 170 tags (List[str]): Optional list of tags to apply to the dataset 171 """ 172 173 if name is None: 174 raise DataPortalInputError("Must provide name for new dataset") 175 if process is None: 176 raise DataPortalInputError("Must provide the process which is used for ingest") 177 if upload_folder is None: 178 raise DataPortalInputError("Must provide upload_folder -- folder containing files to upload") 179 180 # Parse the process provided by the user 181 process = parse_process_name_or_id(process, self._client) 182 183 # If no files were provided 184 if files is None: 185 # Get the list of files in the upload folder 186 files = get_files_in_directory(upload_folder) 187 188 if files is None or len(files) == 0: 189 raise RuntimeWarning("No files to upload, exiting") 190 191 # Normalize into Tag object 192 if tags is not None: 193 tags = [Tag(value=value) for value in tags] 194 195 # Make sure that the files match the expected pattern 196 self._client.processes.check_dataset_files(files, process.id, upload_folder) 197 198 # Create the ingest process request 199 dataset_create_request = UploadDatasetRequest( 200 process_id=process.id, 201 name=name, 202 description=description, 203 expected_files=files, 204 tags=tags, 205 ) 206 207 # Get the response 208 create_response = self._client.datasets.create(project_id=self.id, 209 upload_request=dataset_create_request) 210 211 # Upload the files 212 self._client.datasets.upload_files( 213 project_id=self.id, 214 dataset_id=create_response.id, 215 directory=upload_folder, 216 files=files 217 ) 218 219 # Return the dataset which was created, which might take a second to update 220 max_attempts = 5 221 for attempt in range(max_attempts): 222 try: 223 return self.get_dataset_by_id(create_response.id) 224 except DataPortalAssetNotFound as e: 225 if attempt == max_attempts - 1: 226 raise e 227 else: 228 sleep(2)
Upload a set of files to the Data Portal, creating a new dataset.
If the files parameter is not provided, it will upload all files in the upload folder
Arguments:
- name (str): Name of newly created dataset
- description (str): Description of newly created dataset
- process (str | DataPortalProcess): Process to run may be referenced by name, ID, or object
- upload_folder (str): Folder containing files to upload
- files (List[str]): Optional subset of files to upload from the folder
- tags (List[str]): Optional list of tags to apply to the dataset
230 def samples(self, max_items: int = 10000) -> List[Sample]: 231 """ 232 Retrieves a list of samples associated with a project along with their metadata 233 234 Args: 235 max_items (int): Maximum number of records to get (default 10,000) 236 """ 237 return self._client.metadata.get_project_samples(self.id, max_items)
Retrieves a list of samples associated with a project along with their metadata
Arguments:
- max_items (int): Maximum number of records to get (default 10,000)
240class DataPortalProjects(DataPortalAssets[DataPortalProject]): 241 """Collection of DataPortalProject objects""" 242 asset_name = "project"
Collection of DataPortalProject objects