cirro.sdk.helpers

 1from typing import Union
 2
 3from cirro_api_client.v1.errors import UnexpectedStatus
 4from cirro_api_client.v1.models import ProcessDetail
 5
 6from cirro.cirro_client import CirroApi
 7from cirro.sdk.exceptions import DataPortalInputError
 8from cirro.sdk.process import DataPortalProcess
 9
10
11def parse_process_name_or_id(process: Union[DataPortalProcess, str], client: CirroApi):
12    """
13    If the process is a string, try to parse it as a process name or ID.
14    """
15
16    # If the process object is already a DataPortalProcess object
17    if isinstance(process, DataPortalProcess):
18        return process
19
20    # Otherwise, it should be a string
21    if not isinstance(process, str):
22        raise DataPortalInputError(f"Process name or ID should be a string: '{process}'")
23
24    # Try to get the process by ID
25    try:
26        process_by_id = client.processes.get(process)
27        if isinstance(process_by_id, ProcessDetail):
28            return DataPortalProcess(process_by_id, client)
29
30    # Catch the error if no process is found
31    except UnexpectedStatus:
32        pass
33
34    # If that didn't work, try to parse it as a name
35    try:
36        process_by_name = client.processes.find_by_name(process)
37        if isinstance(process_by_name, ProcessDetail):
38            return DataPortalProcess(process_by_name, client)
39
40    # Catch the error if no process is found
41    except UnexpectedStatus:
42        pass
43
44    # If that didn't work, raise an error indicating that the process couldn't be parsed
45    raise DataPortalInputError(f"Could not parse process name or id: '{process}'")
def parse_process_name_or_id( process: Union[cirro.DataPortalProcess, str], client: cirro.CirroApi):
12def parse_process_name_or_id(process: Union[DataPortalProcess, str], client: CirroApi):
13    """
14    If the process is a string, try to parse it as a process name or ID.
15    """
16
17    # If the process object is already a DataPortalProcess object
18    if isinstance(process, DataPortalProcess):
19        return process
20
21    # Otherwise, it should be a string
22    if not isinstance(process, str):
23        raise DataPortalInputError(f"Process name or ID should be a string: '{process}'")
24
25    # Try to get the process by ID
26    try:
27        process_by_id = client.processes.get(process)
28        if isinstance(process_by_id, ProcessDetail):
29            return DataPortalProcess(process_by_id, client)
30
31    # Catch the error if no process is found
32    except UnexpectedStatus:
33        pass
34
35    # If that didn't work, try to parse it as a name
36    try:
37        process_by_name = client.processes.find_by_name(process)
38        if isinstance(process_by_name, ProcessDetail):
39            return DataPortalProcess(process_by_name, client)
40
41    # Catch the error if no process is found
42    except UnexpectedStatus:
43        pass
44
45    # If that didn't work, raise an error indicating that the process couldn't be parsed
46    raise DataPortalInputError(f"Could not parse process name or id: '{process}'")

If the process is a string, try to parse it as a process name or ID.