cirro.sdk.helpers
1from typing import Union 2 3from cirro_api_client.v1.errors import UnexpectedStatus 4from cirro_api_client.v1.models import ProcessDetail 5 6from cirro.cirro_client import CirroApi 7from cirro.sdk.exceptions import DataPortalInputError 8from cirro.sdk.process import DataPortalProcess 9 10 11def parse_process_name_or_id(process: Union[DataPortalProcess, str], client: CirroApi): 12 """ 13 If the process is a string, try to parse it as a process name or ID. 14 """ 15 16 # If the process object is already a DataPortalProcess object 17 if isinstance(process, DataPortalProcess): 18 return process 19 20 # Otherwise, it should be a string 21 if not isinstance(process, str): 22 raise DataPortalInputError(f"Process name or ID should be a string: '{process}'") 23 24 # Try to get the process by ID 25 try: 26 process_by_id = client.processes.get(process) 27 if isinstance(process_by_id, ProcessDetail): 28 return DataPortalProcess(process_by_id, client) 29 30 # Catch the error if no process is found 31 except UnexpectedStatus: 32 pass 33 34 # If that didn't work, try to parse it as a name 35 try: 36 process_by_name = client.processes.find_by_name(process) 37 if isinstance(process_by_name, ProcessDetail): 38 return DataPortalProcess(process_by_name, client) 39 40 # Catch the error if no process is found 41 except UnexpectedStatus: 42 pass 43 44 # If that didn't work, raise an error indicating that the process couldn't be parsed 45 raise DataPortalInputError(f"Could not parse process name or id: '{process}'")
def
parse_process_name_or_id( process: Union[cirro.DataPortalProcess, str], client: cirro.CirroApi):
12def parse_process_name_or_id(process: Union[DataPortalProcess, str], client: CirroApi): 13 """ 14 If the process is a string, try to parse it as a process name or ID. 15 """ 16 17 # If the process object is already a DataPortalProcess object 18 if isinstance(process, DataPortalProcess): 19 return process 20 21 # Otherwise, it should be a string 22 if not isinstance(process, str): 23 raise DataPortalInputError(f"Process name or ID should be a string: '{process}'") 24 25 # Try to get the process by ID 26 try: 27 process_by_id = client.processes.get(process) 28 if isinstance(process_by_id, ProcessDetail): 29 return DataPortalProcess(process_by_id, client) 30 31 # Catch the error if no process is found 32 except UnexpectedStatus: 33 pass 34 35 # If that didn't work, try to parse it as a name 36 try: 37 process_by_name = client.processes.find_by_name(process) 38 if isinstance(process_by_name, ProcessDetail): 39 return DataPortalProcess(process_by_name, client) 40 41 # Catch the error if no process is found 42 except UnexpectedStatus: 43 pass 44 45 # If that didn't work, raise an error indicating that the process couldn't be parsed 46 raise DataPortalInputError(f"Could not parse process name or id: '{process}'")
If the process is a string, try to parse it as a process name or ID.