Source code for globus_sdk.tokenstorage.v2.base

from __future__ import annotations

import abc
import contextlib
import os
import pathlib
import re
import sys
import typing as t

import globus_sdk
from globus_sdk._types import UUIDLike

from .token_data import TokenStorageData

if t.TYPE_CHECKING:
    from globus_sdk.globus_app import GlobusAppConfig


[docs] class TokenStorage(metaclass=abc.ABCMeta): """ The interface for interacting with a store of :class:`TokenStorageData` objects. Implementations must partition their token data objects by ``namespace``. Within a namespace, token data must be indexed by ``resource_server``. :param namespace: A unique string for partitioning token data (Default: "DEFAULT"). :ivar globus_sdk.IDTokenDecoder | None id_token_decoder: A decoder to use when decoding ``id_token`` JWTs from Globus Auth. By default, a new decoder is used each time decoding is performed. """ def __init__(self, namespace: str = "DEFAULT") -> None: self.namespace = namespace self.id_token_decoder: globus_sdk.IDTokenDecoder | None = None
[docs] @abc.abstractmethod def store_token_data_by_resource_server( self, token_data_by_resource_server: t.Mapping[str, TokenStorageData] ) -> None: """ Store token data for one or more resource server in the current namespace. :param token_data_by_resource_server: mapping of resource server to token data. """
[docs] @abc.abstractmethod def get_token_data_by_resource_server(self) -> dict[str, TokenStorageData]: """ Retrieve all token data stored in the current namespace. :returns: a dict of ``TokenStorageData`` objects indexed by their resource server. """
[docs] def get_token_data(self, resource_server: str) -> TokenStorageData | None: """ Retrieve token data for a particular resource server in the current namespace. :param resource_server: The resource_server string to get token data for. :returns: token data if found or else None. """ return self.get_token_data_by_resource_server().get(resource_server)
[docs] @abc.abstractmethod def remove_token_data(self, resource_server: str) -> bool: """ Remove token data for a resource server in the current namespace. :param resource_server: The resource server string to remove token data for. :returns: True if token data was deleted, False if none was found to delete. """
[docs] def store_token_response( self, token_response: globus_sdk.OAuthTokenResponse ) -> None: """ Store token data from an :class:`OAuthTokenResponse` in the current namespace. :param token_response: A token response object from an authentication flow. """ token_data_by_resource_server = {} identity_id = self._extract_identity_id(token_response) for resource_server, token_dict in token_response.by_resource_server.items(): token_data_by_resource_server[resource_server] = TokenStorageData( resource_server=token_dict["resource_server"], identity_id=identity_id, scope=token_dict["scope"], access_token=token_dict["access_token"], refresh_token=token_dict.get("refresh_token"), expires_at_seconds=token_dict["expires_at_seconds"], token_type=token_dict.get("token_type"), ) self.store_token_data_by_resource_server(token_data_by_resource_server)
def _extract_identity_id( self, token_response: globus_sdk.OAuthTokenResponse ) -> str | None: """ Get identity_id from id_token if available. .. note:: This method is private, but is used in ValidatingTokenStorage to override the extraction of ``identity_id`` information. Generalizing customization of ``identity_id`` extraction will require implementation of a user-facing mechanism for controlling calls to ``decode_id_token()``. """ # dependent token responses cannot contain an `id_token` field, as the # top-level data is an array if isinstance(token_response, globus_sdk.OAuthDependentTokenResponse): return None if id_token := token_response.get("id_token"): if self.id_token_decoder: decoded_id_token = self.id_token_decoder.decode(id_token) else: decoded_id_token = token_response.decode_id_token() return decoded_id_token["sub"] # type: ignore[no-any-return] else: return None
class FileTokenStorage(TokenStorage, metaclass=abc.ABCMeta): """ A base class for token storages which store tokens in a local file. Common functionality for file-based token storages like file creation and class instantiation for a GlobusApp is defined here. :cvar file_format: The file format suffix associated with files of this type. This is used when constructing the file path for a GlobusApp. :param filepath: The path to a file where token data should be stored. :param namespace: A unique string for partitioning token data (Default: "DEFAULT"). """ # File suffix associated with files of this type (e.g., "csv") file_format: str = "_UNSET_" # must be overridden by subclasses def __init__( self, filepath: pathlib.Path | str, *, namespace: str = "DEFAULT" ) -> None: """ :param filepath: the name of the file to write to and read from :param namespace: A user-supplied namespace for partitioning token data """ self.filepath = str(filepath) try: self._ensure_containing_dir_exists() except OSError as e: msg = ( "Encountered an error while initializing the token storage file " f"'{self.filepath}'" ) raise ValueError(msg) from e super().__init__(namespace=namespace) def __init_subclass__(cls, **kwargs: t.Any): if cls.file_format == "_UNSET_": raise TypeError(f"{cls.__name__} must set a 'file_format' class attribute") @classmethod def for_globus_app( cls, client_id: UUIDLike, app_name: str, config: GlobusAppConfig, namespace: str, ) -> TokenStorage: """ Initialize a TokenStorage instance for a GlobusApp, using the supplied info to determine the file location. :param client_id: The client ID of the Globus App. :param app_name: The name of the Globus App. :param config: The GlobusAppConfig object for the Globus App. :param namespace: A user-supplied namespace for partitioning token data. """ filepath = _default_globus_app_filepath(client_id, app_name, config.environment) return cls(filepath=f"{filepath}.{cls.file_format}", namespace=namespace) def _ensure_containing_dir_exists(self) -> None: """ Ensure that the directory containing the given filepath exists. """ os.makedirs(os.path.dirname(self.filepath), exist_ok=True) def file_exists(self) -> bool: """ Check if the file used by this file storage adapter exists. """ return os.path.exists(self.filepath) @contextlib.contextmanager def user_only_umask(self) -> t.Iterator[None]: """ A context manager to deny rwx to Group and World, x to User This does not create a file, but ensures that if a file is created while in the context manager, its permissions will be correct on unix systems. .. note:: On Windows, this has no effect. To control the permissions on files used for token storage, use ``%LOCALAPPDATA%`` or ``%APPDATA%``. These directories should only be accessible to the current user. """ old_umask = os.umask(0o177) try: yield finally: os.umask(old_umask) def _default_globus_app_filepath( client_id: UUIDLike, app_name: str, environment: str ) -> str: r""" Construct a default TokenStorage filepath for a GlobusApp. For flexibility, the filepath will omit the file format suffix. On Windows, this will be: ``~\AppData\Local\globus\app\{client_id}\{app_name}\tokens`` On Linux and macOS, we use: ``~/.globus/app/{client_id}/{app_name}/tokens`` """ environment_prefix = f"{environment}-" if environment == "production": environment_prefix = "" filename = f"{environment_prefix}tokens" app_name = _slugify_app_name(app_name) if sys.platform == "win32": # try to get the app data dir, preferring the local appdata datadir = os.getenv("LOCALAPPDATA", os.getenv("APPDATA")) if not datadir: home = os.path.expanduser("~") datadir = os.path.join(home, "AppData", "Local") return os.path.join( datadir, "globus", "app", str(client_id), app_name, filename ) else: return os.path.expanduser( f"~/.globus/app/{str(client_id)}/{app_name}/{filename}" ) # https://learn.microsoft.com/en-us/windows/win32/fileio/naming-a-file#naming-conventions _RESERVED_FS_CHARS = re.compile(r'[<>:"/\\|?*]') # https://stackoverflow.com/a/31976060 _RESERVED_FS_NAMES = re.compile(r"con|prn|aux|nul|com\d|lpt\d") def _slugify_app_name(app_name: str) -> str: """ Slugify a globus app name for use in a file path. * Reserved filesystem characters are replaced with a '+'. ('a?' -> 'a+') * Periods and Spaces are replaced with a '-'. ('a. b' -> 'a--b') * Control characters are removed. ('a\0b' -> 'ab') * The string is lowercased. ('AB' -> 'ab') :raises: GlobusSDKUsageError if the app name is empty after slugification. :raises: GlobusSDKUsageError if the app name is a reserved filesystem name (after slugification). """ app_name = _RESERVED_FS_CHARS.sub("+", app_name) app_name = app_name.replace(".", "-").replace(" ", "-") app_name = "".join(c for c in app_name if c.isprintable()) app_name = app_name.lower() if _RESERVED_FS_NAMES.fullmatch(app_name): msg = ( f'App name results in a reserved filename ("{app_name}"). ' "Please choose a different name." ) raise globus_sdk.GlobusSDKUsageError(msg) if not app_name: msg = "App name results in the empty string. Please choose a different name." raise globus_sdk.GlobusSDKUsageError(msg) return app_name