cirro.sdk.dataset

  1import datetime
  2from pathlib import Path
  3from typing import Union, List, Optional
  4
  5from cirro_api_client.v1.api.processes import validate_file_requirements
  6from cirro_api_client.v1.models import Dataset, DatasetDetail, RunAnalysisRequest, ProcessDetail, Status, \
  7    RunAnalysisRequestParams, Tag, ArtifactType, NamedItem, Executor, ValidateFileRequirementsRequest
  8
  9from cirro.cirro_client import CirroApi
 10from cirro.models.assets import DatasetAssets
 11from cirro.models.file import PathLike
 12from cirro.sdk.asset import DataPortalAssets, DataPortalAsset
 13from cirro.sdk.exceptions import DataPortalAssetNotFound
 14from cirro.sdk.exceptions import DataPortalInputError
 15from cirro.sdk.file import DataPortalFile, DataPortalFiles
 16from cirro.sdk.helpers import parse_process_name_or_id
 17from cirro.sdk.process import DataPortalProcess
 18
 19
 20class DataPortalDataset(DataPortalAsset):
 21    """
 22    Datasets in the Data Portal are collections of files which have
 23    either been uploaded directly, or which have been output by
 24    an analysis pipeline or notebook.
 25    """
 26
 27    def __init__(self, dataset: Union[Dataset, DatasetDetail], client: CirroApi):
 28        """
 29        Instantiate a dataset object
 30
 31        Should be invoked from a top-level constructor, for example:
 32
 33        ```python
 34        from cirro import DataPortal()
 35        portal = DataPortal()
 36        dataset = portal.get_dataset(
 37            project="id-or-name-of-project",
 38            dataset="id-or-name-of-dataset"
 39        )
 40        ```
 41
 42        """
 43        assert dataset.project_id is not None, "Must provide dataset with project_id attribute"
 44        self._data = dataset
 45        self._assets: Optional[DatasetAssets] = None
 46        self._client = client
 47
 48    @property
 49    def id(self) -> str:
 50        """Unique identifier for the dataset"""
 51        return self._data.id
 52
 53    @property
 54    def name(self) -> str:
 55        """Editable name for the dataset"""
 56        return self._data.name
 57
 58    @property
 59    def description(self) -> str:
 60        """Longer name for the dataset"""
 61        return self._data.description
 62
 63    @property
 64    def process_id(self) -> str:
 65        """Unique ID of process used to create the dataset"""
 66        return self._data.process_id
 67
 68    @property
 69    def process(self) -> ProcessDetail:
 70        """
 71        Object representing the process used to create the dataset
 72        """
 73        return self._client.processes.get(self.process_id)
 74
 75    @property
 76    def project_id(self) -> str:
 77        """ID of the project containing the dataset"""
 78        return self._data.project_id
 79
 80    @property
 81    def status(self) -> Status:
 82        """
 83        Status of the dataset
 84        """
 85        return self._data.status
 86
 87    @property
 88    def source_dataset_ids(self) -> List[str]:
 89        """IDs of the datasets used as sources for this dataset (if any)"""
 90        return self._data.source_dataset_ids
 91
 92    @property
 93    def source_datasets(self) -> List['DataPortalDataset']:
 94        """
 95        Objects representing the datasets used as sources for this dataset (if any)
 96        """
 97        return [
 98            DataPortalDataset(
 99                dataset=self._client.datasets.get(project_id=self.project_id, dataset_id=dataset_id),
100                client=self._client
101            )
102            for dataset_id in self.source_dataset_ids
103        ]
104
105    @property
106    def params(self) -> dict:
107        """
108        Parameters used to generate the dataset
109        """
110        return self._get_detail().params.to_dict()
111
112    @property
113    def info(self) -> dict:
114        """
115        Extra information about the dataset
116        """
117        return self._get_detail().info.to_dict()
118
119    @property
120    def tags(self) -> List[Tag]:
121        """
122        Tags applied to the dataset
123        """
124        return self._data.tags
125
126    @property
127    def share(self) -> Optional[NamedItem]:
128        """
129        Share associated with the dataset, if any.
130        """
131        return self._get_detail().share
132
133    @property
134    def created_by(self) -> str:
135        """User who created the dataset"""
136        return self._data.created_by
137
138    @property
139    def created_at(self) -> datetime.datetime:
140        """Timestamp of dataset creation"""
141        return self._data.created_at
142
143    def _get_detail(self):
144        if not isinstance(self._data, DatasetDetail):
145            self._data = self._client.datasets.get(project_id=self.project_id, dataset_id=self.id)
146        return self._data
147
148    def _get_assets(self):
149        if not self._assets:
150            self._assets = self._client.datasets.get_assets_listing(
151                project_id=self.project_id,
152                dataset_id=self.id
153            )
154        return self._assets
155
156    def __str__(self):
157        return '\n'.join([
158            f"{i.title()}: {self.__getattribute__(i)}"
159            for i in ['name', 'id', 'description', 'status']
160        ])
161
162    def get_file(self, relative_path: str) -> DataPortalFile:
163        """
164        Get a file from the dataset using its relative path.
165
166        Args:
167            relative_path (str): Relative path of file within the dataset
168
169        Returns:
170            `from cirro.sdk.file import DataPortalFile`
171        """
172
173        # Get the list of files in this dataset
174        files = self.list_files()
175
176        # Try getting the file using the relative path provided by the user
177        try:
178            return files.get_by_id(relative_path)
179        except DataPortalAssetNotFound:
180            # Try getting the file with the 'data/' prefix prepended
181            try:
182                return files.get_by_id("data/" + relative_path)
183            except DataPortalAssetNotFound:
184                # If not found, raise the exception using the string provided
185                # by the user, not the data/ prepended version (which may be
186                # confusing to the user)
187                msg = '\n'.join([f"No file found with path '{relative_path}'."])
188                raise DataPortalAssetNotFound(msg)
189
190    def list_files(self) -> DataPortalFiles:
191        """
192        Return the list of files which make up the dataset.
193        """
194        files = self._get_assets().files
195        return DataPortalFiles(
196            [
197                DataPortalFile(file=file, client=self._client)
198                for file in files
199            ]
200        )
201
202    def get_artifact(self, artifact_type: ArtifactType) -> DataPortalFile:
203        """
204        Get the artifact of a particular type from the dataset
205        """
206        artifacts = self._get_assets().artifacts
207        artifact = next((a for a in artifacts if a.artifact_type == artifact_type), None)
208        if artifact is None:
209            raise DataPortalAssetNotFound(f"No artifact found with type '{artifact_type}'")
210        return DataPortalFile(file=artifact.file, client=self._client)
211
212    def list_artifacts(self) -> List[DataPortalFile]:
213        """
214        Return the list of artifacts associated with the dataset
215
216        An artifact may be something generated as part of the analysis or other process.
217        See `cirro_api_client.v1.models.ArtifactType` for the list of possible artifact types.
218
219        """
220        artifacts = self._get_assets().artifacts
221        return DataPortalFiles(
222            [
223                DataPortalFile(file=artifact.file, client=self._client)
224                for artifact in artifacts
225            ]
226        )
227
228    def download_files(self, download_location: str = None) -> None:
229        """
230        Download all the files from the dataset to a local directory.
231
232        Args:
233            download_location (str): Path to local directory
234        """
235
236        # Alias for internal method
237        self.list_files().download(download_location)
238
239    def run_analysis(
240            self,
241            name: str = None,
242            description: str = "",
243            process: Union[DataPortalProcess, str] = None,
244            params=None,
245            notifications_emails: List[str] = None,
246            compute_environment: str = None,
247            resume_dataset_id: str = None
248    ) -> str:
249        """
250        Runs an analysis on a dataset, returns the ID of the newly created dataset.
251
252        The process can be provided as either a DataPortalProcess object,
253        or a string which corresponds to the name or ID of the process.
254
255        Args:
256            name (str): Name of newly created dataset
257            description (str): Description of newly created dataset
258            process (DataPortalProcess or str): Process to run
259            params (dict): Analysis parameters
260            notifications_emails (List[str]): Notification email address(es)
261            compute_environment (str): Name or ID of compute environment to use,
262             if blank it will run in AWS
263            resume_dataset_id (str): ID of dataset to resume from, used for caching task execution.
264             It will attempt to re-use the previous output to minimize duplicate work
265
266        Returns:
267            dataset_id (str): ID of newly created dataset
268        """
269        if name is None:
270            raise DataPortalInputError("Must specify 'name' for run_analysis")
271        if process is None:
272            raise DataPortalInputError("Must specify 'process' for run_analysis")
273        if notifications_emails is None:
274            notifications_emails = []
275        if params is None:
276            params = {}
277
278        # If the process is a string, try to parse it as a process name or ID
279        process = parse_process_name_or_id(process, self._client)
280
281        if compute_environment:
282            compute_environments = self._client.compute_environments.list_environments_for_project(
283                project_id=self.project_id
284            )
285            compute_environment = next(
286                (env for env in compute_environments
287                 if env.name == compute_environment or env.id == compute_environment),
288                None
289            )
290            if compute_environment is None:
291                raise DataPortalInputError(f"Compute environment '{compute_environment}' not found")
292
293        resp = self._client.execution.run_analysis(
294            project_id=self.project_id,
295            request=RunAnalysisRequest(
296                name=name,
297                description=description,
298                process_id=process.id,
299                source_dataset_ids=[self.id],
300                params=RunAnalysisRequestParams.from_dict(params),
301                notification_emails=notifications_emails,
302                resume_dataset_id=resume_dataset_id,
303                compute_environment_id=compute_environment.id if compute_environment else None
304            )
305        )
306        return resp.id
307
308    def update_samplesheet(self,
309                           contents: str = None,
310                           file_path: PathLike = None):
311        """
312        Updates the samplesheet metadata of a dataset.
313        Provide either the contents (as a string) or a file path.
314        Both must be in the format of a CSV.
315
316        Args:
317            contents (str): Samplesheet contents to update (should be a CSV string)
318            file_path (PathLike): Path of file to update (should be a CSV file)
319
320        Example:
321        ```python
322        dataset.update_samplesheet(
323            file_path=Path('~/samplesheet.csv')
324        )
325        ```
326        """
327
328        if contents is None and file_path is None:
329            raise DataPortalInputError("Must specify either 'contents' or 'file_path' when updating samplesheet")
330
331        if self.process.executor != Executor.INGEST:
332            raise DataPortalInputError("Cannot update a samplesheet on a non-ingest dataset")
333
334        samplesheet_contents = contents
335        if file_path is not None:
336            samplesheet_contents = Path(file_path).expanduser().read_text()
337
338        # Validate samplesheet
339        file_names = [f.file_name for f in self.list_files()]
340        request = ValidateFileRequirementsRequest(
341            file_names=file_names,
342            sample_sheet=samplesheet_contents,
343        )
344        requirements = validate_file_requirements.sync(process_id=self.process_id,
345                                                       body=request,
346                                                       client=self._client.api_client)
347        if error_msg := requirements.error_msg:
348            raise DataPortalInputError(error_msg)
349
350        # Update the samplesheet if everything looks ok
351        self._client.datasets.update_samplesheet(
352            project_id=self.project_id,
353            dataset_id=self.id,
354            samplesheet=samplesheet_contents
355        )
356
357
358class DataPortalDatasets(DataPortalAssets[DataPortalDataset]):
359    """Collection of multiple DataPortalDataset objects."""
360    asset_name = "dataset"
class DataPortalDataset(cirro.sdk.asset.DataPortalAsset):
 21class DataPortalDataset(DataPortalAsset):
 22    """
 23    Datasets in the Data Portal are collections of files which have
 24    either been uploaded directly, or which have been output by
 25    an analysis pipeline or notebook.
 26    """
 27
 28    def __init__(self, dataset: Union[Dataset, DatasetDetail], client: CirroApi):
 29        """
 30        Instantiate a dataset object
 31
 32        Should be invoked from a top-level constructor, for example:
 33
 34        ```python
 35        from cirro import DataPortal()
 36        portal = DataPortal()
 37        dataset = portal.get_dataset(
 38            project="id-or-name-of-project",
 39            dataset="id-or-name-of-dataset"
 40        )
 41        ```
 42
 43        """
 44        assert dataset.project_id is not None, "Must provide dataset with project_id attribute"
 45        self._data = dataset
 46        self._assets: Optional[DatasetAssets] = None
 47        self._client = client
 48
 49    @property
 50    def id(self) -> str:
 51        """Unique identifier for the dataset"""
 52        return self._data.id
 53
 54    @property
 55    def name(self) -> str:
 56        """Editable name for the dataset"""
 57        return self._data.name
 58
 59    @property
 60    def description(self) -> str:
 61        """Longer name for the dataset"""
 62        return self._data.description
 63
 64    @property
 65    def process_id(self) -> str:
 66        """Unique ID of process used to create the dataset"""
 67        return self._data.process_id
 68
 69    @property
 70    def process(self) -> ProcessDetail:
 71        """
 72        Object representing the process used to create the dataset
 73        """
 74        return self._client.processes.get(self.process_id)
 75
 76    @property
 77    def project_id(self) -> str:
 78        """ID of the project containing the dataset"""
 79        return self._data.project_id
 80
 81    @property
 82    def status(self) -> Status:
 83        """
 84        Status of the dataset
 85        """
 86        return self._data.status
 87
 88    @property
 89    def source_dataset_ids(self) -> List[str]:
 90        """IDs of the datasets used as sources for this dataset (if any)"""
 91        return self._data.source_dataset_ids
 92
 93    @property
 94    def source_datasets(self) -> List['DataPortalDataset']:
 95        """
 96        Objects representing the datasets used as sources for this dataset (if any)
 97        """
 98        return [
 99            DataPortalDataset(
100                dataset=self._client.datasets.get(project_id=self.project_id, dataset_id=dataset_id),
101                client=self._client
102            )
103            for dataset_id in self.source_dataset_ids
104        ]
105
106    @property
107    def params(self) -> dict:
108        """
109        Parameters used to generate the dataset
110        """
111        return self._get_detail().params.to_dict()
112
113    @property
114    def info(self) -> dict:
115        """
116        Extra information about the dataset
117        """
118        return self._get_detail().info.to_dict()
119
120    @property
121    def tags(self) -> List[Tag]:
122        """
123        Tags applied to the dataset
124        """
125        return self._data.tags
126
127    @property
128    def share(self) -> Optional[NamedItem]:
129        """
130        Share associated with the dataset, if any.
131        """
132        return self._get_detail().share
133
134    @property
135    def created_by(self) -> str:
136        """User who created the dataset"""
137        return self._data.created_by
138
139    @property
140    def created_at(self) -> datetime.datetime:
141        """Timestamp of dataset creation"""
142        return self._data.created_at
143
144    def _get_detail(self):
145        if not isinstance(self._data, DatasetDetail):
146            self._data = self._client.datasets.get(project_id=self.project_id, dataset_id=self.id)
147        return self._data
148
149    def _get_assets(self):
150        if not self._assets:
151            self._assets = self._client.datasets.get_assets_listing(
152                project_id=self.project_id,
153                dataset_id=self.id
154            )
155        return self._assets
156
157    def __str__(self):
158        return '\n'.join([
159            f"{i.title()}: {self.__getattribute__(i)}"
160            for i in ['name', 'id', 'description', 'status']
161        ])
162
163    def get_file(self, relative_path: str) -> DataPortalFile:
164        """
165        Get a file from the dataset using its relative path.
166
167        Args:
168            relative_path (str): Relative path of file within the dataset
169
170        Returns:
171            `from cirro.sdk.file import DataPortalFile`
172        """
173
174        # Get the list of files in this dataset
175        files = self.list_files()
176
177        # Try getting the file using the relative path provided by the user
178        try:
179            return files.get_by_id(relative_path)
180        except DataPortalAssetNotFound:
181            # Try getting the file with the 'data/' prefix prepended
182            try:
183                return files.get_by_id("data/" + relative_path)
184            except DataPortalAssetNotFound:
185                # If not found, raise the exception using the string provided
186                # by the user, not the data/ prepended version (which may be
187                # confusing to the user)
188                msg = '\n'.join([f"No file found with path '{relative_path}'."])
189                raise DataPortalAssetNotFound(msg)
190
191    def list_files(self) -> DataPortalFiles:
192        """
193        Return the list of files which make up the dataset.
194        """
195        files = self._get_assets().files
196        return DataPortalFiles(
197            [
198                DataPortalFile(file=file, client=self._client)
199                for file in files
200            ]
201        )
202
203    def get_artifact(self, artifact_type: ArtifactType) -> DataPortalFile:
204        """
205        Get the artifact of a particular type from the dataset
206        """
207        artifacts = self._get_assets().artifacts
208        artifact = next((a for a in artifacts if a.artifact_type == artifact_type), None)
209        if artifact is None:
210            raise DataPortalAssetNotFound(f"No artifact found with type '{artifact_type}'")
211        return DataPortalFile(file=artifact.file, client=self._client)
212
213    def list_artifacts(self) -> List[DataPortalFile]:
214        """
215        Return the list of artifacts associated with the dataset
216
217        An artifact may be something generated as part of the analysis or other process.
218        See `cirro_api_client.v1.models.ArtifactType` for the list of possible artifact types.
219
220        """
221        artifacts = self._get_assets().artifacts
222        return DataPortalFiles(
223            [
224                DataPortalFile(file=artifact.file, client=self._client)
225                for artifact in artifacts
226            ]
227        )
228
229    def download_files(self, download_location: str = None) -> None:
230        """
231        Download all the files from the dataset to a local directory.
232
233        Args:
234            download_location (str): Path to local directory
235        """
236
237        # Alias for internal method
238        self.list_files().download(download_location)
239
240    def run_analysis(
241            self,
242            name: str = None,
243            description: str = "",
244            process: Union[DataPortalProcess, str] = None,
245            params=None,
246            notifications_emails: List[str] = None,
247            compute_environment: str = None,
248            resume_dataset_id: str = None
249    ) -> str:
250        """
251        Runs an analysis on a dataset, returns the ID of the newly created dataset.
252
253        The process can be provided as either a DataPortalProcess object,
254        or a string which corresponds to the name or ID of the process.
255
256        Args:
257            name (str): Name of newly created dataset
258            description (str): Description of newly created dataset
259            process (DataPortalProcess or str): Process to run
260            params (dict): Analysis parameters
261            notifications_emails (List[str]): Notification email address(es)
262            compute_environment (str): Name or ID of compute environment to use,
263             if blank it will run in AWS
264            resume_dataset_id (str): ID of dataset to resume from, used for caching task execution.
265             It will attempt to re-use the previous output to minimize duplicate work
266
267        Returns:
268            dataset_id (str): ID of newly created dataset
269        """
270        if name is None:
271            raise DataPortalInputError("Must specify 'name' for run_analysis")
272        if process is None:
273            raise DataPortalInputError("Must specify 'process' for run_analysis")
274        if notifications_emails is None:
275            notifications_emails = []
276        if params is None:
277            params = {}
278
279        # If the process is a string, try to parse it as a process name or ID
280        process = parse_process_name_or_id(process, self._client)
281
282        if compute_environment:
283            compute_environments = self._client.compute_environments.list_environments_for_project(
284                project_id=self.project_id
285            )
286            compute_environment = next(
287                (env for env in compute_environments
288                 if env.name == compute_environment or env.id == compute_environment),
289                None
290            )
291            if compute_environment is None:
292                raise DataPortalInputError(f"Compute environment '{compute_environment}' not found")
293
294        resp = self._client.execution.run_analysis(
295            project_id=self.project_id,
296            request=RunAnalysisRequest(
297                name=name,
298                description=description,
299                process_id=process.id,
300                source_dataset_ids=[self.id],
301                params=RunAnalysisRequestParams.from_dict(params),
302                notification_emails=notifications_emails,
303                resume_dataset_id=resume_dataset_id,
304                compute_environment_id=compute_environment.id if compute_environment else None
305            )
306        )
307        return resp.id
308
309    def update_samplesheet(self,
310                           contents: str = None,
311                           file_path: PathLike = None):
312        """
313        Updates the samplesheet metadata of a dataset.
314        Provide either the contents (as a string) or a file path.
315        Both must be in the format of a CSV.
316
317        Args:
318            contents (str): Samplesheet contents to update (should be a CSV string)
319            file_path (PathLike): Path of file to update (should be a CSV file)
320
321        Example:
322        ```python
323        dataset.update_samplesheet(
324            file_path=Path('~/samplesheet.csv')
325        )
326        ```
327        """
328
329        if contents is None and file_path is None:
330            raise DataPortalInputError("Must specify either 'contents' or 'file_path' when updating samplesheet")
331
332        if self.process.executor != Executor.INGEST:
333            raise DataPortalInputError("Cannot update a samplesheet on a non-ingest dataset")
334
335        samplesheet_contents = contents
336        if file_path is not None:
337            samplesheet_contents = Path(file_path).expanduser().read_text()
338
339        # Validate samplesheet
340        file_names = [f.file_name for f in self.list_files()]
341        request = ValidateFileRequirementsRequest(
342            file_names=file_names,
343            sample_sheet=samplesheet_contents,
344        )
345        requirements = validate_file_requirements.sync(process_id=self.process_id,
346                                                       body=request,
347                                                       client=self._client.api_client)
348        if error_msg := requirements.error_msg:
349            raise DataPortalInputError(error_msg)
350
351        # Update the samplesheet if everything looks ok
352        self._client.datasets.update_samplesheet(
353            project_id=self.project_id,
354            dataset_id=self.id,
355            samplesheet=samplesheet_contents
356        )

Datasets in the Data Portal are collections of files which have either been uploaded directly, or which have been output by an analysis pipeline or notebook.

DataPortalDataset( dataset: Union[cirro_api_client.v1.models.Dataset, cirro_api_client.v1.models.DatasetDetail], client: cirro.CirroApi)
28    def __init__(self, dataset: Union[Dataset, DatasetDetail], client: CirroApi):
29        """
30        Instantiate a dataset object
31
32        Should be invoked from a top-level constructor, for example:
33
34        ```python
35        from cirro import DataPortal()
36        portal = DataPortal()
37        dataset = portal.get_dataset(
38            project="id-or-name-of-project",
39            dataset="id-or-name-of-dataset"
40        )
41        ```
42
43        """
44        assert dataset.project_id is not None, "Must provide dataset with project_id attribute"
45        self._data = dataset
46        self._assets: Optional[DatasetAssets] = None
47        self._client = client

Instantiate a dataset object

Should be invoked from a top-level constructor, for example:

from cirro import DataPortal()
portal = DataPortal()
dataset = portal.get_dataset(
    project="id-or-name-of-project",
    dataset="id-or-name-of-dataset"
)
id: str
49    @property
50    def id(self) -> str:
51        """Unique identifier for the dataset"""
52        return self._data.id

Unique identifier for the dataset

name: str
54    @property
55    def name(self) -> str:
56        """Editable name for the dataset"""
57        return self._data.name

Editable name for the dataset

description: str
59    @property
60    def description(self) -> str:
61        """Longer name for the dataset"""
62        return self._data.description

Longer name for the dataset

process_id: str
64    @property
65    def process_id(self) -> str:
66        """Unique ID of process used to create the dataset"""
67        return self._data.process_id

Unique ID of process used to create the dataset

69    @property
70    def process(self) -> ProcessDetail:
71        """
72        Object representing the process used to create the dataset
73        """
74        return self._client.processes.get(self.process_id)

Object representing the process used to create the dataset

project_id: str
76    @property
77    def project_id(self) -> str:
78        """ID of the project containing the dataset"""
79        return self._data.project_id

ID of the project containing the dataset

status: cirro_api_client.v1.models.Status
81    @property
82    def status(self) -> Status:
83        """
84        Status of the dataset
85        """
86        return self._data.status

Status of the dataset

source_dataset_ids: List[str]
88    @property
89    def source_dataset_ids(self) -> List[str]:
90        """IDs of the datasets used as sources for this dataset (if any)"""
91        return self._data.source_dataset_ids

IDs of the datasets used as sources for this dataset (if any)

source_datasets: List[DataPortalDataset]
 93    @property
 94    def source_datasets(self) -> List['DataPortalDataset']:
 95        """
 96        Objects representing the datasets used as sources for this dataset (if any)
 97        """
 98        return [
 99            DataPortalDataset(
100                dataset=self._client.datasets.get(project_id=self.project_id, dataset_id=dataset_id),
101                client=self._client
102            )
103            for dataset_id in self.source_dataset_ids
104        ]

Objects representing the datasets used as sources for this dataset (if any)

params: dict
106    @property
107    def params(self) -> dict:
108        """
109        Parameters used to generate the dataset
110        """
111        return self._get_detail().params.to_dict()

Parameters used to generate the dataset

info: dict
113    @property
114    def info(self) -> dict:
115        """
116        Extra information about the dataset
117        """
118        return self._get_detail().info.to_dict()

Extra information about the dataset

tags: List[cirro_api_client.v1.models.Tag]
120    @property
121    def tags(self) -> List[Tag]:
122        """
123        Tags applied to the dataset
124        """
125        return self._data.tags

Tags applied to the dataset

share: Optional[cirro_api_client.v1.models.NamedItem]
127    @property
128    def share(self) -> Optional[NamedItem]:
129        """
130        Share associated with the dataset, if any.
131        """
132        return self._get_detail().share

Share associated with the dataset, if any.

created_by: str
134    @property
135    def created_by(self) -> str:
136        """User who created the dataset"""
137        return self._data.created_by

User who created the dataset

created_at: datetime.datetime
139    @property
140    def created_at(self) -> datetime.datetime:
141        """Timestamp of dataset creation"""
142        return self._data.created_at

Timestamp of dataset creation

def get_file(self, relative_path: str) -> cirro.sdk.file.DataPortalFile:
163    def get_file(self, relative_path: str) -> DataPortalFile:
164        """
165        Get a file from the dataset using its relative path.
166
167        Args:
168            relative_path (str): Relative path of file within the dataset
169
170        Returns:
171            `from cirro.sdk.file import DataPortalFile`
172        """
173
174        # Get the list of files in this dataset
175        files = self.list_files()
176
177        # Try getting the file using the relative path provided by the user
178        try:
179            return files.get_by_id(relative_path)
180        except DataPortalAssetNotFound:
181            # Try getting the file with the 'data/' prefix prepended
182            try:
183                return files.get_by_id("data/" + relative_path)
184            except DataPortalAssetNotFound:
185                # If not found, raise the exception using the string provided
186                # by the user, not the data/ prepended version (which may be
187                # confusing to the user)
188                msg = '\n'.join([f"No file found with path '{relative_path}'."])
189                raise DataPortalAssetNotFound(msg)

Get a file from the dataset using its relative path.

Arguments:
  • relative_path (str): Relative path of file within the dataset
Returns:

from cirro.sdk.file import DataPortalFile

def list_files(self) -> cirro.sdk.file.DataPortalFiles:
191    def list_files(self) -> DataPortalFiles:
192        """
193        Return the list of files which make up the dataset.
194        """
195        files = self._get_assets().files
196        return DataPortalFiles(
197            [
198                DataPortalFile(file=file, client=self._client)
199                for file in files
200            ]
201        )

Return the list of files which make up the dataset.

def get_artifact( self, artifact_type: cirro_api_client.v1.models.ArtifactType) -> cirro.sdk.file.DataPortalFile:
203    def get_artifact(self, artifact_type: ArtifactType) -> DataPortalFile:
204        """
205        Get the artifact of a particular type from the dataset
206        """
207        artifacts = self._get_assets().artifacts
208        artifact = next((a for a in artifacts if a.artifact_type == artifact_type), None)
209        if artifact is None:
210            raise DataPortalAssetNotFound(f"No artifact found with type '{artifact_type}'")
211        return DataPortalFile(file=artifact.file, client=self._client)

Get the artifact of a particular type from the dataset

def list_artifacts(self) -> List[cirro.sdk.file.DataPortalFile]:
213    def list_artifacts(self) -> List[DataPortalFile]:
214        """
215        Return the list of artifacts associated with the dataset
216
217        An artifact may be something generated as part of the analysis or other process.
218        See `cirro_api_client.v1.models.ArtifactType` for the list of possible artifact types.
219
220        """
221        artifacts = self._get_assets().artifacts
222        return DataPortalFiles(
223            [
224                DataPortalFile(file=artifact.file, client=self._client)
225                for artifact in artifacts
226            ]
227        )

Return the list of artifacts associated with the dataset

An artifact may be something generated as part of the analysis or other process. See cirro_api_client.v1.models.ArtifactType for the list of possible artifact types.

def download_files(self, download_location: str = None) -> None:
229    def download_files(self, download_location: str = None) -> None:
230        """
231        Download all the files from the dataset to a local directory.
232
233        Args:
234            download_location (str): Path to local directory
235        """
236
237        # Alias for internal method
238        self.list_files().download(download_location)

Download all the files from the dataset to a local directory.

Arguments:
  • download_location (str): Path to local directory
def run_analysis( self, name: str = None, description: str = '', process: Union[cirro.DataPortalProcess, str] = None, params=None, notifications_emails: List[str] = None, compute_environment: str = None, resume_dataset_id: str = None) -> str:
240    def run_analysis(
241            self,
242            name: str = None,
243            description: str = "",
244            process: Union[DataPortalProcess, str] = None,
245            params=None,
246            notifications_emails: List[str] = None,
247            compute_environment: str = None,
248            resume_dataset_id: str = None
249    ) -> str:
250        """
251        Runs an analysis on a dataset, returns the ID of the newly created dataset.
252
253        The process can be provided as either a DataPortalProcess object,
254        or a string which corresponds to the name or ID of the process.
255
256        Args:
257            name (str): Name of newly created dataset
258            description (str): Description of newly created dataset
259            process (DataPortalProcess or str): Process to run
260            params (dict): Analysis parameters
261            notifications_emails (List[str]): Notification email address(es)
262            compute_environment (str): Name or ID of compute environment to use,
263             if blank it will run in AWS
264            resume_dataset_id (str): ID of dataset to resume from, used for caching task execution.
265             It will attempt to re-use the previous output to minimize duplicate work
266
267        Returns:
268            dataset_id (str): ID of newly created dataset
269        """
270        if name is None:
271            raise DataPortalInputError("Must specify 'name' for run_analysis")
272        if process is None:
273            raise DataPortalInputError("Must specify 'process' for run_analysis")
274        if notifications_emails is None:
275            notifications_emails = []
276        if params is None:
277            params = {}
278
279        # If the process is a string, try to parse it as a process name or ID
280        process = parse_process_name_or_id(process, self._client)
281
282        if compute_environment:
283            compute_environments = self._client.compute_environments.list_environments_for_project(
284                project_id=self.project_id
285            )
286            compute_environment = next(
287                (env for env in compute_environments
288                 if env.name == compute_environment or env.id == compute_environment),
289                None
290            )
291            if compute_environment is None:
292                raise DataPortalInputError(f"Compute environment '{compute_environment}' not found")
293
294        resp = self._client.execution.run_analysis(
295            project_id=self.project_id,
296            request=RunAnalysisRequest(
297                name=name,
298                description=description,
299                process_id=process.id,
300                source_dataset_ids=[self.id],
301                params=RunAnalysisRequestParams.from_dict(params),
302                notification_emails=notifications_emails,
303                resume_dataset_id=resume_dataset_id,
304                compute_environment_id=compute_environment.id if compute_environment else None
305            )
306        )
307        return resp.id

Runs an analysis on a dataset, returns the ID of the newly created dataset.

The process can be provided as either a DataPortalProcess object, or a string which corresponds to the name or ID of the process.

Arguments:
  • name (str): Name of newly created dataset
  • description (str): Description of newly created dataset
  • process (DataPortalProcess or str): Process to run
  • params (dict): Analysis parameters
  • notifications_emails (List[str]): Notification email address(es)
  • compute_environment (str): Name or ID of compute environment to use, if blank it will run in AWS
  • resume_dataset_id (str): ID of dataset to resume from, used for caching task execution. It will attempt to re-use the previous output to minimize duplicate work
Returns:

dataset_id (str): ID of newly created dataset

def update_samplesheet(self, contents: str = None, file_path: ~PathLike = None):
309    def update_samplesheet(self,
310                           contents: str = None,
311                           file_path: PathLike = None):
312        """
313        Updates the samplesheet metadata of a dataset.
314        Provide either the contents (as a string) or a file path.
315        Both must be in the format of a CSV.
316
317        Args:
318            contents (str): Samplesheet contents to update (should be a CSV string)
319            file_path (PathLike): Path of file to update (should be a CSV file)
320
321        Example:
322        ```python
323        dataset.update_samplesheet(
324            file_path=Path('~/samplesheet.csv')
325        )
326        ```
327        """
328
329        if contents is None and file_path is None:
330            raise DataPortalInputError("Must specify either 'contents' or 'file_path' when updating samplesheet")
331
332        if self.process.executor != Executor.INGEST:
333            raise DataPortalInputError("Cannot update a samplesheet on a non-ingest dataset")
334
335        samplesheet_contents = contents
336        if file_path is not None:
337            samplesheet_contents = Path(file_path).expanduser().read_text()
338
339        # Validate samplesheet
340        file_names = [f.file_name for f in self.list_files()]
341        request = ValidateFileRequirementsRequest(
342            file_names=file_names,
343            sample_sheet=samplesheet_contents,
344        )
345        requirements = validate_file_requirements.sync(process_id=self.process_id,
346                                                       body=request,
347                                                       client=self._client.api_client)
348        if error_msg := requirements.error_msg:
349            raise DataPortalInputError(error_msg)
350
351        # Update the samplesheet if everything looks ok
352        self._client.datasets.update_samplesheet(
353            project_id=self.project_id,
354            dataset_id=self.id,
355            samplesheet=samplesheet_contents
356        )

Updates the samplesheet metadata of a dataset. Provide either the contents (as a string) or a file path. Both must be in the format of a CSV.

Arguments:
  • contents (str): Samplesheet contents to update (should be a CSV string)
  • file_path (PathLike): Path of file to update (should be a CSV file)

Example:

dataset.update_samplesheet(
    file_path=Path('~/samplesheet.csv')
)
359class DataPortalDatasets(DataPortalAssets[DataPortalDataset]):
360    """Collection of multiple DataPortalDataset objects."""
361    asset_name = "dataset"

Collection of multiple DataPortalDataset objects.

asset_name = 'dataset'