"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "poetry/puzzle/solver.py" between
poetry-1.1.15.tar.gz and poetry-1.2.0.tar.gz

About: Poetry is a tool for dependency management and packaging in Python.

solver.py  (poetry-1.1.15):solver.py  (poetry-1.2.0)
import enum from __future__ import annotations
import time import time
from collections import defaultdict from collections import defaultdict
from contextlib import contextmanager from contextlib import contextmanager
from typing import List from typing import TYPE_CHECKING
from typing import Optional from typing import FrozenSet
from typing import Tuple
from typing import TypeVar
from clikit.io import ConsoleIO from poetry.core.packages.dependency_group import MAIN_GROUP
from poetry.core.packages import Package
from poetry.core.packages.project_package import ProjectPackage
from poetry.installation.operations import Install
from poetry.installation.operations import Uninstall
from poetry.installation.operations import Update
from poetry.installation.operations.operation import Operation
from poetry.mixology import resolve_version from poetry.mixology import resolve_version
from poetry.mixology.failure import SolveFailure from poetry.mixology.failure import SolveFailure
from poetry.packages import DependencyPackage from poetry.packages import DependencyPackage
from poetry.repositories import Pool from poetry.puzzle.exceptions import OverrideNeeded
from poetry.repositories import Repository from poetry.puzzle.exceptions import SolverProblemError
from poetry.utils.env import Env from poetry.puzzle.provider import Provider
from .exceptions import OverrideNeeded if TYPE_CHECKING:
from .exceptions import SolverProblemError from collections.abc import Iterator
from .provider import Provider
from cleo.io.io import IO
from poetry.core.packages.dependency import Dependency
from poetry.core.packages.package import Package
from poetry.core.packages.project_package import ProjectPackage
from poetry.puzzle.transaction import Transaction
from poetry.repositories import Pool
from poetry.utils.env import Env
class Solver: class Solver:
def __init__( def __init__(
self, self,
package, # type: ProjectPackage package: ProjectPackage,
pool, # type: Pool pool: Pool,
installed, # type: Repository installed: list[Package],
locked, # type: Repository locked: list[Package],
io, # type: ConsoleIO io: IO,
remove_untracked=False, # type: bool provider: Provider | None = None,
provider=None, # type: Optional[Provider] ) -> None:
):
self._package = package self._package = package
self._pool = pool self._pool = pool
self._installed = installed self._installed_packages = installed
self._locked = locked self._locked_packages = locked
self._io = io self._io = io
if provider is None: if provider is None:
provider = Provider(self._package, self._pool, self._io) provider = Provider(
self._package, self._pool, self._io, installed=installed
)
self._provider = provider self._provider = provider
self._overrides = [] self._overrides: list[dict[DependencyPackage, dict[str, Dependency]]] =
self._remove_untracked = remove_untracked []
@property @property
def provider(self): # type: () -> Provider def provider(self) -> Provider:
return self._provider return self._provider
@contextmanager @contextmanager
def use_environment(self, env): # type: (Env) -> None def use_environment(self, env: Env) -> Iterator[None]:
with self.provider.use_environment(env): with self.provider.use_environment(env):
yield yield
def solve(self, use_latest=None): # type: (...) -> List[Operation] def solve(self, use_latest: list[str] | None = None) -> Transaction:
from poetry.puzzle.transaction import Transaction
with self._provider.progress(): with self._provider.progress():
start = time.time() start = time.time()
packages, depths = self._solve(use_latest=use_latest) packages, depths = self._solve(use_latest=use_latest)
end = time.time() end = time.time()
if len(self._overrides) > 1: if len(self._overrides) > 1:
self._provider.debug( self._provider.debug(
"Complete version solving took {:.3f} seconds with {} overri f"Complete version solving took {end - start:.3f} seconds wi
des".format( th"
end - start, len(self._overrides) f" {len(self._overrides)} overrides"
)
) )
self._provider.debug( self._provider.debug(
"Resolved with overrides: {}".format( "Resolved with overrides:"
", ".join("({})".format(b) for b in self._overrides) f" {', '.join(f'({b})' for b in self._overrides)}"
)
) )
operations = [] for p in packages:
for i, package in enumerate(packages): if p.yanked:
installed = False message = (
for pkg in self._installed.packages: f"The locked version {p.pretty_version} for {p.pretty_name}
if package.name == pkg.name: is a"
installed = True " yanked version."
)
if pkg.source_type == "git" and package.source_type == "git" if p.yanked_reason:
: message += f" Reason for being yanked: {p.yanked_reason}"
from poetry.core.vcs.git import Git self._io.write_error_line(f"<warning>Warning: {message}</warning
>")
# Trying to find the currently installed version
pkg_source_url = Git.normalize_url(pkg.source_url) return Transaction(
package_source_url = Git.normalize_url(package.source_ur self._locked_packages,
l) list(zip(packages, depths)),
for locked in self._locked.packages: installed_packages=self._installed_packages,
if locked.name != pkg.name or locked.source_type != root_package=self._package,
"git":
continue
locked_source_url = Git.normalize_url(locked.source_
url)
if (
locked.name == pkg.name
and locked.source_type == pkg.source_type
and locked_source_url == pkg_source_url
and locked.source_reference == pkg.source_refere
nce
and locked.source_resolved_reference
== pkg.source_resolved_reference
):
pkg = Package(
pkg.name,
locked.version,
source_type="git",
source_url=locked.source_url,
source_reference=locked.source_reference,
source_resolved_reference=locked.source_reso
lved_reference,
)
break
if pkg_source_url != package_source_url or (
(
not pkg.source_resolved_reference
or not package.source_resolved_reference
)
and pkg.source_reference != package.source_reference
and not pkg.source_reference.startswith(
package.source_reference
)
or (
pkg.source_resolved_reference
and package.source_resolved_reference
and pkg.source_resolved_reference
!= package.source_resolved_reference
and not pkg.source_resolved_reference.startswith
(
package.source_resolved_reference
)
)
):
operations.append(Update(pkg, package, priority=dept
hs[i]))
else:
operations.append(
Install(package).skip("Already installed")
)
elif package.version != pkg.version:
# Checking version
operations.append(Update(pkg, package, priority=depths[i
]))
elif pkg.source_type and package.source_type != pkg.source_t
ype:
operations.append(Update(pkg, package, priority=depths[i
]))
else:
operations.append(
Install(package, priority=depths[i]).skip(
"Already installed"
)
)
break
if not installed:
operations.append(Install(package, priority=depths[i]))
# Checking for removals
for pkg in self._locked.packages:
remove = True
for package in packages:
if pkg.name == package.name:
remove = False
break
if remove:
skip = True
for installed in self._installed.packages:
if installed.name == pkg.name:
skip = False
break
op = Uninstall(pkg)
if skip:
op.skip("Not currently installed")
operations.append(op)
if self._remove_untracked:
locked_names = {locked.name for locked in self._locked.packages}
for installed in self._installed.packages:
if installed.name == self._package.name:
continue
if installed.name in Provider.UNSAFE_PACKAGES:
# Never remove pip, setuptools etc.
continue
if installed.name not in locked_names:
operations.append(Uninstall(installed))
return sorted(
operations, key=lambda o: (-o.priority, o.package.name, o.package.ve
rsion,),
) )
def solve_in_compatibility_mode(self, overrides, use_latest=None): def solve_in_compatibility_mode(
locked = {} self,
for package in self._locked.packages: overrides: tuple[dict[DependencyPackage, dict[str, Dependency]], ...],
locked[package.name] = DependencyPackage(package.to_dependency(), pa use_latest: list[str] | None = None,
ckage) ) -> tuple[list[Package], list[int]]:
packages = [] packages = []
depths = [] depths = []
for override in overrides: for override in overrides:
self._provider.debug( self._provider.debug(
"<comment>Retrying dependency resolution " "<comment>Retrying dependency resolution "
"with the following overrides ({}).</comment>".format(override) f"with the following overrides ({override}).</comment>"
) )
self._provider.set_overrides(override) self._provider.set_overrides(override)
_packages, _depths = self._solve(use_latest=use_latest) _packages, _depths = self._solve(use_latest=use_latest)
for index, package in enumerate(_packages): for index, package in enumerate(_packages):
if package not in packages: if package not in packages:
packages.append(package) packages.append(package)
depths.append(_depths[index]) depths.append(_depths[index])
continue continue
else: else:
idx = packages.index(package) idx = packages.index(package)
pkg = packages[idx] pkg = packages[idx]
depths[idx] = max(depths[idx], _depths[index]) depths[idx] = max(depths[idx], _depths[index])
for dep in package.requires: for dep in package.requires:
if dep not in pkg.requires: if dep not in pkg.requires:
pkg.requires.append(dep) pkg.add_dependency(dep)
return packages, depths return packages, depths
def _solve(self, use_latest=None): def _solve(
self, use_latest: list[str] | None = None
) -> tuple[list[Package], list[int]]:
if self._provider._overrides: if self._provider._overrides:
self._overrides.append(self._provider._overrides) self._overrides.append(self._provider._overrides)
locked = {} locked: dict[str, list[DependencyPackage]] = defaultdict(list)
for package in self._locked.packages: for package in self._locked_packages:
locked[package.name] = DependencyPackage(package.to_dependency(), pa locked[package.name].append(
ckage) DependencyPackage(package.to_dependency(), package)
)
for dependency_packages in locked.values():
dependency_packages.sort(
key=lambda p: p.package.version,
reverse=True,
)
try: try:
result = resolve_version( result = resolve_version(
self._package, self._provider, locked=locked, use_latest=use_lat est self._package, self._provider, locked=locked, use_latest=use_lat est
) )
packages = result.packages packages = result.packages
except OverrideNeeded as e: except OverrideNeeded as e:
return self.solve_in_compatibility_mode(e.overrides, use_latest=use_ latest) return self.solve_in_compatibility_mode(e.overrides, use_latest=use_ latest)
except SolveFailure as e: except SolveFailure as e:
raise SolverProblemError(e) raise SolverProblemError(e)
results = dict( combined_nodes = depth_first_search(PackageNode(self._package, packages)
depth_first_search( )
PackageNode(self._package, packages), aggregate_package_nodes results = dict(aggregate_package_nodes(nodes) for nodes in combined_node
) s)
)
# Merging feature packages with base packages # Merging feature packages with base packages
final_packages = [] final_packages = []
depths = [] depths = []
for package in packages: for package in packages:
if package.features: if package.features:
for _package in packages: for _package in packages:
if ( if (
_package.name == package.name not _package.features
and not _package.is_same_package_as(package) and _package.name == package.name
and _package.version == package.version and _package.version == package.version
): ):
for dep in package.requires: for dep in package.requires:
if dep.is_same_package_as(_package): # Prevent adding base package as a dependency to its
elf
if _package.name == dep.name:
continue continue
if dep not in _package.requires: if dep not in _package.requires:
_package.requires.append(dep) _package.add_dependency(dep)
else:
continue final_packages.append(package)
depths.append(results[package])
final_packages.append(package)
depths.append(results[package])
# Return the packages in their original order with associated depths # Return the packages in their original order with associated depths
return final_packages, depths return final_packages, depths
class DFSNode(object): DFSNodeID = Tuple[str, FrozenSet[str], bool]
def __init__(self, id, name, base_name):
T = TypeVar("T", bound="DFSNode")
class DFSNode:
def __init__(self, id: DFSNodeID, name: str, base_name: str) -> None:
self.id = id self.id = id
self.name = name self.name = name
self.base_name = base_name self.base_name = base_name
def reachable(self): def reachable(self: T) -> list[T]:
return [] return []
def visit(self, parents): def visit(self, parents: list[PackageNode]) -> None:
pass pass
def __str__(self): def __str__(self) -> str:
return str(self.id) return str(self.id)
class VisitedState(enum.Enum): def depth_first_search(source: PackageNode) -> list[list[PackageNode]]:
Unvisited = 0 back_edges: dict[DFSNodeID, list[PackageNode]] = defaultdict(list)
PartiallyVisited = 1 visited: set[DFSNodeID] = set()
Visited = 2 topo_sorted_nodes: list[PackageNode] = []
def depth_first_search(source, aggregator):
back_edges = defaultdict(list)
visited = {}
topo_sorted_nodes = []
dfs_visit(source, back_edges, visited, topo_sorted_nodes) dfs_visit(source, back_edges, visited, topo_sorted_nodes)
# Combine the nodes by name # Combine the nodes by name
combined_nodes = defaultdict(list) combined_nodes: dict[str, list[PackageNode]] = defaultdict(list)
name_children = defaultdict(list)
for node in topo_sorted_nodes: for node in topo_sorted_nodes:
node.visit(back_edges[node.id]) node.visit(back_edges[node.id])
name_children[node.name].extend(node.reachable())
combined_nodes[node.name].append(node) combined_nodes[node.name].append(node)
combined_topo_sorted_nodes = [] combined_topo_sorted_nodes: list[list[PackageNode]] = [
for node in topo_sorted_nodes: combined_nodes.pop(node.name)
if node.name in combined_nodes: for node in topo_sorted_nodes
combined_topo_sorted_nodes.append(combined_nodes.pop(node.name)) if node.name in combined_nodes
results = [
aggregator(nodes, name_children[nodes[0].name])
for nodes in combined_topo_sorted_nodes
] ]
return results
def dfs_visit(node, back_edges, visited, sorted_nodes): return combined_topo_sorted_nodes
if visited.get(node.id, VisitedState.Unvisited) == VisitedState.Visited:
return True def dfs_visit(
if visited.get(node.id, VisitedState.Unvisited) == VisitedState.PartiallyVis node: PackageNode,
ited: back_edges: dict[DFSNodeID, list[PackageNode]],
# We have a circular dependency. visited: set[DFSNodeID],
# Since the dependencies are resolved we can sorted_nodes: list[PackageNode],
# simply skip it because we already have it ) -> None:
return True if node.id in visited:
return
visited.add(node.id)
visited[node.id] = VisitedState.PartiallyVisited
for neighbor in node.reachable(): for neighbor in node.reachable():
back_edges[neighbor.id].append(node) back_edges[neighbor.id].append(node)
if not dfs_visit(neighbor, back_edges, visited, sorted_nodes): dfs_visit(neighbor, back_edges, visited, sorted_nodes)
return False
visited[node.id] = VisitedState.Visited
sorted_nodes.insert(0, node) sorted_nodes.insert(0, node)
return True
class PackageNode(DFSNode): class PackageNode(DFSNode):
def __init__( def __init__(
self, package, packages, previous=None, previous_dep=None, dep=None, self,
): package: Package,
packages: list[Package],
previous: PackageNode | None = None,
previous_dep: Dependency | None = None,
dep: Dependency | None = None,
) -> None:
self.package = package self.package = package
self.packages = packages self.packages = packages
self.previous = previous self.previous = previous
self.previous_dep = previous_dep self.previous_dep = previous_dep
self.dep = dep self.dep = dep
self.depth = -1 self.depth = -1
if not previous: if not previous:
self.category = "dev" self.category = "dev"
self.groups: frozenset[str] = frozenset()
self.optional = True self.optional = True
else: elif dep:
self.category = dep.category self.category = "main" if MAIN_GROUP in dep.groups else "dev"
self.groups = dep.groups
self.optional = dep.is_optional() self.optional = dep.is_optional()
else:
raise ValueError("Both previous and dep must be passed")
super(PackageNode, self).__init__( super().__init__(
(package.complete_name, self.category, self.optional), (package.complete_name, self.groups, self.optional),
package.complete_name, package.complete_name,
package.name, package.name,
) )
def reachable(self): def reachable(self) -> list[PackageNode]:
children = [] # type: List[PackageNode] children: list[PackageNode] = []
if ( if (
self.previous_dep self.dep
and self.previous_dep
and self.previous_dep is not self.dep and self.previous_dep is not self.dep
and self.previous_dep.name == self.dep.name and self.previous_dep.name == self.dep.name
): ):
return [] return []
for dependency in self.package.all_requires: for dependency in self.package.all_requires:
if self.previous and self.previous.name == dependency.name: if self.previous and self.previous.name == dependency.name:
# We have a circular dependency. # We have a circular dependency.
# Since the dependencies are resolved we can # Since the dependencies are resolved we can
# simply skip it because we already have it # simply skip it because we already have it
# N.B. this only catches cycles of length 2; # N.B. this only catches cycles of length 2;
# dependency cycles in general are handled by the DFS traversal # dependency cycles in general are handled by the DFS traversal
continue continue
for pkg in self.packages: for pkg in self.packages:
if pkg.complete_name == dependency.complete_name and ( if (
dependency.constraint.allows(pkg.version) pkg.complete_name == dependency.complete_name
or dependency.allows_prereleases() and (
and pkg.version.is_prerelease() dependency.constraint.allows(pkg.version)
and dependency.constraint.allows(pkg.version.stable) or dependency.allows_prereleases()
): and pkg.version.is_unstable()
# If there is already a child with this name and dependency.constraint.allows(pkg.version.stable)
# we merge the requirements )
if any( and not any(
child.package.name == pkg.name child.package.complete_name == pkg.complete_name
and child.category == dependency.category and child.groups == dependency.groups
for child in children for child in children
): )
continue ):
children.append( children.append(
PackageNode( PackageNode(
pkg, pkg,
self.packages, self.packages,
self, self,
dependency, dependency,
self.dep or dependency, self.dep or dependency,
) )
) )
return children return children
def visit(self, parents): def visit(self, parents: list[PackageNode]) -> None:
# The root package, which has no parents, is defined as having depth -1 # The root package, which has no parents, is defined as having depth -1
# So that the root package's top-level dependencies have depth 0. # So that the root package's top-level dependencies have depth 0.
self.depth = 1 + max( self.depth = 1 + max(
[ [
parent.depth if parent.base_name != self.base_name else parent.d epth - 1 parent.depth if parent.base_name != self.base_name else parent.d epth - 1
for parent in parents for parent in parents
] ]
+ [-2] + [-2]
) )
def aggregate_package_nodes(nodes, children): def aggregate_package_nodes(nodes: list[PackageNode]) -> tuple[Package, int]:
package = nodes[0].package package = nodes[0].package
depth = max(node.depth for node in nodes) depth = max(node.depth for node in nodes)
category = ( groups: list[str] = []
"main" if any(node.category == "main" for node in children + nodes) else for node in nodes:
"dev" groups.extend(node.groups)
)
optional = all(node.optional for node in children + nodes) category = "main" if any(MAIN_GROUP in node.groups for node in nodes) else "
dev"
optional = all(node.optional for node in nodes)
for node in nodes: for node in nodes:
node.depth = depth node.depth = depth
node.category = category node.category = category
node.optional = optional node.optional = optional
package.category = category package.category = category
package.optional = optional package.optional = optional
return package, depth return package, depth
 End of changes. 51 change blocks. 
261 lines changed or deleted 169 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)