publisher.py (poetry-1.1.15) | : | publisher.py (poetry-1.2.0) | ||
---|---|---|---|---|
from __future__ import annotations | ||||
import logging | import logging | |||
from typing import Optional | from typing import TYPE_CHECKING | |||
from poetry.publishing.uploader import Uploader | ||||
from poetry.utils.authenticator import Authenticator | ||||
from poetry.utils._compat import Path | if TYPE_CHECKING: | |||
from poetry.utils.helpers import get_cert | from pathlib import Path | |||
from poetry.utils.helpers import get_client_cert | ||||
from poetry.utils.password_manager import PasswordManager | ||||
from .uploader import Uploader | from cleo.io.io import IO | |||
from poetry.poetry import Poetry | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | |||
class Publisher: | class Publisher: | |||
""" | """ | |||
Registers and publishes packages to remote repositories. | Registers and publishes packages to remote repositories. | |||
""" | """ | |||
def __init__(self, poetry, io): | def __init__(self, poetry: Poetry, io: IO) -> None: | |||
self._poetry = poetry | self._poetry = poetry | |||
self._package = poetry.package | self._package = poetry.package | |||
self._io = io | self._io = io | |||
self._uploader = Uploader(poetry, io) | self._uploader = Uploader(poetry, io) | |||
self._password_manager = PasswordManager(poetry.config) | self._authenticator = Authenticator(poetry.config, self._io) | |||
@property | @property | |||
def files(self): | def files(self) -> list[Path]: | |||
return self._uploader.files | return self._uploader.files | |||
def publish( | def publish( | |||
self, | self, | |||
repository_name, | repository_name: str | None, | |||
username, | username: str | None, | |||
password, | password: str | None, | |||
cert=None, | cert: Path | None = None, | |||
client_cert=None, | client_cert: Path | None = None, | |||
dry_run=False, | dry_run: bool = False, | |||
): # type: (Optional[str], Optional[str], Optional[str], Optional[Path], Op | skip_existing: bool = False, | |||
tional[Path], Optional[bool]) -> None | ) -> None: | |||
if not repository_name: | if not repository_name: | |||
url = "https://upload.pypi.org/legacy/" | url = "https://upload.pypi.org/legacy/" | |||
repository_name = "pypi" | repository_name = "pypi" | |||
else: | else: | |||
# Retrieving config information | # Retrieving config information | |||
url = self._poetry.config.get("repositories.{}.url".format(repositor y_name)) | url = self._poetry.config.get(f"repositories.{repository_name}.url") | |||
if url is None: | if url is None: | |||
raise RuntimeError( | raise RuntimeError(f"Repository {repository_name} is not defined | |||
"Repository {} is not defined".format(repository_name) | ") | |||
) | ||||
if not (username and password): | if not (username and password): | |||
# Check if we have a token first | # Check if we have a token first | |||
token = self._password_manager.get_pypi_token(repository_name) | token = self._authenticator.get_pypi_token(repository_name) | |||
if token: | if token: | |||
logger.debug("Found an API token for {}.".format(repository_name )) | logger.debug(f"Found an API token for {repository_name}.") | |||
username = "__token__" | username = "__token__" | |||
password = token | password = token | |||
else: | else: | |||
auth = self._password_manager.get_http_auth(repository_name) | auth = self._authenticator.get_http_auth(repository_name) | |||
if auth: | if auth: | |||
logger.debug( | logger.debug( | |||
"Found authentication information for {}.".format( | f"Found authentication information for {repository_name} | |||
repository_name | ." | |||
) | ||||
) | ) | |||
username = auth["username"] | username = auth.username | |||
password = auth["password"] | password = auth.password | |||
resolved_client_cert = client_cert or get_client_cert( | certificates = self._authenticator.get_certs_for_repository(repository_n | |||
self._poetry.config, repository_name | ame) | |||
) | resolved_cert = cert or certificates.cert or certificates.verify | |||
# Requesting missing credentials but only if there is not a client cert | resolved_client_cert = client_cert or certificates.client_cert | |||
defined. | ||||
if not resolved_client_cert: | ||||
if username is None: | ||||
username = self._io.ask("Username:") | ||||
# skip password input if no username is provided, assume unauthentic | ||||
ated | ||||
if username and password is None: | ||||
password = self._io.ask_hidden("Password:") | ||||
self._uploader.auth(username, password) | self._uploader.auth(username, password) | |||
if repository_name == "pypi": | ||||
repository_name = "PyPI" | ||||
self._io.write_line( | self._io.write_line( | |||
"Publishing <c1>{}</c1> (<c2>{}</c2>) " | f"Publishing <c1>{self._package.pretty_name}</c1>" | |||
"to <info>{}</info>".format( | f" (<c2>{self._package.pretty_version}</c2>) to" | |||
self._package.pretty_name, | f" <info>{repository_name}</info>" | |||
self._package.pretty_version, | ||||
"PyPI" if repository_name == "pypi" else repository_name, | ||||
) | ||||
) | ) | |||
self._uploader.upload( | self._uploader.upload( | |||
url, | url, | |||
cert=cert or get_cert(self._poetry.config, repository_name), | cert=resolved_cert, | |||
client_cert=resolved_client_cert, | client_cert=resolved_client_cert, | |||
dry_run=dry_run, | dry_run=dry_run, | |||
skip_existing=skip_existing, | ||||
) | ) | |||
End of changes. 20 change blocks. | ||||
49 lines changed or deleted | 43 lines changed or added |