provider.py (poetry-1.1.15) | : | provider.py (poetry-1.2.0) | ||
---|---|---|---|---|
from __future__ import annotations | ||||
import functools | ||||
import logging | import logging | |||
import os | import os | |||
import re | import re | |||
import tempfile | ||||
import time | import time | |||
import urllib.parse | ||||
from collections import defaultdict | ||||
from contextlib import contextmanager | from contextlib import contextmanager | |||
from tempfile import mkdtemp | from pathlib import Path | |||
from typing import Any | from typing import TYPE_CHECKING | |||
from typing import List | from typing import cast | |||
from typing import Optional | ||||
from cleo.ui.progress_indicator import ProgressIndicator | ||||
from clikit.ui.components import ProgressIndicator | ||||
from poetry.core.packages import Dependency | ||||
from poetry.core.packages import DirectoryDependency | ||||
from poetry.core.packages import FileDependency | ||||
from poetry.core.packages import Package | ||||
from poetry.core.packages import URLDependency | ||||
from poetry.core.packages import VCSDependency | ||||
from poetry.core.packages.utils.utils import get_python_constraint_from_marker | from poetry.core.packages.utils.utils import get_python_constraint_from_marker | |||
from poetry.core.semver.empty_constraint import EmptyConstraint | ||||
from poetry.core.semver.version import Version | from poetry.core.semver.version import Version | |||
from poetry.core.vcs.git import Git | from poetry.core.version.markers import AnyMarker | |||
from poetry.core.version.markers import MarkerUnion | from poetry.core.version.markers import MarkerUnion | |||
from poetry.inspection.info import PackageInfo | from poetry.inspection.info import PackageInfo | |||
from poetry.inspection.info import PackageInfoError | from poetry.inspection.info import PackageInfoError | |||
from poetry.mixology.incompatibility import Incompatibility | from poetry.mixology.incompatibility import Incompatibility | |||
from poetry.mixology.incompatibility_cause import DependencyCause | from poetry.mixology.incompatibility_cause import DependencyCause | |||
from poetry.mixology.incompatibility_cause import PythonCause | from poetry.mixology.incompatibility_cause import PythonCause | |||
from poetry.mixology.term import Term | from poetry.mixology.term import Term | |||
from poetry.packages import DependencyPackage | from poetry.packages import DependencyPackage | |||
from poetry.packages.package_collection import PackageCollection | from poetry.packages.package_collection import PackageCollection | |||
from poetry.puzzle.exceptions import OverrideNeeded | from poetry.puzzle.exceptions import OverrideNeeded | |||
from poetry.repositories import Pool | from poetry.repositories.exceptions import PackageNotFound | |||
from poetry.utils._compat import OrderedDict | ||||
from poetry.utils._compat import Path | ||||
from poetry.utils._compat import urlparse | ||||
from poetry.utils.env import Env | ||||
from poetry.utils.helpers import download_file | from poetry.utils.helpers import download_file | |||
from poetry.utils.helpers import safe_rmtree | from poetry.utils.helpers import safe_extra | |||
from poetry.utils.helpers import temporary_directory | from poetry.vcs.git import Git | |||
if TYPE_CHECKING: | ||||
from collections.abc import Callable | ||||
from collections.abc import Iterable | ||||
from collections.abc import Iterator | ||||
from cleo.io.io import IO | ||||
from poetry.core.packages.dependency import Dependency | ||||
from poetry.core.packages.directory_dependency import DirectoryDependency | ||||
from poetry.core.packages.file_dependency import FileDependency | ||||
from poetry.core.packages.package import Package | ||||
from poetry.core.packages.url_dependency import URLDependency | ||||
from poetry.core.packages.vcs_dependency import VCSDependency | ||||
from poetry.core.semver.version_constraint import VersionConstraint | ||||
from poetry.core.version.markers import BaseMarker | ||||
from poetry.repositories import Pool | ||||
from poetry.utils.env import Env | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | |||
class Indicator(ProgressIndicator): | class Indicator(ProgressIndicator): # type: ignore[misc] | |||
def _formatter_elapsed(self): | CONTEXT: str | None = None | |||
@staticmethod | ||||
@contextmanager | ||||
def context() -> Iterator[Callable[[str | None], None]]: | ||||
def _set_context(context: str | None) -> None: | ||||
Indicator.CONTEXT = context | ||||
yield _set_context | ||||
_set_context(None) | ||||
def _formatter_context(self) -> str: | ||||
if Indicator.CONTEXT is None: | ||||
return " " | ||||
else: | ||||
return f" <c1>{Indicator.CONTEXT}</> " | ||||
def _formatter_elapsed(self) -> str: | ||||
assert self._start_time is not None | ||||
elapsed = time.time() - self._start_time | elapsed = time.time() - self._start_time | |||
return "{:.1f}s".format(elapsed) | return f"{elapsed:.1f}s" | |||
class Provider: | @functools.lru_cache(maxsize=None) | |||
def _get_package_from_git( | ||||
url: str, | ||||
branch: str | None = None, | ||||
tag: str | None = None, | ||||
rev: str | None = None, | ||||
subdirectory: str | None = None, | ||||
source_root: Path | None = None, | ||||
) -> Package: | ||||
source = Git.clone( | ||||
url=url, | ||||
source_root=source_root, | ||||
branch=branch, | ||||
tag=tag, | ||||
revision=rev, | ||||
clean=False, | ||||
) | ||||
revision = Git.get_revision(source) | ||||
path = Path(source.path) | ||||
if subdirectory: | ||||
path = path.joinpath(subdirectory) | ||||
package = Provider.get_package_from_directory(path) | ||||
package._source_type = "git" | ||||
package._source_url = url | ||||
package._source_reference = rev or tag or branch or "HEAD" | ||||
package._source_resolved_reference = revision | ||||
package._source_subdirectory = subdirectory | ||||
UNSAFE_PACKAGES = {"setuptools", "distribute", "pip", "wheel"} | return package | |||
class Provider: | ||||
UNSAFE_PACKAGES: set[str] = set() | ||||
def __init__( | def __init__( | |||
self, package, pool, io, env=None | self, | |||
): # type: (Package, Pool, Any, Optional[Env]) -> None | package: Package, | |||
pool: Pool, | ||||
io: IO, | ||||
env: Env | None = None, | ||||
installed: list[Package] | None = None, | ||||
) -> None: | ||||
self._package = package | self._package = package | |||
self._pool = pool | self._pool = pool | |||
self._io = io | self._io = io | |||
self._env = env | self._env = env | |||
self._python_constraint = package.python_constraint | self._python_constraint = package.python_constraint | |||
self._search_for = {} | self._is_debugging: bool = self._io.is_debug() or self._io.is_very_verbo | |||
self._is_debugging = self._io.is_debug() or self._io.is_very_verbose() | se() | |||
self._in_progress = False | self._in_progress = False | |||
self._overrides = {} | self._overrides: dict[DependencyPackage, dict[str, Dependency]] = {} | |||
self._deferred_cache = {} | self._deferred_cache: dict[Dependency, Package] = {} | |||
self._load_deferred = True | self._load_deferred = True | |||
self._source_root: Path | None = None | ||||
self._installed_packages = installed if installed is not None else [] | ||||
self._direct_origin_packages: dict[str, Package] = {} | ||||
@property | @property | |||
def pool(self): # type: () -> Pool | def pool(self) -> Pool: | |||
return self._pool | return self._pool | |||
def is_debugging(self): | def is_debugging(self) -> bool: | |||
return self._is_debugging | return self._is_debugging | |||
def set_overrides(self, overrides): | def set_overrides( | |||
self, overrides: dict[DependencyPackage, dict[str, Dependency]] | ||||
) -> None: | ||||
self._overrides = overrides | self._overrides = overrides | |||
def load_deferred(self, load_deferred): # type: (bool) -> None | def load_deferred(self, load_deferred: bool) -> None: | |||
self._load_deferred = load_deferred | self._load_deferred = load_deferred | |||
@contextmanager | @contextmanager | |||
def use_environment(self, env): # type: (Env) -> Provider | def use_source_root(self, source_root: Path) -> Iterator[Provider]: | |||
original_source_root = self._source_root | ||||
self._source_root = source_root | ||||
yield self | ||||
self._source_root = original_source_root | ||||
@contextmanager | ||||
def use_environment(self, env: Env) -> Iterator[Provider]: | ||||
original_env = self._env | original_env = self._env | |||
original_python_constraint = self._python_constraint | original_python_constraint = self._python_constraint | |||
self._env = env | self._env = env | |||
self._python_constraint = Version.parse(env.marker_env["python_full_vers ion"]) | self._python_constraint = Version.parse(env.marker_env["python_full_vers ion"]) | |||
yield self | yield self | |||
self._env = original_env | self._env = original_env | |||
self._python_constraint = original_python_constraint | self._python_constraint = original_python_constraint | |||
def search_for(self, dependency): # type: (Dependency) -> List[Package] | @staticmethod | |||
def validate_package_for_dependency( | ||||
dependency: Dependency, package: Package | ||||
) -> None: | ||||
if dependency.name != package.name: | ||||
# For now, the dependency's name must match the actual package's nam | ||||
e | ||||
raise RuntimeError( | ||||
f"The dependency name for {dependency.name} does not match the a | ||||
ctual" | ||||
f" package's name: {package.name}" | ||||
) | ||||
def search_for_installed_packages( | ||||
self, | ||||
dependency: Dependency, | ||||
) -> list[Package]: | ||||
""" | """ | |||
Search for the specifications that match the given dependency. | Search for installed packages, when available, that satisfy the given | |||
dependency. | ||||
The specifications in the returned list will be considered in reverse | This is useful when dealing with packages that are under development, no | |||
order, so the latest version ought to be last. | t | |||
published on package sources and/or only available via system installati | ||||
ons. | ||||
""" | """ | |||
if dependency.is_root: | if not self._installed_packages: | |||
return PackageCollection(dependency, [self._package]) | return [] | |||
for constraint in self._search_for.keys(): | logger.debug( | |||
if ( | "Falling back to installed packages to discover metadata for <c2>%s< | |||
constraint.is_same_package_as(dependency) | />", | |||
and constraint.constraint.intersect(dependency.constraint) | dependency.complete_name, | |||
== dependency.constraint | ) | |||
): | packages = [ | |||
packages = [ | package | |||
p | for package in self._installed_packages | |||
for p in self._search_for[constraint] | if package.satisfies(dependency, ignore_source_type=True) | |||
if dependency.constraint.allows(p.version) | ] | |||
] | logger.debug( | |||
"Found <c2>%d</> compatible packages for <c2>%s</>", | ||||
packages.sort( | len(packages), | |||
key=lambda p: ( | dependency.complete_name, | |||
not p.is_prerelease() and not dependency.allows_prerelea | ) | |||
ses(), | return packages | |||
p.version, | ||||
), | ||||
reverse=True, | ||||
) | ||||
return PackageCollection(dependency, packages) | def search_for_direct_origin_dependency(self, dependency: Dependency) -> Pac | |||
kage: | ||||
package = self._deferred_cache.get(dependency) | ||||
if package is not None: | ||||
pass | ||||
elif dependency.is_vcs(): | ||||
dependency = cast("VCSDependency", dependency) | ||||
package = self._search_for_vcs(dependency) | ||||
if dependency.is_vcs(): | ||||
packages = self.search_for_vcs(dependency) | ||||
elif dependency.is_file(): | elif dependency.is_file(): | |||
packages = self.search_for_file(dependency) | dependency = cast("FileDependency", dependency) | |||
package = self._search_for_file(dependency) | ||||
elif dependency.is_directory(): | elif dependency.is_directory(): | |||
packages = self.search_for_directory(dependency) | dependency = cast("DirectoryDependency", dependency) | |||
package = self._search_for_directory(dependency) | ||||
elif dependency.is_url(): | elif dependency.is_url(): | |||
packages = self.search_for_url(dependency) | dependency = cast("URLDependency", dependency) | |||
else: | package = self._search_for_url(dependency) | |||
packages = self._pool.find_packages(dependency) | ||||
packages.sort( | else: | |||
key=lambda p: ( | raise RuntimeError( | |||
not p.is_prerelease() and not dependency.allows_prereleases( | f"Unknown direct dependency type {dependency.source_type}" | |||
), | ||||
p.version, | ||||
), | ||||
reverse=True, | ||||
) | ) | |||
self._search_for[dependency] = packages | if dependency.is_vcs(): | |||
dependency._source_reference = package.source_reference | ||||
dependency._source_resolved_reference = package.source_resolved_refe | ||||
rence | ||||
dependency._source_subdirectory = package.source_subdirectory | ||||
dependency._constraint = package.version | ||||
dependency._pretty_constraint = package.version.text | ||||
self._deferred_cache[dependency] = package | ||||
return package | ||||
def search_for(self, dependency: Dependency) -> list[DependencyPackage]: | ||||
""" | ||||
Search for the specifications that match the given dependency. | ||||
The specifications in the returned list will be considered in reverse | ||||
order, so the latest version ought to be last. | ||||
""" | ||||
if dependency.is_root: | ||||
return PackageCollection(dependency, [self._package]) | ||||
if dependency.is_direct_origin(): | ||||
package = self.search_for_direct_origin_dependency(dependency) | ||||
self._direct_origin_packages[dependency.name] = package | ||||
return PackageCollection(dependency, [package]) | ||||
# If we've previously found a direct-origin package that meets this depe | ||||
ndency, | ||||
# use it. | ||||
# | ||||
# We rely on the VersionSolver resolving direct-origin dependencies firs | ||||
t. | ||||
direct_origin_package = self._direct_origin_packages.get(dependency.name | ||||
) | ||||
if direct_origin_package is not None: | ||||
packages = ( | ||||
[direct_origin_package] | ||||
if dependency.constraint.allows(direct_origin_package.version) | ||||
else [] | ||||
) | ||||
return PackageCollection(dependency, packages) | ||||
packages = self._pool.find_packages(dependency) | ||||
packages.sort( | ||||
key=lambda p: ( | ||||
not p.yanked, | ||||
not p.is_prerelease() and not dependency.allows_prereleases(), | ||||
p.version, | ||||
), | ||||
reverse=True, | ||||
) | ||||
if not packages: | ||||
packages = self.search_for_installed_packages(dependency) | ||||
return PackageCollection(dependency, packages) | return PackageCollection(dependency, packages) | |||
def search_for_vcs(self, dependency): # type: (VCSDependency) -> List[Packa ge] | def _search_for_vcs(self, dependency: VCSDependency) -> Package: | |||
""" | """ | |||
Search for the specifications that match the given VCS dependency. | Search for the specifications that match the given VCS dependency. | |||
Basically, we clone the repository in a temporary directory | Basically, we clone the repository in a temporary directory | |||
and get the information we need by checking out the specified reference. | and get the information we need by checking out the specified reference. | |||
""" | """ | |||
if dependency in self._deferred_cache: | ||||
return [self._deferred_cache[dependency]] | ||||
package = self.get_package_from_vcs( | package = self.get_package_from_vcs( | |||
dependency.vcs, | dependency.vcs, | |||
dependency.source, | dependency.source, | |||
branch=dependency.branch, | branch=dependency.branch, | |||
tag=dependency.tag, | tag=dependency.tag, | |||
rev=dependency.rev, | rev=dependency.rev, | |||
name=dependency.name, | subdirectory=dependency.source_subdirectory, | |||
source_root=self._source_root | ||||
or (self._env.path.joinpath("src") if self._env else None), | ||||
) | ) | |||
package.develop = dependency.develop | ||||
dependency._constraint = package.version | self.validate_package_for_dependency(dependency=dependency, package=pack | |||
dependency._pretty_constraint = package.version.text | age) | |||
self._deferred_cache[dependency] = package | package.develop = dependency.develop | |||
return [package] | return package | |||
@classmethod | @staticmethod | |||
def get_package_from_vcs( | def get_package_from_vcs( | |||
cls, vcs, url, branch=None, tag=None, rev=None, name=None | vcs: str, | |||
): # type: (str, str, Optional[str], Optional[str]) -> Package | url: str, | |||
branch: str | None = None, | ||||
tag: str | None = None, | ||||
rev: str | None = None, | ||||
subdirectory: str | None = None, | ||||
source_root: Path | None = None, | ||||
) -> Package: | ||||
if vcs != "git": | if vcs != "git": | |||
raise ValueError("Unsupported VCS dependency {}".format(vcs)) | raise ValueError(f"Unsupported VCS dependency {vcs}") | |||
tmp_dir = Path( | return _get_package_from_git( | |||
mkdtemp(prefix="pypoetry-git-{}".format(url.split("/")[-1].rstrip(". | url=url, | |||
git"))) | branch=branch, | |||
tag=tag, | ||||
rev=rev, | ||||
subdirectory=subdirectory, | ||||
source_root=source_root, | ||||
) | ) | |||
try: | def _search_for_file(self, dependency: FileDependency) -> Package: | |||
git = Git() | package = self.get_package_from_file(dependency.full_path) | |||
git.clone(url, tmp_dir) | ||||
reference = branch or tag or rev | ||||
if reference is not None: | ||||
git.checkout(reference, tmp_dir) | ||||
else: | ||||
reference = "HEAD" | ||||
revision = git.rev_parse(reference, tmp_dir).strip() | ||||
package = cls.get_package_from_directory(tmp_dir, name=name) | ||||
package._source_type = "git" | ||||
package._source_url = url | ||||
package._source_reference = reference | ||||
package._source_resolved_reference = revision | ||||
except Exception: | ||||
raise | ||||
finally: | ||||
safe_rmtree(str(tmp_dir)) | ||||
return package | ||||
def search_for_file(self, dependency): # type: (FileDependency) -> List[Pac | ||||
kage] | ||||
if dependency in self._deferred_cache: | ||||
dependency, _package = self._deferred_cache[dependency] | ||||
package = _package.clone() | ||||
else: | ||||
package = self.get_package_from_file(dependency.full_path) | ||||
dependency._constraint = package.version | self.validate_package_for_dependency(dependency=dependency, package=pack | |||
dependency._pretty_constraint = package.version.text | age) | |||
self._deferred_cache[dependency] = (dependency, package) | ||||
if dependency.name != package.name: | ||||
# For now, the dependency's name must match the actual package's nam | ||||
e | ||||
raise RuntimeError( | ||||
"The dependency name for {} does not match the actual package's | ||||
name: {}".format( | ||||
dependency.name, package.name | ||||
) | ||||
) | ||||
if dependency.base is not None: | if dependency.base is not None: | |||
package.root_dir = dependency.base | package.root_dir = dependency.base | |||
package.files = [ | package.files = [ | |||
{"file": dependency.path.name, "hash": "sha256:" + dependency.hash() } | {"file": dependency.path.name, "hash": "sha256:" + dependency.hash() } | |||
] | ] | |||
return [package] | return package | |||
@classmethod | @classmethod | |||
def get_package_from_file(cls, file_path): # type: (Path) -> Package | def get_package_from_file(cls, file_path: Path) -> Package: | |||
try: | try: | |||
package = PackageInfo.from_path(path=file_path).to_package( | package = PackageInfo.from_path(path=file_path).to_package( | |||
root_dir=file_path | root_dir=file_path | |||
) | ) | |||
except PackageInfoError: | except PackageInfoError: | |||
raise RuntimeError( | raise RuntimeError( | |||
"Unable to determine package info from path: {}".format(file_pat h) | f"Unable to determine package info from path: {file_path}" | |||
) | ) | |||
return package | return package | |||
def search_for_directory( | def _search_for_directory(self, dependency: DirectoryDependency) -> Package: | |||
self, dependency | package = self.get_package_from_directory(dependency.full_path) | |||
): # type: (DirectoryDependency) -> List[Package] | ||||
if dependency in self._deferred_cache: | ||||
dependency, _package = self._deferred_cache[dependency] | ||||
package = _package.clone() | ||||
else: | ||||
package = self.get_package_from_directory( | ||||
dependency.full_path, name=dependency.name | ||||
) | ||||
dependency._constraint = package.version | ||||
dependency._pretty_constraint = package.version.text | ||||
self._deferred_cache[dependency] = (dependency, package) | self.validate_package_for_dependency(dependency=dependency, package=pack age) | |||
package.develop = dependency.develop | package.develop = dependency.develop | |||
if dependency.base is not None: | if dependency.base is not None: | |||
package.root_dir = dependency.base | package.root_dir = dependency.base | |||
return [package] | ||||
@classmethod | ||||
def get_package_from_directory( | ||||
cls, directory, name=None | ||||
): # type: (Path, Optional[str]) -> Package | ||||
package = PackageInfo.from_directory(path=directory).to_package( | ||||
root_dir=directory | ||||
) | ||||
if name and name != package.name: | ||||
# For now, the dependency's name must match the actual package's nam | ||||
e | ||||
raise RuntimeError( | ||||
"The dependency name for {} does not match the actual package's | ||||
name: {}".format( | ||||
name, package.name | ||||
) | ||||
) | ||||
return package | return package | |||
def search_for_url(self, dependency): # type: (URLDependency) -> List[Packa | @classmethod | |||
ge] | def get_package_from_directory(cls, directory: Path) -> Package: | |||
if dependency in self._deferred_cache: | return PackageInfo.from_directory(path=directory).to_package(root_dir=di | |||
return [self._deferred_cache[dependency]] | rectory) | |||
def _search_for_url(self, dependency: URLDependency) -> Package: | ||||
package = self.get_package_from_url(dependency.url) | package = self.get_package_from_url(dependency.url) | |||
if dependency.name != package.name: | self.validate_package_for_dependency(dependency=dependency, package=pack | |||
# For now, the dependency's name must match the actual package's nam | age) | |||
e | ||||
raise RuntimeError( | ||||
"The dependency name for {} does not match the actual package's | ||||
name: {}".format( | ||||
dependency.name, package.name | ||||
) | ||||
) | ||||
for extra in dependency.extras: | for extra in dependency.extras: | |||
if extra in package.extras: | if extra in package.extras: | |||
for dep in package.extras[extra]: | for dep in package.extras[extra]: | |||
dep.activate() | dep.activate() | |||
package.requires += package.extras[extra] | for extra_dep in package.extras[extra]: | |||
package.add_dependency(extra_dep) | ||||
dependency._constraint = package.version | ||||
dependency._pretty_constraint = package.version.text | ||||
self._deferred_cache[dependency] = package | return package | |||
return [package] | ||||
@classmethod | @classmethod | |||
def get_package_from_url(cls, url): # type: (str) -> Package | def get_package_from_url(cls, url: str) -> Package: | |||
with temporary_directory() as temp_dir: | file_name = os.path.basename(urllib.parse.urlparse(url).path) | |||
temp_dir = Path(temp_dir) | with tempfile.TemporaryDirectory() as temp_dir: | |||
file_name = os.path.basename(urlparse.urlparse(url).path) | dest = Path(temp_dir) / file_name | |||
download_file(url, str(temp_dir / file_name)) | download_file(url, dest) | |||
package = cls.get_package_from_file(dest) | ||||
package = cls.get_package_from_file(temp_dir / file_name) | ||||
package._source_type = "url" | package._source_type = "url" | |||
package._source_url = url | package._source_url = url | |||
return package | return package | |||
def _get_dependencies_with_overrides( | ||||
self, dependencies: list[Dependency], package: DependencyPackage | ||||
) -> list[Dependency]: | ||||
overrides = self._overrides.get(package, {}) | ||||
_dependencies = [] | ||||
overridden = [] | ||||
for dep in dependencies: | ||||
if dep.name in overrides: | ||||
if dep.name in overridden: | ||||
continue | ||||
# empty constraint is used in overrides to mark that the package | ||||
has | ||||
# already been handled and is not required for the attached mark | ||||
ers | ||||
if not overrides[dep.name].constraint.is_empty(): | ||||
_dependencies.append(overrides[dep.name]) | ||||
overridden.append(dep.name) | ||||
continue | ||||
_dependencies.append(dep) | ||||
return _dependencies | ||||
def incompatibilities_for( | def incompatibilities_for( | |||
self, package | self, dependency_package: DependencyPackage | |||
): # type: (DependencyPackage) -> List[Incompatibility] | ) -> list[Incompatibility]: | |||
""" | """ | |||
Returns incompatibilities that encapsulate a given package's dependencie s, | Returns incompatibilities that encapsulate a given package's dependencie s, | |||
or that it can't be safely selected. | or that it can't be safely selected. | |||
If multiple subsequent versions of this package have the same | If multiple subsequent versions of this package have the same | |||
dependencies, this will return incompatibilities that reflect that. It | dependencies, this will return incompatibilities that reflect that. It | |||
won't return incompatibilities that have already been returned by a | won't return incompatibilities that have already been returned by a | |||
previous call to _incompatibilities_for(). | previous call to _incompatibilities_for(). | |||
""" | """ | |||
package = dependency_package.package | ||||
if package.is_root(): | if package.is_root(): | |||
dependencies = package.all_requires | dependencies = package.all_requires | |||
else: | else: | |||
dependencies = package.requires | dependencies = package.requires | |||
if not package.python_constraint.allows_all(self._python_constraint) : | if not package.python_constraint.allows_all(self._python_constraint) : | |||
transitive_python_constraint = get_python_constraint_from_marker ( | transitive_python_constraint = get_python_constraint_from_marker ( | |||
package.dependency.transitive_marker | dependency_package.dependency.transitive_marker | |||
) | ) | |||
intersection = package.python_constraint.intersect( | intersection = package.python_constraint.intersect( | |||
transitive_python_constraint | transitive_python_constraint | |||
) | ) | |||
difference = transitive_python_constraint.difference(intersectio n) | difference = transitive_python_constraint.difference(intersectio n) | |||
# The difference is only relevant if it intersects | # The difference is only relevant if it intersects | |||
# the root package python constraint | # the root package python constraint | |||
difference = difference.intersect(self._python_constraint) | difference = difference.intersect(self._python_constraint) | |||
if ( | if ( | |||
transitive_python_constraint.is_any() | transitive_python_constraint.is_any() | |||
or self._python_constraint.intersect( | or self._python_constraint.intersect( | |||
package.dependency.python_constraint | dependency_package.dependency.python_constraint | |||
).is_empty() | ).is_empty() | |||
or intersection.is_empty() | or intersection.is_empty() | |||
or not difference.is_empty() | or not difference.is_empty() | |||
): | ): | |||
return [ | return [ | |||
Incompatibility( | Incompatibility( | |||
[Term(package.to_dependency(), True)], | [Term(package.to_dependency(), True)], | |||
PythonCause( | PythonCause( | |||
package.python_versions, str(self._python_constr aint) | package.python_versions, str(self._python_constr aint) | |||
), | ), | |||
) | ) | |||
] | ] | |||
_dependencies = [ | _dependencies = [ | |||
dep | dep | |||
for dep in dependencies | for dep in dependencies | |||
if dep.name not in self.UNSAFE_PACKAGES | if dep.name not in self.UNSAFE_PACKAGES | |||
and self._python_constraint.allows_any(dep.python_constraint) | and self._python_constraint.allows_any(dep.python_constraint) | |||
and (not self._env or dep.marker.validate(self._env.marker_env)) | and (not self._env or dep.marker.validate(self._env.marker_env)) | |||
] | ] | |||
dependencies = self._get_dependencies_with_overrides( | ||||
overrides = self._overrides.get(package, {}) | _dependencies, dependency_package | |||
dependencies = [] | ) | |||
overridden = [] | ||||
for dep in _dependencies: | ||||
if dep.name in overrides: | ||||
if dep.name in overridden: | ||||
continue | ||||
dependencies.append(overrides[dep.name]) | ||||
overridden.append(dep.name) | ||||
continue | ||||
dependencies.append(dep) | ||||
return [ | return [ | |||
Incompatibility( | Incompatibility( | |||
[Term(package.to_dependency(), True), Term(dep, False)], | [Term(package.to_dependency(), True), Term(dep, False)], | |||
DependencyCause(), | DependencyCause(), | |||
) | ) | |||
for dep in dependencies | for dep in dependencies | |||
] | ] | |||
def complete_package( | def complete_package( | |||
self, package | self, dependency_package: DependencyPackage | |||
): # type: (DependencyPackage) -> DependencyPackage | ) -> DependencyPackage: | |||
package = dependency_package.package | ||||
dependency = dependency_package.dependency | ||||
if package.is_root(): | if package.is_root(): | |||
package = package.clone() | dependency_package = dependency_package.clone() | |||
package = dependency_package.package | ||||
dependency = dependency_package.dependency | ||||
requires = package.all_requires | requires = package.all_requires | |||
elif not package.is_root() and package.source_type not in { | elif package.source_type not in { | |||
"directory", | "directory", | |||
"file", | "file", | |||
"url", | "url", | |||
"git", | "git", | |||
}: | }: | |||
package = DependencyPackage( | try: | |||
package.dependency, | dependency_package = DependencyPackage( | |||
self._pool.package( | dependency, | |||
package.name, | self._pool.package( | |||
package.version.text, | package.pretty_name, | |||
extras=list(package.dependency.extras), | package.version, | |||
repository=package.dependency.source_name, | extras=list(dependency.extras), | |||
), | repository=dependency.source_name, | |||
) | ), | |||
) | ||||
except PackageNotFound as e: | ||||
try: | ||||
dependency_package = next( | ||||
DependencyPackage(dependency, pkg) | ||||
for pkg in self.search_for_installed_packages(dependency | ||||
) | ||||
) | ||||
except StopIteration: | ||||
raise e from e | ||||
package = dependency_package.package | ||||
dependency = dependency_package.dependency | ||||
requires = package.requires | requires = package.requires | |||
else: | else: | |||
requires = package.requires | requires = package.requires | |||
if self._load_deferred: | if self._load_deferred: | |||
# Retrieving constraints for deferred dependencies | # Retrieving constraints for deferred dependencies | |||
for r in requires: | for r in requires: | |||
if r.is_directory(): | if r.is_direct_origin(): | |||
self.search_for_directory(r) | self.search_for_direct_origin_dependency(r) | |||
elif r.is_file(): | ||||
self.search_for_file(r) | ||||
elif r.is_vcs(): | ||||
self.search_for_vcs(r) | ||||
elif r.is_url(): | ||||
self.search_for_url(r) | ||||
optional_dependencies = [] | optional_dependencies = [] | |||
_dependencies = [] | _dependencies = [] | |||
# If some extras/features were required, we need to | # If some extras/features were required, we need to | |||
# add a special dependency representing the base package | # add a special dependency representing the base package | |||
# to the current package | # to the current package | |||
if package.dependency.extras: | if dependency.extras: | |||
for extra in package.dependency.extras: | for extra in dependency.extras: | |||
extra = safe_extra(extra) | ||||
if extra not in package.extras: | if extra not in package.extras: | |||
continue | continue | |||
optional_dependencies += [d.name for d in package.extras[extra]] | optional_dependencies += [d.name for d in package.extras[extra]] | |||
package = package.with_features(list(package.dependency.extras)) | dependency_package = dependency_package.with_features( | |||
list(dependency.extras) | ||||
) | ||||
package = dependency_package.package | ||||
dependency = dependency_package.dependency | ||||
_dependencies.append(package.without_features().to_dependency()) | _dependencies.append(package.without_features().to_dependency()) | |||
for dep in requires: | for dep in requires: | |||
if not self._python_constraint.allows_any(dep.python_constraint): | if not self._python_constraint.allows_any(dep.python_constraint): | |||
continue | continue | |||
if dep.name in self.UNSAFE_PACKAGES: | if dep.name in self.UNSAFE_PACKAGES: | |||
continue | continue | |||
if self._env and not dep.marker.validate(self._env.marker_env): | if self._env and not dep.marker.validate(self._env.marker_env): | |||
continue | continue | |||
if not package.is_root(): | if not package.is_root() and ( | |||
if (dep.is_optional() and dep.name not in optional_dependencies) | (dep.is_optional() and dep.name not in optional_dependencies) | |||
or ( | or ( | |||
dep.in_extras | dep.in_extras | |||
and not set(dep.in_extras).intersection(package.dependency.e | and not set(dep.in_extras).intersection( | |||
xtras) | {safe_extra(extra) for extra in dependency.extras} | |||
): | ) | |||
continue | ) | |||
): | ||||
continue | ||||
_dependencies.append(dep) | _dependencies.append(dep) | |||
overrides = self._overrides.get(package, {}) | dependencies = self._get_dependencies_with_overrides( | |||
dependencies = [] | _dependencies, dependency_package | |||
overridden = [] | ) | |||
for dep in _dependencies: | ||||
if dep.name in overrides: | ||||
if dep.name in overridden: | ||||
continue | ||||
dependencies.append(overrides[dep.name]) | ||||
overridden.append(dep.name) | ||||
continue | ||||
dependencies.append(dep) | ||||
# Searching for duplicate dependencies | # Searching for duplicate dependencies | |||
# | # | |||
# If the duplicate dependencies have the same constraint, | # If the duplicate dependencies have the same constraint, | |||
# the requirements will be merged. | # the requirements will be merged. | |||
# | # | |||
# For instance: | # For instance: | |||
# - enum34; python_version=="2.7" | # - enum34; python_version=="2.7" | |||
# - enum34; python_version=="3.3" | # - enum34; python_version=="3.3" | |||
# | # | |||
# will become: | # will become: | |||
# - enum34; python_version=="2.7" or python_version=="3.3" | # - enum34; python_version=="2.7" or python_version=="3.3" | |||
# | # | |||
# If the duplicate dependencies have different constraints | # If the duplicate dependencies have different constraints | |||
# we have to split the dependency graph. | # we have to split the dependency graph. | |||
# | # | |||
# An example of this is: | # An example of this is: | |||
# - pypiwin32 (220); sys_platform == "win32" and python_version >= "3. 6" | # - pypiwin32 (220); sys_platform == "win32" and python_version >= "3. 6" | |||
# - pypiwin32 (219); sys_platform == "win32" and python_version < "3.6 " | # - pypiwin32 (219); sys_platform == "win32" and python_version < "3.6 " | |||
duplicates = OrderedDict() | duplicates: dict[str, list[Dependency]] = defaultdict(list) | |||
for dep in dependencies: | for dep in dependencies: | |||
if dep.name not in duplicates: | duplicates[dep.complete_name].append(dep) | |||
duplicates[dep.name] = [] | ||||
duplicates[dep.name].append(dep) | ||||
dependencies = [] | dependencies = [] | |||
for dep_name, deps in duplicates.items(): | for dep_name, deps in duplicates.items(): | |||
if len(deps) == 1: | if len(deps) == 1: | |||
dependencies.append(deps[0]) | dependencies.append(deps[0]) | |||
continue | continue | |||
self.debug("<debug>Duplicate dependencies for {}</debug>".format(dep _name)) | self.debug(f"<debug>Duplicate dependencies for {dep_name}</debug>") | |||
# Regrouping by constraint | non_direct_origin_deps: list[Dependency] = [] | |||
by_constraint = OrderedDict() | direct_origin_deps: list[Dependency] = [] | |||
for dep in deps: | for dep in deps: | |||
if dep.constraint not in by_constraint: | if dep.is_direct_origin(): | |||
by_constraint[dep.constraint] = [] | direct_origin_deps.append(dep) | |||
else: | ||||
by_constraint[dep.constraint].append(dep) | non_direct_origin_deps.append(dep) | |||
deps = ( | ||||
# We merge by constraint | self._merge_dependencies_by_constraint( | |||
for constraint, _deps in by_constraint.items(): | self._merge_dependencies_by_marker(non_direct_origin_deps) | |||
new_markers = [] | ||||
for dep in _deps: | ||||
marker = dep.marker.without_extras() | ||||
if marker.is_any(): | ||||
# No marker or only extras | ||||
continue | ||||
new_markers.append(marker) | ||||
if not new_markers: | ||||
continue | ||||
dep = _deps[0] | ||||
dep.marker = dep.marker.union(MarkerUnion(*new_markers)) | ||||
by_constraint[constraint] = [dep] | ||||
continue | ||||
if len(by_constraint) == 1: | ||||
self.debug( | ||||
"<debug>Merging requirements for {}</debug>".format(str(deps | ||||
[0])) | ||||
) | ) | |||
dependencies.append(list(by_constraint.values())[0][0]) | + direct_origin_deps | |||
) | ||||
if len(deps) == 1: | ||||
self.debug(f"<debug>Merging requirements for {deps[0]!s}</debug> | ||||
") | ||||
dependencies.append(deps[0]) | ||||
continue | continue | |||
# We leave dependencies as-is if they have the same | # We leave dependencies as-is if they have the same | |||
# python/platform constraints. | # python/platform constraints. | |||
# That way the resolver will pickup the conflict | # That way the resolver will pickup the conflict | |||
# and display a proper error. | # and display a proper error. | |||
_deps = [value[0] for value in by_constraint.values()] | ||||
seen = set() | seen = set() | |||
for _dep in _deps: | for dep in deps: | |||
pep_508_dep = _dep.to_pep_508(False) | pep_508_dep = dep.to_pep_508(False) | |||
if ";" not in pep_508_dep: | if ";" not in pep_508_dep: | |||
_requirements = "" | _requirements = "" | |||
else: | else: | |||
_requirements = pep_508_dep.split(";")[1].strip() | _requirements = pep_508_dep.split(";")[1].strip() | |||
if _requirements not in seen: | if _requirements not in seen: | |||
seen.add(_requirements) | seen.add(_requirements) | |||
if len(_deps) != len(seen): | if len(deps) != len(seen): | |||
for _dep in _deps: | for dep in deps: | |||
dependencies.append(_dep) | dependencies.append(dep) | |||
continue | continue | |||
# At this point, we raise an exception that will | # At this point, we raise an exception that will | |||
# tell the solver to make new resolutions with specific overrides. | # tell the solver to make new resolutions with specific overrides. | |||
# | # | |||
# For instance, if the foo (1.2.3) package has the following depende ncies: | # For instance, if the foo (1.2.3) package has the following depende ncies: | |||
# - bar (>=2.0) ; python_version >= "3.6" | # - bar (>=2.0) ; python_version >= "3.6" | |||
# - bar (<2.0) ; python_version < "3.6" | # - bar (<2.0) ; python_version < "3.6" | |||
# | # | |||
# then the solver will need to make two new resolutions | # then the solver will need to make two new resolutions | |||
# with the following overrides: | # with the following overrides: | |||
# - {<Package foo (1.2.3): {"bar": <Dependency bar (>=2.0)>} | # - {<Package foo (1.2.3): {"bar": <Dependency bar (>=2.0)>} | |||
# - {<Package foo (1.2.3): {"bar": <Dependency bar (<2.0)>} | # - {<Package foo (1.2.3): {"bar": <Dependency bar (<2.0)>} | |||
markers = [] | ||||
for constraint, _deps in by_constraint.items(): | ||||
markers.append(_deps[0].marker) | ||||
_deps = [_dep[0] for _dep in by_constraint.values()] | def fmt_warning(d: Dependency) -> str: | |||
self.debug( | dependency_marker = d.marker if not d.marker.is_any() else "*" | |||
"<warning>Different requirements found for {}.</warning>".format | return ( | |||
( | f"<c1>{d.name}</c1> <fg=default>(<c2>{d.pretty_constraint}</ | |||
", ".join( | c2>)</>" | |||
"<c1>{}</c1> <fg=default>(<c2>{}</c2>)</> with markers < | f" with markers <b>{dependency_marker}</b>" | |||
b>{}</b>".format( | ||||
d.name, | ||||
d.pretty_constraint, | ||||
d.marker if not d.marker.is_any() else "*", | ||||
) | ||||
for d in _deps[:-1] | ||||
) | ||||
+ " and " | ||||
+ "<c1>{}</c1> <fg=default>(<c2>{}</c2>)</> with markers <b> | ||||
{}</b>".format( | ||||
_deps[-1].name, | ||||
_deps[-1].pretty_constraint, | ||||
_deps[-1].marker if not _deps[-1].marker.is_any() else " | ||||
*", | ||||
) | ||||
) | ) | |||
warnings = ", ".join(fmt_warning(d) for d in deps[:-1]) | ||||
warnings += f" and {fmt_warning(deps[-1])}" | ||||
self.debug( | ||||
f"<warning>Different requirements found for {warnings}.</warning | ||||
>" | ||||
) | ) | |||
# We need to check if one of the duplicate dependencies | # We need to check if one of the duplicate dependencies | |||
# has no markers. If there is one, we need to change its | # has no markers. If there is one, we need to change its | |||
# environment markers to the inverse of the union of the | # environment markers to the inverse of the union of the | |||
# other dependencies markers. | # other dependencies markers. | |||
# For instance, if we have the following dependencies: | # For instance, if we have the following dependencies: | |||
# - ipython | # - ipython | |||
# - ipython (1.2.4) ; implementation_name == "pypy" | # - ipython (1.2.4) ; implementation_name == "pypy" | |||
# | # | |||
# the marker for `ipython` will become `implementation_name != "pypy "`. | # the marker for `ipython` will become `implementation_name != "pypy "`. | |||
any_markers_dependencies = [d for d in _deps if d.marker.is_any()] | # | |||
other_markers_dependencies = [d for d in _deps if not d.marker.is_an | # Further, we have to merge the constraints of the requirements | |||
y()] | # without markers into the constraints of the requirements with mark | |||
ers. | ||||
# for instance, if we have the following dependencies: | ||||
# - foo (>= 1.2) | ||||
# - foo (!= 1.2.1) ; python == 3.10 | ||||
# | ||||
# the constraint for the second entry will become (!= 1.2.1, >= 1.2) | ||||
any_markers_dependencies = [d for d in deps if d.marker.is_any()] | ||||
other_markers_dependencies = [d for d in deps if not d.marker.is_any | ||||
()] | ||||
marker = other_markers_dependencies[0].marker | ||||
for other_dep in other_markers_dependencies[1:]: | ||||
marker = marker.union(other_dep.marker) | ||||
inverted_marker = marker.invert() | ||||
if any_markers_dependencies: | if any_markers_dependencies: | |||
marker = other_markers_dependencies[0].marker | for dep_any in any_markers_dependencies: | |||
for other_dep in other_markers_dependencies[1:]: | dep_any.marker = inverted_marker | |||
marker = marker.union(other_dep.marker) | for dep_other in other_markers_dependencies: | |||
dep_other.constraint = dep_other.constraint.intersect( | ||||
for i, d in enumerate(_deps): | dep_any.constraint | |||
if d.marker.is_any(): | ) | |||
_deps[i].marker = marker.invert() | elif not inverted_marker.is_empty() and self._python_constraint.allo | |||
ws_any( | ||||
get_python_constraint_from_marker(inverted_marker) | ||||
): | ||||
# if there is no any marker dependency | ||||
# and the inverted marker is not empty, | ||||
# a dependency with the inverted union of all markers is require | ||||
d | ||||
# in order to not miss other dependencies later, for instance: | ||||
# - foo (1.0) ; python == 3.7 | ||||
# - foo (2.0) ; python == 3.8 | ||||
# - bar (2.0) ; python == 3.8 | ||||
# - bar (3.0) ; python == 3.9 | ||||
# | ||||
# the last dependency would be missed without this, | ||||
# because the intersection with both foo dependencies is empty | ||||
inverted_marker_dep = deps[0].with_constraint(EmptyConstraint()) | ||||
inverted_marker_dep.marker = inverted_marker | ||||
deps.append(inverted_marker_dep) | ||||
overrides = [] | overrides = [] | |||
for _dep in _deps: | overrides_marker_intersection: BaseMarker = AnyMarker() | |||
current_overrides = self._overrides.copy() | for dep_overrides in self._overrides.values(): | |||
package_overrides = current_overrides.get(package, {}).copy() | for dep in dep_overrides.values(): | |||
package_overrides.update({_dep.name: _dep}) | overrides_marker_intersection = ( | |||
current_overrides.update({package: package_overrides}) | overrides_marker_intersection.intersect(dep.marker) | |||
overrides.append(current_overrides) | ) | |||
for dep in deps: | ||||
if not overrides_marker_intersection.intersect(dep.marker).is_em | ||||
pty(): | ||||
current_overrides = self._overrides.copy() | ||||
package_overrides = current_overrides.get( | ||||
dependency_package, {} | ||||
).copy() | ||||
package_overrides.update({dep.name: dep}) | ||||
current_overrides.update({dependency_package: package_overri | ||||
des}) | ||||
overrides.append(current_overrides) | ||||
raise OverrideNeeded(*overrides) | if overrides: | |||
raise OverrideNeeded(*overrides) | ||||
# Modifying dependencies as needed | # Modifying dependencies as needed | |||
clean_dependencies = [] | clean_dependencies = [] | |||
for dep in dependencies: | for dep in dependencies: | |||
if not package.dependency.transitive_marker.without_extras().is_any( | if not dependency.transitive_marker.without_extras().is_any(): | |||
): | marker_intersection = ( | |||
marker_intersection = package.dependency.transitive_marker.witho | dependency.transitive_marker.without_extras().intersect( | |||
ut_extras().intersect( | dep.marker.without_extras() | |||
dep.marker.without_extras() | ) | |||
) | ) | |||
if marker_intersection.is_empty(): | if marker_intersection.is_empty(): | |||
# The dependency is not needed, since the markers specified | # The dependency is not needed, since the markers specified | |||
# for the current package selection are not compatible with | # for the current package selection are not compatible with | |||
# the markers for the current dependency, so we skip it | # the markers for the current dependency, so we skip it | |||
continue | continue | |||
dep.transitive_marker = marker_intersection | dep.transitive_marker = marker_intersection | |||
if not package.dependency.python_constraint.is_any(): | if not dependency.python_constraint.is_any(): | |||
python_constraint_intersection = dep.python_constraint.intersect ( | python_constraint_intersection = dep.python_constraint.intersect ( | |||
package.dependency.python_constraint | dependency.python_constraint | |||
) | ) | |||
if python_constraint_intersection.is_empty(): | if python_constraint_intersection.is_empty(): | |||
# This dependency is not needed under current python constra int. | # This dependency is not needed under current python constra int. | |||
continue | continue | |||
dep.transitive_python_versions = str(python_constraint_intersect ion) | dep.transitive_python_versions = str(python_constraint_intersect ion) | |||
clean_dependencies.append(dep) | clean_dependencies.append(dep) | |||
package.requires = clean_dependencies | package = package.with_dependency_groups([], only=True) | |||
dependency_package = DependencyPackage(dependency, package) | ||||
return package | for dep in clean_dependencies: | |||
package.add_dependency(dep) | ||||
return dependency_package | ||||
def debug(self, message, depth=0): | def debug(self, message: str, depth: int = 0) -> None: | |||
if not (self._io.is_very_verbose() or self._io.is_debug()): | if not (self._io.is_very_verbose() or self._io.is_debug()): | |||
return | return | |||
if message.startswith("fact:"): | if message.startswith("fact:"): | |||
if "depends on" in message: | if "depends on" in message: | |||
m = re.match(r"fact: (.+?) depends on (.+?) \((.+?)\)", message) | m = re.match(r"fact: (.+?) depends on (.+?) \((.+?)\)", message) | |||
if m is None: | ||||
raise ValueError(f"Unable to parse fact: {message}") | ||||
m2 = re.match(r"(.+?) \((.+?)\)", m.group(1)) | m2 = re.match(r"(.+?) \((.+?)\)", m.group(1)) | |||
if m2: | if m2: | |||
name = m2.group(1) | name = m2.group(1) | |||
version = " (<c2>{}</c2>)".format(m2.group(2)) | version = f" (<c2>{m2.group(2)}</c2>)" | |||
else: | else: | |||
name = m.group(1) | name = m.group(1) | |||
version = "" | version = "" | |||
message = ( | message = ( | |||
"<fg=blue>fact</>: <c1>{}</c1>{} " | f"<fg=blue>fact</>: <c1>{name}</c1>{version} " | |||
"depends on <c1>{}</c1> (<c2>{}</c2>)".format( | f"depends on <c1>{m.group(2)}</c1> (<c2>{m.group(3)}</c2>)" | |||
name, version, m.group(2), m.group(3) | ||||
) | ||||
) | ) | |||
elif " is " in message: | elif " is " in message: | |||
message = re.sub( | message = re.sub( | |||
"fact: (.+) is (.+)", | "fact: (.+) is (.+)", | |||
"<fg=blue>fact</>: <c1>\\1</c1> is <c2>\\2</c2>", | "<fg=blue>fact</>: <c1>\\1</c1> is <c2>\\2</c2>", | |||
message, | message, | |||
) | ) | |||
else: | else: | |||
message = re.sub( | message = re.sub( | |||
r"(?<=: )(.+?) \((.+?)\)", "<c1>\\1</c1> (<c2>\\2</c2>)", me ssage | r"(?<=: )(.+?) \((.+?)\)", "<c1>\\1</c1> (<c2>\\2</c2>)", me ssage | |||
) | ) | |||
message = "<fg=blue>fact</>: {}".format(message.split("fact: ")[ 1]) | message = f"<fg=blue>fact</>: {message.split('fact: ')[1]}" | |||
elif message.startswith("selecting "): | elif message.startswith("selecting "): | |||
message = re.sub( | message = re.sub( | |||
r"selecting (.+?) \((.+?)\)", | r"selecting (.+?) \((.+?)\)", | |||
"<fg=blue>selecting</> <c1>\\1</c1> (<c2>\\2</c2>)", | "<fg=blue>selecting</> <c1>\\1</c1> (<c2>\\2</c2>)", | |||
message, | message, | |||
) | ) | |||
elif message.startswith("derived:"): | elif message.startswith("derived:"): | |||
m = re.match(r"derived: (.+?) \((.+?)\)$", message) | m = re.match(r"derived: (.+?) \((.+?)\)$", message) | |||
if m: | if m: | |||
message = "<fg=blue>derived</>: <c1>{}</c1> (<c2>{}</c2>)".forma | message = ( | |||
t( | f"<fg=blue>derived</>: <c1>{m.group(1)}</c1>" | |||
m.group(1), m.group(2) | f" (<c2>{m.group(2)}</c2>)" | |||
) | ) | |||
else: | else: | |||
message = "<fg=blue>derived</>: <c1>{}</c1>".format( | message = ( | |||
message.split("derived: ")[1] | f"<fg=blue>derived</>: <c1>{message.split('derived: ')[1]}</ | |||
c1>" | ||||
) | ) | |||
elif message.startswith("conflict:"): | elif message.startswith("conflict:"): | |||
m = re.match(r"conflict: (.+?) depends on (.+?) \((.+?)\)", message) | m = re.match(r"conflict: (.+?) depends on (.+?) \((.+?)\)", message) | |||
if m: | if m: | |||
m2 = re.match(r"(.+?) \((.+?)\)", m.group(1)) | m2 = re.match(r"(.+?) \((.+?)\)", m.group(1)) | |||
if m2: | if m2: | |||
name = m2.group(1) | name = m2.group(1) | |||
version = " (<c2>{}</c2>)".format(m2.group(2)) | version = f" (<c2>{m2.group(2)}</c2>)" | |||
else: | else: | |||
name = m.group(1) | name = m.group(1) | |||
version = "" | version = "" | |||
message = ( | message = ( | |||
"<fg=red;options=bold>conflict</>: <c1>{}</c1>{} " | f"<fg=red;options=bold>conflict</>: <c1>{name}</c1>{version} | |||
"depends on <c1>{}</c1> (<c2>{}</c2>)".format( | " | |||
name, version, m.group(2), m.group(3) | f"depends on <c1>{m.group(2)}</c1> (<c2>{m.group(3)}</c2>)" | |||
) | ||||
) | ) | |||
else: | else: | |||
message = "<fg=red;options=bold>conflict</>: {}".format( | message = ( | |||
message.split("conflict: ")[1] | "<fg=red;options=bold>conflict</>:" | |||
f" {message.split('conflict: ')[1]}" | ||||
) | ) | |||
message = message.replace("! ", "<error>!</error> ") | message = message.replace("! ", "<error>!</error> ") | |||
if self.is_debugging(): | if self.is_debugging(): | |||
debug_info = str(message) | debug_info = str(message) | |||
debug_info = ( | debug_info = ( | |||
"\n".join( | "\n".join( | |||
[ | [ | |||
"<debug>{}:</debug> {}".format(str(depth).rjust(4), s) | f"<debug>{str(depth).rjust(4)}:</debug> {s}" | |||
for s in debug_info.split("\n") | for s in debug_info.split("\n") | |||
] | ] | |||
) | ) | |||
+ "\n" | + "\n" | |||
) | ) | |||
self._io.write(debug_info) | self._io.write(debug_info) | |||
@contextmanager | @contextmanager | |||
def progress(self): | def progress(self) -> Iterator[None]: | |||
if not self._io.output.supports_ansi() or self.is_debugging(): | if not self._io.output.is_decorated() or self.is_debugging(): | |||
self._io.write_line("Resolving dependencies...") | self._io.write_line("Resolving dependencies...") | |||
yield | yield | |||
else: | else: | |||
indicator = Indicator(self._io, "{message} <debug>({elapsed:2s})</de | indicator = Indicator( | |||
bug>") | self._io, "{message}{context}<debug>({elapsed:2s})</debug>" | |||
) | ||||
with indicator.auto( | with indicator.auto( | |||
"<info>Resolving dependencies...</info>", | "<info>Resolving dependencies...</info>", | |||
"<info>Resolving dependencies...</info>", | "<info>Resolving dependencies...</info>", | |||
): | ): | |||
yield | yield | |||
self._in_progress = False | self._in_progress = False | |||
def _merge_dependencies_by_constraint( | ||||
self, dependencies: Iterable[Dependency] | ||||
) -> list[Dependency]: | ||||
by_constraint: dict[VersionConstraint, list[Dependency]] = defaultdict(l | ||||
ist) | ||||
for dep in dependencies: | ||||
by_constraint[dep.constraint].append(dep) | ||||
for constraint, _deps in by_constraint.items(): | ||||
new_markers = [] | ||||
for dep in _deps: | ||||
marker = dep.marker.without_extras() | ||||
if marker.is_any(): | ||||
# No marker or only extras | ||||
continue | ||||
new_markers.append(marker) | ||||
if not new_markers: | ||||
continue | ||||
dep = _deps[0] | ||||
dep.marker = dep.marker.union(MarkerUnion(*new_markers)) | ||||
by_constraint[constraint] = [dep] | ||||
return [value[0] for value in by_constraint.values()] | ||||
def _merge_dependencies_by_marker( | ||||
self, dependencies: Iterable[Dependency] | ||||
) -> list[Dependency]: | ||||
by_marker: dict[BaseMarker, list[Dependency]] = defaultdict(list) | ||||
for dep in dependencies: | ||||
by_marker[dep.marker].append(dep) | ||||
deps = [] | ||||
for _deps in by_marker.values(): | ||||
if len(_deps) == 1: | ||||
deps.extend(_deps) | ||||
else: | ||||
new_constraint = _deps[0].constraint | ||||
for dep in _deps[1:]: | ||||
new_constraint = new_constraint.intersect(dep.constraint) | ||||
if new_constraint.is_empty(): | ||||
# leave dependencies as-is so the resolver will pickup | ||||
# the conflict and display a proper error. | ||||
deps.extend(_deps) | ||||
else: | ||||
self.debug( | ||||
f"<debug>Merging constraints for {_deps[0].name} for" | ||||
f" marker {_deps[0].marker}</debug>" | ||||
) | ||||
deps.append(_deps[0].with_constraint(new_constraint)) | ||||
return deps | ||||
End of changes. 111 change blocks. | ||||
388 lines changed or deleted | 508 lines changed or added |