locker.py (poetry-1.1.15) | : | locker.py (poetry-1.2.0) | ||
---|---|---|---|---|
from __future__ import annotations | ||||
import json | import json | |||
import logging | import logging | |||
import os | import os | |||
import re | import re | |||
from copy import deepcopy | ||||
from hashlib import sha256 | from hashlib import sha256 | |||
from typing import Dict | from pathlib import Path | |||
from typing import Iterable | from typing import TYPE_CHECKING | |||
from typing import Iterator | from typing import Any | |||
from typing import List | from typing import cast | |||
from typing import Optional | ||||
from typing import Sequence | ||||
from typing import Set | ||||
from typing import Tuple | ||||
from typing import Union | ||||
from poetry.core.packages.dependency import Dependency | ||||
from poetry.core.packages.package import Package | ||||
from poetry.core.semver.helpers import parse_constraint | ||||
from poetry.core.semver.version import Version | ||||
from poetry.core.toml.file import TOMLFile | ||||
from poetry.core.version.markers import parse_marker | ||||
from poetry.core.version.requirements import InvalidRequirement | ||||
from tomlkit import array | from tomlkit import array | |||
from tomlkit import document | from tomlkit import document | |||
from tomlkit import inline_table | from tomlkit import inline_table | |||
from tomlkit import item | from tomlkit import item | |||
from tomlkit import table | from tomlkit import table | |||
from tomlkit.exceptions import TOMLKitError | from tomlkit.exceptions import TOMLKitError | |||
from tomlkit.items import Array | ||||
import poetry.repositories | if TYPE_CHECKING: | |||
from poetry.core.packages.directory_dependency import DirectoryDependency | ||||
from poetry.core.packages.file_dependency import FileDependency | ||||
from poetry.core.packages.url_dependency import URLDependency | ||||
from poetry.core.packages.vcs_dependency import VCSDependency | ||||
from tomlkit.items import Table | ||||
from tomlkit.toml_document import TOMLDocument | ||||
from poetry.core.packages import dependency_from_pep_508 | from poetry.repositories import Repository | |||
from poetry.core.packages.dependency import Dependency | ||||
from poetry.core.packages.package import Package | ||||
from poetry.core.semver import parse_constraint | ||||
from poetry.core.semver.version import Version | ||||
from poetry.core.toml.file import TOMLFile | ||||
from poetry.core.version.markers import parse_marker | ||||
from poetry.core.version.requirements import InvalidRequirement | ||||
from poetry.packages import DependencyPackage | ||||
from poetry.utils._compat import OrderedDict | ||||
from poetry.utils._compat import Path | ||||
from poetry.utils.extras import get_extra_package_names | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | |||
class Locker(object): | class Locker: | |||
_VERSION = "1.1" | _VERSION = "1.1" | |||
_relevant_keys = ["dependencies", "dev-dependencies", "source", "extras"] | _legacy_keys = ["dependencies", "source", "extras", "dev-dependencies"] | |||
_relevant_keys = [*_legacy_keys, "group"] | ||||
def __init__(self, lock, local_config): # type: (Path, dict) -> None | def __init__(self, lock: str | Path, local_config: dict[str, Any]) -> None: | |||
self._lock = TOMLFile(lock) | self._lock = TOMLFile(lock) | |||
self._local_config = local_config | self._local_config = local_config | |||
self._lock_data = None | self._lock_data: TOMLDocument | None = None | |||
self._content_hash = self._get_content_hash() | self._content_hash = self._get_content_hash() | |||
@property | @property | |||
def lock(self): # type: () -> TOMLFile | def lock(self) -> TOMLFile: | |||
return self._lock | return self._lock | |||
@property | @property | |||
def lock_data(self): | def lock_data(self) -> TOMLDocument: | |||
if self._lock_data is None: | if self._lock_data is None: | |||
self._lock_data = self._get_lock_data() | self._lock_data = self._get_lock_data() | |||
return self._lock_data | return self._lock_data | |||
def is_locked(self): # type: () -> bool | def is_locked(self) -> bool: | |||
""" | """ | |||
Checks whether the locker has been locked (lockfile found). | Checks whether the locker has been locked (lockfile found). | |||
""" | """ | |||
if not self._lock.exists(): | if not self._lock.exists(): | |||
return False | return False | |||
return "package" in self.lock_data | return "package" in self.lock_data | |||
def is_fresh(self): # type: () -> bool | def is_fresh(self) -> bool: | |||
""" | """ | |||
Checks whether the lock file is still up to date with the current hash. | Checks whether the lock file is still up to date with the current hash. | |||
""" | """ | |||
lock = self._lock.read() | lock = self._lock.read() | |||
metadata = lock.get("metadata", {}) | metadata = lock.get("metadata", {}) | |||
if "content-hash" in metadata: | if "content-hash" in metadata: | |||
return self._content_hash == lock["metadata"]["content-hash"] | fresh: bool = self._content_hash == metadata["content-hash"] | |||
return fresh | ||||
return False | return False | |||
def locked_repository( | def locked_repository(self) -> Repository: | |||
self, with_dev_reqs=False | ||||
): # type: (bool) -> poetry.repositories.Repository | ||||
""" | """ | |||
Searches and returns a repository of locked packages. | Searches and returns a repository of locked packages. | |||
""" | """ | |||
from poetry.factory import Factory | from poetry.factory import Factory | |||
from poetry.repositories import Repository | ||||
if not self.is_locked(): | if not self.is_locked(): | |||
return poetry.repositories.Repository() | return Repository("poetry-locked") | |||
lock_data = self.lock_data | lock_data = self.lock_data | |||
packages = poetry.repositories.Repository() | packages = Repository("poetry-locked") | |||
locked_packages = cast("list[dict[str, Any]]", lock_data["package"]) | ||||
if with_dev_reqs: | ||||
locked_packages = lock_data["package"] | ||||
else: | ||||
locked_packages = [ | ||||
p for p in lock_data["package"] if p["category"] == "main" | ||||
] | ||||
if not locked_packages: | if not locked_packages: | |||
return packages | return packages | |||
for info in locked_packages: | for info in locked_packages: | |||
source = info.get("source", {}) | source = info.get("source", {}) | |||
source_type = source.get("type") | source_type = source.get("type") | |||
url = source.get("url") | url = source.get("url") | |||
if source_type in ["directory", "file"]: | if source_type in ["directory", "file"]: | |||
url = self._lock.path.parent.joinpath(url).resolve().as_posix() | url = self._lock.path.parent.joinpath(url).resolve().as_posix() | |||
package = Package( | package = Package( | |||
info["name"], | info["name"], | |||
info["version"], | info["version"], | |||
info["version"], | info["version"], | |||
source_type=source_type, | source_type=source_type, | |||
source_url=url, | source_url=url, | |||
source_reference=source.get("reference"), | source_reference=source.get("reference"), | |||
source_resolved_reference=source.get("resolved_reference"), | source_resolved_reference=source.get("resolved_reference"), | |||
source_subdirectory=source.get("subdirectory"), | ||||
) | ) | |||
package.description = info.get("description", "") | package.description = info.get("description", "") | |||
package.category = info["category"] | package.category = info.get("category", "main") | |||
package.optional = info["optional"] | package.optional = info["optional"] | |||
if "hashes" in lock_data["metadata"]: | metadata = cast("dict[str, Any]", lock_data["metadata"]) | |||
name = info["name"] | ||||
if "hashes" in metadata: | ||||
# Old lock so we create dummy files from the hashes | # Old lock so we create dummy files from the hashes | |||
package.files = [ | hashes = cast("dict[str, Any]", metadata["hashes"]) | |||
{"name": h, "hash": h} | package.files = [{"name": h, "hash": h} for h in hashes[name]] | |||
for h in lock_data["metadata"]["hashes"][info["name"]] | ||||
] | ||||
else: | else: | |||
package.files = lock_data["metadata"]["files"][info["name"]] | files = metadata["files"][name] | |||
package.files = files | ||||
package.python_versions = info["python-versions"] | package.python_versions = info["python-versions"] | |||
extras = info.get("extras", {}) | extras = info.get("extras", {}) | |||
if extras: | if extras: | |||
for name, deps in extras.items(): | for name, deps in extras.items(): | |||
package.extras[name] = [] | package.extras[name] = [] | |||
for dep in deps: | for dep in deps: | |||
try: | try: | |||
dependency = dependency_from_pep_508(dep) | dependency = Dependency.create_from_pep_508(dep) | |||
except InvalidRequirement: | except InvalidRequirement: | |||
# handle lock files with invalid PEP 508 | # handle lock files with invalid PEP 508 | |||
m = re.match(r"^(.+?)(?:\[(.+?)])?(?:\s+\((.+)\))?$" , dep) | m = re.match(r"^(.+?)(?:\[(.+?)])?(?:\s+\((.+)\))?$" , dep) | |||
if not m: | ||||
raise | ||||
dep_name = m.group(1) | dep_name = m.group(1) | |||
extras = m.group(2) or "" | extras = m.group(2) or "" | |||
constraint = m.group(3) or "*" | constraint = m.group(3) or "*" | |||
dependency = Dependency( | dependency = Dependency( | |||
dep_name, constraint, extras=extras.split(",") | dep_name, constraint, extras=extras.split(",") | |||
) | ) | |||
package.extras[name].append(dependency) | package.extras[name].append(dependency) | |||
if "marker" in info: | if "marker" in info: | |||
package.marker = parse_marker(info["marker"]) | package.marker = parse_marker(info["marker"]) | |||
skipping to change at line 175 | skipping to change at line 172 | |||
if name == "python": | if name == "python": | |||
dep.python_versions = value | dep.python_versions = value | |||
elif name == "platform": | elif name == "platform": | |||
dep.platform = value | dep.platform = value | |||
split_dep = dep.to_pep_508(False).split(";") | split_dep = dep.to_pep_508(False).split(";") | |||
if len(split_dep) > 1: | if len(split_dep) > 1: | |||
package.marker = parse_marker(split_dep[1].strip()) | package.marker = parse_marker(split_dep[1].strip()) | |||
for dep_name, constraint in info.get("dependencies", {}).items(): | for dep_name, constraint in info.get("dependencies", {}).items(): | |||
root_dir = self._lock.path.parent | root_dir = self._lock.path.parent | |||
if package.source_type == "directory": | if package.source_type == "directory": | |||
# root dir should be the source of the package relative to t | # root dir should be the source of the package relative to t | |||
he lock path | he lock | |||
# path | ||||
assert package.source_url is not None | ||||
root_dir = Path(package.source_url) | root_dir = Path(package.source_url) | |||
if isinstance(constraint, list): | if isinstance(constraint, list): | |||
for c in constraint: | for c in constraint: | |||
package.add_dependency( | package.add_dependency( | |||
Factory.create_dependency(dep_name, c, root_dir=root _dir) | Factory.create_dependency(dep_name, c, root_dir=root _dir) | |||
) | ) | |||
continue | continue | |||
skipping to change at line 200 | skipping to change at line 198 | |||
Factory.create_dependency(dep_name, constraint, root_dir=roo t_dir) | Factory.create_dependency(dep_name, constraint, root_dir=roo t_dir) | |||
) | ) | |||
if "develop" in info: | if "develop" in info: | |||
package.develop = info["develop"] | package.develop = info["develop"] | |||
packages.add_package(package) | packages.add_package(package) | |||
return packages | return packages | |||
@staticmethod | def set_lock_data(self, root: Package, packages: list[Package]) -> bool: | |||
def __get_locked_package( | files: dict[str, Any] = table() | |||
_dependency, packages_by_name | package_specs = self._lock_packages(packages) | |||
): # type: (Dependency, Dict[str, List[Package]]) -> Optional[Package] | ||||
""" | ||||
Internal helper to identify corresponding locked package using dependenc | ||||
y | ||||
version constraints. | ||||
""" | ||||
for _package in packages_by_name.get(_dependency.name, []): | ||||
if _dependency.constraint.allows(_package.version): | ||||
return _package | ||||
return None | ||||
@classmethod | ||||
def __walk_dependency_level( | ||||
cls, | ||||
dependencies, | ||||
level, | ||||
pinned_versions, | ||||
packages_by_name, | ||||
project_level_dependencies, | ||||
nested_dependencies, | ||||
): # type: (List[Dependency], int, bool, Dict[str, List[Package]], Set[str | ||||
], Dict[Tuple[str, str], Dependency]) -> Dict[Tuple[str, str], Dependency] | ||||
if not dependencies: | ||||
return nested_dependencies | ||||
next_level_dependencies = [] | ||||
for requirement in dependencies: | ||||
key = (requirement.name, requirement.pretty_constraint) | ||||
locked_package = cls.__get_locked_package(requirement, packages_by_n | ||||
ame) | ||||
if locked_package: | ||||
# create dependency from locked package to retain dependency met | ||||
adata | ||||
# if this is not done, we can end-up with incorrect nested depen | ||||
dencies | ||||
marker = requirement.marker | ||||
requirement = locked_package.to_dependency() | ||||
requirement.marker = requirement.marker.intersect(marker) | ||||
key = (requirement.name, requirement.pretty_constraint) | ||||
if pinned_versions: | ||||
requirement.set_constraint( | ||||
locked_package.to_dependency().constraint | ||||
) | ||||
for require in locked_package.requires: | ||||
if require.marker.is_empty(): | ||||
require.marker = requirement.marker | ||||
else: | ||||
require.marker = require.marker.intersect(requirement.ma | ||||
rker) | ||||
require.marker = require.marker.intersect(locked_package.mar | ||||
ker) | ||||
if key not in nested_dependencies: | ||||
next_level_dependencies.append(require) | ||||
if requirement.name in project_level_dependencies and level == 0: | ||||
# project level dependencies take precedence | ||||
continue | ||||
if not locked_package: | ||||
# we make a copy to avoid any side-effects | ||||
requirement = deepcopy(requirement) | ||||
if key not in nested_dependencies: | ||||
nested_dependencies[key] = requirement | ||||
else: | ||||
nested_dependencies[key].marker = nested_dependencies[ | ||||
key | ||||
].marker.intersect(requirement.marker) | ||||
return cls.__walk_dependency_level( | ||||
dependencies=next_level_dependencies, | ||||
level=level + 1, | ||||
pinned_versions=pinned_versions, | ||||
packages_by_name=packages_by_name, | ||||
project_level_dependencies=project_level_dependencies, | ||||
nested_dependencies=nested_dependencies, | ||||
) | ||||
@classmethod | ||||
def get_project_dependencies( | ||||
cls, project_requires, locked_packages, pinned_versions=False, with_nest | ||||
ed=False | ||||
): # type: (List[Dependency], List[Package], bool, bool) -> Iterable[Depend | ||||
ency] | ||||
# group packages entries by name, this is required because requirement m | ||||
ight use different constraints | ||||
packages_by_name = {} | ||||
for pkg in locked_packages: | ||||
if pkg.name not in packages_by_name: | ||||
packages_by_name[pkg.name] = [] | ||||
packages_by_name[pkg.name].append(pkg) | ||||
project_level_dependencies = set() | ||||
dependencies = [] | ||||
for dependency in project_requires: | ||||
dependency = deepcopy(dependency) | ||||
locked_package = cls.__get_locked_package(dependency, packages_by_na | ||||
me) | ||||
if locked_package: | ||||
locked_dependency = locked_package.to_dependency() | ||||
locked_dependency.marker = dependency.marker.intersect( | ||||
locked_package.marker | ||||
) | ||||
if not pinned_versions: | ||||
locked_dependency.set_constraint(dependency.constraint) | ||||
dependency = locked_dependency | ||||
project_level_dependencies.add(dependency.name) | ||||
dependencies.append(dependency) | ||||
if not with_nested: | ||||
# return only with project level dependencies | ||||
return dependencies | ||||
nested_dependencies = cls.__walk_dependency_level( | ||||
dependencies=dependencies, | ||||
level=0, | ||||
pinned_versions=pinned_versions, | ||||
packages_by_name=packages_by_name, | ||||
project_level_dependencies=project_level_dependencies, | ||||
nested_dependencies=dict(), | ||||
) | ||||
# Merge same dependencies using marker union | ||||
for requirement in dependencies: | ||||
key = (requirement.name, requirement.pretty_constraint) | ||||
if key not in nested_dependencies: | ||||
nested_dependencies[key] = requirement | ||||
else: | ||||
nested_dependencies[key].marker = nested_dependencies[key].marke | ||||
r.union( | ||||
requirement.marker | ||||
) | ||||
return sorted(nested_dependencies.values(), key=lambda x: x.name.lower() | ||||
) | ||||
def get_project_dependency_packages( | ||||
self, project_requires, dev=False, extras=None | ||||
): # type: (List[Dependency], bool, Optional[Union[bool, Sequence[str]]]) - | ||||
> Iterator[DependencyPackage] | ||||
repository = self.locked_repository(with_dev_reqs=dev) | ||||
# Build a set of all packages required by our selected extras | ||||
extra_package_names = ( | ||||
None if (isinstance(extras, bool) and extras is True) else () | ||||
) | ||||
if extra_package_names is not None: | ||||
extra_package_names = set( | ||||
get_extra_package_names( | ||||
repository.packages, self.lock_data.get("extras", {}), extra | ||||
s or (), | ||||
) | ||||
) | ||||
# If a package is optional and we haven't opted in to it, do not select | ||||
selected = [] | ||||
for dependency in project_requires: | ||||
try: | ||||
package = repository.find_packages(dependency=dependency)[0] | ||||
except IndexError: | ||||
continue | ||||
if extra_package_names is not None and ( | ||||
package.optional and package.name not in extra_package_names | ||||
): | ||||
# a package is locked as optional, but is not activated via extr | ||||
as | ||||
continue | ||||
selected.append(dependency) | ||||
for dependency in self.get_project_dependencies( | ||||
project_requires=selected, | ||||
locked_packages=repository.packages, | ||||
with_nested=True, | ||||
): | ||||
try: | ||||
package = repository.find_packages(dependency=dependency)[0] | ||||
except IndexError: | ||||
continue | ||||
for extra in dependency.extras: | ||||
package.requires_extras.append(extra) | ||||
yield DependencyPackage(dependency=dependency, package=package) | ||||
def set_lock_data(self, root, packages): # type: (...) -> bool | ||||
files = table() | ||||
packages = self._lock_packages(packages) | ||||
# Retrieving hashes | # Retrieving hashes | |||
for package in packages: | for package in package_specs: | |||
if package["name"] not in files: | if package["name"] not in files: | |||
files[package["name"]] = [] | files[package["name"]] = [] | |||
for f in package["files"]: | for f in package["files"]: | |||
file_metadata = inline_table() | file_metadata = inline_table() | |||
for k, v in sorted(f.items()): | for k, v in sorted(f.items()): | |||
file_metadata[k] = v | file_metadata[k] = v | |||
files[package["name"]].append(file_metadata) | files[package["name"]].append(file_metadata) | |||
if files[package["name"]]: | if files[package["name"]]: | |||
files[package["name"]] = item(files[package["name"]]).multiline( | package_files = item(files[package["name"]]) | |||
True) | assert isinstance(package_files, Array) | |||
files[package["name"]] = package_files.multiline(True) | ||||
del package["files"] | del package["files"] | |||
lock = document() | lock = document() | |||
lock["package"] = packages | lock["package"] = package_specs | |||
if root.extras: | if root.extras: | |||
lock["extras"] = { | lock["extras"] = { | |||
extra: [dep.pretty_name for dep in deps] | extra: [dep.pretty_name for dep in deps] | |||
for extra, deps in sorted(root.extras.items()) | for extra, deps in sorted(root.extras.items()) | |||
} | } | |||
lock["metadata"] = OrderedDict( | lock["metadata"] = { | |||
[ | "lock-version": self._VERSION, | |||
("lock-version", self._VERSION), | "python-versions": root.python_versions, | |||
("python-versions", root.python_versions), | "content-hash": self._content_hash, | |||
("content-hash", self._content_hash), | "files": files, | |||
("files", files), | } | |||
] | ||||
) | ||||
if not self.is_locked() or lock != self.lock_data: | if not self.is_locked() or lock != self.lock_data: | |||
self._write_lock_data(lock) | self._write_lock_data(lock) | |||
return True | return True | |||
return False | return False | |||
def _write_lock_data(self, data): | def _write_lock_data(self, data: TOMLDocument) -> None: | |||
self.lock.write(data) | self.lock.write(data) | |||
# Checking lock file data consistency | # Checking lock file data consistency | |||
if data != self.lock.read(): | if data != self.lock.read(): | |||
raise RuntimeError("Inconsistent lock file data.") | raise RuntimeError("Inconsistent lock file data.") | |||
self._lock_data = None | self._lock_data = None | |||
def _get_content_hash(self): # type: () -> str | def _get_content_hash(self) -> str: | |||
""" | """ | |||
Returns the sha256 hash of the sorted content of the pyproject file. | Returns the sha256 hash of the sorted content of the pyproject file. | |||
""" | """ | |||
content = self._local_config | content = self._local_config | |||
relevant_content = {} | relevant_content = {} | |||
for key in self._relevant_keys: | for key in self._relevant_keys: | |||
relevant_content[key] = content.get(key) | data = content.get(key) | |||
content_hash = sha256( | if data is None and key not in self._legacy_keys: | |||
json.dumps(relevant_content, sort_keys=True).encode() | continue | |||
).hexdigest() | ||||
relevant_content[key] = data | ||||
return content_hash | return sha256(json.dumps(relevant_content, sort_keys=True).encode()).hex digest() | |||
def _get_lock_data(self): # type: () -> dict | def _get_lock_data(self) -> TOMLDocument: | |||
if not self._lock.exists(): | if not self._lock.exists(): | |||
raise RuntimeError("No lockfile found. Unable to read locked package s") | raise RuntimeError("No lockfile found. Unable to read locked package s") | |||
try: | try: | |||
lock_data = self._lock.read() | lock_data: TOMLDocument = self._lock.read() | |||
except TOMLKitError as e: | except TOMLKitError as e: | |||
raise RuntimeError("Unable to read the lock file ({}).".format(e)) | raise RuntimeError(f"Unable to read the lock file ({e}).") | |||
lock_version = Version.parse(lock_data["metadata"].get("lock-version", " | metadata = cast("Table", lock_data["metadata"]) | |||
1.0")) | lock_version = Version.parse(metadata.get("lock-version", "1.0")) | |||
current_version = Version.parse(self._VERSION) | current_version = Version.parse(self._VERSION) | |||
# We expect the locker to be able to read lock files | # We expect the locker to be able to read lock files | |||
# from the same semantic versioning range | # from the same semantic versioning range | |||
accepted_versions = parse_constraint( | accepted_versions = parse_constraint( | |||
"^{}".format(Version(current_version.major, 0)) | f"^{Version.from_parts(current_version.major, 0)}" | |||
) | ) | |||
lock_version_allowed = accepted_versions.allows(lock_version) | lock_version_allowed = accepted_versions.allows(lock_version) | |||
if lock_version_allowed and current_version < lock_version: | if lock_version_allowed and current_version < lock_version: | |||
logger.warning( | logger.warning( | |||
"The lock file might not be compatible with the current version | "The lock file might not be compatible with the current version | |||
of Poetry.\n" | of" | |||
"Upgrade Poetry to ensure the lock file is read properly or, alt | " Poetry.\nUpgrade Poetry to ensure the lock file is read proper | |||
ernatively, " | ly or," | |||
"regenerate the lock file with the `poetry lock` command." | " alternatively, regenerate the lock file with the `poetry lock` | |||
" | ||||
" command." | ||||
) | ) | |||
elif not lock_version_allowed: | elif not lock_version_allowed: | |||
raise RuntimeError( | raise RuntimeError( | |||
"The lock file is not compatible with the current version of Poe try.\n" | "The lock file is not compatible with the current version of Poe try.\n" | |||
"Upgrade Poetry to be able to read the lock file or, alternative ly, " | "Upgrade Poetry to be able to read the lock file or, alternative ly, " | |||
"regenerate the lock file with the `poetry lock` command." | "regenerate the lock file with the `poetry lock` command." | |||
) | ) | |||
return lock_data | return lock_data | |||
def _lock_packages( | def _lock_packages(self, packages: list[Package]) -> list[dict[str, Any]]: | |||
self, packages | ||||
): # type: (List['poetry.packages.Package']) -> list | ||||
locked = [] | locked = [] | |||
for package in sorted(packages, key=lambda x: x.name): | for package in sorted( | |||
packages, | ||||
key=lambda x: ( | ||||
x.name, | ||||
x.version, | ||||
x.source_type or "", | ||||
x.source_url or "", | ||||
x.source_subdirectory or "", | ||||
x.source_reference or "", | ||||
x.source_resolved_reference or "", | ||||
), | ||||
): | ||||
spec = self._dump_package(package) | spec = self._dump_package(package) | |||
locked.append(spec) | locked.append(spec) | |||
return locked | return locked | |||
def _dump_package(self, package): # type: (Package) -> dict | def _dump_package(self, package: Package) -> dict[str, Any]: | |||
dependencies = OrderedDict() | dependencies: dict[str, list[Any]] = {} | |||
for dependency in sorted(package.requires, key=lambda d: d.name): | for dependency in sorted( | |||
package.requires, | ||||
key=lambda d: d.name, | ||||
): | ||||
if dependency.pretty_name not in dependencies: | if dependency.pretty_name not in dependencies: | |||
dependencies[dependency.pretty_name] = [] | dependencies[dependency.pretty_name] = [] | |||
constraint = inline_table() | constraint = inline_table() | |||
if dependency.is_directory() or dependency.is_file(): | if dependency.is_directory(): | |||
dependency = cast("DirectoryDependency", dependency) | ||||
constraint["path"] = dependency.path.as_posix() | constraint["path"] = dependency.path.as_posix() | |||
if dependency.is_directory() and dependency.develop: | if dependency.develop: | |||
constraint["develop"] = True | constraint["develop"] = True | |||
elif dependency.is_file(): | ||||
dependency = cast("FileDependency", dependency) | ||||
constraint["path"] = dependency.path.as_posix() | ||||
elif dependency.is_url(): | elif dependency.is_url(): | |||
dependency = cast("URLDependency", dependency) | ||||
constraint["url"] = dependency.url | constraint["url"] = dependency.url | |||
elif dependency.is_vcs(): | elif dependency.is_vcs(): | |||
dependency = cast("VCSDependency", dependency) | ||||
constraint[dependency.vcs] = dependency.source | constraint[dependency.vcs] = dependency.source | |||
if dependency.branch: | if dependency.branch: | |||
constraint["branch"] = dependency.branch | constraint["branch"] = dependency.branch | |||
elif dependency.tag: | elif dependency.tag: | |||
constraint["tag"] = dependency.tag | constraint["tag"] = dependency.tag | |||
elif dependency.rev: | elif dependency.rev: | |||
constraint["rev"] = dependency.rev | constraint["rev"] = dependency.rev | |||
else: | else: | |||
constraint["version"] = str(dependency.pretty_constraint) | constraint["version"] = str(dependency.pretty_constraint) | |||
skipping to change at line 539 | skipping to change at line 376 | |||
if dependency.is_optional(): | if dependency.is_optional(): | |||
constraint["optional"] = True | constraint["optional"] = True | |||
if not dependency.marker.is_any(): | if not dependency.marker.is_any(): | |||
constraint["markers"] = str(dependency.marker) | constraint["markers"] = str(dependency.marker) | |||
dependencies[dependency.pretty_name].append(constraint) | dependencies[dependency.pretty_name].append(constraint) | |||
# All the constraints should have the same type, | # All the constraints should have the same type, | |||
# but we want to simplify them if it's possible | # but we want to simplify them if it's possible | |||
for dependency, constraints in tuple(dependencies.items()): | for dependency_name, constraints in dependencies.items(): | |||
if all( | if all( | |||
len(constraint) == 1 and "version" in constraint | len(constraint) == 1 and "version" in constraint | |||
for constraint in constraints | for constraint in constraints | |||
): | ): | |||
dependencies[dependency] = [ | dependencies[dependency_name] = [ | |||
constraint["version"] for constraint in constraints | constraint["version"] for constraint in constraints | |||
] | ] | |||
data = OrderedDict( | data: dict[str, Any] = { | |||
[ | "name": package.pretty_name, | |||
("name", package.pretty_name), | "version": package.pretty_version, | |||
("version", package.pretty_version), | "description": package.description or "", | |||
("description", package.description or ""), | "category": package.category, | |||
("category", package.category), | "optional": package.optional, | |||
("optional", package.optional), | "python-versions": package.python_versions, | |||
("python-versions", package.python_versions), | "files": sorted( | |||
("files", sorted(package.files, key=lambda x: x["file"])), | package.files, | |||
] | key=lambda x: x["file"], # type: ignore[no-any-return] | |||
) | ), | |||
} | ||||
if dependencies: | if dependencies: | |||
data["dependencies"] = table() | data["dependencies"] = table() | |||
for k, constraints in dependencies.items(): | for k, constraints in dependencies.items(): | |||
if len(constraints) == 1: | if len(constraints) == 1: | |||
data["dependencies"][k] = constraints[0] | data["dependencies"][k] = constraints[0] | |||
else: | else: | |||
data["dependencies"][k] = array().multiline(True) | data["dependencies"][k] = array().multiline(True) | |||
for constraint in constraints: | for constraint in constraints: | |||
data["dependencies"][k].append(constraint) | data["dependencies"][k].append(constraint) | |||
if package.extras: | if package.extras: | |||
extras = OrderedDict() | extras = {} | |||
for name, deps in sorted(package.extras.items()): | for name, deps in sorted(package.extras.items()): | |||
# TODO: This should use dep.to_pep_508() once this is fixed | extras[name] = sorted(dep.base_pep_508_name for dep in deps) | |||
# https://github.com/python-poetry/poetry-core/pull/102 | ||||
extras[name] = sorted( | ||||
dep.base_pep_508_name if not dep.constraint.is_any() else de | ||||
p.name | ||||
for dep in deps | ||||
) | ||||
data["extras"] = extras | data["extras"] = extras | |||
if package.source_url: | if package.source_url: | |||
url = package.source_url | url = package.source_url | |||
if package.source_type in ["file", "directory"]: | if package.source_type in ["file", "directory"]: | |||
# The lock file should only store paths relative to the root pro ject | # The lock file should only store paths relative to the root pro ject | |||
url = Path( | url = Path( | |||
os.path.relpath( | os.path.relpath( | |||
Path(url).as_posix(), self._lock.path.parent.as_posix() | Path(url).resolve(), | |||
Path(self._lock.path.parent).resolve(), | ||||
) | ) | |||
).as_posix() | ).as_posix() | |||
data["source"] = OrderedDict() | data["source"] = {} | |||
if package.source_type: | if package.source_type: | |||
data["source"]["type"] = package.source_type | data["source"]["type"] = package.source_type | |||
data["source"]["url"] = url | data["source"]["url"] = url | |||
if package.source_reference: | if package.source_reference: | |||
data["source"]["reference"] = package.source_reference | data["source"]["reference"] = package.source_reference | |||
if package.source_resolved_reference: | if package.source_resolved_reference: | |||
data["source"]["resolved_reference"] = package.source_resolved_r eference | data["source"]["resolved_reference"] = package.source_resolved_r eference | |||
if package.source_subdirectory: | ||||
data["source"]["subdirectory"] = package.source_subdirectory | ||||
if package.source_type in ["directory", "git"]: | if package.source_type in ["directory", "git"]: | |||
data["develop"] = package.develop | data["develop"] = package.develop | |||
return data | return data | |||
class NullLocker(Locker): | class NullLocker(Locker): | |||
def set_lock_data(self, root, packages): # type: (Package, List[Package]) - > None | def set_lock_data(self, root: Package, packages: list[Package]) -> bool: | |||
pass | pass | |||
End of changes. 63 change blocks. | ||||
323 lines changed or deleted | 142 lines changed or added |