"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "poetry/puzzle/provider.py" between
poetry-1.1.15.tar.gz and poetry-1.2.0.tar.gz

About: Poetry is a tool for dependency management and packaging in Python.

provider.py  (poetry-1.1.15):provider.py  (poetry-1.2.0)
from __future__ import annotations
import functools
import logging import logging
import os import os
import re import re
import tempfile
import time import time
import urllib.parse
from collections import defaultdict
from contextlib import contextmanager from contextlib import contextmanager
from tempfile import mkdtemp from pathlib import Path
from typing import Any from typing import TYPE_CHECKING
from typing import List from typing import cast
from typing import Optional
from cleo.ui.progress_indicator import ProgressIndicator
from clikit.ui.components import ProgressIndicator
from poetry.core.packages import Dependency
from poetry.core.packages import DirectoryDependency
from poetry.core.packages import FileDependency
from poetry.core.packages import Package
from poetry.core.packages import URLDependency
from poetry.core.packages import VCSDependency
from poetry.core.packages.utils.utils import get_python_constraint_from_marker from poetry.core.packages.utils.utils import get_python_constraint_from_marker
from poetry.core.semver.empty_constraint import EmptyConstraint
from poetry.core.semver.version import Version from poetry.core.semver.version import Version
from poetry.core.vcs.git import Git from poetry.core.version.markers import AnyMarker
from poetry.core.version.markers import MarkerUnion from poetry.core.version.markers import MarkerUnion
from poetry.inspection.info import PackageInfo from poetry.inspection.info import PackageInfo
from poetry.inspection.info import PackageInfoError from poetry.inspection.info import PackageInfoError
from poetry.mixology.incompatibility import Incompatibility from poetry.mixology.incompatibility import Incompatibility
from poetry.mixology.incompatibility_cause import DependencyCause from poetry.mixology.incompatibility_cause import DependencyCause
from poetry.mixology.incompatibility_cause import PythonCause from poetry.mixology.incompatibility_cause import PythonCause
from poetry.mixology.term import Term from poetry.mixology.term import Term
from poetry.packages import DependencyPackage from poetry.packages import DependencyPackage
from poetry.packages.package_collection import PackageCollection from poetry.packages.package_collection import PackageCollection
from poetry.puzzle.exceptions import OverrideNeeded from poetry.puzzle.exceptions import OverrideNeeded
from poetry.repositories import Pool from poetry.repositories.exceptions import PackageNotFound
from poetry.utils._compat import OrderedDict
from poetry.utils._compat import Path
from poetry.utils._compat import urlparse
from poetry.utils.env import Env
from poetry.utils.helpers import download_file from poetry.utils.helpers import download_file
from poetry.utils.helpers import safe_rmtree from poetry.utils.helpers import safe_extra
from poetry.utils.helpers import temporary_directory from poetry.vcs.git import Git
if TYPE_CHECKING:
from collections.abc import Callable
from collections.abc import Iterable
from collections.abc import Iterator
from cleo.io.io import IO
from poetry.core.packages.dependency import Dependency
from poetry.core.packages.directory_dependency import DirectoryDependency
from poetry.core.packages.file_dependency import FileDependency
from poetry.core.packages.package import Package
from poetry.core.packages.url_dependency import URLDependency
from poetry.core.packages.vcs_dependency import VCSDependency
from poetry.core.semver.version_constraint import VersionConstraint
from poetry.core.version.markers import BaseMarker
from poetry.repositories import Pool
from poetry.utils.env import Env
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Indicator(ProgressIndicator): class Indicator(ProgressIndicator): # type: ignore[misc]
def _formatter_elapsed(self): CONTEXT: str | None = None
@staticmethod
@contextmanager
def context() -> Iterator[Callable[[str | None], None]]:
def _set_context(context: str | None) -> None:
Indicator.CONTEXT = context
yield _set_context
_set_context(None)
def _formatter_context(self) -> str:
if Indicator.CONTEXT is None:
return " "
else:
return f" <c1>{Indicator.CONTEXT}</> "
def _formatter_elapsed(self) -> str:
assert self._start_time is not None
elapsed = time.time() - self._start_time elapsed = time.time() - self._start_time
return "{:.1f}s".format(elapsed) return f"{elapsed:.1f}s"
class Provider: @functools.lru_cache(maxsize=None)
def _get_package_from_git(
url: str,
branch: str | None = None,
tag: str | None = None,
rev: str | None = None,
subdirectory: str | None = None,
source_root: Path | None = None,
) -> Package:
source = Git.clone(
url=url,
source_root=source_root,
branch=branch,
tag=tag,
revision=rev,
clean=False,
)
revision = Git.get_revision(source)
path = Path(source.path)
if subdirectory:
path = path.joinpath(subdirectory)
package = Provider.get_package_from_directory(path)
package._source_type = "git"
package._source_url = url
package._source_reference = rev or tag or branch or "HEAD"
package._source_resolved_reference = revision
package._source_subdirectory = subdirectory
UNSAFE_PACKAGES = {"setuptools", "distribute", "pip", "wheel"} return package
class Provider:
UNSAFE_PACKAGES: set[str] = set()
def __init__( def __init__(
self, package, pool, io, env=None self,
): # type: (Package, Pool, Any, Optional[Env]) -> None package: Package,
pool: Pool,
io: IO,
env: Env | None = None,
installed: list[Package] | None = None,
) -> None:
self._package = package self._package = package
self._pool = pool self._pool = pool
self._io = io self._io = io
self._env = env self._env = env
self._python_constraint = package.python_constraint self._python_constraint = package.python_constraint
self._search_for = {} self._is_debugging: bool = self._io.is_debug() or self._io.is_very_verbo
self._is_debugging = self._io.is_debug() or self._io.is_very_verbose() se()
self._in_progress = False self._in_progress = False
self._overrides = {} self._overrides: dict[DependencyPackage, dict[str, Dependency]] = {}
self._deferred_cache = {} self._deferred_cache: dict[Dependency, Package] = {}
self._load_deferred = True self._load_deferred = True
self._source_root: Path | None = None
self._installed_packages = installed if installed is not None else []
self._direct_origin_packages: dict[str, Package] = {}
@property @property
def pool(self): # type: () -> Pool def pool(self) -> Pool:
return self._pool return self._pool
def is_debugging(self): def is_debugging(self) -> bool:
return self._is_debugging return self._is_debugging
def set_overrides(self, overrides): def set_overrides(
self, overrides: dict[DependencyPackage, dict[str, Dependency]]
) -> None:
self._overrides = overrides self._overrides = overrides
def load_deferred(self, load_deferred): # type: (bool) -> None def load_deferred(self, load_deferred: bool) -> None:
self._load_deferred = load_deferred self._load_deferred = load_deferred
@contextmanager @contextmanager
def use_environment(self, env): # type: (Env) -> Provider def use_source_root(self, source_root: Path) -> Iterator[Provider]:
original_source_root = self._source_root
self._source_root = source_root
yield self
self._source_root = original_source_root
@contextmanager
def use_environment(self, env: Env) -> Iterator[Provider]:
original_env = self._env original_env = self._env
original_python_constraint = self._python_constraint original_python_constraint = self._python_constraint
self._env = env self._env = env
self._python_constraint = Version.parse(env.marker_env["python_full_vers ion"]) self._python_constraint = Version.parse(env.marker_env["python_full_vers ion"])
yield self yield self
self._env = original_env self._env = original_env
self._python_constraint = original_python_constraint self._python_constraint = original_python_constraint
def search_for(self, dependency): # type: (Dependency) -> List[Package] @staticmethod
def validate_package_for_dependency(
dependency: Dependency, package: Package
) -> None:
if dependency.name != package.name:
# For now, the dependency's name must match the actual package's nam
e
raise RuntimeError(
f"The dependency name for {dependency.name} does not match the a
ctual"
f" package's name: {package.name}"
)
def search_for_installed_packages(
self,
dependency: Dependency,
) -> list[Package]:
""" """
Search for the specifications that match the given dependency. Search for installed packages, when available, that satisfy the given
dependency.
The specifications in the returned list will be considered in reverse This is useful when dealing with packages that are under development, no
order, so the latest version ought to be last. t
published on package sources and/or only available via system installati
ons.
""" """
if dependency.is_root: if not self._installed_packages:
return PackageCollection(dependency, [self._package]) return []
for constraint in self._search_for.keys(): logger.debug(
if ( "Falling back to installed packages to discover metadata for <c2>%s<
constraint.is_same_package_as(dependency) />",
and constraint.constraint.intersect(dependency.constraint) dependency.complete_name,
== dependency.constraint )
): packages = [
packages = [ package
p for package in self._installed_packages
for p in self._search_for[constraint] if package.satisfies(dependency, ignore_source_type=True)
if dependency.constraint.allows(p.version) ]
] logger.debug(
"Found <c2>%d</> compatible packages for <c2>%s</>",
packages.sort( len(packages),
key=lambda p: ( dependency.complete_name,
not p.is_prerelease() and not dependency.allows_prerelea )
ses(), return packages
p.version,
),
reverse=True,
)
return PackageCollection(dependency, packages) def search_for_direct_origin_dependency(self, dependency: Dependency) -> Pac
kage:
package = self._deferred_cache.get(dependency)
if package is not None:
pass
elif dependency.is_vcs():
dependency = cast("VCSDependency", dependency)
package = self._search_for_vcs(dependency)
if dependency.is_vcs():
packages = self.search_for_vcs(dependency)
elif dependency.is_file(): elif dependency.is_file():
packages = self.search_for_file(dependency) dependency = cast("FileDependency", dependency)
package = self._search_for_file(dependency)
elif dependency.is_directory(): elif dependency.is_directory():
packages = self.search_for_directory(dependency) dependency = cast("DirectoryDependency", dependency)
package = self._search_for_directory(dependency)
elif dependency.is_url(): elif dependency.is_url():
packages = self.search_for_url(dependency) dependency = cast("URLDependency", dependency)
else: package = self._search_for_url(dependency)
packages = self._pool.find_packages(dependency)
packages.sort( else:
key=lambda p: ( raise RuntimeError(
not p.is_prerelease() and not dependency.allows_prereleases( f"Unknown direct dependency type {dependency.source_type}"
),
p.version,
),
reverse=True,
) )
self._search_for[dependency] = packages if dependency.is_vcs():
dependency._source_reference = package.source_reference
dependency._source_resolved_reference = package.source_resolved_refe
rence
dependency._source_subdirectory = package.source_subdirectory
dependency._constraint = package.version
dependency._pretty_constraint = package.version.text
self._deferred_cache[dependency] = package
return package
def search_for(self, dependency: Dependency) -> list[DependencyPackage]:
"""
Search for the specifications that match the given dependency.
The specifications in the returned list will be considered in reverse
order, so the latest version ought to be last.
"""
if dependency.is_root:
return PackageCollection(dependency, [self._package])
if dependency.is_direct_origin():
package = self.search_for_direct_origin_dependency(dependency)
self._direct_origin_packages[dependency.name] = package
return PackageCollection(dependency, [package])
# If we've previously found a direct-origin package that meets this depe
ndency,
# use it.
#
# We rely on the VersionSolver resolving direct-origin dependencies firs
t.
direct_origin_package = self._direct_origin_packages.get(dependency.name
)
if direct_origin_package is not None:
packages = (
[direct_origin_package]
if dependency.constraint.allows(direct_origin_package.version)
else []
)
return PackageCollection(dependency, packages)
packages = self._pool.find_packages(dependency)
packages.sort(
key=lambda p: (
not p.yanked,
not p.is_prerelease() and not dependency.allows_prereleases(),
p.version,
),
reverse=True,
)
if not packages:
packages = self.search_for_installed_packages(dependency)
return PackageCollection(dependency, packages) return PackageCollection(dependency, packages)
def search_for_vcs(self, dependency): # type: (VCSDependency) -> List[Packa ge] def _search_for_vcs(self, dependency: VCSDependency) -> Package:
""" """
Search for the specifications that match the given VCS dependency. Search for the specifications that match the given VCS dependency.
Basically, we clone the repository in a temporary directory Basically, we clone the repository in a temporary directory
and get the information we need by checking out the specified reference. and get the information we need by checking out the specified reference.
""" """
if dependency in self._deferred_cache:
return [self._deferred_cache[dependency]]
package = self.get_package_from_vcs( package = self.get_package_from_vcs(
dependency.vcs, dependency.vcs,
dependency.source, dependency.source,
branch=dependency.branch, branch=dependency.branch,
tag=dependency.tag, tag=dependency.tag,
rev=dependency.rev, rev=dependency.rev,
name=dependency.name, subdirectory=dependency.source_subdirectory,
source_root=self._source_root
or (self._env.path.joinpath("src") if self._env else None),
) )
package.develop = dependency.develop
dependency._constraint = package.version self.validate_package_for_dependency(dependency=dependency, package=pack
dependency._pretty_constraint = package.version.text age)
self._deferred_cache[dependency] = package package.develop = dependency.develop
return [package] return package
@classmethod @staticmethod
def get_package_from_vcs( def get_package_from_vcs(
cls, vcs, url, branch=None, tag=None, rev=None, name=None vcs: str,
): # type: (str, str, Optional[str], Optional[str]) -> Package url: str,
branch: str | None = None,
tag: str | None = None,
rev: str | None = None,
subdirectory: str | None = None,
source_root: Path | None = None,
) -> Package:
if vcs != "git": if vcs != "git":
raise ValueError("Unsupported VCS dependency {}".format(vcs)) raise ValueError(f"Unsupported VCS dependency {vcs}")
tmp_dir = Path( return _get_package_from_git(
mkdtemp(prefix="pypoetry-git-{}".format(url.split("/")[-1].rstrip(". url=url,
git"))) branch=branch,
tag=tag,
rev=rev,
subdirectory=subdirectory,
source_root=source_root,
) )
try: def _search_for_file(self, dependency: FileDependency) -> Package:
git = Git() package = self.get_package_from_file(dependency.full_path)
git.clone(url, tmp_dir)
reference = branch or tag or rev
if reference is not None:
git.checkout(reference, tmp_dir)
else:
reference = "HEAD"
revision = git.rev_parse(reference, tmp_dir).strip()
package = cls.get_package_from_directory(tmp_dir, name=name)
package._source_type = "git"
package._source_url = url
package._source_reference = reference
package._source_resolved_reference = revision
except Exception:
raise
finally:
safe_rmtree(str(tmp_dir))
return package
def search_for_file(self, dependency): # type: (FileDependency) -> List[Pac
kage]
if dependency in self._deferred_cache:
dependency, _package = self._deferred_cache[dependency]
package = _package.clone()
else:
package = self.get_package_from_file(dependency.full_path)
dependency._constraint = package.version self.validate_package_for_dependency(dependency=dependency, package=pack
dependency._pretty_constraint = package.version.text age)
self._deferred_cache[dependency] = (dependency, package)
if dependency.name != package.name:
# For now, the dependency's name must match the actual package's nam
e
raise RuntimeError(
"The dependency name for {} does not match the actual package's
name: {}".format(
dependency.name, package.name
)
)
if dependency.base is not None: if dependency.base is not None:
package.root_dir = dependency.base package.root_dir = dependency.base
package.files = [ package.files = [
{"file": dependency.path.name, "hash": "sha256:" + dependency.hash() } {"file": dependency.path.name, "hash": "sha256:" + dependency.hash() }
] ]
return [package] return package
@classmethod @classmethod
def get_package_from_file(cls, file_path): # type: (Path) -> Package def get_package_from_file(cls, file_path: Path) -> Package:
try: try:
package = PackageInfo.from_path(path=file_path).to_package( package = PackageInfo.from_path(path=file_path).to_package(
root_dir=file_path root_dir=file_path
) )
except PackageInfoError: except PackageInfoError:
raise RuntimeError( raise RuntimeError(
"Unable to determine package info from path: {}".format(file_pat h) f"Unable to determine package info from path: {file_path}"
) )
return package return package
def search_for_directory( def _search_for_directory(self, dependency: DirectoryDependency) -> Package:
self, dependency package = self.get_package_from_directory(dependency.full_path)
): # type: (DirectoryDependency) -> List[Package]
if dependency in self._deferred_cache:
dependency, _package = self._deferred_cache[dependency]
package = _package.clone()
else:
package = self.get_package_from_directory(
dependency.full_path, name=dependency.name
)
dependency._constraint = package.version
dependency._pretty_constraint = package.version.text
self._deferred_cache[dependency] = (dependency, package) self.validate_package_for_dependency(dependency=dependency, package=pack age)
package.develop = dependency.develop package.develop = dependency.develop
if dependency.base is not None: if dependency.base is not None:
package.root_dir = dependency.base package.root_dir = dependency.base
return [package]
@classmethod
def get_package_from_directory(
cls, directory, name=None
): # type: (Path, Optional[str]) -> Package
package = PackageInfo.from_directory(path=directory).to_package(
root_dir=directory
)
if name and name != package.name:
# For now, the dependency's name must match the actual package's nam
e
raise RuntimeError(
"The dependency name for {} does not match the actual package's
name: {}".format(
name, package.name
)
)
return package return package
def search_for_url(self, dependency): # type: (URLDependency) -> List[Packa @classmethod
ge] def get_package_from_directory(cls, directory: Path) -> Package:
if dependency in self._deferred_cache: return PackageInfo.from_directory(path=directory).to_package(root_dir=di
return [self._deferred_cache[dependency]] rectory)
def _search_for_url(self, dependency: URLDependency) -> Package:
package = self.get_package_from_url(dependency.url) package = self.get_package_from_url(dependency.url)
if dependency.name != package.name: self.validate_package_for_dependency(dependency=dependency, package=pack
# For now, the dependency's name must match the actual package's nam age)
e
raise RuntimeError(
"The dependency name for {} does not match the actual package's
name: {}".format(
dependency.name, package.name
)
)
for extra in dependency.extras: for extra in dependency.extras:
if extra in package.extras: if extra in package.extras:
for dep in package.extras[extra]: for dep in package.extras[extra]:
dep.activate() dep.activate()
package.requires += package.extras[extra] for extra_dep in package.extras[extra]:
package.add_dependency(extra_dep)
dependency._constraint = package.version
dependency._pretty_constraint = package.version.text
self._deferred_cache[dependency] = package return package
return [package]
@classmethod @classmethod
def get_package_from_url(cls, url): # type: (str) -> Package def get_package_from_url(cls, url: str) -> Package:
with temporary_directory() as temp_dir: file_name = os.path.basename(urllib.parse.urlparse(url).path)
temp_dir = Path(temp_dir) with tempfile.TemporaryDirectory() as temp_dir:
file_name = os.path.basename(urlparse.urlparse(url).path) dest = Path(temp_dir) / file_name
download_file(url, str(temp_dir / file_name)) download_file(url, dest)
package = cls.get_package_from_file(dest)
package = cls.get_package_from_file(temp_dir / file_name)
package._source_type = "url" package._source_type = "url"
package._source_url = url package._source_url = url
return package return package
def _get_dependencies_with_overrides(
self, dependencies: list[Dependency], package: DependencyPackage
) -> list[Dependency]:
overrides = self._overrides.get(package, {})
_dependencies = []
overridden = []
for dep in dependencies:
if dep.name in overrides:
if dep.name in overridden:
continue
# empty constraint is used in overrides to mark that the package
has
# already been handled and is not required for the attached mark
ers
if not overrides[dep.name].constraint.is_empty():
_dependencies.append(overrides[dep.name])
overridden.append(dep.name)
continue
_dependencies.append(dep)
return _dependencies
def incompatibilities_for( def incompatibilities_for(
self, package self, dependency_package: DependencyPackage
): # type: (DependencyPackage) -> List[Incompatibility] ) -> list[Incompatibility]:
""" """
Returns incompatibilities that encapsulate a given package's dependencie s, Returns incompatibilities that encapsulate a given package's dependencie s,
or that it can't be safely selected. or that it can't be safely selected.
If multiple subsequent versions of this package have the same If multiple subsequent versions of this package have the same
dependencies, this will return incompatibilities that reflect that. It dependencies, this will return incompatibilities that reflect that. It
won't return incompatibilities that have already been returned by a won't return incompatibilities that have already been returned by a
previous call to _incompatibilities_for(). previous call to _incompatibilities_for().
""" """
package = dependency_package.package
if package.is_root(): if package.is_root():
dependencies = package.all_requires dependencies = package.all_requires
else: else:
dependencies = package.requires dependencies = package.requires
if not package.python_constraint.allows_all(self._python_constraint) : if not package.python_constraint.allows_all(self._python_constraint) :
transitive_python_constraint = get_python_constraint_from_marker ( transitive_python_constraint = get_python_constraint_from_marker (
package.dependency.transitive_marker dependency_package.dependency.transitive_marker
) )
intersection = package.python_constraint.intersect( intersection = package.python_constraint.intersect(
transitive_python_constraint transitive_python_constraint
) )
difference = transitive_python_constraint.difference(intersectio n) difference = transitive_python_constraint.difference(intersectio n)
# The difference is only relevant if it intersects # The difference is only relevant if it intersects
# the root package python constraint # the root package python constraint
difference = difference.intersect(self._python_constraint) difference = difference.intersect(self._python_constraint)
if ( if (
transitive_python_constraint.is_any() transitive_python_constraint.is_any()
or self._python_constraint.intersect( or self._python_constraint.intersect(
package.dependency.python_constraint dependency_package.dependency.python_constraint
).is_empty() ).is_empty()
or intersection.is_empty() or intersection.is_empty()
or not difference.is_empty() or not difference.is_empty()
): ):
return [ return [
Incompatibility( Incompatibility(
[Term(package.to_dependency(), True)], [Term(package.to_dependency(), True)],
PythonCause( PythonCause(
package.python_versions, str(self._python_constr aint) package.python_versions, str(self._python_constr aint)
), ),
) )
] ]
_dependencies = [ _dependencies = [
dep dep
for dep in dependencies for dep in dependencies
if dep.name not in self.UNSAFE_PACKAGES if dep.name not in self.UNSAFE_PACKAGES
and self._python_constraint.allows_any(dep.python_constraint) and self._python_constraint.allows_any(dep.python_constraint)
and (not self._env or dep.marker.validate(self._env.marker_env)) and (not self._env or dep.marker.validate(self._env.marker_env))
] ]
dependencies = self._get_dependencies_with_overrides(
overrides = self._overrides.get(package, {}) _dependencies, dependency_package
dependencies = [] )
overridden = []
for dep in _dependencies:
if dep.name in overrides:
if dep.name in overridden:
continue
dependencies.append(overrides[dep.name])
overridden.append(dep.name)
continue
dependencies.append(dep)
return [ return [
Incompatibility( Incompatibility(
[Term(package.to_dependency(), True), Term(dep, False)], [Term(package.to_dependency(), True), Term(dep, False)],
DependencyCause(), DependencyCause(),
) )
for dep in dependencies for dep in dependencies
] ]
def complete_package( def complete_package(
self, package self, dependency_package: DependencyPackage
): # type: (DependencyPackage) -> DependencyPackage ) -> DependencyPackage:
package = dependency_package.package
dependency = dependency_package.dependency
if package.is_root(): if package.is_root():
package = package.clone() dependency_package = dependency_package.clone()
package = dependency_package.package
dependency = dependency_package.dependency
requires = package.all_requires requires = package.all_requires
elif not package.is_root() and package.source_type not in { elif package.source_type not in {
"directory", "directory",
"file", "file",
"url", "url",
"git", "git",
}: }:
package = DependencyPackage( try:
package.dependency, dependency_package = DependencyPackage(
self._pool.package( dependency,
package.name, self._pool.package(
package.version.text, package.pretty_name,
extras=list(package.dependency.extras), package.version,
repository=package.dependency.source_name, extras=list(dependency.extras),
), repository=dependency.source_name,
) ),
)
except PackageNotFound as e:
try:
dependency_package = next(
DependencyPackage(dependency, pkg)
for pkg in self.search_for_installed_packages(dependency
)
)
except StopIteration:
raise e from e
package = dependency_package.package
dependency = dependency_package.dependency
requires = package.requires requires = package.requires
else: else:
requires = package.requires requires = package.requires
if self._load_deferred: if self._load_deferred:
# Retrieving constraints for deferred dependencies # Retrieving constraints for deferred dependencies
for r in requires: for r in requires:
if r.is_directory(): if r.is_direct_origin():
self.search_for_directory(r) self.search_for_direct_origin_dependency(r)
elif r.is_file():
self.search_for_file(r)
elif r.is_vcs():
self.search_for_vcs(r)
elif r.is_url():
self.search_for_url(r)
optional_dependencies = [] optional_dependencies = []
_dependencies = [] _dependencies = []
# If some extras/features were required, we need to # If some extras/features were required, we need to
# add a special dependency representing the base package # add a special dependency representing the base package
# to the current package # to the current package
if package.dependency.extras: if dependency.extras:
for extra in package.dependency.extras: for extra in dependency.extras:
extra = safe_extra(extra)
if extra not in package.extras: if extra not in package.extras:
continue continue
optional_dependencies += [d.name for d in package.extras[extra]] optional_dependencies += [d.name for d in package.extras[extra]]
package = package.with_features(list(package.dependency.extras)) dependency_package = dependency_package.with_features(
list(dependency.extras)
)
package = dependency_package.package
dependency = dependency_package.dependency
_dependencies.append(package.without_features().to_dependency()) _dependencies.append(package.without_features().to_dependency())
for dep in requires: for dep in requires:
if not self._python_constraint.allows_any(dep.python_constraint): if not self._python_constraint.allows_any(dep.python_constraint):
continue continue
if dep.name in self.UNSAFE_PACKAGES: if dep.name in self.UNSAFE_PACKAGES:
continue continue
if self._env and not dep.marker.validate(self._env.marker_env): if self._env and not dep.marker.validate(self._env.marker_env):
continue continue
if not package.is_root(): if not package.is_root() and (
if (dep.is_optional() and dep.name not in optional_dependencies) (dep.is_optional() and dep.name not in optional_dependencies)
or ( or (
dep.in_extras dep.in_extras
and not set(dep.in_extras).intersection(package.dependency.e and not set(dep.in_extras).intersection(
xtras) {safe_extra(extra) for extra in dependency.extras}
): )
continue )
):
continue
_dependencies.append(dep) _dependencies.append(dep)
overrides = self._overrides.get(package, {}) dependencies = self._get_dependencies_with_overrides(
dependencies = [] _dependencies, dependency_package
overridden = [] )
for dep in _dependencies:
if dep.name in overrides:
if dep.name in overridden:
continue
dependencies.append(overrides[dep.name])
overridden.append(dep.name)
continue
dependencies.append(dep)
# Searching for duplicate dependencies # Searching for duplicate dependencies
# #
# If the duplicate dependencies have the same constraint, # If the duplicate dependencies have the same constraint,
# the requirements will be merged. # the requirements will be merged.
# #
# For instance: # For instance:
# - enum34; python_version=="2.7" # - enum34; python_version=="2.7"
# - enum34; python_version=="3.3" # - enum34; python_version=="3.3"
# #
# will become: # will become:
# - enum34; python_version=="2.7" or python_version=="3.3" # - enum34; python_version=="2.7" or python_version=="3.3"
# #
# If the duplicate dependencies have different constraints # If the duplicate dependencies have different constraints
# we have to split the dependency graph. # we have to split the dependency graph.
# #
# An example of this is: # An example of this is:
# - pypiwin32 (220); sys_platform == "win32" and python_version >= "3. 6" # - pypiwin32 (220); sys_platform == "win32" and python_version >= "3. 6"
# - pypiwin32 (219); sys_platform == "win32" and python_version < "3.6 " # - pypiwin32 (219); sys_platform == "win32" and python_version < "3.6 "
duplicates = OrderedDict() duplicates: dict[str, list[Dependency]] = defaultdict(list)
for dep in dependencies: for dep in dependencies:
if dep.name not in duplicates: duplicates[dep.complete_name].append(dep)
duplicates[dep.name] = []
duplicates[dep.name].append(dep)
dependencies = [] dependencies = []
for dep_name, deps in duplicates.items(): for dep_name, deps in duplicates.items():
if len(deps) == 1: if len(deps) == 1:
dependencies.append(deps[0]) dependencies.append(deps[0])
continue continue
self.debug("<debug>Duplicate dependencies for {}</debug>".format(dep _name)) self.debug(f"<debug>Duplicate dependencies for {dep_name}</debug>")
# Regrouping by constraint non_direct_origin_deps: list[Dependency] = []
by_constraint = OrderedDict() direct_origin_deps: list[Dependency] = []
for dep in deps: for dep in deps:
if dep.constraint not in by_constraint: if dep.is_direct_origin():
by_constraint[dep.constraint] = [] direct_origin_deps.append(dep)
else:
by_constraint[dep.constraint].append(dep) non_direct_origin_deps.append(dep)
deps = (
# We merge by constraint self._merge_dependencies_by_constraint(
for constraint, _deps in by_constraint.items(): self._merge_dependencies_by_marker(non_direct_origin_deps)
new_markers = []
for dep in _deps:
marker = dep.marker.without_extras()
if marker.is_any():
# No marker or only extras
continue
new_markers.append(marker)
if not new_markers:
continue
dep = _deps[0]
dep.marker = dep.marker.union(MarkerUnion(*new_markers))
by_constraint[constraint] = [dep]
continue
if len(by_constraint) == 1:
self.debug(
"<debug>Merging requirements for {}</debug>".format(str(deps
[0]))
) )
dependencies.append(list(by_constraint.values())[0][0]) + direct_origin_deps
)
if len(deps) == 1:
self.debug(f"<debug>Merging requirements for {deps[0]!s}</debug>
")
dependencies.append(deps[0])
continue continue
# We leave dependencies as-is if they have the same # We leave dependencies as-is if they have the same
# python/platform constraints. # python/platform constraints.
# That way the resolver will pickup the conflict # That way the resolver will pickup the conflict
# and display a proper error. # and display a proper error.
_deps = [value[0] for value in by_constraint.values()]
seen = set() seen = set()
for _dep in _deps: for dep in deps:
pep_508_dep = _dep.to_pep_508(False) pep_508_dep = dep.to_pep_508(False)
if ";" not in pep_508_dep: if ";" not in pep_508_dep:
_requirements = "" _requirements = ""
else: else:
_requirements = pep_508_dep.split(";")[1].strip() _requirements = pep_508_dep.split(";")[1].strip()
if _requirements not in seen: if _requirements not in seen:
seen.add(_requirements) seen.add(_requirements)
if len(_deps) != len(seen): if len(deps) != len(seen):
for _dep in _deps: for dep in deps:
dependencies.append(_dep) dependencies.append(dep)
continue continue
# At this point, we raise an exception that will # At this point, we raise an exception that will
# tell the solver to make new resolutions with specific overrides. # tell the solver to make new resolutions with specific overrides.
# #
# For instance, if the foo (1.2.3) package has the following depende ncies: # For instance, if the foo (1.2.3) package has the following depende ncies:
# - bar (>=2.0) ; python_version >= "3.6" # - bar (>=2.0) ; python_version >= "3.6"
# - bar (<2.0) ; python_version < "3.6" # - bar (<2.0) ; python_version < "3.6"
# #
# then the solver will need to make two new resolutions # then the solver will need to make two new resolutions
# with the following overrides: # with the following overrides:
# - {<Package foo (1.2.3): {"bar": <Dependency bar (>=2.0)>} # - {<Package foo (1.2.3): {"bar": <Dependency bar (>=2.0)>}
# - {<Package foo (1.2.3): {"bar": <Dependency bar (<2.0)>} # - {<Package foo (1.2.3): {"bar": <Dependency bar (<2.0)>}
markers = []
for constraint, _deps in by_constraint.items():
markers.append(_deps[0].marker)
_deps = [_dep[0] for _dep in by_constraint.values()] def fmt_warning(d: Dependency) -> str:
self.debug( dependency_marker = d.marker if not d.marker.is_any() else "*"
"<warning>Different requirements found for {}.</warning>".format return (
( f"<c1>{d.name}</c1> <fg=default>(<c2>{d.pretty_constraint}</
", ".join( c2>)</>"
"<c1>{}</c1> <fg=default>(<c2>{}</c2>)</> with markers < f" with markers <b>{dependency_marker}</b>"
b>{}</b>".format(
d.name,
d.pretty_constraint,
d.marker if not d.marker.is_any() else "*",
)
for d in _deps[:-1]
)
+ " and "
+ "<c1>{}</c1> <fg=default>(<c2>{}</c2>)</> with markers <b>
{}</b>".format(
_deps[-1].name,
_deps[-1].pretty_constraint,
_deps[-1].marker if not _deps[-1].marker.is_any() else "
*",
)
) )
warnings = ", ".join(fmt_warning(d) for d in deps[:-1])
warnings += f" and {fmt_warning(deps[-1])}"
self.debug(
f"<warning>Different requirements found for {warnings}.</warning
>"
) )
# We need to check if one of the duplicate dependencies # We need to check if one of the duplicate dependencies
# has no markers. If there is one, we need to change its # has no markers. If there is one, we need to change its
# environment markers to the inverse of the union of the # environment markers to the inverse of the union of the
# other dependencies markers. # other dependencies markers.
# For instance, if we have the following dependencies: # For instance, if we have the following dependencies:
# - ipython # - ipython
# - ipython (1.2.4) ; implementation_name == "pypy" # - ipython (1.2.4) ; implementation_name == "pypy"
# #
# the marker for `ipython` will become `implementation_name != "pypy "`. # the marker for `ipython` will become `implementation_name != "pypy "`.
any_markers_dependencies = [d for d in _deps if d.marker.is_any()] #
other_markers_dependencies = [d for d in _deps if not d.marker.is_an # Further, we have to merge the constraints of the requirements
y()] # without markers into the constraints of the requirements with mark
ers.
# for instance, if we have the following dependencies:
# - foo (>= 1.2)
# - foo (!= 1.2.1) ; python == 3.10
#
# the constraint for the second entry will become (!= 1.2.1, >= 1.2)
any_markers_dependencies = [d for d in deps if d.marker.is_any()]
other_markers_dependencies = [d for d in deps if not d.marker.is_any
()]
marker = other_markers_dependencies[0].marker
for other_dep in other_markers_dependencies[1:]:
marker = marker.union(other_dep.marker)
inverted_marker = marker.invert()
if any_markers_dependencies: if any_markers_dependencies:
marker = other_markers_dependencies[0].marker for dep_any in any_markers_dependencies:
for other_dep in other_markers_dependencies[1:]: dep_any.marker = inverted_marker
marker = marker.union(other_dep.marker) for dep_other in other_markers_dependencies:
dep_other.constraint = dep_other.constraint.intersect(
for i, d in enumerate(_deps): dep_any.constraint
if d.marker.is_any(): )
_deps[i].marker = marker.invert() elif not inverted_marker.is_empty() and self._python_constraint.allo
ws_any(
get_python_constraint_from_marker(inverted_marker)
):
# if there is no any marker dependency
# and the inverted marker is not empty,
# a dependency with the inverted union of all markers is require
d
# in order to not miss other dependencies later, for instance:
# - foo (1.0) ; python == 3.7
# - foo (2.0) ; python == 3.8
# - bar (2.0) ; python == 3.8
# - bar (3.0) ; python == 3.9
#
# the last dependency would be missed without this,
# because the intersection with both foo dependencies is empty
inverted_marker_dep = deps[0].with_constraint(EmptyConstraint())
inverted_marker_dep.marker = inverted_marker
deps.append(inverted_marker_dep)
overrides = [] overrides = []
for _dep in _deps: overrides_marker_intersection: BaseMarker = AnyMarker()
current_overrides = self._overrides.copy() for dep_overrides in self._overrides.values():
package_overrides = current_overrides.get(package, {}).copy() for dep in dep_overrides.values():
package_overrides.update({_dep.name: _dep}) overrides_marker_intersection = (
current_overrides.update({package: package_overrides}) overrides_marker_intersection.intersect(dep.marker)
overrides.append(current_overrides) )
for dep in deps:
if not overrides_marker_intersection.intersect(dep.marker).is_em
pty():
current_overrides = self._overrides.copy()
package_overrides = current_overrides.get(
dependency_package, {}
).copy()
package_overrides.update({dep.name: dep})
current_overrides.update({dependency_package: package_overri
des})
overrides.append(current_overrides)
raise OverrideNeeded(*overrides) if overrides:
raise OverrideNeeded(*overrides)
# Modifying dependencies as needed # Modifying dependencies as needed
clean_dependencies = [] clean_dependencies = []
for dep in dependencies: for dep in dependencies:
if not package.dependency.transitive_marker.without_extras().is_any( if not dependency.transitive_marker.without_extras().is_any():
): marker_intersection = (
marker_intersection = package.dependency.transitive_marker.witho dependency.transitive_marker.without_extras().intersect(
ut_extras().intersect( dep.marker.without_extras()
dep.marker.without_extras() )
) )
if marker_intersection.is_empty(): if marker_intersection.is_empty():
# The dependency is not needed, since the markers specified # The dependency is not needed, since the markers specified
# for the current package selection are not compatible with # for the current package selection are not compatible with
# the markers for the current dependency, so we skip it # the markers for the current dependency, so we skip it
continue continue
dep.transitive_marker = marker_intersection dep.transitive_marker = marker_intersection
if not package.dependency.python_constraint.is_any(): if not dependency.python_constraint.is_any():
python_constraint_intersection = dep.python_constraint.intersect ( python_constraint_intersection = dep.python_constraint.intersect (
package.dependency.python_constraint dependency.python_constraint
) )
if python_constraint_intersection.is_empty(): if python_constraint_intersection.is_empty():
# This dependency is not needed under current python constra int. # This dependency is not needed under current python constra int.
continue continue
dep.transitive_python_versions = str(python_constraint_intersect ion) dep.transitive_python_versions = str(python_constraint_intersect ion)
clean_dependencies.append(dep) clean_dependencies.append(dep)
package.requires = clean_dependencies package = package.with_dependency_groups([], only=True)
dependency_package = DependencyPackage(dependency, package)
return package for dep in clean_dependencies:
package.add_dependency(dep)
return dependency_package
def debug(self, message, depth=0): def debug(self, message: str, depth: int = 0) -> None:
if not (self._io.is_very_verbose() or self._io.is_debug()): if not (self._io.is_very_verbose() or self._io.is_debug()):
return return
if message.startswith("fact:"): if message.startswith("fact:"):
if "depends on" in message: if "depends on" in message:
m = re.match(r"fact: (.+?) depends on (.+?) \((.+?)\)", message) m = re.match(r"fact: (.+?) depends on (.+?) \((.+?)\)", message)
if m is None:
raise ValueError(f"Unable to parse fact: {message}")
m2 = re.match(r"(.+?) \((.+?)\)", m.group(1)) m2 = re.match(r"(.+?) \((.+?)\)", m.group(1))
if m2: if m2:
name = m2.group(1) name = m2.group(1)
version = " (<c2>{}</c2>)".format(m2.group(2)) version = f" (<c2>{m2.group(2)}</c2>)"
else: else:
name = m.group(1) name = m.group(1)
version = "" version = ""
message = ( message = (
"<fg=blue>fact</>: <c1>{}</c1>{} " f"<fg=blue>fact</>: <c1>{name}</c1>{version} "
"depends on <c1>{}</c1> (<c2>{}</c2>)".format( f"depends on <c1>{m.group(2)}</c1> (<c2>{m.group(3)}</c2>)"
name, version, m.group(2), m.group(3)
)
) )
elif " is " in message: elif " is " in message:
message = re.sub( message = re.sub(
"fact: (.+) is (.+)", "fact: (.+) is (.+)",
"<fg=blue>fact</>: <c1>\\1</c1> is <c2>\\2</c2>", "<fg=blue>fact</>: <c1>\\1</c1> is <c2>\\2</c2>",
message, message,
) )
else: else:
message = re.sub( message = re.sub(
r"(?<=: )(.+?) \((.+?)\)", "<c1>\\1</c1> (<c2>\\2</c2>)", me ssage r"(?<=: )(.+?) \((.+?)\)", "<c1>\\1</c1> (<c2>\\2</c2>)", me ssage
) )
message = "<fg=blue>fact</>: {}".format(message.split("fact: ")[ 1]) message = f"<fg=blue>fact</>: {message.split('fact: ')[1]}"
elif message.startswith("selecting "): elif message.startswith("selecting "):
message = re.sub( message = re.sub(
r"selecting (.+?) \((.+?)\)", r"selecting (.+?) \((.+?)\)",
"<fg=blue>selecting</> <c1>\\1</c1> (<c2>\\2</c2>)", "<fg=blue>selecting</> <c1>\\1</c1> (<c2>\\2</c2>)",
message, message,
) )
elif message.startswith("derived:"): elif message.startswith("derived:"):
m = re.match(r"derived: (.+?) \((.+?)\)$", message) m = re.match(r"derived: (.+?) \((.+?)\)$", message)
if m: if m:
message = "<fg=blue>derived</>: <c1>{}</c1> (<c2>{}</c2>)".forma message = (
t( f"<fg=blue>derived</>: <c1>{m.group(1)}</c1>"
m.group(1), m.group(2) f" (<c2>{m.group(2)}</c2>)"
) )
else: else:
message = "<fg=blue>derived</>: <c1>{}</c1>".format( message = (
message.split("derived: ")[1] f"<fg=blue>derived</>: <c1>{message.split('derived: ')[1]}</
c1>"
) )
elif message.startswith("conflict:"): elif message.startswith("conflict:"):
m = re.match(r"conflict: (.+?) depends on (.+?) \((.+?)\)", message) m = re.match(r"conflict: (.+?) depends on (.+?) \((.+?)\)", message)
if m: if m:
m2 = re.match(r"(.+?) \((.+?)\)", m.group(1)) m2 = re.match(r"(.+?) \((.+?)\)", m.group(1))
if m2: if m2:
name = m2.group(1) name = m2.group(1)
version = " (<c2>{}</c2>)".format(m2.group(2)) version = f" (<c2>{m2.group(2)}</c2>)"
else: else:
name = m.group(1) name = m.group(1)
version = "" version = ""
message = ( message = (
"<fg=red;options=bold>conflict</>: <c1>{}</c1>{} " f"<fg=red;options=bold>conflict</>: <c1>{name}</c1>{version}
"depends on <c1>{}</c1> (<c2>{}</c2>)".format( "
name, version, m.group(2), m.group(3) f"depends on <c1>{m.group(2)}</c1> (<c2>{m.group(3)}</c2>)"
)
) )
else: else:
message = "<fg=red;options=bold>conflict</>: {}".format( message = (
message.split("conflict: ")[1] "<fg=red;options=bold>conflict</>:"
f" {message.split('conflict: ')[1]}"
) )
message = message.replace("! ", "<error>!</error> ") message = message.replace("! ", "<error>!</error> ")
if self.is_debugging(): if self.is_debugging():
debug_info = str(message) debug_info = str(message)
debug_info = ( debug_info = (
"\n".join( "\n".join(
[ [
"<debug>{}:</debug> {}".format(str(depth).rjust(4), s) f"<debug>{str(depth).rjust(4)}:</debug> {s}"
for s in debug_info.split("\n") for s in debug_info.split("\n")
] ]
) )
+ "\n" + "\n"
) )
self._io.write(debug_info) self._io.write(debug_info)
@contextmanager @contextmanager
def progress(self): def progress(self) -> Iterator[None]:
if not self._io.output.supports_ansi() or self.is_debugging(): if not self._io.output.is_decorated() or self.is_debugging():
self._io.write_line("Resolving dependencies...") self._io.write_line("Resolving dependencies...")
yield yield
else: else:
indicator = Indicator(self._io, "{message} <debug>({elapsed:2s})</de indicator = Indicator(
bug>") self._io, "{message}{context}<debug>({elapsed:2s})</debug>"
)
with indicator.auto( with indicator.auto(
"<info>Resolving dependencies...</info>", "<info>Resolving dependencies...</info>",
"<info>Resolving dependencies...</info>", "<info>Resolving dependencies...</info>",
): ):
yield yield
self._in_progress = False self._in_progress = False
def _merge_dependencies_by_constraint(
self, dependencies: Iterable[Dependency]
) -> list[Dependency]:
by_constraint: dict[VersionConstraint, list[Dependency]] = defaultdict(l
ist)
for dep in dependencies:
by_constraint[dep.constraint].append(dep)
for constraint, _deps in by_constraint.items():
new_markers = []
for dep in _deps:
marker = dep.marker.without_extras()
if marker.is_any():
# No marker or only extras
continue
new_markers.append(marker)
if not new_markers:
continue
dep = _deps[0]
dep.marker = dep.marker.union(MarkerUnion(*new_markers))
by_constraint[constraint] = [dep]
return [value[0] for value in by_constraint.values()]
def _merge_dependencies_by_marker(
self, dependencies: Iterable[Dependency]
) -> list[Dependency]:
by_marker: dict[BaseMarker, list[Dependency]] = defaultdict(list)
for dep in dependencies:
by_marker[dep.marker].append(dep)
deps = []
for _deps in by_marker.values():
if len(_deps) == 1:
deps.extend(_deps)
else:
new_constraint = _deps[0].constraint
for dep in _deps[1:]:
new_constraint = new_constraint.intersect(dep.constraint)
if new_constraint.is_empty():
# leave dependencies as-is so the resolver will pickup
# the conflict and display a proper error.
deps.extend(_deps)
else:
self.debug(
f"<debug>Merging constraints for {_deps[0].name} for"
f" marker {_deps[0].marker}</debug>"
)
deps.append(_deps[0].with_constraint(new_constraint))
return deps
 End of changes. 111 change blocks. 
388 lines changed or deleted 508 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)