cirro.sdk.process

 1from typing import List
 2
 3from cirro_api_client.v1.models import Process, Executor
 4
 5from cirro.cirro_client import CirroApi
 6from cirro.models.form_specification import ParameterSpecification
 7from cirro.sdk.asset import DataPortalAssets, DataPortalAsset
 8
 9
10class DataPortalProcess(DataPortalAsset):
11    """Helper functions for interacting with analysis processes."""
12    _data: Process
13
14    def __init__(self, process: Process, client: CirroApi):
15        """
16        Instantiate with helper method
17
18        ```python
19        from cirro import DataPortal()
20        portal = DataPortal()
21        process = portal.get_process_by_name("Process Name")
22        ```
23        """
24        self._data = process
25        self._client = client
26
27    @property
28    def id(self) -> str:
29        """Unique identifier"""
30        return self._data.id
31
32    @property
33    def name(self) -> str:
34        """Readable name"""
35        return self._data.name
36
37    @property
38    def description(self) -> str:
39        """Longer description of process"""
40        return self._data.description
41
42    @property
43    def child_process_ids(self) -> List[str]:
44        """List of processes which can be run on the output of this process"""
45        return self._data.child_process_ids
46
47    @property
48    def executor(self) -> Executor:
49        """INGEST, CROMWELL, or NEXTFLOW"""
50        return self._data.executor
51
52    @property
53    def documentation_url(self) -> str:
54        """Documentation URL"""
55        return self._data.documentation_url
56
57    @property
58    def file_requirements_message(self) -> str:
59        """Description of files required for INGEST processes"""
60        return self._data.file_requirements_message
61
62    def __str__(self):
63        return '\n'.join([
64            f"{i.title()}: {self.__getattribute__(i)}"
65            for i in ['name', 'id', 'description']
66        ])
67
68    def get_parameter_spec(self) -> ParameterSpecification:
69        """
70        Gets a specification used to describe the parameters used in the process.
71        """
72        return self._client.processes.get_parameter_spec(self.id)
73
74
75class DataPortalProcesses(DataPortalAssets[DataPortalProcess]):
76    """Collection of DataPortalProcess objects."""
77    asset_name = "process"
class DataPortalProcess(cirro.sdk.asset.DataPortalAsset):
11class DataPortalProcess(DataPortalAsset):
12    """Helper functions for interacting with analysis processes."""
13    _data: Process
14
15    def __init__(self, process: Process, client: CirroApi):
16        """
17        Instantiate with helper method
18
19        ```python
20        from cirro import DataPortal()
21        portal = DataPortal()
22        process = portal.get_process_by_name("Process Name")
23        ```
24        """
25        self._data = process
26        self._client = client
27
28    @property
29    def id(self) -> str:
30        """Unique identifier"""
31        return self._data.id
32
33    @property
34    def name(self) -> str:
35        """Readable name"""
36        return self._data.name
37
38    @property
39    def description(self) -> str:
40        """Longer description of process"""
41        return self._data.description
42
43    @property
44    def child_process_ids(self) -> List[str]:
45        """List of processes which can be run on the output of this process"""
46        return self._data.child_process_ids
47
48    @property
49    def executor(self) -> Executor:
50        """INGEST, CROMWELL, or NEXTFLOW"""
51        return self._data.executor
52
53    @property
54    def documentation_url(self) -> str:
55        """Documentation URL"""
56        return self._data.documentation_url
57
58    @property
59    def file_requirements_message(self) -> str:
60        """Description of files required for INGEST processes"""
61        return self._data.file_requirements_message
62
63    def __str__(self):
64        return '\n'.join([
65            f"{i.title()}: {self.__getattribute__(i)}"
66            for i in ['name', 'id', 'description']
67        ])
68
69    def get_parameter_spec(self) -> ParameterSpecification:
70        """
71        Gets a specification used to describe the parameters used in the process.
72        """
73        return self._client.processes.get_parameter_spec(self.id)

Helper functions for interacting with analysis processes.

DataPortalProcess( process: cirro_api_client.v1.models.Process, client: cirro.CirroApi)
15    def __init__(self, process: Process, client: CirroApi):
16        """
17        Instantiate with helper method
18
19        ```python
20        from cirro import DataPortal()
21        portal = DataPortal()
22        process = portal.get_process_by_name("Process Name")
23        ```
24        """
25        self._data = process
26        self._client = client

Instantiate with helper method

from cirro import DataPortal()
portal = DataPortal()
process = portal.get_process_by_name("Process Name")
id: str
28    @property
29    def id(self) -> str:
30        """Unique identifier"""
31        return self._data.id

Unique identifier

name: str
33    @property
34    def name(self) -> str:
35        """Readable name"""
36        return self._data.name

Readable name

description: str
38    @property
39    def description(self) -> str:
40        """Longer description of process"""
41        return self._data.description

Longer description of process

child_process_ids: List[str]
43    @property
44    def child_process_ids(self) -> List[str]:
45        """List of processes which can be run on the output of this process"""
46        return self._data.child_process_ids

List of processes which can be run on the output of this process

executor: cirro_api_client.v1.models.Executor
48    @property
49    def executor(self) -> Executor:
50        """INGEST, CROMWELL, or NEXTFLOW"""
51        return self._data.executor

INGEST, CROMWELL, or NEXTFLOW

documentation_url: str
53    @property
54    def documentation_url(self) -> str:
55        """Documentation URL"""
56        return self._data.documentation_url

Documentation URL

file_requirements_message: str
58    @property
59    def file_requirements_message(self) -> str:
60        """Description of files required for INGEST processes"""
61        return self._data.file_requirements_message

Description of files required for INGEST processes

def get_parameter_spec(self) -> cirro.models.form_specification.ParameterSpecification:
69    def get_parameter_spec(self) -> ParameterSpecification:
70        """
71        Gets a specification used to describe the parameters used in the process.
72        """
73        return self._client.processes.get_parameter_spec(self.id)

Gets a specification used to describe the parameters used in the process.

76class DataPortalProcesses(DataPortalAssets[DataPortalProcess]):
77    """Collection of DataPortalProcess objects."""
78    asset_name = "process"

Collection of DataPortalProcess objects.

asset_name = 'process'
Inherited Members
cirro.sdk.asset.DataPortalAssets
DataPortalAssets
description
get_by_name
get_by_id
filter_by_pattern
builtins.list
clear
copy
append
insert
extend
pop
remove
index
count
reverse
sort