"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "poetry/repositories/pypi_repository.py" between
poetry-1.1.15.tar.gz and poetry-1.2.0.tar.gz

About: Poetry is a tool for dependency management and packaging in Python.

pypi_repository.py  (poetry-1.1.15):pypi_repository.py  (poetry-1.2.0)
from __future__ import annotations
import logging import logging
import os
from collections import defaultdict from collections import defaultdict
from typing import Dict from typing import TYPE_CHECKING
from typing import List from typing import Any
from typing import Union
import requests import requests
from cachecontrol import CacheControl
from cachecontrol.caches.file_cache import FileCache
from cachecontrol.controller import logger as cache_control_logger from cachecontrol.controller import logger as cache_control_logger
from cachy import CacheManager
from html5lib.html5parser import parse from html5lib.html5parser import parse
from poetry.core.packages.package import Package
from poetry.core.packages import Dependency
from poetry.core.packages import Package
from poetry.core.packages import dependency_from_pep_508
from poetry.core.packages.utils.link import Link from poetry.core.packages.utils.link import Link
from poetry.core.semver import VersionConstraint from poetry.core.semver.version import Version
from poetry.core.semver import VersionRange from poetry.core.version.exceptions import InvalidVersion
from poetry.core.semver import parse_constraint
from poetry.core.semver.exceptions import ParseVersionError from poetry.repositories.exceptions import PackageNotFound
from poetry.core.version.markers import parse_marker from poetry.repositories.http import HTTPRepository
from poetry.locations import REPOSITORY_CACHE_DIR
from poetry.utils._compat import Path
from poetry.utils._compat import to_str from poetry.utils._compat import to_str
from poetry.utils.helpers import download_file from poetry.utils.constants import REQUESTS_TIMEOUT
from poetry.utils.helpers import temporary_directory
from poetry.utils.patterns import wheel_file_re
from ..inspection.info import PackageInfo
from .exceptions import PackageNotFound
from .remote_repository import RemoteRepository
try:
import urllib.parse as urlparse
except ImportError:
import urlparse
cache_control_logger.setLevel(logging.ERROR) cache_control_logger.setLevel(logging.ERROR)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class PyPiRepository(RemoteRepository): if TYPE_CHECKING:
from packaging.utils import NormalizedName
CACHE_VERSION = parse_constraint("1.0.0") from poetry.core.semver.version_constraint import VersionConstraint
def __init__(self, url="https://pypi.org/", disable_cache=False, fallback=Tr class PyPiRepository(HTTPRepository):
ue): def __init__(
super(PyPiRepository, self).__init__(url.rstrip("/") + "/simple/") self,
url: str = "https://pypi.org/",
disable_cache: bool = False,
fallback: bool = True,
) -> None:
super().__init__(
"PyPI", url.rstrip("/") + "/simple/", disable_cache=disable_cache
)
self._base_url = url self._base_url = url
self._disable_cache = disable_cache
self._fallback = fallback self._fallback = fallback
release_cache_dir = REPOSITORY_CACHE_DIR / "pypi" def search(self, query: str) -> list[Package]:
self._cache = CacheManager(
{
"default": "releases",
"serializer": "json",
"stores": {
"releases": {"driver": "file", "path": str(release_cache_dir
)},
"packages": {"driver": "dict"},
},
}
)
self._cache_control_cache = FileCache(str(release_cache_dir / "_http"))
self._name = "PyPI"
@property
def session(self):
return CacheControl(requests.session(), cache=self._cache_control_cache)
def find_packages(self, dependency): # type: (Dependency) -> List[Package]
"""
Find packages on the remote server.
"""
constraint = dependency.constraint
if constraint is None:
constraint = "*"
if not isinstance(constraint, VersionConstraint):
constraint = parse_constraint(constraint)
allow_prereleases = dependency.allows_prereleases()
if isinstance(constraint, VersionRange):
if (
constraint.max is not None
and constraint.max.is_prerelease()
or constraint.min is not None
and constraint.min.is_prerelease()
):
allow_prereleases = True
try:
info = self.get_package_info(dependency.name)
except PackageNotFound:
self._log(
"No packages found for {} {}".format(dependency.name, str(constr
aint)),
level="debug",
)
return []
packages = []
ignored_pre_release_packages = []
for version, release in info["releases"].items():
if not release:
# Bad release
self._log(
"No release information found for {}-{}, skipping".format(
dependency.name, version
),
level="debug",
)
continue
try:
package = Package(info["info"]["name"], version)
except ParseVersionError:
self._log(
'Unable to parse version "{}" for the {} package, skipping'.
format(
version, dependency.name
),
level="debug",
)
continue
if package.is_prerelease() and not allow_prereleases:
if constraint.is_any():
# we need this when all versions of the package are pre-rele
ases
ignored_pre_release_packages.append(package)
continue
if not constraint or (constraint and constraint.allows(package.versi
on)):
packages.append(package)
self._log(
"{} packages found for {} {}".format(
len(packages), dependency.name, str(constraint)
),
level="debug",
)
return packages or ignored_pre_release_packages
def package(
self,
name, # type: str
version, # type: str
extras=None, # type: (Union[list, None])
): # type: (...) -> Package
return self.get_release_info(name, version).to_package(name=name, extras
=extras)
def search(self, query):
results = [] results = []
search = {"q": query} search = {"q": query}
response = requests.session().get(self._base_url + "search", params=sear response = requests.session().get(
ch) self._base_url + "search", params=search, timeout=REQUESTS_TIMEOUT
)
content = parse(response.content, namespaceHTMLElements=False) content = parse(response.content, namespaceHTMLElements=False)
for result in content.findall(".//*[@class='package-snippet']"): for result in content.findall(".//*[@class='package-snippet']"):
name = result.find("h3/*[@class='package-snippet__name']").text name_element = result.find("h3/*[@class='package-snippet__name']")
version = result.find("h3/*[@class='package-snippet__version']").tex version_element = result.find("h3/*[@class='package-snippet__version
t ']")
if not name or not version: if (
name_element is None
or version_element is None
or not name_element.text
or not version_element.text
):
continue continue
description = result.find("p[@class='package-snippet__description']" name = name_element.text
).text version = version_element.text
if not description:
description = "" description_element = result.find(
"p[@class='package-snippet__description']"
)
description = (
description_element.text
if description_element is not None and description_element.text
else ""
)
try: try:
result = Package(name, version, description) package = Package(name, version)
result.description = to_str(description.strip()) package.description = to_str(description.strip())
results.append(result) results.append(package)
except ParseVersionError: except InvalidVersion:
self._log( self._log(
'Unable to parse version "{}" for the {} package, skipping'. f'Unable to parse version "{version}" for the {name} package
format( ,'
version, name " skipping",
),
level="debug", level="debug",
) )
return results return results
def get_package_info(self, name): # type: (str) -> dict def get_package_info(self, name: NormalizedName) -> dict[str, Any]:
""" """
Return the package information given its name. Return the package information given its name.
The information is returned from the cache if it exists The information is returned from the cache if it exists
or retrieved from the remote server. or retrieved from the remote server.
""" """
if self._disable_cache: if self._disable_cache:
return self._get_package_info(name) return self._get_package_info(name)
return self._cache.store("packages").remember_forever( package_info: dict[str, Any] = self._cache.store("packages").remember_fo rever(
name, lambda: self._get_package_info(name) name, lambda: self._get_package_info(name)
) )
return package_info
def _get_package_info(self, name): # type: (str) -> dict def _find_packages(
data = self._get("pypi/{}/json".format(name)) self, name: NormalizedName, constraint: VersionConstraint
if data is None: ) -> list[Package]:
raise PackageNotFound("Package [{}] not found.".format(name))
return data
def get_release_info(self, name, version): # type: (str, str) -> PackageInf
o
""" """
Return the release information given a package name and a version. Find packages on the remote server.
The information is returned from the cache if it exists
or retrieved from the remote server.
""" """
if self._disable_cache: try:
return PackageInfo.load(self._get_release_info(name, version)) info = self.get_package_info(name)
except PackageNotFound:
cached = self._cache.remember_forever(
"{}:{}".format(name, version), lambda: self._get_release_info(name,
version)
)
cache_version = cached.get("_cache_version", "0.0.0")
if parse_constraint(cache_version) != self.CACHE_VERSION:
# The cache must be updated
self._log( self._log(
"The cache for {} {} is outdated. Refreshing.".format(name, vers ion), f"No packages found for {name} {constraint!s}",
level="debug", level="debug",
) )
cached = self._get_release_info(name, version) return []
self._cache.forever("{}:{}".format(name, version), cached) packages = []
return PackageInfo.load(cached) for version_string, release in info["releases"].items():
if not release:
# Bad release
self._log(
f"No release information found for {name}-{version_string},"
" skipping",
level="debug",
)
continue
def find_links_for_package(self, package): try:
json_data = self._get("pypi/{}/{}/json".format(package.name, package.ver version = Version.parse(version_string)
sion)) except InvalidVersion:
self._log(
f'Unable to parse version "{version_string}" for the'
f" {name} package, skipping",
level="debug",
)
continue
if constraint.allows(version):
# PEP 592: PyPI always yanks entire releases, not individual fil
es,
# so we just have to look for the first file
yanked = self._get_yanked(release[0])
packages.append(Package(info["info"]["name"], version, yanked=ya
nked))
return packages
def _get_package_info(self, name: NormalizedName) -> dict[str, Any]:
data = self._get(f"pypi/{name}/json")
if data is None:
raise PackageNotFound(f"Package [{name}] not found.")
return data
def find_links_for_package(self, package: Package) -> list[Link]:
json_data = self._get(f"pypi/{package.name}/{package.version}/json")
if json_data is None: if json_data is None:
return [] return []
links = [] links = []
for url in json_data["urls"]: for url in json_data["urls"]:
h = "sha256={}".format(url["digests"]["sha256"]) h = f"sha256={url['digests']['sha256']}"
links.append(Link(url["url"] + "#" + h)) links.append(Link(url["url"] + "#" + h, yanked=self._get_yanked(url)
))
return links return links
def _get_release_info(self, name, version): # type: (str, str) -> dict def _get_release_info(
self._log("Getting info for {} ({}) from PyPI".format(name, version), "d self, name: NormalizedName, version: Version
ebug") ) -> dict[str, str | list[str] | None]:
from poetry.inspection.info import PackageInfo
json_data = self._get("pypi/{}/{}/json".format(name, version)) self._log(f"Getting info for {name} ({version}) from PyPI", "debug")
json_data = self._get(f"pypi/{name}/{version}/json")
if json_data is None: if json_data is None:
raise PackageNotFound("Package [{}] not found.".format(name)) raise PackageNotFound(f"Package [{name}] not found.")
info = json_data["info"] info = json_data["info"]
data = PackageInfo( data = PackageInfo(
name=info["name"], name=info["name"],
version=info["version"], version=info["version"],
summary=info["summary"], summary=info["summary"],
platform=info["platform"], platform=info["platform"],
requires_dist=info["requires_dist"], requires_dist=info["requires_dist"],
requires_python=info["requires_python"], requires_python=info["requires_python"],
files=info.get("files", []), files=info.get("files", []),
yanked=self._get_yanked(info),
cache_version=str(self.CACHE_VERSION), cache_version=str(self.CACHE_VERSION),
) )
try: try:
version_info = json_data["urls"] version_info = json_data["urls"]
except KeyError: except KeyError:
version_info = [] version_info = []
for file_info in version_info: for file_info in version_info:
data.files.append( data.files.append(
skipping to change at line 310 skipping to change at line 236
info = self._get_info_from_urls(urls) info = self._get_info_from_urls(urls)
data.requires_dist = info.requires_dist data.requires_dist = info.requires_dist
if not data.requires_python: if not data.requires_python:
data.requires_python = info.requires_python data.requires_python = info.requires_python
return data.asdict() return data.asdict()
def _get(self, endpoint): # type: (str) -> Union[dict, None] def _get(self, endpoint: str) -> dict[str, Any] | None:
try: try:
json_response = self.session.get(self._base_url + endpoint) json_response = self.session.get(
self._base_url + endpoint,
raise_for_status=False,
timeout=REQUESTS_TIMEOUT,
)
except requests.exceptions.TooManyRedirects: except requests.exceptions.TooManyRedirects:
# Cache control redirect loop. # Cache control redirect loop.
# We try to remove the cache and try again # We try to remove the cache and try again
self._cache_control_cache.delete(self._base_url + endpoint) self.session.delete_cache(self._base_url + endpoint)
json_response = self.session.get(self._base_url + endpoint) json_response = self.session.get(
self._base_url + endpoint,
raise_for_status=False,
timeout=REQUESTS_TIMEOUT,
)
if json_response.status_code == 404: if json_response.status_code != 200:
return None return None
json_data = json_response.json() json: dict[str, Any] = json_response.json()
return json
return json_data
def _get_info_from_urls(self, urls): # type: (Dict[str, List[str]]) -> Pack
ageInfo
# Checking wheels first as they are more likely to hold
# the necessary information
if "bdist_wheel" in urls:
# Check fo a universal wheel
wheels = urls["bdist_wheel"]
universal_wheel = None
universal_python2_wheel = None
universal_python3_wheel = None
platform_specific_wheels = []
for wheel in wheels:
link = Link(wheel)
m = wheel_file_re.match(link.filename)
if not m:
continue
pyver = m.group("pyver")
abi = m.group("abi")
plat = m.group("plat")
if abi == "none" and plat == "any":
# Universal wheel
if pyver == "py2.py3":
# Any Python
universal_wheel = wheel
elif pyver == "py2":
universal_python2_wheel = wheel
else:
universal_python3_wheel = wheel
else:
platform_specific_wheels.append(wheel)
if universal_wheel is not None:
return self._get_info_from_wheel(universal_wheel)
info = None
if universal_python2_wheel and universal_python3_wheel:
info = self._get_info_from_wheel(universal_python2_wheel)
py3_info = self._get_info_from_wheel(universal_python3_wheel)
if py3_info.requires_dist:
if not info.requires_dist:
info.requires_dist = py3_info.requires_dist
return info
py2_requires_dist = set(
dependency_from_pep_508(r).to_pep_508()
for r in info.requires_dist
)
py3_requires_dist = set(
dependency_from_pep_508(r).to_pep_508()
for r in py3_info.requires_dist
)
base_requires_dist = py2_requires_dist & py3_requires_dist
py2_only_requires_dist = py2_requires_dist - py3_requires_di
st
py3_only_requires_dist = py3_requires_dist - py2_requires_di
st
# Normalizing requires_dist
requires_dist = list(base_requires_dist)
for requirement in py2_only_requires_dist:
dep = dependency_from_pep_508(requirement)
dep.marker = dep.marker.intersect(
parse_marker("python_version == '2.7'")
)
requires_dist.append(dep.to_pep_508())
for requirement in py3_only_requires_dist:
dep = dependency_from_pep_508(requirement)
dep.marker = dep.marker.intersect(
parse_marker("python_version >= '3'")
)
requires_dist.append(dep.to_pep_508())
info.requires_dist = sorted(list(set(requires_dist)))
if info:
return info
# Prefer non platform specific wheels
if universal_python3_wheel:
return self._get_info_from_wheel(universal_python3_wheel)
if universal_python2_wheel:
return self._get_info_from_wheel(universal_python2_wheel)
if platform_specific_wheels and "sdist" not in urls:
# Pick the first wheel available and hope for the best
return self._get_info_from_wheel(platform_specific_wheels[0])
return self._get_info_from_sdist(urls["sdist"][0])
def _get_info_from_wheel(self, url): # type: (str) -> PackageInfo
self._log(
"Downloading wheel: {}".format(urlparse.urlparse(url).path.rsplit("/
")[-1]),
level="debug",
)
filename = os.path.basename(urlparse.urlparse(url).path.rsplit("/")[-1])
with temporary_directory() as temp_dir:
filepath = Path(temp_dir) / filename
self._download(url, str(filepath))
return PackageInfo.from_wheel(filepath)
def _get_info_from_sdist(self, url): # type: (str) -> PackageInfo
self._log(
"Downloading sdist: {}".format(urlparse.urlparse(url).path.rsplit("/
")[-1]),
level="debug",
)
filename = os.path.basename(urlparse.urlparse(url).path)
with temporary_directory() as temp_dir:
filepath = Path(temp_dir) / filename
self._download(url, str(filepath))
return PackageInfo.from_sdist(filepath)
def _download(self, url, dest): # type: (str, str) -> None
return download_file(url, dest, session=self.session)
def _log(self, msg, level="info"): @staticmethod
getattr(logger, level)("<debug>{}:</debug> {}".format(self._name, msg)) def _get_yanked(json_data: dict[str, Any]) -> str | bool:
if json_data.get("yanked", False):
return json_data.get("yanked_reason") or True # noqa: SIM222
return False
 End of changes. 40 change blocks. 
338 lines changed or deleted 133 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)