cirro.models.process

1from cirro.models.process.process import PipelineDefinition, ConfigAppStatus, get_input_params, CONFIG_APP_URL
2
3__all__ = [
4    'PipelineDefinition',
5    'ConfigAppStatus',
6    'get_input_params',
7    'CONFIG_APP_URL'
8]
class PipelineDefinition:
 23class PipelineDefinition:
 24    """
 25    A pipeline definition on disk.
 26    """
 27
 28    def __init__(self, root_dir: str, entrypoint: Optional[str] = None, logger: Optional[logging.Logger] = None):
 29        self.root_dir: str = path.expanduser(path.expandvars(root_dir))
 30        self.entrypoint: Optional[str] = entrypoint
 31        self.config_app_status: ConfigAppStatus = ConfigAppStatus.RECOMMENDED
 32
 33        if logger:
 34            self.logger: logging.Logger = logger
 35        else:
 36            log_formatter = logging.Formatter(
 37                '%(asctime)s %(levelname)-8s [PipelineDefinition] %(message)s'
 38            )
 39            self.logger = logging.getLogger()
 40            self.logger.setLevel(logging.INFO)
 41            console_handler = logging.StreamHandler()
 42            console_handler.setFormatter(log_formatter)
 43            self.logger.addHandler(console_handler)
 44
 45    @cached_property
 46    def parameter_schema(self) -> Resource:
 47        """
 48        Returns the parameter schema for the pipeline.
 49        """
 50        workflow_files = os.walk(self.root_dir, topdown=True)
 51        for dirpath, dirnames, filenames in workflow_files:
 52            # look for a nextflow_schema.json file at the root of the workflow directory
 53            is_nextflow = (
 54                ('nextflow_schema.json' in filenames)
 55                or ('main.nf' in filenames and 'nextflow.config' in filenames)
 56            )
 57            if is_nextflow:
 58                # lazy load nextflow dependencies
 59                from cirro.models.process.nextflow import get_nextflow_json_schema
 60                contents = get_nextflow_json_schema(self.root_dir, self.logger)
 61
 62                if 'nextflow_schema.json' in filenames:
 63                    self.config_app_status = ConfigAppStatus.OPTIONAL
 64
 65                break
 66            elif any(f.endswith('.wdl') for f in filenames):  # is WDL
 67                # generate schema from WDL workflow
 68                wdl_file = None
 69                if self.entrypoint:
 70                    self.logger.info(f"Using entrypoint WDL file: {self.entrypoint}")
 71                    # if an entrypoint is specified, look for that specific WDL file
 72                    wdl_file = next((f for f in filenames if f.endswith(self.entrypoint)), None)
 73                    if not wdl_file:
 74                        raise FileNotFoundError(f"Entrypoint WDL file '{self.entrypoint}' not found in {dirpath}")
 75                else:
 76                    # otherwise, just take the first WDL file found
 77                    wdl_file = next(f for f in filenames if f.endswith('.wdl'))
 78
 79                # lazy load wdl dependencies
 80                from cirro.models.process.wdl import get_wdl_json_schema
 81
 82                wdl_file = path.join(dirpath, wdl_file)
 83                contents = get_wdl_json_schema(wdl_file, self.logger)
 84                break
 85
 86        else:
 87            raise RuntimeError("Unrecognized workflow format. Please provide a valid Nextflow or WDL workflow.")
 88
 89        schema = Resource.from_contents(contents)
 90        return schema
 91
 92    @property
 93    def form_configuration(self) -> dict[str, Any]:
 94        """
 95        Returns the form configuration for the pipeline.
 96        """
 97        return {
 98            "form": self.parameter_schema.contents,
 99            "ui": {}
100        }
101
102    @property
103    def input_configuration(self) -> dict[str, str]:
104        """
105        Returns the input configuration for the pipeline.
106        """
107        schema = self.parameter_schema
108        contents = schema.contents
109
110        parameters = {
111            p["name"]: p["jsonPath"]
112            for p in get_input_params('$.dataset.params', contents, schema)
113        }
114        return parameters
115
116    def __repr__(self):
117        return f"PipelineDefinition(name={self.root_dir})"

A pipeline definition on disk.

PipelineDefinition( root_dir: str, entrypoint: Optional[str] = None, logger: Optional[logging.Logger] = None)
28    def __init__(self, root_dir: str, entrypoint: Optional[str] = None, logger: Optional[logging.Logger] = None):
29        self.root_dir: str = path.expanduser(path.expandvars(root_dir))
30        self.entrypoint: Optional[str] = entrypoint
31        self.config_app_status: ConfigAppStatus = ConfigAppStatus.RECOMMENDED
32
33        if logger:
34            self.logger: logging.Logger = logger
35        else:
36            log_formatter = logging.Formatter(
37                '%(asctime)s %(levelname)-8s [PipelineDefinition] %(message)s'
38            )
39            self.logger = logging.getLogger()
40            self.logger.setLevel(logging.INFO)
41            console_handler = logging.StreamHandler()
42            console_handler.setFormatter(log_formatter)
43            self.logger.addHandler(console_handler)
root_dir: str
entrypoint: Optional[str]
config_app_status: ConfigAppStatus
parameter_schema: referencing._core.Resource
45    @cached_property
46    def parameter_schema(self) -> Resource:
47        """
48        Returns the parameter schema for the pipeline.
49        """
50        workflow_files = os.walk(self.root_dir, topdown=True)
51        for dirpath, dirnames, filenames in workflow_files:
52            # look for a nextflow_schema.json file at the root of the workflow directory
53            is_nextflow = (
54                ('nextflow_schema.json' in filenames)
55                or ('main.nf' in filenames and 'nextflow.config' in filenames)
56            )
57            if is_nextflow:
58                # lazy load nextflow dependencies
59                from cirro.models.process.nextflow import get_nextflow_json_schema
60                contents = get_nextflow_json_schema(self.root_dir, self.logger)
61
62                if 'nextflow_schema.json' in filenames:
63                    self.config_app_status = ConfigAppStatus.OPTIONAL
64
65                break
66            elif any(f.endswith('.wdl') for f in filenames):  # is WDL
67                # generate schema from WDL workflow
68                wdl_file = None
69                if self.entrypoint:
70                    self.logger.info(f"Using entrypoint WDL file: {self.entrypoint}")
71                    # if an entrypoint is specified, look for that specific WDL file
72                    wdl_file = next((f for f in filenames if f.endswith(self.entrypoint)), None)
73                    if not wdl_file:
74                        raise FileNotFoundError(f"Entrypoint WDL file '{self.entrypoint}' not found in {dirpath}")
75                else:
76                    # otherwise, just take the first WDL file found
77                    wdl_file = next(f for f in filenames if f.endswith('.wdl'))
78
79                # lazy load wdl dependencies
80                from cirro.models.process.wdl import get_wdl_json_schema
81
82                wdl_file = path.join(dirpath, wdl_file)
83                contents = get_wdl_json_schema(wdl_file, self.logger)
84                break
85
86        else:
87            raise RuntimeError("Unrecognized workflow format. Please provide a valid Nextflow or WDL workflow.")
88
89        schema = Resource.from_contents(contents)
90        return schema

Returns the parameter schema for the pipeline.

form_configuration: dict[str, typing.Any]
 92    @property
 93    def form_configuration(self) -> dict[str, Any]:
 94        """
 95        Returns the form configuration for the pipeline.
 96        """
 97        return {
 98            "form": self.parameter_schema.contents,
 99            "ui": {}
100        }

Returns the form configuration for the pipeline.

input_configuration: dict[str, str]
102    @property
103    def input_configuration(self) -> dict[str, str]:
104        """
105        Returns the input configuration for the pipeline.
106        """
107        schema = self.parameter_schema
108        contents = schema.contents
109
110        parameters = {
111            p["name"]: p["jsonPath"]
112            for p in get_input_params('$.dataset.params', contents, schema)
113        }
114        return parameters

Returns the input configuration for the pipeline.

class ConfigAppStatus(enum.Enum):
15class ConfigAppStatus(Enum):
16    """
17    Enum to represent the status of the config app recommendation.
18    """
19    RECOMMENDED = "recommended"
20    OPTIONAL = "optional"

Enum to represent the status of the config app recommendation.

RECOMMENDED = <ConfigAppStatus.RECOMMENDED: 'recommended'>
OPTIONAL = <ConfigAppStatus.OPTIONAL: 'optional'>
def get_input_params( property_path: str, definition: dict[str, typing.Any], schema: referencing._core.Resource) -> Iterable[dict[str, Any]]:
120def get_input_params(property_path: str, definition: dict[str, Any], schema: Resource) -> Iterable[dict[str, Any]]:
121    resolved = definition
122    if '$ref' in definition:
123        registry = schema @ Registry()
124        resolver = registry.resolver()
125
126        resolved = resolver.lookup(f"{schema.id()}{definition['$ref']}").contents
127
128    if resolved.get('type') == 'object':
129        # recursively get input params for nested objects
130        for p, d in resolved.get('properties', {}).items():
131            nested_path = f"{property_path}.{p}"
132            yield from get_input_params(nested_path, d, schema)
133    else:
134        json_path = property_path
135        param_is_path = (
136            (resolved.get('wdlType') and resolved['wdlType'].replace('?', '') in ('File', 'Directory'))
137            or (resolved.get('format') and resolved['format'] in ('file-path', 'directory-path'))
138        )
139        if param_is_path:
140            # override the jsonPath to be '$.inputs[*].dataPath'
141            json_path = "$.inputs[*].dataPath"
142
143        yield {
144            'name': property_path.split('.')[-1],
145            'type': resolved.get('type', 'string'),
146            'default': resolved.get('default', None),
147            'jsonPath': json_path,
148        }
CONFIG_APP_URL = 'https://app.cirro.bio/tools/pipeline-configurator'