"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "poetry/publishing/uploader.py" between
poetry-1.1.15.tar.gz and poetry-1.2.0.tar.gz

About: Poetry is a tool for dependency management and packaging in Python.

uploader.py  (poetry-1.1.15):uploader.py  (poetry-1.2.0)
from __future__ import annotations
import hashlib import hashlib
import io import io
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Union
import requests import requests
from poetry.core.masonry.metadata import Metadata
from poetry.core.masonry.utils.helpers import escape_name
from poetry.core.masonry.utils.helpers import escape_version
from poetry.core.utils.helpers import normalize_version
from requests import adapters from requests import adapters
from requests.exceptions import ConnectionError from requests.exceptions import ConnectionError
from requests.exceptions import HTTPError from requests.exceptions import HTTPError
from requests.packages.urllib3 import util
from requests_toolbelt import user_agent from requests_toolbelt import user_agent
from requests_toolbelt.multipart import MultipartEncoder from requests_toolbelt.multipart import MultipartEncoder
from requests_toolbelt.multipart import MultipartEncoderMonitor from requests_toolbelt.multipart import MultipartEncoderMonitor
from urllib3 import util
from poetry.__version__ import __version__ from poetry.__version__ import __version__
from poetry.core.masonry.metadata import Metadata from poetry.utils.constants import REQUESTS_TIMEOUT
from poetry.core.masonry.utils.helpers import escape_name
from poetry.core.masonry.utils.helpers import escape_version
from poetry.utils._compat import Path
from poetry.utils.helpers import normalize_version
from poetry.utils.patterns import wheel_file_re from poetry.utils.patterns import wheel_file_re
from cleo.io.io import IO
from poetry.poetry import Poetry
_has_blake2 = hasattr(hashlib, "blake2b") _has_blake2 = hasattr(hashlib, "blake2b")
class UploadError(Exception): class UploadError(Exception):
def __init__(self, error): # type: (Union[ConnectionError, HTTPError]) -> N one def __init__(self, error: ConnectionError | HTTPError | str) -> None:
if isinstance(error, HTTPError): if isinstance(error, HTTPError):
message = "HTTP Error {}: {}".format( message = (
error.response.status_code, error.response.reason f"HTTP Error {error.response.status_code}: {error.response.reaso
n} |"
f" {error.response.content!r}"
) )
elif isinstance(error, ConnectionError): elif isinstance(error, ConnectionError):
message = ( message = (
"Connection Error: We were unable to connect to the repository, " "Connection Error: We were unable to connect to the repository, "
"ensure the url is correct and can be reached." "ensure the url is correct and can be reached."
) )
else: else:
message = str(error) message = str(error)
super(UploadError, self).__init__(message) super().__init__(message)
class Uploader: class Uploader:
def __init__(self, poetry, io): def __init__(self, poetry: Poetry, io: IO) -> None:
self._poetry = poetry self._poetry = poetry
self._package = poetry.package self._package = poetry.package
self._io = io self._io = io
self._username = None self._username: str | None = None
self._password = None self._password: str | None = None
@property @property
def user_agent(self): def user_agent(self) -> str:
return user_agent("poetry", __version__) agent: str = user_agent("poetry", __version__)
return agent
@property @property
def adapter(self): def adapter(self) -> adapters.HTTPAdapter:
retry = util.Retry( retry = util.Retry(
connect=5, connect=5,
total=10, total=10,
method_whitelist=["GET"], allowed_methods=["GET"],
status_forcelist=[500, 501, 502, 503], status_forcelist=[500, 501, 502, 503],
) )
return adapters.HTTPAdapter(max_retries=retry) return adapters.HTTPAdapter(max_retries=retry)
@property @property
def files(self): # type: () -> List[Path] def files(self) -> list[Path]:
dist = self._poetry.file.parent / "dist" dist = self._poetry.file.parent / "dist"
version = normalize_version(self._package.version.text) version = normalize_version(self._package.version.text)
wheels = list( wheels = list(
dist.glob( dist.glob(
"{}-{}-*.whl".format( f"{escape_name(self._package.pretty_name)}-{escape_version(versi
escape_name(self._package.pretty_name), escape_version(versi on)}"
on) "-*.whl"
) )
) )
tars = list( tars = list(dist.glob(f"{self._package.pretty_name}-{version}.tar.gz"))
dist.glob("{}-{}.tar.gz".format(self._package.pretty_name, version))
return sorted(wheels + tars) return sorted(wheels + tars)
def auth(self, username, password): def auth(self, username: str | None, password: str | None) -> None:
self._username = username self._username = username
self._password = password self._password = password
def make_session(self): # type: () -> requests.Session def make_session(self) -> requests.Session:
session = requests.session() session = requests.session()
if self.is_authenticated(): auth = self.get_auth()
session.auth = (self._username, self._password) if auth is not None:
session.auth = auth
session.headers["User-Agent"] = self.user_agent session.headers["User-Agent"] = self.user_agent
for scheme in ("http://", "https://"): for scheme in ("http://", "https://"):
session.mount(scheme, self.adapter) session.mount(scheme, self.adapter)
return session return session
def is_authenticated(self): def get_auth(self) -> tuple[str, str] | None:
return self._username is not None and self._password is not None if self._username is None or self._password is None:
return None
return (self._username, self._password)
def upload( def upload(
self, url, cert=None, client_cert=None, dry_run=False self,
): # type: (str, Optional[Path], Optional[Path], bool) -> None url: str,
cert: Path | bool = True,
client_cert: Path | None = None,
dry_run: bool = False,
skip_existing: bool = False,
) -> None:
session = self.make_session() session = self.make_session()
if cert: session.verify = str(cert) if isinstance(cert, Path) else cert
session.verify = str(cert)
if client_cert: if client_cert:
session.cert = str(client_cert) session.cert = str(client_cert)
try: try:
self._upload(session, url, dry_run) self._upload(session, url, dry_run, skip_existing)
finally: finally:
session.close() session.close()
def post_data(self, file): # type: (Path) -> Dict[str, Any] def post_data(self, file: Path) -> dict[str, Any]:
meta = Metadata.from_package(self._package) meta = Metadata.from_package(self._package)
file_type = self._get_type(file) file_type = self._get_type(file)
if _has_blake2: if _has_blake2:
blake2_256_hash = hashlib.blake2b(digest_size=256 // 8) blake2_256_hash = hashlib.blake2b(digest_size=256 // 8)
md5_hash = hashlib.md5() md5_hash = hashlib.md5()
sha256_hash = hashlib.sha256() sha256_hash = hashlib.sha256()
with file.open("rb") as fp: with file.open("rb") as fp:
for content in iter(lambda: fp.read(io.DEFAULT_BUFFER_SIZE), b""): for content in iter(lambda: fp.read(io.DEFAULT_BUFFER_SIZE), b""):
md5_hash.update(content) md5_hash.update(content)
sha256_hash.update(content) sha256_hash.update(content)
if _has_blake2: if _has_blake2:
blake2_256_hash.update(content) blake2_256_hash.update(content)
md5_digest = md5_hash.hexdigest() md5_digest = md5_hash.hexdigest()
sha2_digest = sha256_hash.hexdigest() sha2_digest = sha256_hash.hexdigest()
blake2_256_digest: str | None = None
if _has_blake2: if _has_blake2:
blake2_256_digest = blake2_256_hash.hexdigest() blake2_256_digest = blake2_256_hash.hexdigest()
blake2_256_digest = None
py_version: str | None = None
if file_type == "bdist_wheel": if file_type == "bdist_wheel":
wheel_info = wheel_file_re.match(file.name) wheel_info = wheel_file_re.match(file.name)
py_version = wheel_info.group("pyver") if wheel_info is not None:
else: py_version = wheel_info.group("pyver")
py_version = None
data = { data = {
# identify release # identify release
"name": meta.name, "name": meta.name,
"version": meta.version, "version": meta.version,
# file content # file content
"filetype": file_type, "filetype": file_type,
"pyversion": py_version, "pyversion": py_version,
# additional meta-data # additional meta-data
"metadata_version": meta.metadata_version, "metadata_version": meta.metadata_version,
skipping to change at line 199 skipping to change at line 210
# Metadata 2.1 # Metadata 2.1
if meta.description_content_type: if meta.description_content_type:
data["description_content_type"] = meta.description_content_type data["description_content_type"] = meta.description_content_type
# TODO: Provides extra # TODO: Provides extra
return data return data
def _upload( def _upload(
self, session, url, dry_run=False self,
): # type: (requests.Session, str, Optional[bool]) -> None session: requests.Session,
try: url: str,
self._do_upload(session, url, dry_run) dry_run: bool = False,
except HTTPError as e: skip_existing: bool = False,
if ( ) -> None:
e.response.status_code == 400
and "was ever registered" in e.response.text
self._register(session, url)
except HTTPError as e:
raise UploadError(e)
raise UploadError(e)
def _do_upload(
self, session, url, dry_run=False
): # type: (requests.Session, str, Optional[bool]) -> None
for file in self.files: for file in self.files:
# TODO: Check existence # TODO: Check existence
resp = self._upload_file(session, url, file, dry_run) self._upload_file(session, url, file, dry_run, skip_existing)
if not dry_run:
def _upload_file( def _upload_file(
self, session, url, file, dry_run=False self,
): # type: (requests.Session, str, Path, Optional[bool]) -> requests.Respon session: requests.Session,
se url: str,
file: Path,
dry_run: bool = False,
skip_existing: bool = False,
) -> None:
from cleo.ui.progress_bar import ProgressBar
data = self.post_data(file) data = self.post_data(file)
data.update( data.update(
{ {
# action # action
":action": "file_upload", ":action": "file_upload",
"protocol_version": "1", "protocol_version": "1",
} }
) )
data_to_send = self._prepare_data(data) data_to_send: list[tuple[str, Any]] = self._prepare_data(data)
with file.open("rb") as fp: with file.open("rb") as fp:
data_to_send.append( data_to_send.append(
("content", (file.name, fp, "application/octet-stream")) ("content", (file.name, fp, "application/octet-stream"))
) )
encoder = MultipartEncoder(data_to_send) encoder = MultipartEncoder(data_to_send)
bar = self._io.progress_bar(encoder.len) bar = ProgressBar(self._io, max=encoder.len)
bar.set_format( bar.set_format(f" - Uploading <c1>{file.name}</c1> <b>%percent%%</b>
" - Uploading <c1>{0}</c1> <b>%percent%%</b>".format(file.name) ")
monitor = MultipartEncoderMonitor( monitor = MultipartEncoderMonitor(
encoder, lambda monitor: bar.set_progress(monitor.bytes_read) encoder, lambda monitor: bar.set_progress(monitor.bytes_read)
) )
bar.start() bar.start()
resp = None resp = None
try: try:
if not dry_run: if not dry_run:
resp = session.post( resp = session.post(
url, url,
data=monitor, data=monitor,
allow_redirects=False, allow_redirects=False,
headers={"Content-Type": monitor.content_type}, headers={"Content-Type": monitor.content_type},
) )
if dry_run or resp.ok: if resp is None or 200 <= resp.status_code < 300:
bar.set_format( bar.set_format(
" - Uploading <c1>{0}</c1> <fg=green>%percent%%</>".form f" - Uploading <c1>{file.name}</c1> <fg=green>%percent%%
at( </>"
) )
bar.finish() bar.finish()
elif resp.status_code == 301:
if self._io.output.is_decorated():
f" - Uploading <c1>{file.name}</c1> <error>FAILED</>
raise UploadError(
"Redirects are not supported. "
"Is the URL missing a trailing slash?"
elif resp.status_code == 400 and "was ever registered" in resp.t
self._register(session, url)
elif skip_existing and self._is_file_exists_error(resp):
f" - Uploading <c1>{file.name}</c1> <warning>File exists
" Skipping</>"
except (requests.ConnectionError, requests.HTTPError) as e: except (requests.ConnectionError, requests.HTTPError) as e:
if self._io.output.supports_ansi(): if self._io.output.is_decorated():
self._io.overwrite( self._io.overwrite(
" - Uploading <c1>{0}</c1> <error>{1}</>".format( f" - Uploading <c1>{file.name}</c1> <error>FAILED</>"
file.name, "FAILED"
) )
raise UploadError(e) raise UploadError(e)
finally: finally:
self._io.write_line("") self._io.write_line("")
return resp def _register(self, session: requests.Session, url: str) -> requests.Respons
def _register(
self, session, url
): # type: (requests.Session, str) -> requests.Response
""" """
Register a package to a repository. Register a package to a repository.
""" """
dist = self._poetry.file.parent / "dist" dist = self._poetry.file.parent / "dist"
file = dist / "{}-{}.tar.gz".format( file = (
self._package.name, normalize_version(self._package.version.text) dist
/ f"{self._package.name}-{normalize_version(self._package.version.te
xt)}.tar.gz" # noqa: E501
) )
if not file.exists(): if not file.exists():
raise RuntimeError('"{0}" does not exist.'.format(file.name)) raise RuntimeError(f'"{file.name}" does not exist.')
data = self.post_data(file) data = self.post_data(file)
data.update({":action": "submit", "protocol_version": "1"}) data.update({":action": "submit", "protocol_version": "1"})
data_to_send = self._prepare_data(data) data_to_send = self._prepare_data(data)
encoder = MultipartEncoder(data_to_send) encoder = MultipartEncoder(data_to_send)
resp = session.post( resp = session.post(
url, url,
data=encoder, data=encoder,
allow_redirects=False, allow_redirects=False,
headers={"Content-Type": encoder.content_type}, headers={"Content-Type": encoder.content_type},
) )
resp.raise_for_status() resp.raise_for_status()
return resp return resp
def _prepare_data(self, data): def _prepare_data(self, data: dict[str, Any]) -> list[tuple[str, str]]:
data_to_send = [] data_to_send = []
for key, value in data.items(): for key, value in data.items():
if not isinstance(value, (list, tuple)): if not isinstance(value, (list, tuple)):
data_to_send.append((key, value)) data_to_send.append((key, value))
else: else:
for item in value: for item in value:
data_to_send.append((key, item)) data_to_send.append((key, item))
return data_to_send return data_to_send
def _get_type(self, file): def _get_type(self, file: Path) -> str:
exts = file.suffixes exts = file.suffixes
if exts[-1] == ".whl": if exts[-1] == ".whl":
return "bdist_wheel" return "bdist_wheel"
elif len(exts) >= 2 and "".join(exts[-2:]) == ".tar.gz": elif len(exts) >= 2 and "".join(exts[-2:]) == ".tar.gz":
return "sdist" return "sdist"
raise ValueError("Unknown distribution format {}".format("".join(exts))) raise ValueError("Unknown distribution format " + "".join(exts))
def _is_file_exists_error(self, response: requests.Response) -> bool:
# based on https://github.com/pypa/twine/blob/a6dd69c79f7b5abfb79022092a
5d3776a499e31b/twine/commands/upload.py#L32 # noqa: E501
status = response.status_code
reason = response.reason.lower()
text = response.text.lower()
reason_and_text = reason + text
return (
# pypiserver (https://pypi.org/project/pypiserver)
status == 409
# PyPI / TestPyPI / GCP Artifact Registry
or (status == 400 and "already exist" in reason_and_text)
# Nexus Repository OSS (https://www.sonatype.com/nexus-repository-os
or (status == 400 and "updating asset" in reason_and_text)
# Artifactory (https://jfrog.com/artifactory/)
or (status == 403 and "overwrite artifact" in reason_and_text)
# Gitlab Enterprise Edition (https://about.gitlab.com)
or (status == 400 and "already been taken" in reason_and_text)
 End of changes. 49 change blocks. 
96 lines changed or deleted 117 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)