Source code for globus_sdk.tokenstorage.v2.sqlite

from __future__ import annotations

import json
import pathlib
import sqlite3
import textwrap
import typing as t

from globus_sdk import __version__, exc

from .base import FileTokenStorage
from .token_data import TokenStorageData


[docs] class SQLiteTokenStorage(FileTokenStorage): """ A token storage which stores token data on disk in a SQLite database. See :class:`TokenStorage` for common interface details. :param filepath: The path on disk to a SQLite database file. :param connect_params: A dictionary of parameters to pass to ``sqlite3.connect()``. :param namespace: A unique string for partitioning token data (Default: "DEFAULT"). :raises GlobusSDKUsageError: If the filepath is ":memory:". This usage-mode is not supported in this class; use :class:`MemoryTokenStorage` instead if in-memory token storage is desired. """ file_format = "db" def __init__( self, filepath: pathlib.Path | str, *, connect_params: dict[str, t.Any] | None = None, namespace: str = "DEFAULT", ) -> None: if filepath == ":memory:": raise exc.GlobusSDKUsageError( "SQLiteTokenStorage cannot be used with a ':memory:' database. " "If you want to store tokens in memory, use MemoryTokenStorage instead." ) super().__init__(filepath, namespace=namespace) self._connection = self._init_and_connect(connect_params) def _init_and_connect( self, connect_params: dict[str, t.Any] | None, ) -> sqlite3.Connection: connect_params = connect_params or {} if not self.file_exists(): with self.user_only_umask(): conn: sqlite3.Connection = sqlite3.connect( self.filepath, **connect_params ) conn.executescript( textwrap.dedent( """ CREATE TABLE token_storage ( namespace VARCHAR NOT NULL, resource_server VARCHAR NOT NULL, token_data_json VARCHAR NOT NULL, PRIMARY KEY (namespace, resource_server) ); CREATE TABLE sdk_storage_adapter_internal ( attribute VARCHAR NOT NULL, value VARCHAR NOT NULL, PRIMARY KEY (attribute) ); """ ) ) # mark the version which was used to create the DB # also mark the "database schema version" in case we ever need to handle # graceful upgrades conn.executemany( "INSERT INTO sdk_storage_adapter_internal(attribute, value) " "VALUES (?, ?)", [ ("globus-sdk.version", __version__), # schema_version=1 indicates a schema built with the original # SQLiteAdapter # schema_version=2 indicates one built with SQLiteTokenStorage # # a schema_version of 1 therefore indicates that there should be # a 'config_storage' table present ("globus-sdk.database_schema_version", "2"), ], ) conn.commit() else: conn = sqlite3.connect(self.filepath, **connect_params) return conn
[docs] def close(self) -> None: """ Close the underlying database connection. """ self._connection.close()
def store_token_data_by_resource_server( self, token_data_by_resource_server: t.Mapping[str, TokenStorageData] ) -> None: """ Store token data for one or more resource servers in the current namespace. Token data is JSON-serialized before being inserted into the database. :param token_data_by_resource_server: A mapping of resource servers to token data. """ pairs = [] for resource_server, token_data in token_data_by_resource_server.items(): pairs.append((resource_server, token_data.to_dict())) self._connection.executemany( "REPLACE INTO token_storage(namespace, resource_server, token_data_json) " "VALUES(?, ?, ?)", [ (self.namespace, rs_name, json.dumps(token_data_dict)) for (rs_name, token_data_dict) in pairs ], ) self._connection.commit() def get_token_data_by_resource_server(self) -> dict[str, TokenStorageData]: """ Lookup all token data under the current namespace from the database. :returns: A dict of ``TokenStorageData`` objects indexed by their resource server. """ ret: dict[str, TokenStorageData] = {} for row in self._connection.execute( "SELECT resource_server, token_data_json " "FROM token_storage WHERE namespace=?", (self.namespace,), ): resource_server, token_data_json = row token_data_dict = json.loads(token_data_json) ret[resource_server] = TokenStorageData.from_dict(token_data_dict) return ret def remove_token_data(self, resource_server: str) -> bool: """ Given a resource server to target, delete token data for that resource server from the database (limited to the current namespace). You can use this as part of a logout command implementation, loading token data as a dict, and then deleting the data for each resource server. :param resource_server: The name of the resource server to remove from the DB :returns: True if token data was deleted, False if none was found to delete. """ rowcount = self._connection.execute( "DELETE FROM token_storage WHERE namespace=? AND resource_server=?", (self.namespace, resource_server), ).rowcount self._connection.commit() return rowcount != 0
[docs] def iter_namespaces(self) -> t.Iterator[str]: """Iterate over all distinct namespaces in the SQLite database.""" seen: set[str] = set() for row in self._connection.execute( "SELECT DISTINCT namespace FROM token_storage;" ): namespace = row[0] seen.add(namespace) yield namespace