cirro.auth
1from io import StringIO 2from typing import Optional 3 4from cirro.auth.access_token import AccessTokenAuth 5from cirro.auth.base import AuthInfo 6from cirro.auth.client_creds import ClientCredentialsAuth 7from cirro.auth.device_code import DeviceCodeAuth 8 9__all__ = [ 10 'get_auth_info_from_config', 11 'AuthInfo', 12 'DeviceCodeAuth', 13 'AccessTokenAuth', 14 'ClientCredentialsAuth' 15] 16 17from cirro.config import AppConfig 18 19 20def get_auth_info_from_config(app_config: AppConfig, auth_io: Optional[StringIO] = None): 21 """ 22 Generates the AuthInfo object from the user's saved configuration 23 """ 24 user_config = app_config.user_config 25 if not user_config or not user_config.auth_method: 26 return DeviceCodeAuth(region=app_config.region, 27 client_id=app_config.client_id, 28 auth_endpoint=app_config.auth_endpoint, 29 auth_io=auth_io) 30 31 auth_methods = [ 32 DeviceCodeAuth, 33 ClientCredentialsAuth 34 ] 35 matched_auth_method = next((m for m in auth_methods if m.__name__ == user_config.auth_method), None) 36 if not matched_auth_method: 37 # Backwards compatibility 38 if user_config.auth_method == 'ClientAuth': 39 matched_auth_method = DeviceCodeAuth 40 else: 41 raise RuntimeError(f'{user_config.auth_method} not found, please re-run configuration') 42 43 auth_config = user_config.auth_method_config 44 45 if matched_auth_method == DeviceCodeAuth: 46 return DeviceCodeAuth(region=app_config.region, 47 client_id=app_config.client_id, 48 auth_endpoint=app_config.auth_endpoint, 49 enable_cache=auth_config.get('enable_cache') == 'True', 50 auth_io=auth_io) 51 52 if matched_auth_method == ClientCredentialsAuth: 53 return ClientCredentialsAuth( 54 auth_config.get('client_id'), 55 auth_config.get('client_secret'), 56 auth_endpoint=app_config.auth_endpoint 57 ) 58 59 return None
def
get_auth_info_from_config( app_config: cirro.config.AppConfig, auth_io: Optional[_io.StringIO] = None):
21def get_auth_info_from_config(app_config: AppConfig, auth_io: Optional[StringIO] = None): 22 """ 23 Generates the AuthInfo object from the user's saved configuration 24 """ 25 user_config = app_config.user_config 26 if not user_config or not user_config.auth_method: 27 return DeviceCodeAuth(region=app_config.region, 28 client_id=app_config.client_id, 29 auth_endpoint=app_config.auth_endpoint, 30 auth_io=auth_io) 31 32 auth_methods = [ 33 DeviceCodeAuth, 34 ClientCredentialsAuth 35 ] 36 matched_auth_method = next((m for m in auth_methods if m.__name__ == user_config.auth_method), None) 37 if not matched_auth_method: 38 # Backwards compatibility 39 if user_config.auth_method == 'ClientAuth': 40 matched_auth_method = DeviceCodeAuth 41 else: 42 raise RuntimeError(f'{user_config.auth_method} not found, please re-run configuration') 43 44 auth_config = user_config.auth_method_config 45 46 if matched_auth_method == DeviceCodeAuth: 47 return DeviceCodeAuth(region=app_config.region, 48 client_id=app_config.client_id, 49 auth_endpoint=app_config.auth_endpoint, 50 enable_cache=auth_config.get('enable_cache') == 'True', 51 auth_io=auth_io) 52 53 if matched_auth_method == ClientCredentialsAuth: 54 return ClientCredentialsAuth( 55 auth_config.get('client_id'), 56 auth_config.get('client_secret'), 57 auth_endpoint=app_config.auth_endpoint 58 ) 59 60 return None
Generates the AuthInfo object from the user's saved configuration
class
AuthInfo(abc.ABC):
7class AuthInfo(ABC): 8 @abstractmethod 9 def get_current_user(self) -> str: 10 raise NotImplementedError() 11 12 @abstractmethod 13 def get_auth_method(self) -> AuthMethod: 14 raise NotImplementedError()
Helper class that provides a standard way to create an ABC using inheritance.
92class DeviceCodeAuth(AuthInfo): 93 """ 94 Authenticates to Cirro by asking 95 the user to enter a verification code on the portal website 96 97 :param client_id: The client ID for the OAuth application 98 :param region: The AWS region where the Cognito user pool is located 99 :param auth_endpoint: The endpoint for the OAuth authorization server 100 :param enable_cache: Optionally enable cache to avoid re-authentication 101 :param auth_io: Optionally provide a StringIO object for the authentication link 102 :param await_completion: 103 If True, block until the user completes the authorization. 104 If auth_io is provided, the authorization message will be written to that buffer. 105 If auth_io is not provided, the authorization message will be printed. 106 If False, the object will be instantiated without fully completing the authorization. 107 The authorization message can be accessed using the .auth_message property. 108 Then, the await_completion() method must be run to complete the process. 109 110 Implements the OAuth device code flow 111 This is the preferred way to authenticate 112 """ 113 def __init__( 114 self, 115 client_id: str, 116 region: str, 117 auth_endpoint: str, 118 enable_cache=False, 119 auth_io: Optional[StringIO] = None, 120 await_completion=True 121 ): 122 self.client_id = client_id 123 self.auth_endpoint = auth_endpoint 124 self.region = region 125 self._token_info: Optional[OAuthTokenResponse] = None 126 self._persistence: Optional[BasePersistence] = None 127 self._flow: Optional[DeviceTokenResponse] = None 128 self._token_path = Path(Constants.home, f'{client_id}.token.dat').expanduser() 129 130 if enable_cache: 131 self._persistence = _build_token_persistence(str(self._token_path), fallback_to_plaintext=True) 132 self._token_info = self._load_token_info() 133 134 # Check saved token for change in endpoint 135 if self._token_info and self._token_info.get('client_id') != client_id: 136 logger.debug('Different client ID found, clearing saved token info') 137 self._clear_token_info() 138 139 # Check saved token for refresh token expiry 140 if self._token_info and self._token_info.get('refresh_expires_in'): 141 refresh_expiry_threshold = datetime.fromtimestamp(self._token_info.get('refresh_expires_in'))\ 142 - timedelta(hours=12) 143 if refresh_expiry_threshold < datetime.now(): 144 logger.debug('Refresh token expiry is too soon, re-authenticating') 145 self._clear_token_info() 146 147 if not self._token_info: 148 if await_completion: 149 self._token_info = _authenticate(client_id=client_id, auth_endpoint=auth_endpoint, auth_io=auth_io) 150 else: 151 self._flow = _initialize_auth_flow(client_id=client_id, auth_endpoint=auth_endpoint) 152 153 if self._token_info: 154 self._save_token_info() 155 self._update_token_metadata() 156 self._get_token_lock = threading.Lock() 157 158 @property 159 def auth_message(self): 160 """ 161 If the DeviceCodeAuth was instantiated with await_completion=False, 162 then the authorization message will be populated by this property. 163 """ 164 if self._flow is None: 165 raise ValueError("The DeviceTokenResponse is not available") 166 else: 167 return self._flow["message"] 168 169 @property 170 def auth_message_markdown(self): 171 """ 172 Markdown syntax for the authorization message, so that links are rendered appropriately. 173 """ 174 return " ".join([ 175 ( 176 f"[{part}]({part})" 177 if part.startswith("http") 178 else part 179 ) 180 for part in self.auth_message.split(" ") 181 ]) 182 183 def await_completion(self): 184 """Block until the user completes the authorization process.""" 185 self._token_info = _await_completion( 186 client_id=self.client_id, 187 auth_endpoint=self.auth_endpoint, 188 flow=self._flow 189 ) 190 self._save_token_info() 191 self._update_token_metadata() 192 self._get_token_lock = threading.Lock() 193 194 def get_auth_method(self) -> AuthMethod: 195 return RefreshableTokenAuth(token_getter=lambda: self._get_token()['access_token']) 196 197 def get_current_user(self) -> str: 198 return self._username 199 200 def _get_token(self): 201 with self._get_token_lock: 202 # Refresh access token using refresh token 203 if datetime.now() > self.access_token_expiry: 204 self._refresh_access_token() 205 206 return self._token_info 207 208 def _refresh_access_token(self): 209 try: 210 cognito = boto3.client('cognito-idp', region_name=self.region) 211 resp = cognito.initiate_auth( 212 ClientId=self.client_id, 213 AuthFlow='REFRESH_TOKEN_AUTH', 214 AuthParameters={ 215 'REFRESH_TOKEN': self._token_info['refresh_token'] 216 } 217 ) 218 logger.debug('Successfully refreshed token') 219 except ClientError as err: 220 logger.warning(err) 221 self._clear_token_info() 222 raise RuntimeError('Failed to refresh token, please reauthenticate') 223 224 auth_result = resp['AuthenticationResult'] 225 self._token_info['access_token'] = auth_result['AccessToken'] 226 self._token_info['id_token'] = auth_result['IdToken'] 227 self._save_token_info() 228 self._update_token_metadata() 229 230 def _update_token_metadata(self): 231 decoded_access_token = jwt.decode(self._token_info['access_token'], 232 options={"verify_signature": False}) 233 self.access_token_expiry = datetime.fromtimestamp(decoded_access_token['exp']) 234 self._username = decoded_access_token['username'] 235 236 def _load_token_info(self) -> Optional[OAuthTokenResponse]: 237 if not self._persistence or not self._token_path.exists(): 238 return None 239 240 token_info = json.loads(self._persistence.load()) 241 if 'access_token' not in token_info: 242 return None 243 244 return token_info 245 246 def _save_token_info(self): 247 if not self._persistence: 248 return 249 250 self._persistence.save(json.dumps(self._token_info)) 251 252 def _clear_token_info(self): 253 if not self._persistence: 254 return 255 256 Path(self._persistence.get_location()).unlink(missing_ok=True) 257 self._token_info = None
Authenticates to Cirro by asking the user to enter a verification code on the portal website
Parameters
- client_id: The client ID for the OAuth application
- region: The AWS region where the Cognito user pool is located
- auth_endpoint: The endpoint for the OAuth authorization server
- enable_cache: Optionally enable cache to avoid re-authentication
- auth_io: Optionally provide a StringIO object for the authentication link
- await_completion: If True, block until the user completes the authorization. If auth_io is provided, the authorization message will be written to that buffer. If auth_io is not provided, the authorization message will be printed. If False, the object will be instantiated without fully completing the authorization. The authorization message can be accessed using the .auth_message property. Then, the await_completion() method must be run to complete the process.
Implements the OAuth device code flow This is the preferred way to authenticate
DeviceCodeAuth( client_id: str, region: str, auth_endpoint: str, enable_cache=False, auth_io: Optional[_io.StringIO] = None, await_completion=True)
113 def __init__( 114 self, 115 client_id: str, 116 region: str, 117 auth_endpoint: str, 118 enable_cache=False, 119 auth_io: Optional[StringIO] = None, 120 await_completion=True 121 ): 122 self.client_id = client_id 123 self.auth_endpoint = auth_endpoint 124 self.region = region 125 self._token_info: Optional[OAuthTokenResponse] = None 126 self._persistence: Optional[BasePersistence] = None 127 self._flow: Optional[DeviceTokenResponse] = None 128 self._token_path = Path(Constants.home, f'{client_id}.token.dat').expanduser() 129 130 if enable_cache: 131 self._persistence = _build_token_persistence(str(self._token_path), fallback_to_plaintext=True) 132 self._token_info = self._load_token_info() 133 134 # Check saved token for change in endpoint 135 if self._token_info and self._token_info.get('client_id') != client_id: 136 logger.debug('Different client ID found, clearing saved token info') 137 self._clear_token_info() 138 139 # Check saved token for refresh token expiry 140 if self._token_info and self._token_info.get('refresh_expires_in'): 141 refresh_expiry_threshold = datetime.fromtimestamp(self._token_info.get('refresh_expires_in'))\ 142 - timedelta(hours=12) 143 if refresh_expiry_threshold < datetime.now(): 144 logger.debug('Refresh token expiry is too soon, re-authenticating') 145 self._clear_token_info() 146 147 if not self._token_info: 148 if await_completion: 149 self._token_info = _authenticate(client_id=client_id, auth_endpoint=auth_endpoint, auth_io=auth_io) 150 else: 151 self._flow = _initialize_auth_flow(client_id=client_id, auth_endpoint=auth_endpoint) 152 153 if self._token_info: 154 self._save_token_info() 155 self._update_token_metadata() 156 self._get_token_lock = threading.Lock()
auth_message
158 @property 159 def auth_message(self): 160 """ 161 If the DeviceCodeAuth was instantiated with await_completion=False, 162 then the authorization message will be populated by this property. 163 """ 164 if self._flow is None: 165 raise ValueError("The DeviceTokenResponse is not available") 166 else: 167 return self._flow["message"]
If the DeviceCodeAuth was instantiated with await_completion=False, then the authorization message will be populated by this property.
auth_message_markdown
169 @property 170 def auth_message_markdown(self): 171 """ 172 Markdown syntax for the authorization message, so that links are rendered appropriately. 173 """ 174 return " ".join([ 175 ( 176 f"[{part}]({part})" 177 if part.startswith("http") 178 else part 179 ) 180 for part in self.auth_message.split(" ") 181 ])
Markdown syntax for the authorization message, so that links are rendered appropriately.
def
await_completion(self):
183 def await_completion(self): 184 """Block until the user completes the authorization process.""" 185 self._token_info = _await_completion( 186 client_id=self.client_id, 187 auth_endpoint=self.auth_endpoint, 188 flow=self._flow 189 ) 190 self._save_token_info() 191 self._update_token_metadata() 192 self._get_token_lock = threading.Lock()
Block until the user completes the authorization process.
11class AccessTokenAuth(AuthInfo): 12 """ 13 Authenticates to Cirro with a static access token 14 15 :param token: Access token 16 """ 17 18 def __init__(self, token: str): 19 self._token = token 20 self._username = None 21 self._access_token_expiry = None 22 self._update_token_metadata() 23 24 def get_current_user(self) -> str: 25 return self._username 26 27 def get_auth_method(self) -> AuthMethod: 28 return TokenAuth(token=self._token) 29 30 def _update_token_metadata(self): 31 decoded_access_token = jwt.decode(self._token, 32 options={"verify_signature": False}) 33 self._access_token_expiry = datetime.fromtimestamp(decoded_access_token['exp']) 34 self._username = decoded_access_token['username']
Authenticates to Cirro with a static access token
Parameters
- token: Access token
19class ClientCredentialsAuth(AuthInfo): 20 """ 21 Authenticates to Cirro with OAuth client credentials 22 23 Args: 24 client_id (str): Client ID 25 client_secret (str): Client Secret 26 auth_endpoint (str): Auth Endpoint 27 28 ```python 29 import os 30 from cirro import CirroApi 31 from cirro.auth.client_creds import ClientCredentialsAuth 32 from cirro.config import AppConfig 33 34 client_id = os.getenv('CIRRO_CLIENT_ID') 35 client_secret = os.getenv('CIRRO_CLIENT_SECRET') 36 37 config = AppConfig(base_url="app.cirro.bio") 38 auth_info = ClientCredentialsAuth(client_id, client_secret, auth_endpoint=config.auth_endpoint) 39 cirro = CirroApi(auth_info=auth_info) 40 ``` 41 """ 42 43 def __init__( 44 self, 45 client_id: str, 46 client_secret: str, 47 auth_endpoint: str 48 ): 49 self._client_id = client_id 50 self._client_secret = client_secret 51 self._auth_endpoint = auth_endpoint 52 self._token_info: Optional[OAuthTokenResponse] = None 53 self._token_expiry = None 54 self._username = None 55 self._get_token_lock = threading.Lock() 56 57 def get_current_user(self) -> str: 58 return self._username 59 60 def get_auth_method(self) -> AuthMethod: 61 return RefreshableTokenAuth(token_getter=lambda: self._get_token()["access_token"]) 62 63 def _get_token(self) -> OAuthTokenResponse: 64 with self._get_token_lock: 65 # Refresh access token if expired 66 if not self._token_expiry or time.time() > self._token_expiry: 67 self._refresh_token() 68 69 return self._token_info 70 71 def _refresh_token(self): 72 logger.debug("Refreshing token") 73 basic_auth = base64.b64encode( 74 f"{self._client_id}:{self._client_secret}".encode() 75 ).decode() 76 77 headers = { 78 "Authorization": f"Basic {basic_auth}", 79 "Content-Type": "application/x-www-form-urlencoded", 80 } 81 82 data = { 83 "grant_type": "client_credentials", 84 } 85 86 response = requests.post( 87 f"{self._auth_endpoint}/token", 88 headers=headers, 89 data=data, 90 ) 91 token_info: OAuthTokenResponse = response.json() 92 93 self._token_info = token_info 94 95 if "access_token" not in token_info: 96 raise RuntimeError(f"Error authenticating {token_info}") 97 98 self._update_token_metadata() 99 100 def _update_token_metadata(self): 101 decoded_access_token = jwt.decode(self._token_info["access_token"], 102 options={"verify_signature": False}) 103 expires_in = self._token_info.get("expires_in", 3600) 104 self._token_expiry = time.time() + expires_in - 30 105 self._username = decoded_access_token["appUsername"]
Authenticates to Cirro with OAuth client credentials
Arguments:
- client_id (str): Client ID
- client_secret (str): Client Secret
- auth_endpoint (str): Auth Endpoint
import os
from cirro import CirroApi
from cirro.auth.client_creds import ClientCredentialsAuth
from cirro.config import AppConfig
client_id = os.getenv('CIRRO_CLIENT_ID')
client_secret = os.getenv('CIRRO_CLIENT_SECRET')
config = AppConfig(base_url="app.cirro.bio")
auth_info = ClientCredentialsAuth(client_id, client_secret, auth_endpoint=config.auth_endpoint)
cirro = CirroApi(auth_info=auth_info)
ClientCredentialsAuth(client_id: str, client_secret: str, auth_endpoint: str)
43 def __init__( 44 self, 45 client_id: str, 46 client_secret: str, 47 auth_endpoint: str 48 ): 49 self._client_id = client_id 50 self._client_secret = client_secret 51 self._auth_endpoint = auth_endpoint 52 self._token_info: Optional[OAuthTokenResponse] = None 53 self._token_expiry = None 54 self._username = None 55 self._get_token_lock = threading.Lock()