password_manager.py (poetry-1.1.15) | : | password_manager.py (poetry-1.2.0) | ||
---|---|---|---|---|
from __future__ import annotations | ||||
import dataclasses | ||||
import logging | import logging | |||
from contextlib import suppress | ||||
from typing import TYPE_CHECKING | ||||
if TYPE_CHECKING: | ||||
from poetry.config.config import Config | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | |||
class PasswordManagerError(Exception): | class PasswordManagerError(Exception): | |||
pass | pass | |||
class KeyRingError(Exception): | class PoetryKeyringError(Exception): | |||
pass | pass | |||
class KeyRing: | @dataclasses.dataclass | |||
def __init__(self, namespace): | class HTTPAuthCredential: | |||
username: str | None = dataclasses.field(default=None) | ||||
password: str | None = dataclasses.field(default=None) | ||||
class PoetryKeyring: | ||||
def __init__(self, namespace: str) -> None: | ||||
self._namespace = namespace | self._namespace = namespace | |||
self._is_available = True | self._is_available = True | |||
self._check() | self._check() | |||
def is_available(self): | def is_available(self) -> bool: | |||
return self._is_available | return self._is_available | |||
def get_password(self, name, username): | def get_credential( | |||
self, *names: str, username: str | None = None | ||||
) -> HTTPAuthCredential: | ||||
default = HTTPAuthCredential(username=username, password=None) | ||||
if not self.is_available(): | if not self.is_available(): | |||
return | return default | |||
import keyring | ||||
for name in names: | ||||
credential = keyring.get_credential(name, username) | ||||
if credential: | ||||
return HTTPAuthCredential( | ||||
username=credential.username, password=credential.password | ||||
) | ||||
return default | ||||
def get_password(self, name: str, username: str) -> str | None: | ||||
if not self.is_available(): | ||||
return None | ||||
import keyring | import keyring | |||
import keyring.errors | import keyring.errors | |||
name = self.get_entry_name(name) | name = self.get_entry_name(name) | |||
try: | try: | |||
return keyring.get_password(name, username) | return keyring.get_password(name, username) | |||
except (RuntimeError, keyring.errors.KeyringError): | except (RuntimeError, keyring.errors.KeyringError): | |||
raise KeyRingError( | raise PoetryKeyringError( | |||
"Unable to retrieve the password for {} from the key ring".forma | f"Unable to retrieve the password for {name} from the key ring" | |||
t(name) | ||||
) | ) | |||
def set_password(self, name, username, password): | def set_password(self, name: str, username: str, password: str) -> None: | |||
if not self.is_available(): | if not self.is_available(): | |||
return | return | |||
import keyring | import keyring | |||
import keyring.errors | import keyring.errors | |||
name = self.get_entry_name(name) | name = self.get_entry_name(name) | |||
try: | try: | |||
keyring.set_password(name, username, password) | keyring.set_password(name, username, password) | |||
except (RuntimeError, keyring.errors.KeyringError) as e: | except (RuntimeError, keyring.errors.KeyringError) as e: | |||
raise KeyRingError( | raise PoetryKeyringError( | |||
"Unable to store the password for {} in the key ring: {}".format | f"Unable to store the password for {name} in the key ring: {e}" | |||
( | ||||
name, str(e) | ||||
) | ||||
) | ) | |||
def delete_password(self, name, username): | def delete_password(self, name: str, username: str) -> None: | |||
if not self.is_available(): | if not self.is_available(): | |||
return | return | |||
import keyring | ||||
import keyring.errors | import keyring.errors | |||
name = self.get_entry_name(name) | name = self.get_entry_name(name) | |||
try: | try: | |||
keyring.delete_password(name, username) | keyring.delete_password(name, username) | |||
except (RuntimeError, keyring.errors.KeyringError): | except (RuntimeError, keyring.errors.KeyringError): | |||
raise KeyRingError( | raise PoetryKeyringError( | |||
"Unable to delete the password for {} from the key ring".format( | f"Unable to delete the password for {name} from the key ring" | |||
name) | ||||
) | ) | |||
def get_entry_name(self, name): | def get_entry_name(self, name: str) -> str: | |||
return "{}-{}".format(self._namespace, name) | return f"{self._namespace}-{name}" | |||
def _check(self): | def _check(self) -> None: | |||
try: | try: | |||
import keyring | import keyring | |||
except Exception as e: | except Exception as e: | |||
logger.debug("An error occurred while importing keyring: {}".format( str(e))) | logger.debug(f"An error occurred while importing keyring: {e!s}") | |||
self._is_available = False | self._is_available = False | |||
return | return | |||
backend = keyring.get_keyring() | backend = keyring.get_keyring() | |||
name = backend.name.split(" ")[0] | name = backend.name.split(" ")[0] | |||
if name == "fail": | if name in ("fail", "null"): | |||
logger.debug("No suitable keyring backend found") | logger.debug("No suitable keyring backend found") | |||
self._is_available = False | self._is_available = False | |||
elif "plaintext" in backend.name.lower(): | elif "plaintext" in backend.name.lower(): | |||
logger.debug("Only a plaintext keyring backend is available. Not usi ng it.") | logger.debug("Only a plaintext keyring backend is available. Not usi ng it.") | |||
self._is_available = False | self._is_available = False | |||
elif name == "chainer": | elif name == "chainer": | |||
try: | try: | |||
import keyring.backend | import keyring.backend | |||
backends = keyring.backend.get_all_keyring() | backends = keyring.backend.get_all_keyring() | |||
self._is_available = any( | self._is_available = any( | |||
[ | b.name.split(" ")[0] not in ["chainer", "fail", "null"] | |||
b.name.split(" ")[0] not in ["chainer", "fail"] | and "plaintext" not in b.name.lower() | |||
and "plaintext" not in b.name.lower() | for b in backends | |||
for b in backends | ||||
] | ||||
) | ) | |||
except Exception: | except Exception: | |||
self._is_available = False | self._is_available = False | |||
if not self._is_available: | if not self._is_available: | |||
logger.warning("No suitable keyring backends were found") | logger.debug("No suitable keyring backends were found") | |||
class PasswordManager: | class PasswordManager: | |||
def __init__(self, config): | def __init__(self, config: Config) -> None: | |||
self._config = config | self._config = config | |||
self._keyring = None | self._keyring: PoetryKeyring | None = None | |||
@property | @property | |||
def keyring(self): | def keyring(self) -> PoetryKeyring: | |||
if self._keyring is None: | if self._keyring is None: | |||
self._keyring = KeyRing("poetry-repository") | self._keyring = PoetryKeyring("poetry-repository") | |||
if not self._keyring.is_available(): | if not self._keyring.is_available(): | |||
logger.warning( | logger.debug( | |||
"Using a plaintext file to store and retrieve credentials" | "<warning>Keyring is not available, credentials will be stor | |||
ed and " | ||||
"retrieved from configuration files as plaintext.</>" | ||||
) | ) | |||
return self._keyring | return self._keyring | |||
def set_pypi_token(self, name, token): | @staticmethod | |||
def warn_plaintext_credentials_stored() -> None: | ||||
logger.warning("Using a plaintext file to store credentials") | ||||
def set_pypi_token(self, name: str, token: str) -> None: | ||||
if not self.keyring.is_available(): | if not self.keyring.is_available(): | |||
self._config.auth_config_source.add_property( | self.warn_plaintext_credentials_stored() | |||
"pypi-token.{}".format(name), token | self._config.auth_config_source.add_property(f"pypi-token.{name}", t | |||
) | oken) | |||
else: | else: | |||
self.keyring.set_password(name, "__token__", token) | self.keyring.set_password(name, "__token__", token) | |||
def get_pypi_token(self, name): | def get_pypi_token(self, repo_name: str) -> str | None: | |||
if not self.keyring.is_available(): | """Get PyPi token. | |||
return self._config.get("pypi-token.{}".format(name)) | ||||
return self.keyring.get_password(name, "__token__") | First checks the environment variables for a token, | |||
then the configured username/password and the | ||||
available keyring. | ||||
:param repo_name: Name of repository. | ||||
:return: Returns a token as a string if found, otherwise None. | ||||
""" | ||||
token: str | None = self._config.get(f"pypi-token.{repo_name}") | ||||
if token: | ||||
return token | ||||
def delete_pypi_token(self, name): | return self.keyring.get_password(repo_name, "__token__") | |||
def delete_pypi_token(self, name: str) -> None: | ||||
if not self.keyring.is_available(): | if not self.keyring.is_available(): | |||
return self._config.auth_config_source.remove_property( | return self._config.auth_config_source.remove_property(f"pypi-token. | |||
"pypi-token.{}".format(name) | {name}") | |||
) | ||||
self.keyring.delete_password(name, "__token__") | self.keyring.delete_password(name, "__token__") | |||
def get_http_auth(self, name): | def get_http_auth(self, name: str) -> dict[str, str | None] | None: | |||
auth = self._config.get("http-basic.{}".format(name)) | auth = self._config.get(f"http-basic.{name}") | |||
if not auth: | if not auth: | |||
username = self._config.get("http-basic.{}.username".format(name)) | username = self._config.get(f"http-basic.{name}.username") | |||
password = self._config.get("http-basic.{}.password".format(name)) | password = self._config.get(f"http-basic.{name}.password") | |||
if not username and not password: | if not username and not password: | |||
return None | return None | |||
else: | else: | |||
username, password = auth["username"], auth.get("password") | username, password = auth["username"], auth.get("password") | |||
if password is None: | if password is None: | |||
password = self.keyring.get_password(name, username) | password = self.keyring.get_password(name, username) | |||
return { | return { | |||
"username": username, | "username": username, | |||
"password": password, | "password": password, | |||
} | } | |||
def set_http_password(self, name, username, password): | def set_http_password(self, name: str, username: str, password: str) -> None : | |||
auth = {"username": username} | auth = {"username": username} | |||
if not self.keyring.is_available(): | if not self.keyring.is_available(): | |||
self.warn_plaintext_credentials_stored() | ||||
auth["password"] = password | auth["password"] = password | |||
else: | else: | |||
self.keyring.set_password(name, username, password) | self.keyring.set_password(name, username, password) | |||
self._config.auth_config_source.add_property("http-basic.{}".format(name ), auth) | self._config.auth_config_source.add_property(f"http-basic.{name}", auth) | |||
def delete_http_password(self, name): | def delete_http_password(self, name: str) -> None: | |||
auth = self.get_http_auth(name) | auth = self.get_http_auth(name) | |||
if not auth or "username" not in auth: | if not auth: | |||
return | return | |||
try: | username = auth.get("username") | |||
self.keyring.delete_password(name, auth["username"]) | if username is None: | |||
except KeyRingError: | return | |||
pass | ||||
with suppress(PoetryKeyringError): | ||||
self.keyring.delete_password(name, username) | ||||
self._config.auth_config_source.remove_property("http-basic.{}".format(n ame)) | self._config.auth_config_source.remove_property(f"http-basic.{name}") | |||
End of changes. 40 change blocks. | ||||
63 lines changed or deleted | 105 lines changed or added |