cirro.sdk.project

  1from functools import cache
  2from time import sleep
  3from typing import List, Union
  4
  5from cirro_api_client.v1.models import Project, UploadDatasetRequest, Dataset, Sample, Tag
  6
  7from cirro.cirro_client import CirroApi
  8from cirro.file_utils import get_files_in_directory
  9from cirro.sdk.asset import DataPortalAssets, DataPortalAsset
 10from cirro.sdk.dataset import DataPortalDataset, DataPortalDatasets
 11from cirro.sdk.exceptions import DataPortalAssetNotFound, DataPortalInputError
 12from cirro.sdk.helpers import parse_process_name_or_id
 13from cirro.sdk.process import DataPortalProcess
 14from cirro.sdk.reference import DataPortalReference, DataPortalReferences
 15from cirro.sdk.reference_type import DataPortalReferenceType, DataPortalReferenceTypes
 16from cirro.services.service_helpers import list_all_datasets
 17
 18
 19class DataPortalProject(DataPortalAsset):
 20    """
 21    Projects in the Data Portal contain collections of Datasets.
 22    Users are granted permissions at the project-level, allowing them
 23    to view and/or modify all the datasets in that collection.
 24    """
 25    def __init__(self, proj: Project, client: CirroApi):
 26        """
 27        Instantiate with helper method
 28
 29        ```python
 30        from cirro import DataPortal()
 31        portal = DataPortal()
 32        project = portal.get_project_by_name("Project Name")
 33        ```
 34
 35        """
 36        self._data = proj
 37        self._client = client
 38
 39    @property
 40    def id(self) -> str:
 41        """
 42        Unique identifier
 43        """
 44        return self._data.id
 45
 46    @property
 47    def name(self) -> str:
 48        """
 49        Readable name
 50        """
 51        return self._data.name
 52
 53    @property
 54    def description(self) -> str:
 55        """
 56        Longer description of the project
 57        """
 58        return self._data.description
 59
 60    def __str__(self):
 61        """Control how the Project is rendered as a string."""
 62
 63        return '\n'.join([
 64            f"{i.title()}: {self.__getattribute__(i)}"
 65            for i in ['name', 'id', 'description']
 66        ])
 67
 68    @cache
 69    def _get_datasets(self) -> List[Dataset]:
 70        return list_all_datasets(project_id=self.id,
 71                                 client=self._client)
 72
 73    def list_datasets(self, force_refresh=False) -> DataPortalDatasets:
 74        """List all the datasets available in the project."""
 75        if force_refresh:
 76            self._get_datasets.cache_clear()
 77
 78        return DataPortalDatasets(
 79            [
 80                DataPortalDataset(d, self._client)
 81                for d in self._get_datasets()
 82            ]
 83        )
 84
 85    def get_dataset_by_name(self, name: str, force_refresh=False) -> DataPortalDataset:
 86        """Return the dataset with the specified name."""
 87        if force_refresh:
 88            self._get_datasets.cache_clear()
 89
 90        dataset = next((d for d in self._get_datasets() if d.name == name), None)
 91        if dataset is None:
 92            raise DataPortalAssetNotFound(f'Dataset with name {name} not found')
 93        return self.get_dataset_by_id(dataset.id)
 94
 95    def get_dataset_by_id(self, _id: str = None) -> DataPortalDataset:
 96        """Return the dataset with the specified id."""
 97
 98        dataset = self._client.datasets.get(project_id=self.id, dataset_id=_id)
 99        if dataset is None:
100            raise DataPortalAssetNotFound(f'Dataset with ID {_id} not found')
101        return DataPortalDataset(dataset, self._client)
102
103    def list_references(self, reference_type: str = None) -> DataPortalReferences:
104        """
105        List the references available in a project.
106        Optionally filter to references of a particular type (identified by name)
107        """
108
109        # Get the complete list of references which are available
110        reference_types = DataPortalReferenceTypes(
111            [
112                DataPortalReferenceType(ref)
113                for ref in self._client.references.get_types()
114            ]
115        )
116
117        # If a particular name was specified
118        if reference_type is not None:
119            reference_types = reference_types.filter_by_pattern(reference_type)
120            if len(reference_types) == 0:
121                msg = f"Could not find any reference types with the name {reference_type}"
122                raise DataPortalAssetNotFound(msg)
123
124        return DataPortalReferences(
125            [
126                DataPortalReference(ref, project_id=self.id, client=self._client)
127                for ref in self._client.references.get_for_project(
128                    self.id
129                )
130                if reference_type is None or ref.type == reference_type
131            ]
132        )
133
134    def get_reference_by_name(self, name: str = None, ref_type: str = None) -> DataPortalReference:
135        """Return the reference of a particular type with the specified name."""
136
137        if name is None:
138            raise DataPortalInputError("Must specify the reference name")
139
140        return self.list_references(ref_type).get_by_name(name)
141
142    def upload_dataset(
143        self,
144        name: str = None,
145        description='',
146        process: Union[DataPortalProcess, str] = None,
147        upload_folder: str = None,
148        files: List[str] = None,
149        tags: List[str] = None,
150    ):
151        """
152        Upload a set of files to the Data Portal, creating a new dataset.
153
154        If the files parameter is not provided, it will upload all files in the upload folder
155
156        Args:
157            name (str): Name of newly created dataset
158            description (str): Description of newly created dataset
159            process (str | DataPortalProcess): Process to run may be referenced by name, ID, or object
160            upload_folder (str): Folder containing files to upload
161            files (List[str]): Optional subset of files to upload from the folder
162            tags (List[str]): Optional list of tags to apply to the dataset
163        """
164
165        if name is None:
166            raise DataPortalInputError("Must provide name for new dataset")
167        if process is None:
168            raise DataPortalInputError("Must provide the process which is used for ingest")
169        if upload_folder is None:
170            raise DataPortalInputError("Must provide upload_folder -- folder containing files to upload")
171
172        # Parse the process provided by the user
173        process = parse_process_name_or_id(process, self._client)
174
175        # If no files were provided
176        if files is None:
177            # Get the list of files in the upload folder
178            files = get_files_in_directory(upload_folder)
179
180        if files is None or len(files) == 0:
181            raise RuntimeWarning("No files to upload, exiting")
182
183        # Normalize into Tag object
184        if tags is not None:
185            tags = [Tag(value=value) for value in tags]
186
187        # Make sure that the files match the expected pattern
188        self._client.processes.check_dataset_files(files, process.id, upload_folder)
189
190        # Create the ingest process request
191        dataset_create_request = UploadDatasetRequest(
192            process_id=process.id,
193            name=name,
194            description=description,
195            expected_files=files,
196            tags=tags,
197        )
198
199        # Get the response
200        create_response = self._client.datasets.create(project_id=self.id,
201                                                       upload_request=dataset_create_request)
202
203        # Upload the files
204        self._client.datasets.upload_files(
205            project_id=self.id,
206            dataset_id=create_response.id,
207            directory=upload_folder,
208            files=files
209        )
210
211        # Return the dataset which was created, which might take a second to update
212        max_attempts = 5
213        for attempt in range(max_attempts):
214            try:
215                return self.get_dataset_by_id(create_response.id)
216            except DataPortalAssetNotFound as e:
217                if attempt == max_attempts - 1:
218                    raise e
219                else:
220                    sleep(2)
221
222    def samples(self, max_items: int = 10000) -> List[Sample]:
223        """
224        Retrieves a list of samples associated with a project along with their metadata
225
226        Args:
227            max_items (int): Maximum number of records to get (default 10,000)
228        """
229        return self._client.metadata.get_project_samples(self.id, max_items)
230
231
232class DataPortalProjects(DataPortalAssets[DataPortalProject]):
233    """Collection of DataPortalProject objects"""
234    asset_name = "project"
class DataPortalProject(cirro.sdk.asset.DataPortalAsset):
 20class DataPortalProject(DataPortalAsset):
 21    """
 22    Projects in the Data Portal contain collections of Datasets.
 23    Users are granted permissions at the project-level, allowing them
 24    to view and/or modify all the datasets in that collection.
 25    """
 26    def __init__(self, proj: Project, client: CirroApi):
 27        """
 28        Instantiate with helper method
 29
 30        ```python
 31        from cirro import DataPortal()
 32        portal = DataPortal()
 33        project = portal.get_project_by_name("Project Name")
 34        ```
 35
 36        """
 37        self._data = proj
 38        self._client = client
 39
 40    @property
 41    def id(self) -> str:
 42        """
 43        Unique identifier
 44        """
 45        return self._data.id
 46
 47    @property
 48    def name(self) -> str:
 49        """
 50        Readable name
 51        """
 52        return self._data.name
 53
 54    @property
 55    def description(self) -> str:
 56        """
 57        Longer description of the project
 58        """
 59        return self._data.description
 60
 61    def __str__(self):
 62        """Control how the Project is rendered as a string."""
 63
 64        return '\n'.join([
 65            f"{i.title()}: {self.__getattribute__(i)}"
 66            for i in ['name', 'id', 'description']
 67        ])
 68
 69    @cache
 70    def _get_datasets(self) -> List[Dataset]:
 71        return list_all_datasets(project_id=self.id,
 72                                 client=self._client)
 73
 74    def list_datasets(self, force_refresh=False) -> DataPortalDatasets:
 75        """List all the datasets available in the project."""
 76        if force_refresh:
 77            self._get_datasets.cache_clear()
 78
 79        return DataPortalDatasets(
 80            [
 81                DataPortalDataset(d, self._client)
 82                for d in self._get_datasets()
 83            ]
 84        )
 85
 86    def get_dataset_by_name(self, name: str, force_refresh=False) -> DataPortalDataset:
 87        """Return the dataset with the specified name."""
 88        if force_refresh:
 89            self._get_datasets.cache_clear()
 90
 91        dataset = next((d for d in self._get_datasets() if d.name == name), None)
 92        if dataset is None:
 93            raise DataPortalAssetNotFound(f'Dataset with name {name} not found')
 94        return self.get_dataset_by_id(dataset.id)
 95
 96    def get_dataset_by_id(self, _id: str = None) -> DataPortalDataset:
 97        """Return the dataset with the specified id."""
 98
 99        dataset = self._client.datasets.get(project_id=self.id, dataset_id=_id)
100        if dataset is None:
101            raise DataPortalAssetNotFound(f'Dataset with ID {_id} not found')
102        return DataPortalDataset(dataset, self._client)
103
104    def list_references(self, reference_type: str = None) -> DataPortalReferences:
105        """
106        List the references available in a project.
107        Optionally filter to references of a particular type (identified by name)
108        """
109
110        # Get the complete list of references which are available
111        reference_types = DataPortalReferenceTypes(
112            [
113                DataPortalReferenceType(ref)
114                for ref in self._client.references.get_types()
115            ]
116        )
117
118        # If a particular name was specified
119        if reference_type is not None:
120            reference_types = reference_types.filter_by_pattern(reference_type)
121            if len(reference_types) == 0:
122                msg = f"Could not find any reference types with the name {reference_type}"
123                raise DataPortalAssetNotFound(msg)
124
125        return DataPortalReferences(
126            [
127                DataPortalReference(ref, project_id=self.id, client=self._client)
128                for ref in self._client.references.get_for_project(
129                    self.id
130                )
131                if reference_type is None or ref.type == reference_type
132            ]
133        )
134
135    def get_reference_by_name(self, name: str = None, ref_type: str = None) -> DataPortalReference:
136        """Return the reference of a particular type with the specified name."""
137
138        if name is None:
139            raise DataPortalInputError("Must specify the reference name")
140
141        return self.list_references(ref_type).get_by_name(name)
142
143    def upload_dataset(
144        self,
145        name: str = None,
146        description='',
147        process: Union[DataPortalProcess, str] = None,
148        upload_folder: str = None,
149        files: List[str] = None,
150        tags: List[str] = None,
151    ):
152        """
153        Upload a set of files to the Data Portal, creating a new dataset.
154
155        If the files parameter is not provided, it will upload all files in the upload folder
156
157        Args:
158            name (str): Name of newly created dataset
159            description (str): Description of newly created dataset
160            process (str | DataPortalProcess): Process to run may be referenced by name, ID, or object
161            upload_folder (str): Folder containing files to upload
162            files (List[str]): Optional subset of files to upload from the folder
163            tags (List[str]): Optional list of tags to apply to the dataset
164        """
165
166        if name is None:
167            raise DataPortalInputError("Must provide name for new dataset")
168        if process is None:
169            raise DataPortalInputError("Must provide the process which is used for ingest")
170        if upload_folder is None:
171            raise DataPortalInputError("Must provide upload_folder -- folder containing files to upload")
172
173        # Parse the process provided by the user
174        process = parse_process_name_or_id(process, self._client)
175
176        # If no files were provided
177        if files is None:
178            # Get the list of files in the upload folder
179            files = get_files_in_directory(upload_folder)
180
181        if files is None or len(files) == 0:
182            raise RuntimeWarning("No files to upload, exiting")
183
184        # Normalize into Tag object
185        if tags is not None:
186            tags = [Tag(value=value) for value in tags]
187
188        # Make sure that the files match the expected pattern
189        self._client.processes.check_dataset_files(files, process.id, upload_folder)
190
191        # Create the ingest process request
192        dataset_create_request = UploadDatasetRequest(
193            process_id=process.id,
194            name=name,
195            description=description,
196            expected_files=files,
197            tags=tags,
198        )
199
200        # Get the response
201        create_response = self._client.datasets.create(project_id=self.id,
202                                                       upload_request=dataset_create_request)
203
204        # Upload the files
205        self._client.datasets.upload_files(
206            project_id=self.id,
207            dataset_id=create_response.id,
208            directory=upload_folder,
209            files=files
210        )
211
212        # Return the dataset which was created, which might take a second to update
213        max_attempts = 5
214        for attempt in range(max_attempts):
215            try:
216                return self.get_dataset_by_id(create_response.id)
217            except DataPortalAssetNotFound as e:
218                if attempt == max_attempts - 1:
219                    raise e
220                else:
221                    sleep(2)
222
223    def samples(self, max_items: int = 10000) -> List[Sample]:
224        """
225        Retrieves a list of samples associated with a project along with their metadata
226
227        Args:
228            max_items (int): Maximum number of records to get (default 10,000)
229        """
230        return self._client.metadata.get_project_samples(self.id, max_items)

Projects in the Data Portal contain collections of Datasets. Users are granted permissions at the project-level, allowing them to view and/or modify all the datasets in that collection.

DataPortalProject( proj: cirro_api_client.v1.models.Project, client: cirro.CirroApi)
26    def __init__(self, proj: Project, client: CirroApi):
27        """
28        Instantiate with helper method
29
30        ```python
31        from cirro import DataPortal()
32        portal = DataPortal()
33        project = portal.get_project_by_name("Project Name")
34        ```
35
36        """
37        self._data = proj
38        self._client = client

Instantiate with helper method

from cirro import DataPortal()
portal = DataPortal()
project = portal.get_project_by_name("Project Name")
id: str
40    @property
41    def id(self) -> str:
42        """
43        Unique identifier
44        """
45        return self._data.id

Unique identifier

name: str
47    @property
48    def name(self) -> str:
49        """
50        Readable name
51        """
52        return self._data.name

Readable name

description: str
54    @property
55    def description(self) -> str:
56        """
57        Longer description of the project
58        """
59        return self._data.description

Longer description of the project

def list_datasets(self, force_refresh=False) -> cirro.sdk.dataset.DataPortalDatasets:
74    def list_datasets(self, force_refresh=False) -> DataPortalDatasets:
75        """List all the datasets available in the project."""
76        if force_refresh:
77            self._get_datasets.cache_clear()
78
79        return DataPortalDatasets(
80            [
81                DataPortalDataset(d, self._client)
82                for d in self._get_datasets()
83            ]
84        )

List all the datasets available in the project.

def get_dataset_by_name( self, name: str, force_refresh=False) -> cirro.DataPortalDataset:
86    def get_dataset_by_name(self, name: str, force_refresh=False) -> DataPortalDataset:
87        """Return the dataset with the specified name."""
88        if force_refresh:
89            self._get_datasets.cache_clear()
90
91        dataset = next((d for d in self._get_datasets() if d.name == name), None)
92        if dataset is None:
93            raise DataPortalAssetNotFound(f'Dataset with name {name} not found')
94        return self.get_dataset_by_id(dataset.id)

Return the dataset with the specified name.

def get_dataset_by_id(self, _id: str = None) -> cirro.DataPortalDataset:
 96    def get_dataset_by_id(self, _id: str = None) -> DataPortalDataset:
 97        """Return the dataset with the specified id."""
 98
 99        dataset = self._client.datasets.get(project_id=self.id, dataset_id=_id)
100        if dataset is None:
101            raise DataPortalAssetNotFound(f'Dataset with ID {_id} not found')
102        return DataPortalDataset(dataset, self._client)

Return the dataset with the specified id.

def list_references( self, reference_type: str = None) -> cirro.sdk.reference.DataPortalReferences:
104    def list_references(self, reference_type: str = None) -> DataPortalReferences:
105        """
106        List the references available in a project.
107        Optionally filter to references of a particular type (identified by name)
108        """
109
110        # Get the complete list of references which are available
111        reference_types = DataPortalReferenceTypes(
112            [
113                DataPortalReferenceType(ref)
114                for ref in self._client.references.get_types()
115            ]
116        )
117
118        # If a particular name was specified
119        if reference_type is not None:
120            reference_types = reference_types.filter_by_pattern(reference_type)
121            if len(reference_types) == 0:
122                msg = f"Could not find any reference types with the name {reference_type}"
123                raise DataPortalAssetNotFound(msg)
124
125        return DataPortalReferences(
126            [
127                DataPortalReference(ref, project_id=self.id, client=self._client)
128                for ref in self._client.references.get_for_project(
129                    self.id
130                )
131                if reference_type is None or ref.type == reference_type
132            ]
133        )

List the references available in a project. Optionally filter to references of a particular type (identified by name)

def get_reference_by_name( self, name: str = None, ref_type: str = None) -> cirro.DataPortalReference:
135    def get_reference_by_name(self, name: str = None, ref_type: str = None) -> DataPortalReference:
136        """Return the reference of a particular type with the specified name."""
137
138        if name is None:
139            raise DataPortalInputError("Must specify the reference name")
140
141        return self.list_references(ref_type).get_by_name(name)

Return the reference of a particular type with the specified name.

def upload_dataset( self, name: str = None, description='', process: Union[cirro.DataPortalProcess, str] = None, upload_folder: str = None, files: List[str] = None, tags: List[str] = None):
143    def upload_dataset(
144        self,
145        name: str = None,
146        description='',
147        process: Union[DataPortalProcess, str] = None,
148        upload_folder: str = None,
149        files: List[str] = None,
150        tags: List[str] = None,
151    ):
152        """
153        Upload a set of files to the Data Portal, creating a new dataset.
154
155        If the files parameter is not provided, it will upload all files in the upload folder
156
157        Args:
158            name (str): Name of newly created dataset
159            description (str): Description of newly created dataset
160            process (str | DataPortalProcess): Process to run may be referenced by name, ID, or object
161            upload_folder (str): Folder containing files to upload
162            files (List[str]): Optional subset of files to upload from the folder
163            tags (List[str]): Optional list of tags to apply to the dataset
164        """
165
166        if name is None:
167            raise DataPortalInputError("Must provide name for new dataset")
168        if process is None:
169            raise DataPortalInputError("Must provide the process which is used for ingest")
170        if upload_folder is None:
171            raise DataPortalInputError("Must provide upload_folder -- folder containing files to upload")
172
173        # Parse the process provided by the user
174        process = parse_process_name_or_id(process, self._client)
175
176        # If no files were provided
177        if files is None:
178            # Get the list of files in the upload folder
179            files = get_files_in_directory(upload_folder)
180
181        if files is None or len(files) == 0:
182            raise RuntimeWarning("No files to upload, exiting")
183
184        # Normalize into Tag object
185        if tags is not None:
186            tags = [Tag(value=value) for value in tags]
187
188        # Make sure that the files match the expected pattern
189        self._client.processes.check_dataset_files(files, process.id, upload_folder)
190
191        # Create the ingest process request
192        dataset_create_request = UploadDatasetRequest(
193            process_id=process.id,
194            name=name,
195            description=description,
196            expected_files=files,
197            tags=tags,
198        )
199
200        # Get the response
201        create_response = self._client.datasets.create(project_id=self.id,
202                                                       upload_request=dataset_create_request)
203
204        # Upload the files
205        self._client.datasets.upload_files(
206            project_id=self.id,
207            dataset_id=create_response.id,
208            directory=upload_folder,
209            files=files
210        )
211
212        # Return the dataset which was created, which might take a second to update
213        max_attempts = 5
214        for attempt in range(max_attempts):
215            try:
216                return self.get_dataset_by_id(create_response.id)
217            except DataPortalAssetNotFound as e:
218                if attempt == max_attempts - 1:
219                    raise e
220                else:
221                    sleep(2)

Upload a set of files to the Data Portal, creating a new dataset.

If the files parameter is not provided, it will upload all files in the upload folder

Arguments:
  • name (str): Name of newly created dataset
  • description (str): Description of newly created dataset
  • process (str | DataPortalProcess): Process to run may be referenced by name, ID, or object
  • upload_folder (str): Folder containing files to upload
  • files (List[str]): Optional subset of files to upload from the folder
  • tags (List[str]): Optional list of tags to apply to the dataset
def samples( self, max_items: int = 10000) -> List[cirro_api_client.v1.models.Sample]:
223    def samples(self, max_items: int = 10000) -> List[Sample]:
224        """
225        Retrieves a list of samples associated with a project along with their metadata
226
227        Args:
228            max_items (int): Maximum number of records to get (default 10,000)
229        """
230        return self._client.metadata.get_project_samples(self.id, max_items)

Retrieves a list of samples associated with a project along with their metadata

Arguments:
  • max_items (int): Maximum number of records to get (default 10,000)
233class DataPortalProjects(DataPortalAssets[DataPortalProject]):
234    """Collection of DataPortalProject objects"""
235    asset_name = "project"

Collection of DataPortalProject objects

asset_name = 'project'