cirro.models.process
class
PipelineDefinition:
23class PipelineDefinition: 24 """ 25 A pipeline definition on disk. 26 """ 27 28 def __init__(self, root_dir: str, entrypoint: Optional[str] = None, logger: Optional[logging.Logger] = None): 29 self.root_dir: str = path.expanduser(path.expandvars(root_dir)) 30 self.entrypoint: Optional[str] = entrypoint 31 self.config_app_status: ConfigAppStatus = ConfigAppStatus.RECOMMENDED 32 33 if logger: 34 self.logger: logging.Logger = logger 35 else: 36 log_formatter = logging.Formatter( 37 '%(asctime)s %(levelname)-8s [PipelineDefinition] %(message)s' 38 ) 39 self.logger = logging.getLogger() 40 self.logger.setLevel(logging.INFO) 41 console_handler = logging.StreamHandler() 42 console_handler.setFormatter(log_formatter) 43 self.logger.addHandler(console_handler) 44 45 @cached_property 46 def parameter_schema(self) -> Resource: 47 """ 48 Returns the parameter schema for the pipeline. 49 """ 50 workflow_files = os.walk(self.root_dir, topdown=True) 51 for dirpath, dirnames, filenames in workflow_files: 52 # look for a nextflow_schema.json file at the root of the workflow directory 53 is_nextflow = ( 54 ('nextflow_schema.json' in filenames) 55 or ('main.nf' in filenames and 'nextflow.config' in filenames) 56 ) 57 if is_nextflow: 58 # lazy load nextflow dependencies 59 from cirro.models.process.nextflow import get_nextflow_json_schema 60 contents = get_nextflow_json_schema(self.root_dir, self.logger) 61 62 if 'nextflow_schema.json' in filenames: 63 self.config_app_status = ConfigAppStatus.OPTIONAL 64 65 break 66 elif any(f.endswith('.wdl') for f in filenames): # is WDL 67 # generate schema from WDL workflow 68 wdl_file = None 69 if self.entrypoint: 70 self.logger.info(f"Using entrypoint WDL file: {self.entrypoint}") 71 # if an entrypoint is specified, look for that specific WDL file 72 wdl_file = next((f for f in filenames if f.endswith(self.entrypoint)), None) 73 if not wdl_file: 74 raise FileNotFoundError(f"Entrypoint WDL file '{self.entrypoint}' not found in {dirpath}") 75 else: 76 # otherwise, just take the first WDL file found 77 wdl_file = next(f for f in filenames if f.endswith('.wdl')) 78 79 # lazy load wdl dependencies 80 from cirro.models.process.wdl import get_wdl_json_schema 81 82 wdl_file = path.join(dirpath, wdl_file) 83 contents = get_wdl_json_schema(wdl_file, self.logger) 84 break 85 86 else: 87 raise RuntimeError("Unrecognized workflow format. Please provide a valid Nextflow or WDL workflow.") 88 89 schema = Resource.from_contents(contents) 90 return schema 91 92 @property 93 def form_configuration(self) -> dict[str, Any]: 94 """ 95 Returns the form configuration for the pipeline. 96 """ 97 return { 98 "form": self.parameter_schema.contents, 99 "ui": {} 100 } 101 102 @property 103 def input_configuration(self) -> dict[str, str]: 104 """ 105 Returns the input configuration for the pipeline. 106 """ 107 schema = self.parameter_schema 108 contents = schema.contents 109 110 parameters = { 111 p["name"]: p["jsonPath"] 112 for p in get_input_params('$.dataset.params', contents, schema) 113 } 114 return parameters 115 116 def __repr__(self): 117 return f"PipelineDefinition(name={self.root_dir})"
A pipeline definition on disk.
PipelineDefinition( root_dir: str, entrypoint: Optional[str] = None, logger: Optional[logging.Logger] = None)
28 def __init__(self, root_dir: str, entrypoint: Optional[str] = None, logger: Optional[logging.Logger] = None): 29 self.root_dir: str = path.expanduser(path.expandvars(root_dir)) 30 self.entrypoint: Optional[str] = entrypoint 31 self.config_app_status: ConfigAppStatus = ConfigAppStatus.RECOMMENDED 32 33 if logger: 34 self.logger: logging.Logger = logger 35 else: 36 log_formatter = logging.Formatter( 37 '%(asctime)s %(levelname)-8s [PipelineDefinition] %(message)s' 38 ) 39 self.logger = logging.getLogger() 40 self.logger.setLevel(logging.INFO) 41 console_handler = logging.StreamHandler() 42 console_handler.setFormatter(log_formatter) 43 self.logger.addHandler(console_handler)
config_app_status: ConfigAppStatus
parameter_schema: referencing._core.Resource
45 @cached_property 46 def parameter_schema(self) -> Resource: 47 """ 48 Returns the parameter schema for the pipeline. 49 """ 50 workflow_files = os.walk(self.root_dir, topdown=True) 51 for dirpath, dirnames, filenames in workflow_files: 52 # look for a nextflow_schema.json file at the root of the workflow directory 53 is_nextflow = ( 54 ('nextflow_schema.json' in filenames) 55 or ('main.nf' in filenames and 'nextflow.config' in filenames) 56 ) 57 if is_nextflow: 58 # lazy load nextflow dependencies 59 from cirro.models.process.nextflow import get_nextflow_json_schema 60 contents = get_nextflow_json_schema(self.root_dir, self.logger) 61 62 if 'nextflow_schema.json' in filenames: 63 self.config_app_status = ConfigAppStatus.OPTIONAL 64 65 break 66 elif any(f.endswith('.wdl') for f in filenames): # is WDL 67 # generate schema from WDL workflow 68 wdl_file = None 69 if self.entrypoint: 70 self.logger.info(f"Using entrypoint WDL file: {self.entrypoint}") 71 # if an entrypoint is specified, look for that specific WDL file 72 wdl_file = next((f for f in filenames if f.endswith(self.entrypoint)), None) 73 if not wdl_file: 74 raise FileNotFoundError(f"Entrypoint WDL file '{self.entrypoint}' not found in {dirpath}") 75 else: 76 # otherwise, just take the first WDL file found 77 wdl_file = next(f for f in filenames if f.endswith('.wdl')) 78 79 # lazy load wdl dependencies 80 from cirro.models.process.wdl import get_wdl_json_schema 81 82 wdl_file = path.join(dirpath, wdl_file) 83 contents = get_wdl_json_schema(wdl_file, self.logger) 84 break 85 86 else: 87 raise RuntimeError("Unrecognized workflow format. Please provide a valid Nextflow or WDL workflow.") 88 89 schema = Resource.from_contents(contents) 90 return schema
Returns the parameter schema for the pipeline.
form_configuration: dict[str, typing.Any]
92 @property 93 def form_configuration(self) -> dict[str, Any]: 94 """ 95 Returns the form configuration for the pipeline. 96 """ 97 return { 98 "form": self.parameter_schema.contents, 99 "ui": {} 100 }
Returns the form configuration for the pipeline.
input_configuration: dict[str, str]
102 @property 103 def input_configuration(self) -> dict[str, str]: 104 """ 105 Returns the input configuration for the pipeline. 106 """ 107 schema = self.parameter_schema 108 contents = schema.contents 109 110 parameters = { 111 p["name"]: p["jsonPath"] 112 for p in get_input_params('$.dataset.params', contents, schema) 113 } 114 return parameters
Returns the input configuration for the pipeline.
class
ConfigAppStatus(enum.Enum):
15class ConfigAppStatus(Enum): 16 """ 17 Enum to represent the status of the config app recommendation. 18 """ 19 RECOMMENDED = "recommended" 20 OPTIONAL = "optional"
Enum to represent the status of the config app recommendation.
RECOMMENDED =
<ConfigAppStatus.RECOMMENDED: 'recommended'>
OPTIONAL =
<ConfigAppStatus.OPTIONAL: 'optional'>
def
get_input_params( property_path: str, definition: dict[str, typing.Any], schema: referencing._core.Resource) -> Iterable[dict[str, Any]]:
120def get_input_params(property_path: str, definition: dict[str, Any], schema: Resource) -> Iterable[dict[str, Any]]: 121 resolved = definition 122 if '$ref' in definition: 123 registry = schema @ Registry() 124 resolver = registry.resolver() 125 126 resolved = resolver.lookup(f"{schema.id()}{definition['$ref']}").contents 127 128 if resolved.get('type') == 'object': 129 # recursively get input params for nested objects 130 for p, d in resolved.get('properties', {}).items(): 131 nested_path = f"{property_path}.{p}" 132 yield from get_input_params(nested_path, d, schema) 133 else: 134 json_path = property_path 135 param_is_path = ( 136 (resolved.get('wdlType') and resolved['wdlType'].replace('?', '') in ('File', 'Directory')) 137 or (resolved.get('format') and resolved['format'] in ('file-path', 'directory-path')) 138 ) 139 if param_is_path: 140 # override the jsonPath to be '$.inputs[*].dataPath' 141 json_path = "$.inputs[*].dataPath" 142 143 yield { 144 'name': property_path.split('.')[-1], 145 'type': resolved.get('type', 'string'), 146 'default': resolved.get('default', None), 147 'jsonPath': json_path, 148 }
CONFIG_APP_URL =
'https://app.cirro.bio/tools/pipeline-configurator'