"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "poetry/mixology/version_solver.py" between
poetry-1.1.15.tar.gz and poetry-1.2.0.tar.gz

About: Poetry is a tool for dependency management and packaging in Python.

version_solver.py  (poetry-1.1.15):version_solver.py  (poetry-1.2.0)
# -*- coding: utf-8 -*- from __future__ import annotations
import functools
import time import time
from contextlib import suppress
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from typing import Any
from typing import Dict from poetry.core.packages.dependency import Dependency
from typing import List
from typing import Union from poetry.mixology.failure import SolveFailure
from poetry.mixology.incompatibility import Incompatibility
from poetry.core.packages import Dependency from poetry.mixology.incompatibility_cause import ConflictCause
from poetry.core.packages import Package from poetry.mixology.incompatibility_cause import NoVersionsCause
from poetry.core.packages import ProjectPackage from poetry.mixology.incompatibility_cause import PackageNotFoundCause
from poetry.core.semver import Version from poetry.mixology.incompatibility_cause import RootCause
from poetry.core.semver import VersionRange from poetry.mixology.partial_solution import PartialSolution
from poetry.mixology.result import SolverResult
from .failure import SolveFailure from poetry.mixology.set_relation import SetRelation
from .incompatibility import Incompatibility from poetry.mixology.term import Term
from .incompatibility_cause import ConflictCause from poetry.packages import DependencyPackage
from .incompatibility_cause import NoVersionsCause
from .incompatibility_cause import PackageNotFoundCause
from .incompatibility_cause import RootCause
from .partial_solution import PartialSolution
from .result import SolverResult
from .set_relation import SetRelation
from .term import Term
if TYPE_CHECKING: if TYPE_CHECKING:
from poetry.core.packages.project_package import ProjectPackage
from poetry.puzzle.provider import Provider from poetry.puzzle.provider import Provider
_conflict = object() _conflict = object()
class DependencyCache:
"""
A cache of the valid dependencies.
The key observation here is that during the search - except at backtracking
- once we have decided that a dependency is invalid, we never need check it
again.
"""
def __init__(self, provider: Provider) -> None:
self.provider = provider
self.cache: dict[
tuple[str, str | None, str | None, str | None, str | None],
list[DependencyPackage],
] = {}
self.search_for = functools.lru_cache(maxsize=128)(self._search_for)
def _search_for(self, dependency: Dependency) -> list[DependencyPackage]:
key = (
dependency.complete_name,
dependency.source_type,
dependency.source_url,
dependency.source_reference,
dependency.source_subdirectory,
)
packages = self.cache.get(key)
if packages is None:
packages = self.provider.search_for(dependency)
else:
packages = [
p for p in packages if dependency.constraint.allows(p.package.ve
rsion)
]
self.cache[key] = packages
return packages
def clear(self) -> None:
self.cache.clear()
class VersionSolver: class VersionSolver:
""" """
The version solver that finds a set of package versions that satisfy the The version solver that finds a set of package versions that satisfy the
root package's dependencies. root package's dependencies.
See https://github.com/dart-lang/pub/tree/master/doc/solver.md for details See https://github.com/dart-lang/pub/tree/master/doc/solver.md for details
on how this solver works. on how this solver works.
""" """
def __init__( def __init__(
self, self,
root, # type: ProjectPackage root: ProjectPackage,
provider, # type: Provider provider: Provider,
locked=None, # type: Dict[str, Package] locked: dict[str, list[DependencyPackage]] | None = None,
use_latest=None, # type: List[str] use_latest: list[str] | None = None,
): ) -> None:
self._root = root self._root = root
self._provider = provider self._provider = provider
self._dependency_cache = DependencyCache(provider)
self._locked = locked or {} self._locked = locked or {}
if use_latest is None: if use_latest is None:
use_latest = [] use_latest = []
self._use_latest = use_latest self._use_latest = use_latest
self._incompatibilities = {} # type: Dict[str, List[Incompatibility]] self._incompatibilities: dict[str, list[Incompatibility]] = {}
self._contradicted_incompatibilities: set[Incompatibility] = set()
self._solution = PartialSolution() self._solution = PartialSolution()
@property @property
def solution(self): # type: () -> PartialSolution def solution(self) -> PartialSolution:
return self._solution return self._solution
def solve(self): # type: () -> SolverResult def solve(self) -> SolverResult:
""" """
Finds a set of dependencies that match the root package's constraints, Finds a set of dependencies that match the root package's constraints,
or raises an error if no such set is available. or raises an error if no such set is available.
""" """
start = time.time() start = time.time()
root_dependency = Dependency(self._root.name, self._root.version) root_dependency = Dependency(self._root.name, self._root.version)
root_dependency.is_root = True root_dependency.is_root = True
self._add_incompatibility( self._add_incompatibility(
Incompatibility([Term(root_dependency, False)], RootCause()) Incompatibility([Term(root_dependency, False)], RootCause())
) )
try: try:
next = self._root.name next: str | None = self._root.name
while next is not None: while next is not None:
self._propagate(next) self._propagate(next)
next = self._choose_package_version() next = self._choose_package_version()
return self._result() return self._result()
except Exception: except Exception:
raise raise
finally: finally:
self._log( self._log(
"Version solving took {:.3f} seconds.\n" f"Version solving took {time.time() - start:.3f} seconds.\n"
"Tried {} solutions.".format( f"Tried {self._solution.attempted_solutions} solutions."
time.time() - start, self._solution.attempted_solutions
)
) )
def _propagate(self, package): # type: (str) -> None def _propagate(self, package: str) -> None:
""" """
Performs unit propagation on incompatibilities transitively Performs unit propagation on incompatibilities transitively
related to package to derive new assignments for _solution. related to package to derive new assignments for _solution.
""" """
changed = set() changed = {package}
changed.add(package)
while changed: while changed:
package = changed.pop() package = changed.pop()
# Iterate in reverse because conflict resolution tends to produce mo re # Iterate in reverse because conflict resolution tends to produce mo re
# general incompatibilities as time goes on. If we look at those fir st, # general incompatibilities as time goes on. If we look at those fir st,
# we can derive stronger assignments sooner and more eagerly find # we can derive stronger assignments sooner and more eagerly find
# conflicts. # conflicts.
for incompatibility in reversed(self._incompatibilities[package]): for incompatibility in reversed(self._incompatibilities[package]):
if incompatibility in self._contradicted_incompatibilities:
continue
result = self._propagate_incompatibility(incompatibility) result = self._propagate_incompatibility(incompatibility)
if result is _conflict: if result is _conflict:
# If the incompatibility is satisfied by the solution, we us e # If the incompatibility is satisfied by the solution, we us e
# _resolve_conflict() to determine the root cause of the con # _resolve_conflict() to determine the root cause of the con
flict as a flict as
# new incompatibility. # a new incompatibility.
# #
# It also backjumps to a point in the solution # It also backjumps to a point in the solution
# where that incompatibility will allow us to derive new ass ignments # where that incompatibility will allow us to derive new ass ignments
# that avoid the conflict. # that avoid the conflict.
root_cause = self._resolve_conflict(incompatibility) root_cause = self._resolve_conflict(incompatibility)
# Back jumping erases all the assignments we did at the prev ious # Back jumping erases all the assignments we did at the prev ious
# decision level, so we clear [changed] and refill it with t he # decision level, so we clear [changed] and refill it with t he
# newly-propagated assignment. # newly-propagated assignment.
changed.clear() changed.clear()
changed.add(str(self._propagate_incompatibility(root_cause)) ) changed.add(str(self._propagate_incompatibility(root_cause)) )
break break
elif result is not None: elif result is not None:
changed.add(result) changed.add(str(result))
def _propagate_incompatibility( def _propagate_incompatibility(
self, incompatibility self, incompatibility: Incompatibility
): # type: (Incompatibility) -> Union[str, _conflict, None] ) -> str | object | None:
""" """
If incompatibility is almost satisfied by _solution, adds the If incompatibility is almost satisfied by _solution, adds the
negation of the unsatisfied term to _solution. negation of the unsatisfied term to _solution.
If incompatibility is satisfied by _solution, returns _conflict. If If incompatibility is satisfied by _solution, returns _conflict. If
incompatibility is almost satisfied by _solution, returns the incompatibility is almost satisfied by _solution, returns the
unsatisfied term's package name. unsatisfied term's package name.
Otherwise, returns None. Otherwise, returns None.
""" """
skipping to change at line 156 skipping to change at line 197
# inconclusive for incompatibility and we can't deduce anything. # inconclusive for incompatibility and we can't deduce anything.
unsatisfied = None unsatisfied = None
for term in incompatibility.terms: for term in incompatibility.terms:
relation = self._solution.relation(term) relation = self._solution.relation(term)
if relation == SetRelation.DISJOINT: if relation == SetRelation.DISJOINT:
# If term is already contradicted by _solution, then # If term is already contradicted by _solution, then
# incompatibility is contradicted as well and there's nothing ne w we # incompatibility is contradicted as well and there's nothing ne w we
# can deduce from it. # can deduce from it.
return self._contradicted_incompatibilities.add(incompatibility)
return None
elif relation == SetRelation.OVERLAPPING: elif relation == SetRelation.OVERLAPPING:
# If more than one term is inconclusive, we can't deduce anythin g about # If more than one term is inconclusive, we can't deduce anythin g about
# incompatibility. # incompatibility.
if unsatisfied is not None: if unsatisfied is not None:
return return None
# If exactly one term in incompatibility is inconclusive, then i t's # If exactly one term in incompatibility is inconclusive, then i t's
# almost satisfied and [term] is the unsatisfied term. We can ad d the # almost satisfied and [term] is the unsatisfied term. We can ad d the
# inverse of the term to _solution. # inverse of the term to _solution.
unsatisfied = term unsatisfied = term
# If *all* terms in incompatibility are satisfied by _solution, then # If *all* terms in incompatibility are satisfied by _solution, then
# incompatibility is satisfied and we have a conflict. # incompatibility is satisfied and we have a conflict.
if unsatisfied is None: if unsatisfied is None:
return _conflict return _conflict
self._log( self._contradicted_incompatibilities.add(incompatibility)
"derived: {}{}".format(
"not " if unsatisfied.is_positive() else "", unsatisfied.depende adverb = "not " if unsatisfied.is_positive() else ""
ncy self._log(f"derived: {adverb}{unsatisfied.dependency}")
)
)
self._solution.derive( self._solution.derive(
unsatisfied.dependency, not unsatisfied.is_positive(), incompatibili ty unsatisfied.dependency, not unsatisfied.is_positive(), incompatibili ty
) )
return unsatisfied.dependency.complete_name complete_name: str = unsatisfied.dependency.complete_name
return complete_name
def _resolve_conflict( def _resolve_conflict(self, incompatibility: Incompatibility) -> Incompatibi
self, incompatibility lity:
): # type: (Incompatibility) -> Incompatibility
""" """
Given an incompatibility that's satisfied by _solution, Given an incompatibility that's satisfied by _solution,
The `conflict resolution`_ constructs a new incompatibility that encapsu The `conflict resolution`_ constructs a new incompatibility that encapsu
lates the root lates
cause of the conflict and backtracks _solution until the new the root cause of the conflict and backtracks _solution until the new
incompatibility will allow _propagate() to deduce new assignments. incompatibility will allow _propagate() to deduce new assignments.
Adds the new incompatibility to _incompatibilities and returns it. Adds the new incompatibility to _incompatibilities and returns it.
.. _conflict resolution: https://github.com/dart-lang/pub/tree/master/do .. _conflict resolution:
c/solver.md#conflict-resolution https://github.com/dart-lang/pub/tree/master/doc/solver.md#conflict-reso
lution
""" """
self._log("conflict: {}".format(incompatibility)) self._log(f"conflict: {incompatibility}")
new_incompatibility = False new_incompatibility = False
while not incompatibility.is_failure(): while not incompatibility.is_failure():
# The term in incompatibility.terms that was most recently satisfied by # The term in incompatibility.terms that was most recently satisfied by
# _solution. # _solution.
most_recent_term = None most_recent_term = None
# The earliest assignment in _solution such that incompatibility is # The earliest assignment in _solution such that incompatibility is
# satisfied by _solution up to and including this assignment. # satisfied by _solution up to and including this assignment.
most_recent_satisfier = None most_recent_satisfier = None
skipping to change at line 261 skipping to change at line 302
previous_satisfier_level = max( previous_satisfier_level = max(
previous_satisfier_level, previous_satisfier_level,
self._solution.satisfier(difference.inverse).decisio n_level, self._solution.satisfier(difference.inverse).decisio n_level,
) )
# If most_recent_identifier is the only satisfier left at its decisi on # If most_recent_identifier is the only satisfier left at its decisi on
# level, or if it has no cause (indicating that it's a decision rath er # level, or if it has no cause (indicating that it's a decision rath er
# than a derivation), then incompatibility is the root cause. We the n # than a derivation), then incompatibility is the root cause. We the n
# backjump to previous_satisfier_level, where incompatibility is # backjump to previous_satisfier_level, where incompatibility is
# guaranteed to allow _propagate to produce more assignments. # guaranteed to allow _propagate to produce more assignments.
# using assert to suppress mypy [union-attr]
assert most_recent_satisfier is not None
if ( if (
previous_satisfier_level < most_recent_satisfier.decision_level previous_satisfier_level < most_recent_satisfier.decision_level
or most_recent_satisfier.cause is None or most_recent_satisfier.cause is None
): ):
self._solution.backtrack(previous_satisfier_level) self._solution.backtrack(previous_satisfier_level)
self._contradicted_incompatibilities.clear()
self._dependency_cache.clear()
if new_incompatibility: if new_incompatibility:
self._add_incompatibility(incompatibility) self._add_incompatibility(incompatibility)
return incompatibility return incompatibility
# Create a new incompatibility by combining incompatibility with the # Create a new incompatibility by combining incompatibility with the
# incompatibility that caused most_recent_satisfier to be assigned. Doing # incompatibility that caused most_recent_satisfier to be assigned. Doing
# this iteratively constructs an incompatibility that's guaranteed t o be # this iteratively constructs an incompatibility that's guaranteed t o be
# true (that is, we know for sure no solution will satisfy the # true (that is, we know for sure no solution will satisfy the
# incompatibility) while also approximating the intuitive notion of the # incompatibility) while also approximating the intuitive notion of the
# "root cause" of the conflict. # "root cause" of the conflict.
new_terms = [] new_terms = [
for term in incompatibility.terms: term for term in incompatibility.terms if term != most_recent_te
if term != most_recent_term: rm
new_terms.append(term) ]
for term in most_recent_satisfier.cause.terms: for term in most_recent_satisfier.cause.terms:
if term.dependency != most_recent_satisfier.dependency: if term.dependency != most_recent_satisfier.dependency:
new_terms.append(term) new_terms.append(term)
# The most_recent_satisfier may not satisfy most_recent_term on its own # The most_recent_satisfier may not satisfy most_recent_term on its own
# if there are a collection of constraints on most_recent_term that # if there are a collection of constraints on most_recent_term that
# only satisfy it together. For example, if most_recent_term is # only satisfy it together. For example, if most_recent_term is
# `foo ^1.0.0` and _solution contains `[foo >=1.0.0, # `foo ^1.0.0` and _solution contains `[foo >=1.0.0,
# foo <2.0.0]`, then most_recent_satisfier will be `foo <2.0.0` even # foo <2.0.0]`, then most_recent_satisfier will be `foo <2.0.0` even
# though it doesn't totally satisfy `foo ^1.0.0`. # though it doesn't totally satisfy `foo ^1.0.0`.
# #
# In this case, we add `not (most_recent_satisfier \ most_recent_ter m)` to # In this case, we add `not (most_recent_satisfier \ most_recent_ter m)` to
# the incompatibility as well, See the `algorithm documentation`_ fo r # the incompatibility as well, See the `algorithm documentation`_ fo r
# details. # details.
# #
# .. _algorithm documentation: https://github.com/dart-lang/pub/tree # .. _algorithm documentation:
/master/doc/solver.md#conflict-resolution # https://github.com/dart-lang/pub/tree/master/doc/solver.md#conflic
t-resolution # noqa: E501
if difference is not None: if difference is not None:
new_terms.append(difference.inverse) new_terms.append(difference.inverse)
incompatibility = Incompatibility( incompatibility = Incompatibility(
new_terms, ConflictCause(incompatibility, most_recent_satisfier. cause) new_terms, ConflictCause(incompatibility, most_recent_satisfier. cause)
) )
new_incompatibility = True new_incompatibility = True
partially = "" if difference is None else " partially" partially = "" if difference is None else " partially"
bang = "!"
self._log(
"{} {} is{} satisfied by {}".format(
bang, most_recent_term, partially, most_recent_satisfier
)
)
self._log( self._log(
'{} which is caused by "{}"'.format(bang, most_recent_satisfier. f"! {most_recent_term} is{partially} satisfied by"
cause) f" {most_recent_satisfier}"
) )
self._log("{} thus: {}".format(bang, incompatibility)) self._log(f'! which is caused by "{most_recent_satisfier.cause}"')
self._log(f"! thus: {incompatibility}")
raise SolveFailure(incompatibility) raise SolveFailure(incompatibility)
def _choose_package_version(self): # type: () -> Union[str, None] def _choose_package_version(self) -> str | None:
""" """
Tries to select a version of a required package. Tries to select a version of a required package.
Returns the name of the package whose incompatibilities should be Returns the name of the package whose incompatibilities should be
propagated by _propagate(), or None indicating that version solving is propagated by _propagate(), or None indicating that version solving is
complete and a solution has been found. complete and a solution has been found.
""" """
unsatisfied = self._solution.unsatisfied unsatisfied = self._solution.unsatisfied
if not unsatisfied: if not unsatisfied:
return return None
# Prefer packages with as few remaining versions as possible, # Prefer packages with as few remaining versions as possible,
# so that if a conflict is necessary it's forced quickly. # so that if a conflict is necessary it's forced quickly.
def _get_min(dependency): def _get_min(dependency: Dependency) -> tuple[bool, int]:
# Direct origin dependencies must be handled first: we don't want to
resolve
# a regular dependency for some package only to find later that we h
ad a
# direct-origin dependency.
if dependency.is_direct_origin():
return False, -1
if dependency.name in self._use_latest: if dependency.name in self._use_latest:
# If we're forced to use the latest version of a package, it eff ectively # If we're forced to use the latest version of a package, it eff ectively
# only has one version to choose from. # only has one version to choose from.
return not dependency.marker.is_any(), 1 return not dependency.marker.is_any(), 1
locked = self._get_locked(dependency) locked = self._get_locked(dependency)
if locked and ( if locked:
dependency.constraint.allows(locked.version)
or locked.is_prerelease()
and dependency.constraint.allows(locked.version.next_patch)
):
return not dependency.marker.is_any(), 1
# VCS, URL, File or Directory dependencies
# represent a single version
if (
dependency.is_vcs()
or dependency.is_url()
or dependency.is_file()
or dependency.is_directory()
):
return not dependency.marker.is_any(), 1 return not dependency.marker.is_any(), 1
try: try:
return ( return (
not dependency.marker.is_any(), not dependency.marker.is_any(),
len(self._provider.search_for(dependency)), len(self._dependency_cache.search_for(dependency)),
) )
except ValueError: except ValueError:
return not dependency.marker.is_any(), 0 return not dependency.marker.is_any(), 0
if len(unsatisfied) == 1: if len(unsatisfied) == 1:
dependency = unsatisfied[0] dependency = unsatisfied[0]
else: else:
dependency = min(*unsatisfied, key=_get_min) dependency = min(*unsatisfied, key=_get_min)
locked = self._get_locked(dependency) locked = self._get_locked(dependency)
if locked is None or not dependency.constraint.allows(locked.version): if locked is None:
try: try:
packages = self._provider.search_for(dependency) packages = self._dependency_cache.search_for(dependency)
except ValueError as e: except ValueError as e:
self._add_incompatibility( self._add_incompatibility(
Incompatibility([Term(dependency, True)], PackageNotFoundCau se(e)) Incompatibility([Term(dependency, True)], PackageNotFoundCau se(e))
) )
return dependency.complete_name complete_name: str = dependency.complete_name
return complete_name
try: package = None
version = packages[0] if locked is not None:
except IndexError: package = next(
version = None (
p
for p in packages
if p.package.version == locked.package.version
),
None,
)
if package is None:
with suppress(IndexError):
package = packages[0]
if version is None: if package is None:
# If there are no versions that satisfy the constraint, # If there are no versions that satisfy the constraint,
# add an incompatibility that indicates that. # add an incompatibility that indicates that.
self._add_incompatibility( self._add_incompatibility(
Incompatibility([Term(dependency, True)], NoVersionsCause()) Incompatibility([Term(dependency, True)], NoVersionsCause())
) )
return dependency.complete_name complete_name = dependency.complete_name
return complete_name
else: else:
version = locked package = locked
version = self._provider.complete_package(version) package = self._provider.complete_package(package)
conflict = False conflict = False
for incompatibility in self._provider.incompatibilities_for(version): for incompatibility in self._provider.incompatibilities_for(package):
self._add_incompatibility(incompatibility) self._add_incompatibility(incompatibility)
# If an incompatibility is already satisfied, then selecting version # If an incompatibility is already satisfied, then selecting version
# would cause a conflict. # would cause a conflict.
# #
# We'll continue adding its dependencies, then go back to # We'll continue adding its dependencies, then go back to
# unit propagation which will guide us to choose a better version. # unit propagation which will guide us to choose a better version.
conflict = conflict or all( conflict = conflict or all(
[ term.dependency.complete_name == dependency.complete_name
term.dependency.complete_name == dependency.complete_name or self._solution.satisfies(term)
or self._solution.satisfies(term) for term in incompatibility.terms
for term in incompatibility.terms
]
) )
if not conflict: if not conflict:
self._solution.decide(version) self._solution.decide(package.package)
self._log( self._log(
"selecting {} ({})".format( f"selecting {package.package.complete_name}"
version.complete_name, version.full_pretty_version f" ({package.package.full_pretty_version})"
)
) )
return dependency.complete_name complete_name = dependency.complete_name
return complete_name
def _excludes_single_version(self, constraint): # type: (Any) -> bool def _result(self) -> SolverResult:
return isinstance(VersionRange().difference(constraint), Version)
def _result(self): # type: () -> SolverResult
""" """
Creates a #SolverResult from the decisions in _solution Creates a #SolverResult from the decisions in _solution
""" """
decisions = self._solution.decisions decisions = self._solution.decisions
return SolverResult( return SolverResult(
self._root, self._root,
[p for p in decisions if not p.is_root()], [p for p in decisions if not p.is_root()],
self._solution.attempted_solutions, self._solution.attempted_solutions,
) )
def _add_incompatibility(self, incompatibility): # type: (Incompatibility) def _add_incompatibility(self, incompatibility: Incompatibility) -> None:
-> None self._log(f"fact: {incompatibility}")
self._log("fact: {}".format(incompatibility))
for term in incompatibility.terms: for term in incompatibility.terms:
if term.dependency.complete_name not in self._incompatibilities: if term.dependency.complete_name not in self._incompatibilities:
self._incompatibilities[term.dependency.complete_name] = [] self._incompatibilities[term.dependency.complete_name] = []
if ( if (
incompatibility incompatibility
in self._incompatibilities[term.dependency.complete_name] in self._incompatibilities[term.dependency.complete_name]
): ):
continue continue
self._incompatibilities[term.dependency.complete_name].append( self._incompatibilities[term.dependency.complete_name].append(
incompatibility incompatibility
) )
def _get_locked(self, dependency): # type: (Dependency) -> Union[Package, N one] def _get_locked(self, dependency: Dependency) -> DependencyPackage | None:
if dependency.name in self._use_latest: if dependency.name in self._use_latest:
return return None
locked = self._locked.get(dependency.name)
if not locked:
return
if not dependency.is_same_package_as(locked):
return
return locked locked = self._locked.get(dependency.name, [])
for dependency_package in locked:
package = dependency_package.package
if (
# Locked dependencies are always without features.
# Thus, we can't use is_same_package_as() here because it compar
es
# the complete_name (including features).
dependency.name == package.name
and dependency.is_same_source_as(package)
and dependency.constraint.allows(package.version)
):
return DependencyPackage(dependency, package)
return None
def _log(self, text): def _log(self, text: str) -> None:
self._provider.debug(text, self._solution.attempted_solutions) self._provider.debug(text, self._solution.attempted_solutions)
 End of changes. 57 change blocks. 
137 lines changed or deleted 185 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)