cirro.sdk.project

  1from functools import cache
  2from time import sleep
  3from typing import List, Union
  4
  5from cirro_api_client.v1.models import Project, UploadDatasetRequest, Dataset, Sample, Tag, Status
  6
  7from cirro.cirro_client import CirroApi
  8from cirro.file_utils import get_files_in_directory
  9from cirro.sdk.asset import DataPortalAssets, DataPortalAsset
 10from cirro.sdk.dataset import DataPortalDataset, DataPortalDatasets
 11from cirro.sdk.exceptions import DataPortalAssetNotFound, DataPortalInputError
 12from cirro.sdk.helpers import parse_process_name_or_id
 13from cirro.sdk.process import DataPortalProcess
 14from cirro.sdk.reference import DataPortalReference, DataPortalReferences
 15from cirro.sdk.reference_type import DataPortalReferenceType, DataPortalReferenceTypes
 16from cirro.services.service_helpers import list_all_datasets
 17
 18
 19class DataPortalProject(DataPortalAsset):
 20    """
 21    Projects in the Data Portal contain collections of Datasets.
 22    Users are granted permissions at the project-level, allowing them
 23    to view and/or modify all the datasets in that collection.
 24    """
 25    def __init__(self, proj: Project, client: CirroApi):
 26        """
 27        Instantiate with helper method
 28
 29        ```python
 30        from cirro import DataPortal()
 31        portal = DataPortal()
 32        project = portal.get_project_by_name("Project Name")
 33        ```
 34
 35        """
 36        self._data = proj
 37        self._client = client
 38
 39    @property
 40    def id(self) -> str:
 41        """
 42        Unique identifier
 43        """
 44        return self._data.id
 45
 46    @property
 47    def name(self) -> str:
 48        """
 49        Readable name
 50        """
 51        return self._data.name
 52
 53    @property
 54    def description(self) -> str:
 55        """
 56        Longer description of the project
 57        """
 58        return self._data.description
 59
 60    @property
 61    def status(self) -> Status:
 62        """
 63        Status of the project
 64        """
 65        return self._data.status
 66
 67    def __str__(self):
 68        """Control how the Project is rendered as a string."""
 69
 70        return '\n'.join([
 71            f"{i.title()}: {self.__getattribute__(i)}"
 72            for i in ['name', 'id', 'description']
 73        ])
 74
 75    @cache
 76    def _get_datasets(self) -> List[Dataset]:
 77        return list_all_datasets(project_id=self.id,
 78                                 client=self._client)
 79
 80    def list_datasets(self, force_refresh=False) -> DataPortalDatasets:
 81        """List all the datasets available in the project."""
 82        if force_refresh:
 83            self._get_datasets.cache_clear()
 84
 85        return DataPortalDatasets(
 86            [
 87                DataPortalDataset(d, self._client)
 88                for d in self._get_datasets()
 89            ]
 90        )
 91
 92    def get_dataset_by_name(self, name: str, force_refresh=False) -> DataPortalDataset:
 93        """Return the dataset with the specified name."""
 94        if force_refresh:
 95            self._get_datasets.cache_clear()
 96
 97        dataset = next((d for d in self._get_datasets() if d.name == name), None)
 98        if dataset is None:
 99            raise DataPortalAssetNotFound(f'Dataset with name {name} not found')
100        return self.get_dataset_by_id(dataset.id)
101
102    def get_dataset_by_id(self, _id: str = None) -> DataPortalDataset:
103        """Return the dataset with the specified id."""
104
105        dataset = self._client.datasets.get(project_id=self.id, dataset_id=_id)
106        if dataset is None:
107            raise DataPortalAssetNotFound(f'Dataset with ID {_id} not found')
108        return DataPortalDataset(dataset, self._client)
109
110    def list_references(self, reference_type: str = None) -> DataPortalReferences:
111        """
112        List the references available in a project.
113        Optionally filter to references of a particular type (identified by name)
114        """
115
116        # Get the complete list of references which are available
117        reference_types = DataPortalReferenceTypes(
118            [
119                DataPortalReferenceType(ref)
120                for ref in self._client.references.get_types()
121            ]
122        )
123
124        # If a particular name was specified
125        if reference_type is not None:
126            reference_types = reference_types.filter_by_pattern(reference_type)
127            if len(reference_types) == 0:
128                msg = f"Could not find any reference types with the name {reference_type}"
129                raise DataPortalAssetNotFound(msg)
130
131        return DataPortalReferences(
132            [
133                DataPortalReference(ref, project_id=self.id, client=self._client)
134                for ref in self._client.references.get_for_project(
135                    self.id
136                )
137                if reference_type is None or ref.type == reference_type
138            ]
139        )
140
141    def get_reference_by_name(self, name: str = None, ref_type: str = None) -> DataPortalReference:
142        """Return the reference of a particular type with the specified name."""
143
144        if name is None:
145            raise DataPortalInputError("Must specify the reference name")
146
147        return self.list_references(ref_type).get_by_name(name)
148
149    def upload_dataset(
150        self,
151        name: str = None,
152        description='',
153        process: Union[DataPortalProcess, str] = None,
154        upload_folder: str = None,
155        files: List[str] = None,
156        tags: List[str] = None,
157    ):
158        """
159        Upload a set of files to the Data Portal, creating a new dataset.
160
161        If the files parameter is not provided, it will upload all files in the upload folder
162
163        Args:
164            name (str): Name of newly created dataset
165            description (str): Description of newly created dataset
166            process (str | DataPortalProcess): Process to run may be referenced by name, ID, or object
167            upload_folder (str): Folder containing files to upload
168            files (List[str]): Optional subset of files to upload from the folder
169            tags (List[str]): Optional list of tags to apply to the dataset
170        """
171
172        if name is None:
173            raise DataPortalInputError("Must provide name for new dataset")
174        if process is None:
175            raise DataPortalInputError("Must provide the process which is used for ingest")
176        if upload_folder is None:
177            raise DataPortalInputError("Must provide upload_folder -- folder containing files to upload")
178
179        # Parse the process provided by the user
180        process = parse_process_name_or_id(process, self._client)
181
182        # If no files were provided
183        if files is None:
184            # Get the list of files in the upload folder
185            files = get_files_in_directory(upload_folder)
186
187        if files is None or len(files) == 0:
188            raise RuntimeWarning("No files to upload, exiting")
189
190        # Normalize into Tag object
191        if tags is not None:
192            tags = [Tag(value=value) for value in tags]
193
194        # Make sure that the files match the expected pattern
195        self._client.processes.check_dataset_files(files, process.id, upload_folder)
196
197        # Create the ingest process request
198        dataset_create_request = UploadDatasetRequest(
199            process_id=process.id,
200            name=name,
201            description=description,
202            expected_files=files,
203            tags=tags,
204        )
205
206        # Get the response
207        create_response = self._client.datasets.create(project_id=self.id,
208                                                       upload_request=dataset_create_request)
209
210        # Upload the files
211        self._client.datasets.upload_files(
212            project_id=self.id,
213            dataset_id=create_response.id,
214            directory=upload_folder,
215            files=files
216        )
217
218        # Return the dataset which was created, which might take a second to update
219        max_attempts = 5
220        for attempt in range(max_attempts):
221            try:
222                return self.get_dataset_by_id(create_response.id)
223            except DataPortalAssetNotFound as e:
224                if attempt == max_attempts - 1:
225                    raise e
226                else:
227                    sleep(2)
228
229    def samples(self, max_items: int = 10000) -> List[Sample]:
230        """
231        Retrieves a list of samples associated with a project along with their metadata
232
233        Args:
234            max_items (int): Maximum number of records to get (default 10,000)
235        """
236        return self._client.metadata.get_project_samples(self.id, max_items)
237
238
239class DataPortalProjects(DataPortalAssets[DataPortalProject]):
240    """Collection of DataPortalProject objects"""
241    asset_name = "project"
class DataPortalProject(cirro.sdk.asset.DataPortalAsset):
 20class DataPortalProject(DataPortalAsset):
 21    """
 22    Projects in the Data Portal contain collections of Datasets.
 23    Users are granted permissions at the project-level, allowing them
 24    to view and/or modify all the datasets in that collection.
 25    """
 26    def __init__(self, proj: Project, client: CirroApi):
 27        """
 28        Instantiate with helper method
 29
 30        ```python
 31        from cirro import DataPortal()
 32        portal = DataPortal()
 33        project = portal.get_project_by_name("Project Name")
 34        ```
 35
 36        """
 37        self._data = proj
 38        self._client = client
 39
 40    @property
 41    def id(self) -> str:
 42        """
 43        Unique identifier
 44        """
 45        return self._data.id
 46
 47    @property
 48    def name(self) -> str:
 49        """
 50        Readable name
 51        """
 52        return self._data.name
 53
 54    @property
 55    def description(self) -> str:
 56        """
 57        Longer description of the project
 58        """
 59        return self._data.description
 60
 61    @property
 62    def status(self) -> Status:
 63        """
 64        Status of the project
 65        """
 66        return self._data.status
 67
 68    def __str__(self):
 69        """Control how the Project is rendered as a string."""
 70
 71        return '\n'.join([
 72            f"{i.title()}: {self.__getattribute__(i)}"
 73            for i in ['name', 'id', 'description']
 74        ])
 75
 76    @cache
 77    def _get_datasets(self) -> List[Dataset]:
 78        return list_all_datasets(project_id=self.id,
 79                                 client=self._client)
 80
 81    def list_datasets(self, force_refresh=False) -> DataPortalDatasets:
 82        """List all the datasets available in the project."""
 83        if force_refresh:
 84            self._get_datasets.cache_clear()
 85
 86        return DataPortalDatasets(
 87            [
 88                DataPortalDataset(d, self._client)
 89                for d in self._get_datasets()
 90            ]
 91        )
 92
 93    def get_dataset_by_name(self, name: str, force_refresh=False) -> DataPortalDataset:
 94        """Return the dataset with the specified name."""
 95        if force_refresh:
 96            self._get_datasets.cache_clear()
 97
 98        dataset = next((d for d in self._get_datasets() if d.name == name), None)
 99        if dataset is None:
100            raise DataPortalAssetNotFound(f'Dataset with name {name} not found')
101        return self.get_dataset_by_id(dataset.id)
102
103    def get_dataset_by_id(self, _id: str = None) -> DataPortalDataset:
104        """Return the dataset with the specified id."""
105
106        dataset = self._client.datasets.get(project_id=self.id, dataset_id=_id)
107        if dataset is None:
108            raise DataPortalAssetNotFound(f'Dataset with ID {_id} not found')
109        return DataPortalDataset(dataset, self._client)
110
111    def list_references(self, reference_type: str = None) -> DataPortalReferences:
112        """
113        List the references available in a project.
114        Optionally filter to references of a particular type (identified by name)
115        """
116
117        # Get the complete list of references which are available
118        reference_types = DataPortalReferenceTypes(
119            [
120                DataPortalReferenceType(ref)
121                for ref in self._client.references.get_types()
122            ]
123        )
124
125        # If a particular name was specified
126        if reference_type is not None:
127            reference_types = reference_types.filter_by_pattern(reference_type)
128            if len(reference_types) == 0:
129                msg = f"Could not find any reference types with the name {reference_type}"
130                raise DataPortalAssetNotFound(msg)
131
132        return DataPortalReferences(
133            [
134                DataPortalReference(ref, project_id=self.id, client=self._client)
135                for ref in self._client.references.get_for_project(
136                    self.id
137                )
138                if reference_type is None or ref.type == reference_type
139            ]
140        )
141
142    def get_reference_by_name(self, name: str = None, ref_type: str = None) -> DataPortalReference:
143        """Return the reference of a particular type with the specified name."""
144
145        if name is None:
146            raise DataPortalInputError("Must specify the reference name")
147
148        return self.list_references(ref_type).get_by_name(name)
149
150    def upload_dataset(
151        self,
152        name: str = None,
153        description='',
154        process: Union[DataPortalProcess, str] = None,
155        upload_folder: str = None,
156        files: List[str] = None,
157        tags: List[str] = None,
158    ):
159        """
160        Upload a set of files to the Data Portal, creating a new dataset.
161
162        If the files parameter is not provided, it will upload all files in the upload folder
163
164        Args:
165            name (str): Name of newly created dataset
166            description (str): Description of newly created dataset
167            process (str | DataPortalProcess): Process to run may be referenced by name, ID, or object
168            upload_folder (str): Folder containing files to upload
169            files (List[str]): Optional subset of files to upload from the folder
170            tags (List[str]): Optional list of tags to apply to the dataset
171        """
172
173        if name is None:
174            raise DataPortalInputError("Must provide name for new dataset")
175        if process is None:
176            raise DataPortalInputError("Must provide the process which is used for ingest")
177        if upload_folder is None:
178            raise DataPortalInputError("Must provide upload_folder -- folder containing files to upload")
179
180        # Parse the process provided by the user
181        process = parse_process_name_or_id(process, self._client)
182
183        # If no files were provided
184        if files is None:
185            # Get the list of files in the upload folder
186            files = get_files_in_directory(upload_folder)
187
188        if files is None or len(files) == 0:
189            raise RuntimeWarning("No files to upload, exiting")
190
191        # Normalize into Tag object
192        if tags is not None:
193            tags = [Tag(value=value) for value in tags]
194
195        # Make sure that the files match the expected pattern
196        self._client.processes.check_dataset_files(files, process.id, upload_folder)
197
198        # Create the ingest process request
199        dataset_create_request = UploadDatasetRequest(
200            process_id=process.id,
201            name=name,
202            description=description,
203            expected_files=files,
204            tags=tags,
205        )
206
207        # Get the response
208        create_response = self._client.datasets.create(project_id=self.id,
209                                                       upload_request=dataset_create_request)
210
211        # Upload the files
212        self._client.datasets.upload_files(
213            project_id=self.id,
214            dataset_id=create_response.id,
215            directory=upload_folder,
216            files=files
217        )
218
219        # Return the dataset which was created, which might take a second to update
220        max_attempts = 5
221        for attempt in range(max_attempts):
222            try:
223                return self.get_dataset_by_id(create_response.id)
224            except DataPortalAssetNotFound as e:
225                if attempt == max_attempts - 1:
226                    raise e
227                else:
228                    sleep(2)
229
230    def samples(self, max_items: int = 10000) -> List[Sample]:
231        """
232        Retrieves a list of samples associated with a project along with their metadata
233
234        Args:
235            max_items (int): Maximum number of records to get (default 10,000)
236        """
237        return self._client.metadata.get_project_samples(self.id, max_items)

Projects in the Data Portal contain collections of Datasets. Users are granted permissions at the project-level, allowing them to view and/or modify all the datasets in that collection.

DataPortalProject( proj: cirro_api_client.v1.models.Project, client: cirro.CirroApi)
26    def __init__(self, proj: Project, client: CirroApi):
27        """
28        Instantiate with helper method
29
30        ```python
31        from cirro import DataPortal()
32        portal = DataPortal()
33        project = portal.get_project_by_name("Project Name")
34        ```
35
36        """
37        self._data = proj
38        self._client = client

Instantiate with helper method

from cirro import DataPortal()
portal = DataPortal()
project = portal.get_project_by_name("Project Name")
id: str
40    @property
41    def id(self) -> str:
42        """
43        Unique identifier
44        """
45        return self._data.id

Unique identifier

name: str
47    @property
48    def name(self) -> str:
49        """
50        Readable name
51        """
52        return self._data.name

Readable name

description: str
54    @property
55    def description(self) -> str:
56        """
57        Longer description of the project
58        """
59        return self._data.description

Longer description of the project

status: cirro_api_client.v1.models.Status
61    @property
62    def status(self) -> Status:
63        """
64        Status of the project
65        """
66        return self._data.status

Status of the project

def list_datasets(self, force_refresh=False) -> cirro.sdk.dataset.DataPortalDatasets:
81    def list_datasets(self, force_refresh=False) -> DataPortalDatasets:
82        """List all the datasets available in the project."""
83        if force_refresh:
84            self._get_datasets.cache_clear()
85
86        return DataPortalDatasets(
87            [
88                DataPortalDataset(d, self._client)
89                for d in self._get_datasets()
90            ]
91        )

List all the datasets available in the project.

def get_dataset_by_name( self, name: str, force_refresh=False) -> cirro.DataPortalDataset:
 93    def get_dataset_by_name(self, name: str, force_refresh=False) -> DataPortalDataset:
 94        """Return the dataset with the specified name."""
 95        if force_refresh:
 96            self._get_datasets.cache_clear()
 97
 98        dataset = next((d for d in self._get_datasets() if d.name == name), None)
 99        if dataset is None:
100            raise DataPortalAssetNotFound(f'Dataset with name {name} not found')
101        return self.get_dataset_by_id(dataset.id)

Return the dataset with the specified name.

def get_dataset_by_id(self, _id: str = None) -> cirro.DataPortalDataset:
103    def get_dataset_by_id(self, _id: str = None) -> DataPortalDataset:
104        """Return the dataset with the specified id."""
105
106        dataset = self._client.datasets.get(project_id=self.id, dataset_id=_id)
107        if dataset is None:
108            raise DataPortalAssetNotFound(f'Dataset with ID {_id} not found')
109        return DataPortalDataset(dataset, self._client)

Return the dataset with the specified id.

def list_references( self, reference_type: str = None) -> cirro.sdk.reference.DataPortalReferences:
111    def list_references(self, reference_type: str = None) -> DataPortalReferences:
112        """
113        List the references available in a project.
114        Optionally filter to references of a particular type (identified by name)
115        """
116
117        # Get the complete list of references which are available
118        reference_types = DataPortalReferenceTypes(
119            [
120                DataPortalReferenceType(ref)
121                for ref in self._client.references.get_types()
122            ]
123        )
124
125        # If a particular name was specified
126        if reference_type is not None:
127            reference_types = reference_types.filter_by_pattern(reference_type)
128            if len(reference_types) == 0:
129                msg = f"Could not find any reference types with the name {reference_type}"
130                raise DataPortalAssetNotFound(msg)
131
132        return DataPortalReferences(
133            [
134                DataPortalReference(ref, project_id=self.id, client=self._client)
135                for ref in self._client.references.get_for_project(
136                    self.id
137                )
138                if reference_type is None or ref.type == reference_type
139            ]
140        )

List the references available in a project. Optionally filter to references of a particular type (identified by name)

def get_reference_by_name( self, name: str = None, ref_type: str = None) -> cirro.DataPortalReference:
142    def get_reference_by_name(self, name: str = None, ref_type: str = None) -> DataPortalReference:
143        """Return the reference of a particular type with the specified name."""
144
145        if name is None:
146            raise DataPortalInputError("Must specify the reference name")
147
148        return self.list_references(ref_type).get_by_name(name)

Return the reference of a particular type with the specified name.

def upload_dataset( self, name: str = None, description='', process: Union[cirro.DataPortalProcess, str] = None, upload_folder: str = None, files: List[str] = None, tags: List[str] = None):
150    def upload_dataset(
151        self,
152        name: str = None,
153        description='',
154        process: Union[DataPortalProcess, str] = None,
155        upload_folder: str = None,
156        files: List[str] = None,
157        tags: List[str] = None,
158    ):
159        """
160        Upload a set of files to the Data Portal, creating a new dataset.
161
162        If the files parameter is not provided, it will upload all files in the upload folder
163
164        Args:
165            name (str): Name of newly created dataset
166            description (str): Description of newly created dataset
167            process (str | DataPortalProcess): Process to run may be referenced by name, ID, or object
168            upload_folder (str): Folder containing files to upload
169            files (List[str]): Optional subset of files to upload from the folder
170            tags (List[str]): Optional list of tags to apply to the dataset
171        """
172
173        if name is None:
174            raise DataPortalInputError("Must provide name for new dataset")
175        if process is None:
176            raise DataPortalInputError("Must provide the process which is used for ingest")
177        if upload_folder is None:
178            raise DataPortalInputError("Must provide upload_folder -- folder containing files to upload")
179
180        # Parse the process provided by the user
181        process = parse_process_name_or_id(process, self._client)
182
183        # If no files were provided
184        if files is None:
185            # Get the list of files in the upload folder
186            files = get_files_in_directory(upload_folder)
187
188        if files is None or len(files) == 0:
189            raise RuntimeWarning("No files to upload, exiting")
190
191        # Normalize into Tag object
192        if tags is not None:
193            tags = [Tag(value=value) for value in tags]
194
195        # Make sure that the files match the expected pattern
196        self._client.processes.check_dataset_files(files, process.id, upload_folder)
197
198        # Create the ingest process request
199        dataset_create_request = UploadDatasetRequest(
200            process_id=process.id,
201            name=name,
202            description=description,
203            expected_files=files,
204            tags=tags,
205        )
206
207        # Get the response
208        create_response = self._client.datasets.create(project_id=self.id,
209                                                       upload_request=dataset_create_request)
210
211        # Upload the files
212        self._client.datasets.upload_files(
213            project_id=self.id,
214            dataset_id=create_response.id,
215            directory=upload_folder,
216            files=files
217        )
218
219        # Return the dataset which was created, which might take a second to update
220        max_attempts = 5
221        for attempt in range(max_attempts):
222            try:
223                return self.get_dataset_by_id(create_response.id)
224            except DataPortalAssetNotFound as e:
225                if attempt == max_attempts - 1:
226                    raise e
227                else:
228                    sleep(2)

Upload a set of files to the Data Portal, creating a new dataset.

If the files parameter is not provided, it will upload all files in the upload folder

Arguments:
  • name (str): Name of newly created dataset
  • description (str): Description of newly created dataset
  • process (str | DataPortalProcess): Process to run may be referenced by name, ID, or object
  • upload_folder (str): Folder containing files to upload
  • files (List[str]): Optional subset of files to upload from the folder
  • tags (List[str]): Optional list of tags to apply to the dataset
def samples( self, max_items: int = 10000) -> List[cirro_api_client.v1.models.Sample]:
230    def samples(self, max_items: int = 10000) -> List[Sample]:
231        """
232        Retrieves a list of samples associated with a project along with their metadata
233
234        Args:
235            max_items (int): Maximum number of records to get (default 10,000)
236        """
237        return self._client.metadata.get_project_samples(self.id, max_items)

Retrieves a list of samples associated with a project along with their metadata

Arguments:
  • max_items (int): Maximum number of records to get (default 10,000)
240class DataPortalProjects(DataPortalAssets[DataPortalProject]):
241    """Collection of DataPortalProject objects"""
242    asset_name = "project"

Collection of DataPortalProject objects

asset_name = 'project'