env.py (poetry-1.1.15) | : | env.py (poetry-1.2.0) | ||
---|---|---|---|---|
from __future__ import annotations | ||||
import base64 | import base64 | |||
import contextlib | ||||
import hashlib | import hashlib | |||
import itertools | ||||
import json | import json | |||
import os | import os | |||
import platform | import platform | |||
import plistlib | ||||
import re | import re | |||
import shutil | import subprocess | |||
import sys | import sys | |||
import sysconfig | import sysconfig | |||
import textwrap | import warnings | |||
from contextlib import contextmanager | from contextlib import contextmanager | |||
from copy import deepcopy | from copy import deepcopy | |||
from pathlib import Path | ||||
from subprocess import CalledProcessError | ||||
from typing import TYPE_CHECKING | ||||
from typing import Any | from typing import Any | |||
from typing import Dict | ||||
from typing import List | ||||
from typing import Optional | ||||
from typing import Tuple | ||||
from typing import Union | ||||
import packaging.tags | import packaging.tags | |||
import tomlkit | import tomlkit | |||
import virtualenv | import virtualenv | |||
from clikit.api.io import IO | from cleo.io.outputs.output import Verbosity | |||
from packaging.tags import Tag | from packaging.tags import Tag | |||
from packaging.tags import interpreter_name | from packaging.tags import interpreter_name | |||
from packaging.tags import interpreter_version | from packaging.tags import interpreter_version | |||
from packaging.tags import sys_tags | from packaging.tags import sys_tags | |||
from poetry.core.semver.helpers import parse_constraint | ||||
from poetry.core.semver import parse_constraint | ||||
from poetry.core.semver.version import Version | from poetry.core.semver.version import Version | |||
from poetry.core.toml.file import TOMLFile | from poetry.core.toml.file import TOMLFile | |||
from poetry.core.version.markers import BaseMarker | from poetry.core.utils.helpers import temporary_directory | |||
from poetry.locations import CACHE_DIR | from virtualenv.seed.wheels.embed import get_embed_wheel | |||
from poetry.poetry import Poetry | ||||
from poetry.utils._compat import CalledProcessError | from poetry.utils._compat import WINDOWS | |||
from poetry.utils._compat import Path | ||||
from poetry.utils._compat import decode | from poetry.utils._compat import decode | |||
from poetry.utils._compat import encode | from poetry.utils._compat import encode | |||
from poetry.utils._compat import list_to_shell_command | from poetry.utils._compat import list_to_shell_command | |||
from poetry.utils._compat import subprocess | from poetry.utils._compat import metadata | |||
from poetry.utils.helpers import get_real_windows_path | ||||
from poetry.utils.helpers import is_dir_writable | from poetry.utils.helpers import is_dir_writable | |||
from poetry.utils.helpers import paths_csv | from poetry.utils.helpers import paths_csv | |||
from poetry.utils.helpers import remove_directory | ||||
if TYPE_CHECKING: | ||||
from collections.abc import Iterable | ||||
from collections.abc import Iterator | ||||
from cleo.io.io import IO | ||||
from poetry.core.poetry import Poetry as CorePoetry | ||||
from poetry.core.version.markers import BaseMarker | ||||
from virtualenv.seed.wheels.util import Wheel | ||||
from poetry.poetry import Poetry | ||||
GET_SYS_TAGS = f""" | ||||
import importlib.util | ||||
import json | ||||
import sys | ||||
from pathlib import Path | ||||
spec = importlib.util.spec_from_file_location( | ||||
"packaging", Path(r"{packaging.__file__}") | ||||
) | ||||
packaging = importlib.util.module_from_spec(spec) | ||||
sys.modules[spec.name] = packaging | ||||
spec = importlib.util.spec_from_file_location( | ||||
"packaging.tags", Path(r"{packaging.tags.__file__}") | ||||
) | ||||
packaging_tags = importlib.util.module_from_spec(spec) | ||||
spec.loader.exec_module(packaging_tags) | ||||
print( | ||||
json.dumps([(t.interpreter, t.abi, t.platform) for t in packaging_tags.sys_t | ||||
ags()]) | ||||
) | ||||
""" | ||||
GET_ENVIRONMENT_INFO = """\ | GET_ENVIRONMENT_INFO = """\ | |||
import json | import json | |||
import os | import os | |||
import platform | import platform | |||
import sys | import sys | |||
import sysconfig | import sysconfig | |||
INTERPRETER_SHORT_NAMES = { | INTERPRETER_SHORT_NAMES = { | |||
"python": "py", | "python": "py", | |||
skipping to change at line 71 | skipping to change at line 109 | |||
def interpreter_version(): | def interpreter_version(): | |||
version = sysconfig.get_config_var("interpreter_version") | version = sysconfig.get_config_var("interpreter_version") | |||
if version: | if version: | |||
version = str(version) | version = str(version) | |||
else: | else: | |||
version = _version_nodot(sys.version_info[:2]) | version = _version_nodot(sys.version_info[:2]) | |||
return version | return version | |||
def _version_nodot(version): | def _version_nodot(version): | |||
# type: (PythonVersion) -> str | ||||
if any(v >= 10 for v in version): | if any(v >= 10 for v in version): | |||
sep = "_" | sep = "_" | |||
else: | else: | |||
sep = "" | sep = "" | |||
return sep.join(map(str, version)) | return sep.join(map(str, version)) | |||
if hasattr(sys, "implementation"): | if hasattr(sys, "implementation"): | |||
info = sys.implementation.version | info = sys.implementation.version | |||
iver = "{0.major}.{0.minor}.{0.micro}".format(info) | iver = "{0.major}.{0.minor}.{0.micro}".format(info) | |||
skipping to change at line 105 | skipping to change at line 142 | |||
"platform_machine": platform.machine(), | "platform_machine": platform.machine(), | |||
"platform_release": platform.release(), | "platform_release": platform.release(), | |||
"platform_system": platform.system(), | "platform_system": platform.system(), | |||
"platform_version": platform.version(), | "platform_version": platform.version(), | |||
"python_full_version": platform.python_version(), | "python_full_version": platform.python_version(), | |||
"platform_python_implementation": platform.python_implementation(), | "platform_python_implementation": platform.python_implementation(), | |||
"python_version": ".".join(platform.python_version_tuple()[:2]), | "python_version": ".".join(platform.python_version_tuple()[:2]), | |||
"sys_platform": sys.platform, | "sys_platform": sys.platform, | |||
"version_info": tuple(sys.version_info), | "version_info": tuple(sys.version_info), | |||
# Extra information | # Extra information | |||
"interpreter_name": INTERPRETER_SHORT_NAMES.get(implementation_name, impleme | "interpreter_name": INTERPRETER_SHORT_NAMES.get( | |||
ntation_name), | implementation_name, implementation_name | |||
), | ||||
"interpreter_version": interpreter_version(), | "interpreter_version": interpreter_version(), | |||
} | } | |||
print(json.dumps(env)) | print(json.dumps(env)) | |||
""" | """ | |||
GET_BASE_PREFIX = """\ | GET_BASE_PREFIX = """\ | |||
import sys | import sys | |||
if hasattr(sys, "real_prefix"): | if hasattr(sys, "real_prefix"): | |||
skipping to change at line 129 | skipping to change at line 168 | |||
else: | else: | |||
print(sys.prefix) | print(sys.prefix) | |||
""" | """ | |||
GET_PYTHON_VERSION = """\ | GET_PYTHON_VERSION = """\ | |||
import sys | import sys | |||
print('.'.join([str(s) for s in sys.version_info[:3]])) | print('.'.join([str(s) for s in sys.version_info[:3]])) | |||
""" | """ | |||
GET_PYTHON_VERSION_ONELINER = ( | ||||
"\"import sys; print('.'.join([str(s) for s in sys.version_info[:3]]))\"" | ||||
) | ||||
GET_SYS_PATH = """\ | GET_SYS_PATH = """\ | |||
import json | import json | |||
import sys | import sys | |||
print(json.dumps(sys.path)) | print(json.dumps(sys.path)) | |||
""" | """ | |||
GET_PATHS = """\ | GET_PATHS = """\ | |||
import json | import json | |||
import sysconfig | import sysconfig | |||
skipping to change at line 152 | skipping to change at line 195 | |||
GET_PATHS_FOR_GENERIC_ENVS = """\ | GET_PATHS_FOR_GENERIC_ENVS = """\ | |||
# We can't use sysconfig.get_paths() because | # We can't use sysconfig.get_paths() because | |||
# on some distributions it does not return the proper paths | # on some distributions it does not return the proper paths | |||
# (those used by pip for instance). We go through distutils | # (those used by pip for instance). We go through distutils | |||
# to get the proper ones. | # to get the proper ones. | |||
import json | import json | |||
import site | import site | |||
import sysconfig | import sysconfig | |||
from distutils.command.install import SCHEME_KEYS # noqa | from distutils.command.install import SCHEME_KEYS | |||
from distutils.core import Distribution | from distutils.core import Distribution | |||
d = Distribution() | d = Distribution() | |||
d.parse_config_files() | d.parse_config_files() | |||
obj = d.get_command_obj("install", create=True) | obj = d.get_command_obj("install", create=True) | |||
obj.finalize_options() | obj.finalize_options() | |||
paths = sysconfig.get_paths().copy() | paths = sysconfig.get_paths().copy() | |||
for key in SCHEME_KEYS: | for key in SCHEME_KEYS: | |||
if key == "headers": | if key == "headers": | |||
skipping to change at line 177 | skipping to change at line 220 | |||
if site.check_enableusersite() and hasattr(obj, "install_usersite"): | if site.check_enableusersite() and hasattr(obj, "install_usersite"): | |||
paths["usersite"] = getattr(obj, "install_usersite") | paths["usersite"] = getattr(obj, "install_usersite") | |||
paths["userbase"] = getattr(obj, "install_userbase") | paths["userbase"] = getattr(obj, "install_userbase") | |||
print(json.dumps(paths)) | print(json.dumps(paths)) | |||
""" | """ | |||
class SitePackages: | class SitePackages: | |||
def __init__( | def __init__( | |||
self, path, fallbacks=None, skip_write_checks=False | self, | |||
): # type: (Path, List[Path], bool) -> None | purelib: Path, | |||
self._path = path | platlib: Path | None = None, | |||
fallbacks: list[Path] | None = None, | ||||
skip_write_checks: bool = False, | ||||
) -> None: | ||||
self._purelib = purelib | ||||
self._platlib = platlib or purelib | ||||
if platlib and platlib.resolve() == purelib.resolve(): | ||||
self._platlib = purelib | ||||
self._fallbacks = fallbacks or [] | self._fallbacks = fallbacks or [] | |||
self._skip_write_checks = skip_write_checks | self._skip_write_checks = skip_write_checks | |||
self._candidates = [self._path] + self._fallbacks | ||||
self._candidates: list[Path] = [] | ||||
for path in itertools.chain([self._purelib, self._platlib], self._fallba | ||||
cks): | ||||
if path not in self._candidates: | ||||
self._candidates.append(path) | ||||
self._writable_candidates = None if not skip_write_checks else self._can didates | self._writable_candidates = None if not skip_write_checks else self._can didates | |||
@property | @property | |||
def path(self): # type: () -> Path | def path(self) -> Path: | |||
return self._path | return self._purelib | |||
@property | ||||
def purelib(self) -> Path: | ||||
return self._purelib | ||||
@property | ||||
def platlib(self) -> Path: | ||||
return self._platlib | ||||
@property | @property | |||
def candidates(self): # type: () -> List[Path] | def candidates(self) -> list[Path]: | |||
return self._candidates | return self._candidates | |||
@property | @property | |||
def writable_candidates(self): # type: () -> List[Path] | def writable_candidates(self) -> list[Path]: | |||
if self._writable_candidates is not None: | if self._writable_candidates is not None: | |||
return self._writable_candidates | return self._writable_candidates | |||
self._writable_candidates = [] | self._writable_candidates = [] | |||
for candidate in self._candidates: | for candidate in self._candidates: | |||
if not is_dir_writable(path=candidate, create=True): | if not is_dir_writable(path=candidate, create=True): | |||
continue | continue | |||
self._writable_candidates.append(candidate) | self._writable_candidates.append(candidate) | |||
return self._writable_candidates | return self._writable_candidates | |||
def make_candidates( | def make_candidates( | |||
self, path, writable_only=False | self, path: Path, writable_only: bool = False, strict: bool = False | |||
): # type: (Path, bool) -> List[Path] | ) -> list[Path]: | |||
candidates = self._candidates if not writable_only else self.writable_ca ndidates | candidates = self._candidates if not writable_only else self.writable_ca ndidates | |||
if path.is_absolute(): | if path.is_absolute(): | |||
for candidate in candidates: | for candidate in candidates: | |||
try: | with contextlib.suppress(ValueError): | |||
path.relative_to(candidate) | path.relative_to(candidate) | |||
return [path] | return [path] | |||
except ValueError: | site_type = "writable " if writable_only else "" | |||
pass | raise ValueError( | |||
else: | f"{path} is not relative to any discovered {site_type}sites" | |||
raise ValueError( | ) | |||
"{} is not relative to any discovered {}sites".format( | ||||
path, "writable " if writable_only else "" | results = [candidate / path for candidate in candidates] | |||
if not results and strict: | ||||
raise RuntimeError( | ||||
f'Unable to find a suitable destination for "{path}" in' | ||||
f" {paths_csv(self._candidates)}" | ||||
) | ||||
return results | ||||
def distributions( | ||||
self, name: str | None = None, writable_only: bool = False | ||||
) -> Iterable[metadata.Distribution]: | ||||
path = list( | ||||
map( | ||||
str, self._candidates if not writable_only else self.writable_ca | ||||
ndidates | ||||
) | ||||
) | ||||
yield from metadata.PathDistribution.discover( # type: ignore[no-untype | ||||
d-call] | ||||
name=name, | ||||
path=path, | ||||
) | ||||
def find_distribution( | ||||
self, name: str, writable_only: bool = False | ||||
) -> metadata.Distribution | None: | ||||
for distribution in self.distributions(name=name, writable_only=writable | ||||
_only): | ||||
return distribution | ||||
return None | ||||
def find_distribution_files_with_suffix( | ||||
self, distribution_name: str, suffix: str, writable_only: bool = False | ||||
) -> Iterable[Path]: | ||||
for distribution in self.distributions( | ||||
name=distribution_name, writable_only=writable_only | ||||
): | ||||
assert distribution.files is not None | ||||
for file in distribution.files: | ||||
if file.name.endswith(suffix): | ||||
yield Path( | ||||
distribution.locate_file(file), # type: ignore[no-untyp | ||||
ed-call] | ||||
) | ) | |||
) | ||||
return [candidate / path for candidate in candidates if candidate] | def find_distribution_files_with_name( | |||
self, distribution_name: str, name: str, writable_only: bool = False | ||||
) -> Iterable[Path]: | ||||
for distribution in self.distributions( | ||||
name=distribution_name, writable_only=writable_only | ||||
): | ||||
assert distribution.files is not None | ||||
for file in distribution.files: | ||||
if file.name == name: | ||||
yield Path( | ||||
distribution.locate_file(file), # type: ignore[no-untyp | ||||
ed-call] | ||||
) | ||||
def _path_method_wrapper( | def find_distribution_nspkg_pth_files( | |||
self, path, method, *args, **kwargs | self, distribution_name: str, writable_only: bool = False | |||
): # type: (Path, str, *Any, **Any) -> Union[Tuple[Path, Any], List[Tuple[P | ) -> Iterable[Path]: | |||
ath, Any]]] | return self.find_distribution_files_with_suffix( | |||
distribution_name=distribution_name, | ||||
suffix="-nspkg.pth", | ||||
writable_only=writable_only, | ||||
) | ||||
# TODO: Move to parameters after dropping Python 2.7 | def find_distribution_direct_url_json_files( | |||
return_first = kwargs.pop("return_first", True) | self, distribution_name: str, writable_only: bool = False | |||
writable_only = kwargs.pop("writable_only", False) | ) -> Iterable[Path]: | |||
return self.find_distribution_files_with_name( | ||||
distribution_name=distribution_name, | ||||
name="direct_url.json", | ||||
writable_only=writable_only, | ||||
) | ||||
candidates = self.make_candidates(path, writable_only=writable_only) | def remove_distribution_files(self, distribution_name: str) -> list[Path]: | |||
paths = [] | ||||
if not candidates: | for distribution in self.distributions( | |||
raise RuntimeError( | name=distribution_name, writable_only=True | |||
'Unable to find a suitable destination for "{}" in {}'.format( | ): | |||
str(path), paths_csv(self._candidates) | assert distribution.files is not None | |||
for file in distribution.files: | ||||
path = Path( | ||||
distribution.locate_file(file), # type: ignore[no-untyped-c | ||||
all] | ||||
) | ) | |||
) | # We can't use unlink(missing_ok=True) because it's not always a | |||
vailable | ||||
if path.exists(): | ||||
path.unlink() | ||||
distribution_path: Path = distribution._path # type: ignore[attr-de | ||||
fined] | ||||
if distribution_path.exists(): | ||||
remove_directory(str(distribution_path), force=True) | ||||
paths.append(distribution_path) | ||||
return paths | ||||
def _path_method_wrapper( | ||||
self, | ||||
path: str | Path, | ||||
method: str, | ||||
*args: Any, | ||||
return_first: bool = True, | ||||
writable_only: bool = False, | ||||
**kwargs: Any, | ||||
) -> tuple[Path, Any] | list[tuple[Path, Any]]: | ||||
if isinstance(path, str): | ||||
path = Path(path) | ||||
candidates = self.make_candidates( | ||||
path, writable_only=writable_only, strict=True | ||||
) | ||||
results = [] | results = [] | |||
for candidate in candidates: | for candidate in candidates: | |||
try: | try: | |||
result = candidate, getattr(candidate, method)(*args, **kwargs) | result = candidate, getattr(candidate, method)(*args, **kwargs) | |||
if return_first: | if return_first: | |||
return result | return result | |||
else: | results.append(result) | |||
results.append(result) | except OSError: | |||
except (IOError, OSError): | ||||
# TODO: Replace with PermissionError | # TODO: Replace with PermissionError | |||
pass | pass | |||
if results: | if results: | |||
return results | return results | |||
raise OSError("Unable to access any of {}".format(paths_csv(candidates)) ) | raise OSError(f"Unable to access any of {paths_csv(candidates)}") | |||
def write_text(self, path, *args, **kwargs): # type: (Path, *Any, **Any) -> | def write_text(self, path: str | Path, *args: Any, **kwargs: Any) -> Path: | |||
Path | paths = self._path_method_wrapper(path, "write_text", *args, **kwargs) | |||
return self._path_method_wrapper(path, "write_text", *args, **kwargs)[0] | assert isinstance(paths, tuple) | |||
return paths[0] | ||||
def mkdir(self, path: str | Path, *args: Any, **kwargs: Any) -> Path: | ||||
paths = self._path_method_wrapper(path, "mkdir", *args, **kwargs) | ||||
assert isinstance(paths, tuple) | ||||
return paths[0] | ||||
def mkdir(self, path, *args, **kwargs): # type: (Path, *Any, **Any) -> Path | def exists(self, path: str | Path) -> bool: | |||
return self._path_method_wrapper(path, "mkdir", *args, **kwargs)[0] | ||||
def exists(self, path): # type: (Path) -> bool | ||||
return any( | return any( | |||
value[-1] | value[-1] | |||
for value in self._path_method_wrapper(path, "exists", return_first= False) | for value in self._path_method_wrapper(path, "exists", return_first= False) | |||
) | ) | |||
def find(self, path, writable_only=False): # type: (Path, bool) -> List[Pat | def find( | |||
h] | self, | |||
path: str | Path, | ||||
writable_only: bool = False, | ||||
) -> list[Path]: | ||||
return [ | return [ | |||
value[0] | value[0] | |||
for value in self._path_method_wrapper( | for value in self._path_method_wrapper( | |||
path, "exists", return_first=False, writable_only=writable_only | path, "exists", return_first=False, writable_only=writable_only | |||
) | ) | |||
if value[-1] is True | if value[-1] is True | |||
] | ] | |||
def __getattr__(self, item): | def __getattr__(self, item: str) -> Any: | |||
try: | try: | |||
return super(SitePackages, self).__getattribute__(item) | return super().__getattribute__(item) | |||
except AttributeError: | except AttributeError: | |||
return getattr(self.path, item) | return getattr(self.path, item) | |||
class EnvError(Exception): | class EnvError(Exception): | |||
pass | pass | |||
class EnvCommandError(EnvError): | class EnvCommandError(EnvError): | |||
def __init__(self, e, input=None): # type: (CalledProcessError) -> None | def __init__(self, e: CalledProcessError, input: str | None = None) -> None: | |||
self.e = e | self.e = e | |||
message = "Command {} errored with the following return code {}, and out | message = ( | |||
put: \n{}".format( | f"Command {e.cmd} errored with the following return code {e.returnco | |||
e.cmd, e.returncode, decode(e.output) | de}," | |||
f" and output: \n{decode(e.output)}" | ||||
) | ) | |||
if input: | if input: | |||
message += "input was : {}".format(input) | message += f"input was : {input}" | |||
super(EnvCommandError, self).__init__(message) | super().__init__(message) | |||
class NoCompatiblePythonVersionFound(EnvError): | class NoCompatiblePythonVersionFound(EnvError): | |||
def __init__(self, expected, given=None): | def __init__(self, expected: str, given: str | None = None) -> None: | |||
if given: | if given: | |||
message = ( | message = ( | |||
"The specified Python version ({}) " | f"The specified Python version ({given}) " | |||
"is not supported by the project ({}).\n" | f"is not supported by the project ({expected}).\n" | |||
"Please choose a compatible version " | "Please choose a compatible version " | |||
"or loosen the python constraint specified " | "or loosen the python constraint specified " | |||
"in the pyproject.toml file.".format(given, expected) | "in the pyproject.toml file." | |||
) | ) | |||
else: | else: | |||
message = ( | message = ( | |||
"Poetry was unable to find a compatible version. " | "Poetry was unable to find a compatible version. " | |||
"If you have one, you can explicitly use it " | "If you have one, you can explicitly use it " | |||
'via the "env use" command.' | 'via the "env use" command.' | |||
) | ) | |||
super(NoCompatiblePythonVersionFound, self).__init__(message) | super().__init__(message) | |||
class InvalidCurrentPythonVersionError(EnvError): | ||||
def __init__(self, expected: str, given: str) -> None: | ||||
message = ( | ||||
f"Current Python version ({given}) " | ||||
f"is not allowed by the project ({expected}).\n" | ||||
'Please change python executable via the "env use" command.' | ||||
) | ||||
super().__init__(message) | ||||
class EnvManager(object): | class EnvManager: | |||
""" | """ | |||
Environments manager | Environments manager | |||
""" | """ | |||
_env = None | _env = None | |||
ENVS_FILE = "envs.toml" | ENVS_FILE = "envs.toml" | |||
def __init__(self, poetry): # type: (Poetry) -> None | def __init__(self, poetry: Poetry) -> None: | |||
self._poetry = poetry | self._poetry = poetry | |||
def activate(self, python, io): # type: (str, IO) -> Env | def _full_python_path(self, python: str) -> str: | |||
venv_path = self._poetry.config.get("virtualenvs.path") | try: | |||
if venv_path is None: | executable = decode( | |||
venv_path = Path(CACHE_DIR) / "virtualenvs" | subprocess.check_output( | |||
else: | list_to_shell_command( | |||
venv_path = Path(venv_path) | [python, "-c", '"import sys; print(sys.executable)"'] | |||
), | ||||
shell=True, | ||||
).strip() | ||||
) | ||||
except CalledProcessError as e: | ||||
raise EnvCommandError(e) | ||||
return executable | ||||
def _detect_active_python(self, io: IO) -> str | None: | ||||
executable = None | ||||
try: | ||||
io.write_line( | ||||
"Trying to detect current active python executable as specified | ||||
in the" | ||||
" config.", | ||||
verbosity=Verbosity.VERBOSE, | ||||
) | ||||
executable = self._full_python_path("python") | ||||
io.write_line(f"Found: {executable}", verbosity=Verbosity.VERBOSE) | ||||
except CalledProcessError: | ||||
io.write_line( | ||||
"Unable to detect the current active python executable. Falling | ||||
back to" | ||||
" default.", | ||||
verbosity=Verbosity.VERBOSE, | ||||
) | ||||
return executable | ||||
def activate(self, python: str, io: IO) -> Env: | ||||
venv_path = self._poetry.config.virtualenvs_path | ||||
cwd = self._poetry.file.parent | cwd = self._poetry.file.parent | |||
envs_file = TOMLFile(venv_path / self.ENVS_FILE) | envs_file = TOMLFile(venv_path / self.ENVS_FILE) | |||
try: | try: | |||
python_version = Version.parse(python) | python_version = Version.parse(python) | |||
python = "python{}".format(python_version.major) | python = f"python{python_version.major}" | |||
if python_version.precision > 1: | if python_version.precision > 1: | |||
python += ".{}".format(python_version.minor) | python += f".{python_version.minor}" | |||
except ValueError: | except ValueError: | |||
# Executable in PATH or full executable path | # Executable in PATH or full executable path | |||
pass | pass | |||
python = self._full_python_path(python) | ||||
try: | try: | |||
python_version = decode( | python_version_string = decode( | |||
subprocess.check_output( | subprocess.check_output( | |||
list_to_shell_command( | list_to_shell_command([python, "-c", GET_PYTHON_VERSION_ONEL | |||
[ | INER]), | |||
python, | ||||
"-c", | ||||
"\"import sys; print('.'.join([str(s) for s in sys.v | ||||
ersion_info[:3]]))\"", | ||||
] | ||||
), | ||||
shell=True, | shell=True, | |||
) | ) | |||
) | ) | |||
except CalledProcessError as e: | except CalledProcessError as e: | |||
raise EnvCommandError(e) | raise EnvCommandError(e) | |||
python_version = Version.parse(python_version.strip()) | python_version = Version.parse(python_version_string.strip()) | |||
minor = "{}.{}".format(python_version.major, python_version.minor) | minor = f"{python_version.major}.{python_version.minor}" | |||
patch = python_version.text | patch = python_version.text | |||
create = False | create = False | |||
is_root_venv = self._poetry.config.get("virtualenvs.in-project") | is_root_venv = self._poetry.config.get("virtualenvs.in-project") | |||
# If we are required to create the virtual environment in the root folde r, | # If we are required to create the virtual environment in the root folde r, | |||
# create or recreate it if needed | # create or recreate it if needed | |||
if is_root_venv: | if is_root_venv: | |||
create = False | create = False | |||
venv = self._poetry.file.parent / ".venv" | venv = self._poetry.file.parent / ".venv" | |||
if venv.exists(): | if venv.exists(): | |||
skipping to change at line 406 | skipping to change at line 604 | |||
envs = envs_file.read() | envs = envs_file.read() | |||
current_env = envs.get(base_env_name) | current_env = envs.get(base_env_name) | |||
if current_env is not None: | if current_env is not None: | |||
current_minor = current_env["minor"] | current_minor = current_env["minor"] | |||
current_patch = current_env["patch"] | current_patch = current_env["patch"] | |||
if current_minor == minor and current_patch != patch: | if current_minor == minor and current_patch != patch: | |||
# We need to recreate | # We need to recreate | |||
create = True | create = True | |||
name = "{}-py{}".format(base_env_name, minor) | name = f"{base_env_name}-py{minor}" | |||
venv = venv_path / name | venv = venv_path / name | |||
# Create if needed | # Create if needed | |||
if not venv.exists() or venv.exists() and create: | if not venv.exists() or venv.exists() and create: | |||
in_venv = os.environ.get("VIRTUAL_ENV") is not None | in_venv = os.environ.get("VIRTUAL_ENV") is not None | |||
if in_venv or not venv.exists(): | if in_venv or not venv.exists(): | |||
create = True | create = True | |||
if venv.exists(): | if venv.exists(): | |||
# We need to check if the patch version is correct | # We need to check if the patch version is correct | |||
skipping to change at line 431 | skipping to change at line 629 | |||
create = True | create = True | |||
self.create_venv(io, executable=python, force=create) | self.create_venv(io, executable=python, force=create) | |||
# Activate | # Activate | |||
envs[base_env_name] = {"minor": minor, "patch": patch} | envs[base_env_name] = {"minor": minor, "patch": patch} | |||
envs_file.write(envs) | envs_file.write(envs) | |||
return self.get(reload=True) | return self.get(reload=True) | |||
def deactivate(self, io): # type: (IO) -> None | def deactivate(self, io: IO) -> None: | |||
venv_path = self._poetry.config.get("virtualenvs.path") | venv_path = self._poetry.config.virtualenvs_path | |||
if venv_path is None: | name = self.generate_env_name( | |||
venv_path = Path(CACHE_DIR) / "virtualenvs" | self._poetry.package.name, str(self._poetry.file.parent) | |||
else: | ) | |||
venv_path = Path(venv_path) | ||||
name = self._poetry.package.name | ||||
name = self.generate_env_name(name, str(self._poetry.file.parent)) | ||||
envs_file = TOMLFile(venv_path / self.ENVS_FILE) | envs_file = TOMLFile(venv_path / self.ENVS_FILE) | |||
if envs_file.exists(): | if envs_file.exists(): | |||
envs = envs_file.read() | envs = envs_file.read() | |||
env = envs.get(name) | env = envs.get(name) | |||
if env is not None: | if env is not None: | |||
io.write_line( | venv = venv_path / f"{name}-py{env['minor']}" | |||
"Deactivating virtualenv: <comment>{}</comment>".format( | io.write_line(f"Deactivating virtualenv: <comment>{venv}</commen | |||
venv_path / (name + "-py{}".format(env["minor"])) | t>") | |||
) | ||||
) | ||||
del envs[name] | del envs[name] | |||
envs_file.write(envs) | envs_file.write(envs) | |||
def get(self, reload=False): # type: (bool) -> Env | def get(self, reload: bool = False) -> Env: | |||
if self._env is not None and not reload: | if self._env is not None and not reload: | |||
return self._env | return self._env | |||
python_minor = ".".join([str(v) for v in sys.version_info[:2]]) | python_minor = ".".join([str(v) for v in sys.version_info[:2]]) | |||
venv_path = self._poetry.config.get("virtualenvs.path") | venv_path = self._poetry.config.virtualenvs_path | |||
if venv_path is None: | ||||
venv_path = Path(CACHE_DIR) / "virtualenvs" | ||||
else: | ||||
venv_path = Path(venv_path) | ||||
cwd = self._poetry.file.parent | cwd = self._poetry.file.parent | |||
envs_file = TOMLFile(venv_path / self.ENVS_FILE) | envs_file = TOMLFile(venv_path / self.ENVS_FILE) | |||
env = None | env = None | |||
base_env_name = self.generate_env_name(self._poetry.package.name, str(cw d)) | base_env_name = self.generate_env_name(self._poetry.package.name, str(cw d)) | |||
if envs_file.exists(): | if envs_file.exists(): | |||
envs = envs_file.read() | envs = envs_file.read() | |||
env = envs.get(base_env_name) | env = envs.get(base_env_name) | |||
if env: | if env: | |||
python_minor = env["minor"] | python_minor = env["minor"] | |||
skipping to change at line 488 | skipping to change at line 675 | |||
# Conda sets CONDA_PREFIX in its envs, see | # Conda sets CONDA_PREFIX in its envs, see | |||
# https://github.com/conda/conda/issues/2764 | # https://github.com/conda/conda/issues/2764 | |||
env_prefix = os.environ.get("VIRTUAL_ENV", os.environ.get("CONDA_PREFIX" )) | env_prefix = os.environ.get("VIRTUAL_ENV", os.environ.get("CONDA_PREFIX" )) | |||
conda_env_name = os.environ.get("CONDA_DEFAULT_ENV") | conda_env_name = os.environ.get("CONDA_DEFAULT_ENV") | |||
# It's probably not a good idea to pollute Conda's global "base" env, si nce | # It's probably not a good idea to pollute Conda's global "base" env, si nce | |||
# most users have it activated all the time. | # most users have it activated all the time. | |||
in_venv = env_prefix is not None and conda_env_name != "base" | in_venv = env_prefix is not None and conda_env_name != "base" | |||
if not in_venv or env is not None: | if not in_venv or env is not None: | |||
# Checking if a local virtualenv exists | # Checking if a local virtualenv exists | |||
if self._poetry.config.get("virtualenvs.in-project") is not False: | if ( | |||
if (cwd / ".venv").exists() and (cwd / ".venv").is_dir(): | self._poetry.config.get("virtualenvs.in-project") is not False | |||
venv = cwd / ".venv" | and (cwd / ".venv").exists() | |||
and (cwd / ".venv").is_dir() | ||||
): | ||||
venv = cwd / ".venv" | ||||
return VirtualEnv(venv) | return VirtualEnv(venv) | |||
create_venv = self._poetry.config.get("virtualenvs.create", True) | create_venv = self._poetry.config.get("virtualenvs.create", True) | |||
if not create_venv: | if not create_venv: | |||
return self.get_system_env() | return self.get_system_env() | |||
venv_path = self._poetry.config.get("virtualenvs.path") | venv_path = self._poetry.config.virtualenvs_path | |||
if venv_path is None: | ||||
venv_path = Path(CACHE_DIR) / "virtualenvs" | ||||
else: | ||||
venv_path = Path(venv_path) | ||||
name = "{}-py{}".format(base_env_name, python_minor.strip()) | name = f"{base_env_name}-py{python_minor.strip()}" | |||
venv = venv_path / name | venv = venv_path / name | |||
if not venv.exists(): | if not venv.exists(): | |||
return self.get_system_env() | return self.get_system_env() | |||
return VirtualEnv(venv) | return VirtualEnv(venv) | |||
if env_prefix is not None: | if env_prefix is not None: | |||
prefix = Path(env_prefix) | prefix = Path(env_prefix) | |||
base_prefix = None | base_prefix = None | |||
else: | else: | |||
prefix = Path(sys.prefix) | prefix = Path(sys.prefix) | |||
base_prefix = self.get_base_prefix() | base_prefix = self.get_base_prefix() | |||
return VirtualEnv(prefix, base_prefix) | return VirtualEnv(prefix, base_prefix) | |||
def list(self, name=None): # type: (Optional[str]) -> List[VirtualEnv] | def list(self, name: str | None = None) -> list[VirtualEnv]: | |||
if name is None: | if name is None: | |||
name = self._poetry.package.name | name = self._poetry.package.name | |||
venv_name = self.generate_env_name(name, str(self._poetry.file.parent)) | venv_name = self.generate_env_name(name, str(self._poetry.file.parent)) | |||
venv_path = self._poetry.config.virtualenvs_path | ||||
venv_path = self._poetry.config.get("virtualenvs.path") | ||||
if venv_path is None: | ||||
venv_path = Path(CACHE_DIR) / "virtualenvs" | ||||
else: | ||||
venv_path = Path(venv_path) | ||||
env_list = [ | env_list = [ | |||
VirtualEnv(Path(p)) | VirtualEnv(Path(p)) for p in sorted(venv_path.glob(f"{venv_name}-py* | |||
for p in sorted(venv_path.glob("{}-py*".format(venv_name))) | ")) | |||
] | ] | |||
venv = self._poetry.file.parent / ".venv" | venv = self._poetry.file.parent / ".venv" | |||
if ( | if ( | |||
self._poetry.config.get("virtualenvs.in-project") | self._poetry.config.get("virtualenvs.in-project") | |||
and venv.exists() | and venv.exists() | |||
and venv.is_dir() | and venv.is_dir() | |||
): | ): | |||
env_list.insert(0, VirtualEnv(venv)) | env_list.insert(0, VirtualEnv(venv)) | |||
return env_list | return env_list | |||
def remove(self, python): # type: (str) -> Env | def remove(self, python: str) -> Env: | |||
venv_path = self._poetry.config.get("virtualenvs.path") | venv_path = self._poetry.config.virtualenvs_path | |||
if venv_path is None: | ||||
venv_path = Path(CACHE_DIR) / "virtualenvs" | ||||
else: | ||||
venv_path = Path(venv_path) | ||||
cwd = self._poetry.file.parent | cwd = self._poetry.file.parent | |||
envs_file = TOMLFile(venv_path / self.ENVS_FILE) | envs_file = TOMLFile(venv_path / self.ENVS_FILE) | |||
base_env_name = self.generate_env_name(self._poetry.package.name, str(cw d)) | base_env_name = self.generate_env_name(self._poetry.package.name, str(cw d)) | |||
if python.startswith(base_env_name): | if python.startswith(base_env_name): | |||
venvs = self.list() | venvs = self.list() | |||
for venv in venvs: | for venv in venvs: | |||
if venv.path.name == python: | if venv.path.name == python: | |||
# Exact virtualenv name | # Exact virtualenv name | |||
skipping to change at line 589 | skipping to change at line 764 | |||
if current_env["minor"] == venv_minor: | if current_env["minor"] == venv_minor: | |||
del envs[base_env_name] | del envs[base_env_name] | |||
envs_file.write(envs) | envs_file.write(envs) | |||
self.remove_venv(venv.path) | self.remove_venv(venv.path) | |||
return venv | return venv | |||
raise ValueError( | raise ValueError( | |||
'<warning>Environment "{}" does not exist.</warning>'.format(pyt hon) | f'<warning>Environment "{python}" does not exist.</warning>' | |||
) | ) | |||
try: | try: | |||
python_version = Version.parse(python) | python_version = Version.parse(python) | |||
python = "python{}".format(python_version.major) | python = f"python{python_version.major}" | |||
if python_version.precision > 1: | if python_version.precision > 1: | |||
python += ".{}".format(python_version.minor) | python += f".{python_version.minor}" | |||
except ValueError: | except ValueError: | |||
# Executable in PATH or full executable path | # Executable in PATH or full executable path | |||
pass | pass | |||
try: | try: | |||
python_version = decode( | python_version_string = decode( | |||
subprocess.check_output( | subprocess.check_output( | |||
list_to_shell_command( | list_to_shell_command([python, "-c", GET_PYTHON_VERSION_ONEL | |||
[ | INER]), | |||
python, | ||||
"-c", | ||||
"\"import sys; print('.'.join([str(s) for s in sys.v | ||||
ersion_info[:3]]))\"", | ||||
] | ||||
), | ||||
shell=True, | shell=True, | |||
) | ) | |||
) | ) | |||
except CalledProcessError as e: | except CalledProcessError as e: | |||
raise EnvCommandError(e) | raise EnvCommandError(e) | |||
python_version = Version.parse(python_version.strip()) | python_version = Version.parse(python_version_string.strip()) | |||
minor = "{}.{}".format(python_version.major, python_version.minor) | minor = f"{python_version.major}.{python_version.minor}" | |||
name = "{}-py{}".format(base_env_name, minor) | name = f"{base_env_name}-py{minor}" | |||
venv = venv_path / name | venv_path = venv_path / name | |||
if not venv.exists(): | if not venv_path.exists(): | |||
raise ValueError( | raise ValueError(f'<warning>Environment "{name}" does not exist.</wa | |||
'<warning>Environment "{}" does not exist.</warning>'.format(nam | rning>') | |||
e) | ||||
) | ||||
if envs_file.exists(): | if envs_file.exists(): | |||
envs = envs_file.read() | envs = envs_file.read() | |||
current_env = envs.get(base_env_name) | current_env = envs.get(base_env_name) | |||
if current_env is not None: | if current_env is not None: | |||
current_minor = current_env["minor"] | current_minor = current_env["minor"] | |||
if current_minor == minor: | if current_minor == minor: | |||
del envs[base_env_name] | del envs[base_env_name] | |||
envs_file.write(envs) | envs_file.write(envs) | |||
self.remove_venv(venv) | self.remove_venv(venv_path) | |||
return VirtualEnv(venv, venv) | return VirtualEnv(venv_path, venv_path) | |||
def create_venv( | def create_venv( | |||
self, io, name=None, executable=None, force=False | self, | |||
): # type: (IO, Optional[str], Optional[str], bool) -> Env | io: IO, | |||
name: str | None = None, | ||||
executable: str | None = None, | ||||
force: bool = False, | ||||
) -> Env: | ||||
if self._env is not None and not force: | if self._env is not None and not force: | |||
return self._env | return self._env | |||
cwd = self._poetry.file.parent | cwd = self._poetry.file.parent | |||
env = self.get(reload=True) | env = self.get(reload=True) | |||
if not env.is_sane(): | if not env.is_sane(): | |||
force = True | force = True | |||
if env.is_venv() and not force: | if env.is_venv() and not force: | |||
# Already inside a virtualenv. | # Already inside a virtualenv. | |||
current_python = Version.parse( | ||||
".".join(str(c) for c in env.version_info[:3]) | ||||
) | ||||
if not self._poetry.package.python_constraint.allows(current_python) | ||||
: | ||||
raise InvalidCurrentPythonVersionError( | ||||
self._poetry.package.python_versions, str(current_python) | ||||
) | ||||
return env | return env | |||
create_venv = self._poetry.config.get("virtualenvs.create") | create_venv = self._poetry.config.get("virtualenvs.create") | |||
root_venv = self._poetry.config.get("virtualenvs.in-project") | root_venv = self._poetry.config.get("virtualenvs.in-project") | |||
prefer_active_python = self._poetry.config.get( | ||||
"virtualenvs.prefer-active-python" | ||||
) | ||||
venv_prompt = self._poetry.config.get("virtualenvs.prompt") | ||||
venv_path = self._poetry.config.get("virtualenvs.path") | if not executable and prefer_active_python: | |||
if root_venv: | executable = self._detect_active_python(io) | |||
venv_path = cwd / ".venv" | ||||
elif venv_path is None: | ||||
venv_path = Path(CACHE_DIR) / "virtualenvs" | ||||
else: | ||||
venv_path = Path(venv_path) | ||||
venv_path = cwd / ".venv" if root_venv else self._poetry.config.virtuale nvs_path | ||||
if not name: | if not name: | |||
name = self._poetry.package.name | name = self._poetry.package.name | |||
assert name is not None | ||||
python_patch = ".".join([str(v) for v in sys.version_info[:3]]) | python_patch = ".".join([str(v) for v in sys.version_info[:3]]) | |||
python_minor = ".".join([str(v) for v in sys.version_info[:2]]) | python_minor = ".".join([str(v) for v in sys.version_info[:2]]) | |||
if executable: | if executable: | |||
python_patch = decode( | python_patch = decode( | |||
subprocess.check_output( | subprocess.check_output( | |||
list_to_shell_command( | list_to_shell_command( | |||
[ | [executable, "-c", GET_PYTHON_VERSION_ONELINER] | |||
executable, | ||||
"-c", | ||||
"\"import sys; print('.'.join([str(s) for s in sys.v | ||||
ersion_info[:3]]))\"", | ||||
] | ||||
), | ), | |||
shell=True, | shell=True, | |||
).strip() | ).strip() | |||
) | ) | |||
python_minor = ".".join(python_patch.split(".")[:2]) | python_minor = ".".join(python_patch.split(".")[:2]) | |||
supported_python = self._poetry.package.python_constraint | supported_python = self._poetry.package.python_constraint | |||
if not supported_python.allows(Version.parse(python_patch)): | if not supported_python.allows(Version.parse(python_patch)): | |||
# The currently activated or chosen Python version | # The currently activated or chosen Python version | |||
# is not compatible with the Python constraint specified | # is not compatible with the Python constraint specified | |||
# for the project. | # for the project. | |||
# If an executable has been specified, we stop there | # If an executable has been specified, we stop there | |||
# and notify the user of the incompatibility. | # and notify the user of the incompatibility. | |||
# Otherwise, we try to find a compatible Python version. | # Otherwise, we try to find a compatible Python version. | |||
if executable: | if executable and not prefer_active_python: | |||
raise NoCompatiblePythonVersionFound( | raise NoCompatiblePythonVersionFound( | |||
self._poetry.package.python_versions, python_patch | self._poetry.package.python_versions, python_patch | |||
) | ) | |||
io.write_line( | io.write_error_line( | |||
"<warning>The currently activated Python version {} " | f"<warning>The currently activated Python version {python_patch} | |||
"is not supported by the project ({}).\n" | is not" | |||
"Trying to find and use a compatible version.</warning> ".format | f" supported by the project ({self._poetry.package.python_versio | |||
( | ns}).\n" | |||
python_patch, self._poetry.package.python_versions | "Trying to find and use a compatible version.</warning> " | |||
) | ||||
) | ) | |||
for python_to_try in reversed( | for python_to_try in sorted( | |||
sorted( | self._poetry.package.AVAILABLE_PYTHONS, | |||
self._poetry.package.AVAILABLE_PYTHONS, | key=lambda v: (v.startswith("3"), -len(v), v), | |||
key=lambda v: (v.startswith("3"), -len(v), v), | reverse=True, | |||
) | ||||
): | ): | |||
if len(python_to_try) == 1: | if len(python_to_try) == 1: | |||
if not parse_constraint("^{}.0".format(python_to_try)).allow s_any( | if not parse_constraint(f"^{python_to_try}.0").allows_any( | |||
supported_python | supported_python | |||
): | ): | |||
continue | continue | |||
elif not supported_python.allows_all( | elif not supported_python.allows_any( | |||
parse_constraint(python_to_try + ".*") | parse_constraint(python_to_try + ".*") | |||
): | ): | |||
continue | continue | |||
python = "python" + python_to_try | python = "python" + python_to_try | |||
if io.is_debug(): | if io.is_debug(): | |||
io.write_line("<debug>Trying {}</debug>".format(python)) | io.write_line(f"<debug>Trying {python}</debug>") | |||
try: | try: | |||
python_patch = decode( | python_patch = decode( | |||
subprocess.check_output( | subprocess.check_output( | |||
list_to_shell_command( | list_to_shell_command( | |||
[ | [python, "-c", GET_PYTHON_VERSION_ONELINER] | |||
python, | ||||
"-c", | ||||
"\"import sys; print('.'.join([str(s) for s | ||||
in sys.version_info[:3]]))\"", | ||||
] | ||||
), | ), | |||
stderr=subprocess.STDOUT, | stderr=subprocess.STDOUT, | |||
shell=True, | shell=True, | |||
).strip() | ).strip() | |||
) | ) | |||
except CalledProcessError: | except CalledProcessError: | |||
continue | continue | |||
if not python_patch: | if not python_patch: | |||
continue | continue | |||
if supported_python.allows(Version.parse(python_patch)): | if supported_python.allows(Version.parse(python_patch)): | |||
io.write_line("Using <c1>{}</c1> ({})".format(python, python _patch)) | io.write_line(f"Using <c1>{python}</c1> ({python_patch})") | |||
executable = python | executable = python | |||
python_minor = ".".join(python_patch.split(".")[:2]) | python_minor = ".".join(python_patch.split(".")[:2]) | |||
break | break | |||
if not executable: | if not executable: | |||
raise NoCompatiblePythonVersionFound( | raise NoCompatiblePythonVersionFound( | |||
self._poetry.package.python_versions | self._poetry.package.python_versions | |||
) | ) | |||
if root_venv: | if root_venv: | |||
venv = venv_path | venv = venv_path | |||
else: | else: | |||
name = self.generate_env_name(name, str(cwd)) | name = self.generate_env_name(name, str(cwd)) | |||
name = "{}-py{}".format(name, python_minor.strip()) | name = f"{name}-py{python_minor.strip()}" | |||
venv = venv_path / name | venv = venv_path / name | |||
if venv_prompt is not None: | ||||
venv_prompt = venv_prompt.format( | ||||
project_name=self._poetry.package.name or "virtualenv", | ||||
python_version=python_minor, | ||||
) | ||||
if not venv.exists(): | if not venv.exists(): | |||
if create_venv is False: | if create_venv is False: | |||
io.write_line( | io.write_line( | |||
"<fg=black;bg=yellow>" | "<fg=black;bg=yellow>" | |||
"Skipping virtualenv creation, " | "Skipping virtualenv creation, " | |||
"as specified in config file." | "as specified in config file." | |||
"</>" | "</>" | |||
) | ) | |||
return self.get_system_env() | return self.get_system_env() | |||
io.write_line( | io.write_line( | |||
"Creating virtualenv <c1>{}</> in {}".format(name, str(venv_path | f"Creating virtualenv <c1>{name}</> in" | |||
)) | f" {venv_path if not WINDOWS else get_real_windows_path(venv_pat | |||
h)!s}" | ||||
) | ) | |||
self.build_venv(venv, executable=executable) | ||||
else: | else: | |||
create_venv = False | ||||
if force: | if force: | |||
if not env.is_sane(): | if not env.is_sane(): | |||
io.write_line( | io.write_error_line( | |||
"<warning>The virtual environment found in {} seems to b | f"<warning>The virtual environment found in {env.path} s | |||
e broken.</warning>".format( | eems to" | |||
env.path | " be broken.</warning>" | |||
) | ||||
) | ) | |||
io.write_line( | io.write_line(f"Recreating virtualenv <c1>{name}</> in {venv!s}" | |||
"Recreating virtualenv <c1>{}</> in {}".format(name, str(ven | ) | |||
v)) | ||||
) | ||||
self.remove_venv(venv) | self.remove_venv(venv) | |||
self.build_venv(venv, executable=executable) | create_venv = True | |||
elif io.is_very_verbose(): | elif io.is_very_verbose(): | |||
io.write_line("Virtualenv <c1>{}</> already exists.".format(name | io.write_line(f"Virtualenv <c1>{name}</> already exists.") | |||
)) | ||||
if create_venv: | ||||
self.build_venv( | ||||
venv, | ||||
executable=executable, | ||||
flags=self._poetry.config.get("virtualenvs.options"), | ||||
prompt=venv_prompt, | ||||
) | ||||
# venv detection: | # venv detection: | |||
# stdlib venv may symlink sys.executable, so we can't use realpath. | # stdlib venv may symlink sys.executable, so we can't use realpath. | |||
# but others can symlink *to* the venv Python, | # but others can symlink *to* the venv Python, | |||
# so we can't just use sys.executable. | # so we can't just use sys.executable. | |||
# So we just check every item in the symlink tree (generally <= 3) | # So we just check every item in the symlink tree (generally <= 3) | |||
p = os.path.normcase(sys.executable) | p = os.path.normcase(sys.executable) | |||
paths = [p] | paths = [p] | |||
while os.path.islink(p): | while os.path.islink(p): | |||
p = os.path.normcase(os.path.join(os.path.dirname(p), os.readlink(p) )) | p = os.path.normcase(os.path.join(os.path.dirname(p), os.readlink(p) )) | |||
paths.append(p) | paths.append(p) | |||
p_venv = os.path.normcase(str(venv)) | p_venv = os.path.normcase(str(venv)) | |||
if any(p.startswith(p_venv) for p in paths): | if any(p.startswith(p_venv) for p in paths): | |||
# Running properly in the virtualenv, don't need to do anything | # Running properly in the virtualenv, don't need to do anything | |||
return SystemEnv(Path(sys.prefix), Path(self.get_base_prefix())) | return self.get_system_env() | |||
return VirtualEnv(venv) | return VirtualEnv(venv) | |||
@classmethod | @classmethod | |||
def build_venv( | def build_venv( | |||
cls, path, executable=None | cls, | |||
): # type: (Union[Path,str], Optional[Union[str, Path]]) -> virtualenv.run. | path: Path | str, | |||
session.Session | executable: str | Path | None = None, | |||
flags: dict[str, bool] | None = None, | ||||
with_pip: bool | None = None, | ||||
with_wheel: bool | None = None, | ||||
with_setuptools: bool | None = None, | ||||
prompt: str | None = None, | ||||
) -> virtualenv.run.session.Session: | ||||
if WINDOWS: | ||||
path = get_real_windows_path(path) | ||||
executable = get_real_windows_path(executable) if executable else No | ||||
ne | ||||
flags = flags or {} | ||||
flags["no-pip"] = ( | ||||
not with_pip if with_pip is not None else flags.pop("no-pip", True) | ||||
) | ||||
flags["no-setuptools"] = ( | ||||
not with_setuptools | ||||
if with_setuptools is not None | ||||
else flags.pop("no-setuptools", True) | ||||
) | ||||
# we want wheels to be enabled when pip is required and it has not been | ||||
# explicitly disabled | ||||
flags["no-wheel"] = ( | ||||
not with_wheel | ||||
if with_wheel is not None | ||||
else flags.pop("no-wheel", flags["no-pip"]) | ||||
) | ||||
if isinstance(executable, Path): | if isinstance(executable, Path): | |||
executable = executable.resolve().as_posix() | executable = executable.resolve().as_posix() | |||
return virtualenv.cli_run( | ||||
[ | args = [ | |||
"--no-download", | "--no-download", | |||
"--no-periodic-update", | "--no-periodic-update", | |||
"--python", | "--python", | |||
executable or sys.executable, | executable or sys.executable, | |||
] | ||||
if prompt is not None: | ||||
args.extend(["--prompt", prompt]) | ||||
for flag, value in flags.items(): | ||||
if value is True: | ||||
args.append(f"--{flag}") | ||||
args.append(str(path)) | ||||
cli_result = virtualenv.cli_run(args) | ||||
# Exclude the venv folder from from macOS Time Machine backups | ||||
# TODO: Add backup-ignore markers for other platforms too | ||||
if sys.platform == "darwin": | ||||
import xattr | ||||
xattr.setxattr( | ||||
str(path), | str(path), | |||
] | "com.apple.metadata:com_apple_backup_excludeItem", | |||
) | plistlib.dumps("com.apple.backupd", fmt=plistlib.FMT_BINARY), | |||
) | ||||
return cli_result | ||||
@classmethod | @classmethod | |||
def remove_venv(cls, path): # type: (Union[Path,str]) -> None | def remove_venv(cls, path: Path | str) -> None: | |||
if isinstance(path, str): | if isinstance(path, str): | |||
path = Path(path) | path = Path(path) | |||
assert path.is_dir() | assert path.is_dir() | |||
try: | try: | |||
shutil.rmtree(str(path)) | remove_directory(path) | |||
return | return | |||
except OSError as e: | except OSError as e: | |||
# Continue only if e.errno == 16 | # Continue only if e.errno == 16 | |||
if e.errno != 16: # ERRNO 16: Device or resource busy | if e.errno != 16: # ERRNO 16: Device or resource busy | |||
raise e | raise e | |||
# Delete all files and folders but the toplevel one. This is because som etimes | # Delete all files and folders but the toplevel one. This is because som etimes | |||
# the venv folder is mounted by the OS, such as in a docker volume. In s uch | # the venv folder is mounted by the OS, such as in a docker volume. In s uch | |||
# cases, an attempt to delete the folder itself will result in an `OSErr or`. | # cases, an attempt to delete the folder itself will result in an `OSErr or`. | |||
# See https://github.com/python-poetry/poetry/pull/2064 | # See https://github.com/python-poetry/poetry/pull/2064 | |||
for file_path in path.iterdir(): | for file_path in path.iterdir(): | |||
if file_path.is_file() or file_path.is_symlink(): | if file_path.is_file() or file_path.is_symlink(): | |||
file_path.unlink() | file_path.unlink() | |||
elif file_path.is_dir(): | elif file_path.is_dir(): | |||
shutil.rmtree(str(file_path)) | remove_directory(file_path, force=True) | |||
@classmethod | @classmethod | |||
def get_system_env( | def get_system_env(cls, naive: bool = False) -> Env: | |||
cls, naive=False | ||||
): # type: (bool) -> Union["SystemEnv", "GenericEnv"] | ||||
""" | """ | |||
Retrieve the current Python environment. | Retrieve the current Python environment. | |||
This can be the base Python environment or an activated virtual environm ent. | This can be the base Python environment or an activated virtual environm ent. | |||
This method also works around the issue that the virtual environment | ||||
This method also workaround the issue that the virtual environment | ||||
used by Poetry internally (when installed via the custom installer) | used by Poetry internally (when installed via the custom installer) | |||
is incorrectly detected as the system environment. Note that this workar ound | is incorrectly detected as the system environment. Note that this workar ound | |||
happens only when `naive` is False since there are times where we actual ly | happens only when `naive` is False since there are times where we actual ly | |||
want to retrieve Poetry's custom virtual environment | want to retrieve Poetry's custom virtual environment | |||
(e.g. plugin installation or self update). | (e.g. plugin installation or self update). | |||
""" | """ | |||
prefix, base_prefix = Path(sys.prefix), Path(cls.get_base_prefix()) | prefix, base_prefix = Path(sys.prefix), Path(cls.get_base_prefix()) | |||
env = SystemEnv(prefix) | env: Env = SystemEnv(prefix) | |||
if not naive: | if not naive: | |||
if prefix.joinpath("poetry_env").exists(): | if prefix.joinpath("poetry_env").exists(): | |||
env = GenericEnv(base_prefix, child_env=env) | env = GenericEnv(base_prefix, child_env=env) | |||
else: | else: | |||
from poetry.locations import data_dir | from poetry.locations import data_dir | |||
try: | try: | |||
prefix.relative_to(data_dir()) | prefix.relative_to(data_dir()) | |||
except ValueError: | except ValueError: | |||
pass | pass | |||
else: | else: | |||
env = GenericEnv(base_prefix, child_env=env) | env = GenericEnv(base_prefix, child_env=env) | |||
return env | return env | |||
@classmethod | @classmethod | |||
def get_base_prefix(cls): # type: () -> str | def get_base_prefix(cls) -> Path: | |||
if hasattr(sys, "real_prefix"): | real_prefix = getattr(sys, "real_prefix", None) | |||
return sys.real_prefix | if real_prefix is not None: | |||
return Path(real_prefix) | ||||
if hasattr(sys, "base_prefix"): | ||||
return sys.base_prefix | base_prefix = getattr(sys, "base_prefix", None) | |||
if base_prefix is not None: | ||||
return Path(base_prefix) | ||||
return sys.prefix | return Path(sys.prefix) | |||
@classmethod | @classmethod | |||
def generate_env_name(cls, name, cwd): # type: (str, str) -> str | def generate_env_name(cls, name: str, cwd: str) -> str: | |||
name = name.lower() | name = name.lower() | |||
sanitized_name = re.sub(r'[ $`!*@"\\\r\n\t]', "_", name)[:42] | sanitized_name = re.sub(r'[ $`!*@"\\\r\n\t]', "_", name)[:42] | |||
h = hashlib.sha256(encode(cwd)).digest() | normalized_cwd = os.path.normcase(os.path.realpath(cwd)) | |||
h = base64.urlsafe_b64encode(h).decode()[:8] | h_bytes = hashlib.sha256(encode(normalized_cwd)).digest() | |||
h_str = base64.urlsafe_b64encode(h_bytes).decode()[:8] | ||||
return "{}-{}".format(sanitized_name, h) | return f"{sanitized_name}-{h_str}" | |||
class Env(object): | class Env: | |||
""" | """ | |||
An abstract Python environment. | An abstract Python environment. | |||
""" | """ | |||
def __init__(self, path, base=None): # type: (Path, Optional[Path]) -> None | def __init__(self, path: Path, base: Path | None = None) -> None: | |||
self._is_windows = sys.platform == "win32" | self._is_windows = sys.platform == "win32" | |||
self._is_mingw = sysconfig.get_platform().startswith("mingw") | self._is_mingw = sysconfig.get_platform().startswith("mingw") | |||
self._is_conda = bool(os.environ.get("CONDA_DEFAULT_ENV")) | self._is_conda = bool(os.environ.get("CONDA_DEFAULT_ENV")) | |||
if self._is_windows: | ||||
path = get_real_windows_path(path) | ||||
base = get_real_windows_path(base) if base else None | ||||
if not self._is_windows or self._is_mingw: | if not self._is_windows or self._is_mingw: | |||
bin_dir = "bin" | bin_dir = "bin" | |||
else: | else: | |||
bin_dir = "Scripts" | bin_dir = "Scripts" | |||
self._path = path | self._path = path | |||
self._bin_dir = self._path / bin_dir | self._bin_dir = self._path / bin_dir | |||
self._base = base or path | ||||
self._executable = "python" | self._executable = "python" | |||
self._pip_executable = "pip" | self._pip_executable = "pip" | |||
self.find_executables() | self.find_executables() | |||
self._marker_env = None | self._base = base or path | |||
self._pip_version = None | ||||
self._site_packages = None | self._marker_env: dict[str, Any] | None = None | |||
self._paths = None | self._pip_version: Version | None = None | |||
self._supported_tags = None | self._site_packages: SitePackages | None = None | |||
self._purelib = None | self._paths: dict[str, str] | None = None | |||
self._platlib = None | self._supported_tags: list[Tag] | None = None | |||
self._script_dirs = None | self._purelib: Path | None = None | |||
self._platlib: Path | None = None | ||||
self._script_dirs: list[Path] | None = None | ||||
self._embedded_pip_path: str | None = None | ||||
@property | @property | |||
def path(self): # type: () -> Path | def path(self) -> Path: | |||
return self._path | return self._path | |||
@property | @property | |||
def base(self): # type: () -> Path | def base(self) -> Path: | |||
return self._base | return self._base | |||
@property | @property | |||
def version_info(self): # type: () -> Tuple[int] | def version_info(self) -> tuple[Any, ...]: | |||
return tuple(self.marker_env["version_info"]) | return tuple(self.marker_env["version_info"]) | |||
@property | @property | |||
def python_implementation(self): # type: () -> str | def python_implementation(self) -> str: | |||
return self.marker_env["platform_python_implementation"] | implementation: str = self.marker_env["platform_python_implementation"] | |||
return implementation | ||||
@property | @property | |||
def python(self): # type: () -> str | def python(self) -> str: | |||
""" | """ | |||
Path to current python executable | Path to current python executable | |||
""" | """ | |||
return self._bin(self._executable) | return self._bin(self._executable) | |||
@property | @property | |||
def marker_env(self): | def marker_env(self) -> dict[str, Any]: | |||
if self._marker_env is None: | if self._marker_env is None: | |||
self._marker_env = self.get_marker_env() | self._marker_env = self.get_marker_env() | |||
return self._marker_env | return self._marker_env | |||
@property | @property | |||
def parent_env(self): # type: () -> GenericEnv | def parent_env(self) -> GenericEnv: | |||
return GenericEnv(self.base, child_env=self) | return GenericEnv(self.base, child_env=self) | |||
def _find_python_executable(self) -> None: | ||||
bin_dir = self._bin_dir | ||||
if self._is_windows and self._is_conda: | ||||
bin_dir = self._path | ||||
python_executables = sorted( | ||||
p.name | ||||
for p in bin_dir.glob("python*") | ||||
if re.match(r"python(?:\d+(?:\.\d+)?)?(?:\.exe)?$", p.name) | ||||
) | ||||
if python_executables: | ||||
executable = python_executables[0] | ||||
if executable.endswith(".exe"): | ||||
executable = executable[:-4] | ||||
self._executable = executable | ||||
def _find_pip_executable(self) -> None: | ||||
pip_executables = sorted( | ||||
p.name | ||||
for p in self._bin_dir.glob("pip*") | ||||
if re.match(r"pip(?:\d+(?:\.\d+)?)?(?:\.exe)?$", p.name) | ||||
) | ||||
if pip_executables: | ||||
pip_executable = pip_executables[0] | ||||
if pip_executable.endswith(".exe"): | ||||
pip_executable = pip_executable[:-4] | ||||
self._pip_executable = pip_executable | ||||
def find_executables(self) -> None: | ||||
self._find_python_executable() | ||||
self._find_pip_executable() | ||||
def get_embedded_wheel(self, distribution: str) -> Path: | ||||
wheel: Wheel = get_embed_wheel( | ||||
distribution, f"{self.version_info[0]}.{self.version_info[1]}" | ||||
) | ||||
path: Path = wheel.path | ||||
return path | ||||
@property | ||||
def pip_embedded(self) -> str: | ||||
if self._embedded_pip_path is None: | ||||
self._embedded_pip_path = str(self.get_embedded_wheel("pip") / "pip" | ||||
) | ||||
return self._embedded_pip_path | ||||
@property | @property | |||
def pip(self): # type: () -> str | def pip(self) -> str: | |||
""" | """ | |||
Path to current pip executable | Path to current pip executable | |||
""" | """ | |||
return self._bin(self._pip_executable) | # we do not use as_posix() here due to issues with windows pathlib2 | |||
# implementation | ||||
path = self._bin(self._pip_executable) | ||||
if not Path(path).exists(): | ||||
return str(self.pip_embedded) | ||||
return path | ||||
@property | @property | |||
def platform(self): # type: () -> str | def platform(self) -> str: | |||
return sys.platform | return sys.platform | |||
@property | @property | |||
def os(self): # type: () -> str | def os(self) -> str: | |||
return os.name | return os.name | |||
@property | @property | |||
def pip_version(self): | def pip_version(self) -> Version: | |||
if self._pip_version is None: | if self._pip_version is None: | |||
self._pip_version = self.get_pip_version() | self._pip_version = self.get_pip_version() | |||
return self._pip_version | return self._pip_version | |||
@property | @property | |||
def site_packages(self): # type: () -> SitePackages | def site_packages(self) -> SitePackages: | |||
if self._site_packages is None: | if self._site_packages is None: | |||
# we disable write checks if no user site exist | # we disable write checks if no user site exist | |||
fallbacks = [self.usersite] if self.usersite else [] | fallbacks = [self.usersite] if self.usersite else [] | |||
self._site_packages = SitePackages( | self._site_packages = SitePackages( | |||
self.purelib, fallbacks, skip_write_checks=False if fallbacks el | self.purelib, | |||
se True | self.platlib, | |||
fallbacks, | ||||
skip_write_checks=not fallbacks, | ||||
) | ) | |||
return self._site_packages | return self._site_packages | |||
@property | @property | |||
def usersite(self): # type: () -> Optional[Path] | def usersite(self) -> Path | None: | |||
if "usersite" in self.paths: | if "usersite" in self.paths: | |||
return Path(self.paths["usersite"]) | return Path(self.paths["usersite"]) | |||
return None | ||||
@property | @property | |||
def userbase(self): # type: () -> Optional[Path] | def userbase(self) -> Path | None: | |||
if "userbase" in self.paths: | if "userbase" in self.paths: | |||
return Path(self.paths["userbase"]) | return Path(self.paths["userbase"]) | |||
return None | ||||
@property | @property | |||
def purelib(self): # type: () -> Path | def purelib(self) -> Path: | |||
if self._purelib is None: | if self._purelib is None: | |||
self._purelib = Path(self.paths["purelib"]) | self._purelib = Path(self.paths["purelib"]) | |||
return self._purelib | return self._purelib | |||
@property | @property | |||
def platlib(self): # type: () -> Path | def platlib(self) -> Path: | |||
if self._platlib is None: | if self._platlib is None: | |||
if "platlib" in self.paths: | if "platlib" in self.paths: | |||
self._platlib = Path(self.paths["platlib"]) | self._platlib = Path(self.paths["platlib"]) | |||
else: | else: | |||
self._platlib = self.purelib | self._platlib = self.purelib | |||
return self._platlib | return self._platlib | |||
def is_path_relative_to_lib(self, path): # type: (Path) -> bool | def is_path_relative_to_lib(self, path: Path) -> bool: | |||
for lib_path in [self.purelib, self.platlib]: | for lib_path in [self.purelib, self.platlib]: | |||
try: | with contextlib.suppress(ValueError): | |||
path.relative_to(lib_path) | path.relative_to(lib_path) | |||
return True | return True | |||
except ValueError: | ||||
pass | ||||
return False | return False | |||
@property | @property | |||
def sys_path(self): # type: () -> List[str] | def sys_path(self) -> list[str]: | |||
raise NotImplementedError() | raise NotImplementedError() | |||
@property | @property | |||
def paths(self): # type: () -> Dict[str, str] | def paths(self) -> dict[str, str]: | |||
if self._paths is None: | if self._paths is None: | |||
self._paths = self.get_paths() | self._paths = self.get_paths() | |||
return self._paths | return self._paths | |||
@property | @property | |||
def supported_tags(self): # type: () -> List[Tag] | def supported_tags(self) -> list[Tag]: | |||
if self._supported_tags is None: | if self._supported_tags is None: | |||
self._supported_tags = self.get_supported_tags() | self._supported_tags = self.get_supported_tags() | |||
return self._supported_tags | return self._supported_tags | |||
@classmethod | @classmethod | |||
def get_base_prefix(cls): # type: () -> str | def get_base_prefix(cls) -> Path: | |||
if hasattr(sys, "real_prefix"): | real_prefix = getattr(sys, "real_prefix", None) | |||
return sys.real_prefix | if real_prefix is not None: | |||
return Path(real_prefix) | ||||
if hasattr(sys, "base_prefix"): | ||||
return sys.base_prefix | base_prefix = getattr(sys, "base_prefix", None) | |||
if base_prefix is not None: | ||||
return sys.prefix | return Path(base_prefix) | |||
def _find_python_executable(self): # type: () -> None | ||||
bin_dir = self._bin_dir | ||||
if self._is_windows and self._is_conda: | ||||
bin_dir = self._path | ||||
python_executables = sorted( | ||||
p.name | ||||
for p in bin_dir.glob("python*") | ||||
if re.match(r"python(?:\d+(?:\.\d+)?)?(?:\.exe)?$", p.name) | ||||
) | ||||
if python_executables: | ||||
executable = python_executables[0] | ||||
if executable.endswith(".exe"): | ||||
executable = executable[:-4] | ||||
self._executable = executable | ||||
def _find_pip_executable(self): # type: () -> None | ||||
pip_executables = sorted( | ||||
p.name | ||||
for p in self._bin_dir.glob("pip*") | ||||
if re.match(r"pip(?:\d+(?:\.\d+)?)?(?:\.exe)?$", p.name) | ||||
) | ||||
if pip_executables: | ||||
pip_executable = pip_executables[0] | ||||
if pip_executable.endswith(".exe"): | ||||
pip_executable = pip_executable[:-4] | ||||
self._pip_executable = pip_executable | return Path(sys.prefix) | |||
def find_executables(self): # type: () -> None | def get_version_info(self) -> tuple[Any, ...]: | |||
self._find_python_executable() | ||||
self._find_pip_executable() | ||||
def get_version_info(self): # type: () -> Tuple[int] | ||||
raise NotImplementedError() | raise NotImplementedError() | |||
def get_python_implementation(self): # type: () -> str | def get_python_implementation(self) -> str: | |||
raise NotImplementedError() | raise NotImplementedError() | |||
def get_marker_env(self): # type: () -> Dict[str, Any] | def get_marker_env(self) -> dict[str, Any]: | |||
raise NotImplementedError() | raise NotImplementedError() | |||
def get_pip_command(self): # type: () -> List[str] | def get_pip_command(self, embedded: bool = False) -> list[str]: | |||
raise NotImplementedError() | raise NotImplementedError() | |||
def get_supported_tags(self): # type: () -> List[Tag] | def get_supported_tags(self) -> list[Tag]: | |||
raise NotImplementedError() | raise NotImplementedError() | |||
def get_pip_version(self): # type: () -> Version | def get_pip_version(self) -> Version: | |||
raise NotImplementedError() | raise NotImplementedError() | |||
def get_paths(self): # type: () -> Dict[str, str] | def get_paths(self) -> dict[str, str]: | |||
raise NotImplementedError() | raise NotImplementedError() | |||
def is_valid_for_marker(self, marker): # type: (BaseMarker) -> bool | def is_valid_for_marker(self, marker: BaseMarker) -> bool: | |||
return marker.validate(self.marker_env) | valid: bool = marker.validate(self.marker_env) | |||
return valid | ||||
def is_sane(self): # type: () -> bool | def is_sane(self) -> bool: | |||
""" | """ | |||
Checks whether the current environment is sane or not. | Checks whether the current environment is sane or not. | |||
""" | """ | |||
return True | return True | |||
def run(self, bin, *args, **kwargs): | def get_command_from_bin(self, bin: str) -> list[str]: | |||
bin = self._bin(bin) | if bin == "pip": | |||
cmd = [bin] + list(args) | # when pip is required we need to ensure that we fallback to | |||
return self._run(cmd, **kwargs) | # embedded pip when pip is not available in the environment | |||
return self.get_pip_command() | ||||
def run_python(self, *args, **kwargs): | return [self._bin(bin)] | |||
return self.run(self._executable, *args, **kwargs) | ||||
def run_pip(self, *args, **kwargs): | def run(self, bin: str, *args: str, **kwargs: Any) -> str | int: | |||
pip = self.get_pip_command() | cmd = self.get_command_from_bin(bin) + list(args) | |||
return self._run(cmd, **kwargs) | ||||
def run_pip(self, *args: str, **kwargs: Any) -> int | str: | ||||
pip = self.get_pip_command(embedded=True) | ||||
cmd = pip + list(args) | cmd = pip + list(args) | |||
return self._run(cmd, **kwargs) | return self._run(cmd, **kwargs) | |||
def run_python_script(self, content, **kwargs): # type: (str, Any) -> str | def run_python_script(self, content: str, **kwargs: Any) -> int | str: | |||
return self.run(self._executable, "-W", "ignore", "-", input_=content, * *kwargs) | return self.run(self._executable, "-W", "ignore", "-", input_=content, * *kwargs) | |||
def _run(self, cmd, **kwargs): | def _run(self, cmd: list[str], **kwargs: Any) -> int | str: | |||
""" | """ | |||
Run a command inside the Python environment. | Run a command inside the Python environment. | |||
""" | """ | |||
call = kwargs.pop("call", False) | call = kwargs.pop("call", False) | |||
input_ = kwargs.pop("input_", None) | input_ = kwargs.pop("input_", None) | |||
env = kwargs.pop("env", dict(os.environ)) | ||||
try: | try: | |||
if self._is_windows: | if self._is_windows: | |||
kwargs["shell"] = True | kwargs["shell"] = True | |||
command: str | list[str] | ||||
if kwargs.get("shell", False): | if kwargs.get("shell", False): | |||
cmd = list_to_shell_command(cmd) | command = list_to_shell_command(cmd) | |||
else: | ||||
command = cmd | ||||
if input_: | if input_: | |||
output = subprocess.run( | output = subprocess.run( | |||
cmd, | command, | |||
stdout=subprocess.PIPE, | stdout=subprocess.PIPE, | |||
stderr=subprocess.STDOUT, | stderr=subprocess.STDOUT, | |||
input=encode(input_), | input=encode(input_), | |||
check=True, | check=True, | |||
**kwargs | **kwargs, | |||
).stdout | ).stdout | |||
elif call: | elif call: | |||
return subprocess.call(cmd, stderr=subprocess.STDOUT, **kwargs) | return subprocess.call( | |||
command, stderr=subprocess.STDOUT, env=env, **kwargs | ||||
) | ||||
else: | else: | |||
output = subprocess.check_output( | output = subprocess.check_output( | |||
cmd, stderr=subprocess.STDOUT, **kwargs | command, stderr=subprocess.STDOUT, env=env, **kwargs | |||
) | ) | |||
except CalledProcessError as e: | except CalledProcessError as e: | |||
raise EnvCommandError(e, input=input_) | raise EnvCommandError(e, input=input_) | |||
return decode(output) | return decode(output) | |||
def execute(self, bin, *args, **kwargs): | def execute(self, bin: str, *args: str, **kwargs: Any) -> int: | |||
bin = self._bin(bin) | command = self.get_command_from_bin(bin) + list(args) | |||
env = kwargs.pop("env", {k: v for k, v in os.environ.items()}) | env = kwargs.pop("env", dict(os.environ)) | |||
if not self._is_windows: | if not self._is_windows: | |||
args = [bin] + list(args) | return os.execvpe(command[0], command, env=env) | |||
return os.execvpe(bin, args, env=env) | ||||
else: | ||||
exe = subprocess.Popen([bin] + list(args), env=env, **kwargs) | ||||
exe.communicate() | ||||
return exe.returncode | ||||
def is_venv(self): # type: () -> bool | kwargs["shell"] = True | |||
exe = subprocess.Popen([command[0]] + command[1:], env=env, **kwargs) | ||||
exe.communicate() | ||||
return exe.returncode | ||||
def is_venv(self) -> bool: | ||||
raise NotImplementedError() | raise NotImplementedError() | |||
@property | @property | |||
def script_dirs(self): # type: () -> List[Path] | def script_dirs(self) -> list[Path]: | |||
if self._script_dirs is None: | if self._script_dirs is None: | |||
self._script_dirs = ( | scripts = self.paths.get("scripts") | |||
[Path(self.paths["scripts"])] | self._script_dirs = [ | |||
if "scripts" in self.paths | Path(scripts) if scripts is not None else self._bin_dir | |||
else self._bin_dir | ] | |||
) | ||||
if self.userbase: | if self.userbase: | |||
self._script_dirs.append(self.userbase / self._script_dirs[0].na me) | self._script_dirs.append(self.userbase / self._script_dirs[0].na me) | |||
return self._script_dirs | return self._script_dirs | |||
def _bin(self, bin): # type: (str) -> str | def _bin(self, bin: str) -> str: | |||
""" | """ | |||
Return path to the given executable. | Return path to the given executable. | |||
""" | """ | |||
if self._is_windows and not bin.endswith(".exe"): | if self._is_windows and not bin.endswith(".exe"): | |||
bin_path = self._bin_dir / (bin + ".exe") | bin_path = self._bin_dir / (bin + ".exe") | |||
else: | else: | |||
bin_path = self._bin_dir / bin | bin_path = self._bin_dir / bin | |||
if not bin_path.exists(): | if not bin_path.exists(): | |||
# On Windows, some executables can be in the base path | # On Windows, some executables can be in the base path | |||
# This is especially true when installing Python with | # This is especially true when installing Python with | |||
# the official installer, where python.exe will be at | # the official installer, where python.exe will be at | |||
# the root of the env path. | # the root of the env path. | |||
# This is an edge case and should not be encountered | ||||
# in normal uses but this happens in the sonnet script | ||||
# that creates a fake virtual environment pointing to | ||||
# a base Python install. | ||||
if self._is_windows: | if self._is_windows: | |||
if not bin.endswith(".exe"): | if not bin.endswith(".exe"): | |||
bin_path = self._path / (bin + ".exe") | bin_path = self._path / (bin + ".exe") | |||
else: | else: | |||
bin_path = self._path / bin | bin_path = self._path / bin | |||
if bin_path.exists(): | if bin_path.exists(): | |||
return str(bin_path) | return str(bin_path) | |||
return bin | return bin | |||
return str(bin_path) | return str(bin_path) | |||
def __eq__(self, other): # type: (Env) -> bool | def __eq__(self, other: object) -> bool: | |||
if not isinstance(other, Env): | ||||
return False | ||||
return other.__class__ == self.__class__ and other.path == self.path | return other.__class__ == self.__class__ and other.path == self.path | |||
def __repr__(self): | def __repr__(self) -> str: | |||
return '{}("{}")'.format(self.__class__.__name__, self._path) | return f'{self.__class__.__name__}("{self._path}")' | |||
class SystemEnv(Env): | class SystemEnv(Env): | |||
""" | """ | |||
A system (i.e. not a virtualenv) Python environment. | A system (i.e. not a virtualenv) Python environment. | |||
""" | """ | |||
@property | @property | |||
def python(self): # type: () -> str | def python(self) -> str: | |||
return sys.executable | return sys.executable | |||
@property | @property | |||
def sys_path(self): # type: () -> List[str] | def sys_path(self) -> list[str]: | |||
return sys.path | return sys.path | |||
def get_version_info(self): # type: () -> Tuple[int] | def get_version_info(self) -> tuple[Any, ...]: | |||
return sys.version_info | return tuple(sys.version_info) | |||
def get_python_implementation(self): # type: () -> str | def get_python_implementation(self) -> str: | |||
return platform.python_implementation() | return platform.python_implementation() | |||
def get_pip_command(self): # type: () -> List[str] | def get_pip_command(self, embedded: bool = False) -> list[str]: | |||
# If we're not in a venv, assume the interpreter we're running on | # If we're not in a venv, assume the interpreter we're running on | |||
# has a pip and use that | # has a pip and use that | |||
return [sys.executable, "-m", "pip"] | return [sys.executable, self.pip_embedded if embedded else self.pip] | |||
def get_paths(self): # type: () -> Dict[str, str] | def get_paths(self) -> dict[str, str]: | |||
# We can't use sysconfig.get_paths() because | # We can't use sysconfig.get_paths() because | |||
# on some distributions it does not return the proper paths | # on some distributions it does not return the proper paths | |||
# (those used by pip for instance). We go through distutils | # (those used by pip for instance). We go through distutils | |||
# to get the proper ones. | # to get the proper ones. | |||
import site | import site | |||
from distutils.command.install import SCHEME_KEYS # noqa | from distutils.command.install import SCHEME_KEYS | |||
from distutils.core import Distribution | from distutils.core import Distribution | |||
d = Distribution() | d = Distribution() | |||
d.parse_config_files() | d.parse_config_files() | |||
obj = d.get_command_obj("install", create=True) | with warnings.catch_warnings(): | |||
warnings.filterwarnings("ignore", "setup.py install is deprecated") | ||||
obj = d.get_command_obj("install", create=True) | ||||
assert obj is not None | ||||
obj.finalize_options() | obj.finalize_options() | |||
paths = sysconfig.get_paths().copy() | paths = sysconfig.get_paths().copy() | |||
for key in SCHEME_KEYS: | for key in SCHEME_KEYS: | |||
if key == "headers": | if key == "headers": | |||
# headers is not a path returned by sysconfig.get_paths() | # headers is not a path returned by sysconfig.get_paths() | |||
continue | continue | |||
paths[key] = getattr(obj, "install_{}".format(key)) | paths[key] = getattr(obj, f"install_{key}") | |||
if site.check_enableusersite() and hasattr(obj, "install_usersite"): | if site.check_enableusersite(): | |||
paths["usersite"] = getattr(obj, "install_usersite") | usersite = getattr(obj, "install_usersite", None) | |||
paths["userbase"] = getattr(obj, "install_userbase") | userbase = getattr(obj, "install_userbase", None) | |||
if usersite is not None and userbase is not None: | ||||
paths["usersite"] = usersite | ||||
paths["userbase"] = userbase | ||||
return paths | return paths | |||
def get_supported_tags(self): # type: () -> List[Tag] | def get_supported_tags(self) -> list[Tag]: | |||
return list(sys_tags()) | return list(sys_tags()) | |||
def get_marker_env(self): # type: () -> Dict[str, Any] | def get_marker_env(self) -> dict[str, Any]: | |||
if hasattr(sys, "implementation"): | if hasattr(sys, "implementation"): | |||
info = sys.implementation.version | info = sys.implementation.version | |||
iver = "{0.major}.{0.minor}.{0.micro}".format(info) | iver = f"{info.major}.{info.minor}.{info.micro}" | |||
kind = info.releaselevel | kind = info.releaselevel | |||
if kind != "final": | if kind != "final": | |||
iver += kind[0] + str(info.serial) | iver += kind[0] + str(info.serial) | |||
implementation_name = sys.implementation.name | implementation_name = sys.implementation.name | |||
else: | else: | |||
iver = "0" | iver = "0" | |||
implementation_name = "" | implementation_name = "" | |||
return { | return { | |||
"implementation_name": implementation_name, | "implementation_name": implementation_name, | |||
"implementation_version": iver, | "implementation_version": iver, | |||
"os_name": os.name, | "os_name": os.name, | |||
"platform_machine": platform.machine(), | "platform_machine": platform.machine(), | |||
"platform_release": platform.release(), | "platform_release": platform.release(), | |||
"platform_system": platform.system(), | "platform_system": platform.system(), | |||
"platform_version": platform.version(), | "platform_version": platform.version(), | |||
"python_full_version": platform.python_version(), | "python_full_version": platform.python_version(), | |||
"platform_python_implementation": platform.python_implementation(), | "platform_python_implementation": platform.python_implementation(), | |||
"python_version": ".".join( | "python_version": ".".join(platform.python_version().split(".")[:2]) | |||
v for v in platform.python_version().split(".")[:2] | , | |||
), | ||||
"sys_platform": sys.platform, | "sys_platform": sys.platform, | |||
"version_info": sys.version_info, | "version_info": sys.version_info, | |||
# Extra information | ||||
"interpreter_name": interpreter_name(), | "interpreter_name": interpreter_name(), | |||
"interpreter_version": interpreter_version(), | "interpreter_version": interpreter_version(), | |||
} | } | |||
def get_pip_version(self): # type: () -> Version | def get_pip_version(self) -> Version: | |||
from pip import __version__ | from pip import __version__ | |||
return Version.parse(__version__) | return Version.parse(__version__) | |||
def is_venv(self): # type: () -> bool | def is_venv(self) -> bool: | |||
return self._path != self._base | return self._path != self._base | |||
class VirtualEnv(Env): | class VirtualEnv(Env): | |||
""" | """ | |||
A virtual Python environment. | A virtual Python environment. | |||
""" | """ | |||
def __init__(self, path, base=None): # type: (Path, Optional[Path]) -> None | def __init__(self, path: Path, base: Path | None = None) -> None: | |||
super(VirtualEnv, self).__init__(path, base) | super().__init__(path, base) | |||
# If base is None, it probably means this is | # If base is None, it probably means this is | |||
# a virtualenv created from VIRTUAL_ENV. | # a virtualenv created from VIRTUAL_ENV. | |||
# In this case we need to get sys.base_prefix | # In this case we need to get sys.base_prefix | |||
# from inside the virtualenv. | # from inside the virtualenv. | |||
if base is None: | if base is None: | |||
self._base = Path(self.run_python_script(GET_BASE_PREFIX).strip()) | output = self.run_python_script(GET_BASE_PREFIX) | |||
assert isinstance(output, str) | ||||
self._base = Path(output.strip()) | ||||
@property | @property | |||
def sys_path(self): # type: () -> List[str] | def sys_path(self) -> list[str]: | |||
output = self.run_python_script(GET_SYS_PATH) | output = self.run_python_script(GET_SYS_PATH) | |||
assert isinstance(output, str) | ||||
paths: list[str] = json.loads(output) | ||||
return paths | ||||
return json.loads(output) | def get_version_info(self) -> tuple[Any, ...]: | |||
def get_version_info(self): # type: () -> Tuple[int] | ||||
output = self.run_python_script(GET_PYTHON_VERSION) | output = self.run_python_script(GET_PYTHON_VERSION) | |||
assert isinstance(output, str) | ||||
return tuple([int(s) for s in output.strip().split(".")]) | return tuple(int(s) for s in output.strip().split(".")) | |||
def get_python_implementation(self): # type: () -> str | def get_python_implementation(self) -> str: | |||
return self.marker_env["platform_python_implementation"] | implementation: str = self.marker_env["platform_python_implementation"] | |||
return implementation | ||||
def get_pip_command(self): # type: () -> List[str] | def get_pip_command(self, embedded: bool = False) -> list[str]: | |||
# We're in a virtualenv that is known to be sane, | # We're in a virtualenv that is known to be sane, | |||
# so assume that we have a functional pip | # so assume that we have a functional pip | |||
return [self._bin(self._pip_executable)] | return [ | |||
self._bin(self._executable), | ||||
def get_supported_tags(self): # type: () -> List[Tag] | self.pip_embedded if embedded else self.pip, | |||
file_path = Path(packaging.tags.__file__) | ] | |||
if file_path.suffix == ".pyc": | ||||
# Python 2 | ||||
file_path = file_path.with_suffix(".py") | ||||
with file_path.open(encoding="utf-8") as f: | ||||
script = decode(f.read()) | ||||
script = script.replace( | ||||
"from ._typing import TYPE_CHECKING, cast", | ||||
"TYPE_CHECKING = False\ncast = lambda type_, value: value", | ||||
) | ||||
script = script.replace( | ||||
"from ._typing import MYPY_CHECK_RUNNING, cast", | ||||
"MYPY_CHECK_RUNNING = False\ncast = lambda type_, value: value", | ||||
) | ||||
script += textwrap.dedent( | ||||
""" | ||||
import json | ||||
print(json.dumps([(t.interpreter, t.abi, t.platform) for t in sys_ta | ||||
gs()])) | ||||
""" | ||||
) | ||||
output = self.run_python_script(script) | def get_supported_tags(self) -> list[Tag]: | |||
output = self.run_python_script(GET_SYS_TAGS) | ||||
assert isinstance(output, str) | ||||
return [Tag(*t) for t in json.loads(output)] | return [Tag(*t) for t in json.loads(output)] | |||
def get_marker_env(self): # type: () -> Dict[str, Any] | def get_marker_env(self) -> dict[str, Any]: | |||
output = self.run(self._executable, "-", input_=GET_ENVIRONMENT_INFO) | output = self.run_python_script(GET_ENVIRONMENT_INFO) | |||
assert isinstance(output, str) | ||||
env: dict[str, Any] = json.loads(output) | ||||
return env | ||||
return json.loads(output) | def get_pip_version(self) -> Version: | |||
output = self.run_pip("--version") | ||||
assert isinstance(output, str) | ||||
output = output.strip() | ||||
def get_pip_version(self): # type: () -> Version | ||||
output = self.run_pip("--version").strip() | ||||
m = re.match("pip (.+?)(?: from .+)?$", output) | m = re.match("pip (.+?)(?: from .+)?$", output) | |||
if not m: | if not m: | |||
return Version.parse("0.0") | return Version.parse("0.0") | |||
return Version.parse(m.group(1)) | return Version.parse(m.group(1)) | |||
def get_paths(self): # type: () -> Dict[str, str] | def get_paths(self) -> dict[str, str]: | |||
output = self.run_python_script(GET_PATHS) | output = self.run_python_script(GET_PATHS) | |||
assert isinstance(output, str) | ||||
paths: dict[str, str] = json.loads(output) | ||||
return paths | ||||
return json.loads(output) | def is_venv(self) -> bool: | |||
def is_venv(self): # type: () -> bool | ||||
return True | return True | |||
def is_sane(self): | def is_sane(self) -> bool: | |||
# A virtualenv is considered sane if both "python" and "pip" exist. | # A virtualenv is considered sane if "python" exists. | |||
return os.path.exists(self.python) and os.path.exists(self._bin("pip")) | return os.path.exists(self.python) | |||
def _run(self, cmd, **kwargs): | def _run(self, cmd: list[str], **kwargs: Any) -> int | str: | |||
kwargs["env"] = self.get_temp_environ(environ=kwargs.get("env")) | kwargs["env"] = self.get_temp_environ(environ=kwargs.get("env")) | |||
return super(VirtualEnv, self)._run(cmd, **kwargs) | return super()._run(cmd, **kwargs) | |||
def get_temp_environ( | def get_temp_environ( | |||
self, environ=None, exclude=None, **kwargs | self, | |||
): # type: (Optional[Dict[str, str]], Optional[List[str]], **str) -> Dict[s | environ: dict[str, str] | None = None, | |||
tr, str] | exclude: list[str] | None = None, | |||
**kwargs: str, | ||||
) -> dict[str, str]: | ||||
exclude = exclude or [] | exclude = exclude or [] | |||
exclude.extend(["PYTHONHOME", "__PYVENV_LAUNCHER__"]) | exclude.extend(["PYTHONHOME", "__PYVENV_LAUNCHER__"]) | |||
if environ: | if environ: | |||
environ = deepcopy(environ) | environ = deepcopy(environ) | |||
for key in exclude: | for key in exclude: | |||
environ.pop(key, None) | environ.pop(key, None) | |||
else: | else: | |||
environ = {k: v for k, v in os.environ.items() if k not in exclude} | environ = {k: v for k, v in os.environ.items() if k not in exclude} | |||
environ.update(kwargs) | environ.update(kwargs) | |||
environ["PATH"] = self._updated_path() | environ["PATH"] = self._updated_path() | |||
environ["VIRTUAL_ENV"] = str(self._path) | environ["VIRTUAL_ENV"] = str(self._path) | |||
return environ | return environ | |||
def execute(self, bin, *args, **kwargs): | def execute(self, bin: str, *args: str, **kwargs: Any) -> int: | |||
kwargs["env"] = self.get_temp_environ(environ=kwargs.get("env")) | kwargs["env"] = self.get_temp_environ(environ=kwargs.get("env")) | |||
return super(VirtualEnv, self).execute(bin, *args, **kwargs) | return super().execute(bin, *args, **kwargs) | |||
@contextmanager | @contextmanager | |||
def temp_environ(self): | def temp_environ(self) -> Iterator[None]: | |||
environ = dict(os.environ) | environ = dict(os.environ) | |||
try: | try: | |||
yield | yield | |||
finally: | finally: | |||
os.environ.clear() | os.environ.clear() | |||
os.environ.update(environ) | os.environ.update(environ) | |||
def _updated_path(self): | def _updated_path(self) -> str: | |||
return os.pathsep.join([str(self._bin_dir), os.environ.get("PATH", "")]) | return os.pathsep.join([str(self._bin_dir), os.environ.get("PATH", "")]) | |||
class GenericEnv(VirtualEnv): | class GenericEnv(VirtualEnv): | |||
def __init__( | def __init__( | |||
self, path, base=None, child_env=None | self, path: Path, base: Path | None = None, child_env: Env | None = None | |||
): # type: (Path, Optional[Path], Optional[Env]) -> None | ) -> None: | |||
self._child_env = child_env | self._child_env = child_env | |||
super(GenericEnv, self).__init__(path, base=base) | super().__init__(path, base=base) | |||
def find_executables(self): # type: () -> None | def find_executables(self) -> None: | |||
patterns = [("python*", "pip*")] | patterns = [("python*", "pip*")] | |||
if self._child_env: | if self._child_env: | |||
minor_version = "{}.{}".format( | minor_version = ( | |||
self._child_env.version_info[0], self._child_env.version_info[1] | f"{self._child_env.version_info[0]}.{self._child_env.version_inf | |||
o[1]}" | ||||
) | ) | |||
major_version = "{}".format(self._child_env.version_info[0]) | major_version = f"{self._child_env.version_info[0]}" | |||
patterns = [ | patterns = [ | |||
("python{}".format(minor_version), "pip{}".format(minor_version) | (f"python{minor_version}", f"pip{minor_version}"), | |||
), | (f"python{major_version}", f"pip{major_version}"), | |||
("python{}".format(major_version), "pip{}".format(major_version) | ||||
), | ||||
] | ] | |||
python_executable = None | python_executable = None | |||
pip_executable = None | pip_executable = None | |||
for python_pattern, pip_pattern in patterns: | for python_pattern, pip_pattern in patterns: | |||
if python_executable and pip_executable: | if python_executable and pip_executable: | |||
break | break | |||
if not python_executable: | if not python_executable: | |||
python_executables = sorted( | python_executables = sorted( | |||
[ | p.name | |||
p.name | for p in self._bin_dir.glob(python_pattern) | |||
for p in self._bin_dir.glob(python_pattern) | if re.match(r"python(?:\d+(?:\.\d+)?)?(?:\.exe)?$", p.name) | |||
if re.match(r"python(?:\d+(?:\.\d+)?)?(?:\.exe)?$", p.na | ||||
me) | ||||
] | ||||
) | ) | |||
if python_executables: | if python_executables: | |||
executable = python_executables[0] | executable = python_executables[0] | |||
if executable.endswith(".exe"): | if executable.endswith(".exe"): | |||
executable = executable[:-4] | executable = executable[:-4] | |||
python_executable = executable | python_executable = executable | |||
if not pip_executable: | if not pip_executable: | |||
pip_executables = sorted( | pip_executables = sorted( | |||
[ | p.name | |||
p.name | for p in self._bin_dir.glob(pip_pattern) | |||
for p in self._bin_dir.glob(pip_pattern) | if re.match(r"pip(?:\d+(?:\.\d+)?)?(?:\.exe)?$", p.name) | |||
if re.match(r"pip(?:\d+(?:\.\d+)?)?(?:\.exe)?$", p.name) | ||||
] | ||||
) | ) | |||
if pip_executables: | if pip_executables: | |||
pip_executable = pip_executables[0] | pip_executable = pip_executables[0] | |||
if pip_executable.endswith(".exe"): | if pip_executable.endswith(".exe"): | |||
pip_executable = pip_executable[:-4] | pip_executable = pip_executable[:-4] | |||
pip_executable = pip_executable | ||||
if python_executable: | if python_executable: | |||
self._executable = python_executable | self._executable = python_executable | |||
if pip_executable: | if pip_executable: | |||
self._pip_executable = pip_executable | self._pip_executable = pip_executable | |||
def get_paths(self): # type: () -> Dict[str, str] | def get_paths(self) -> dict[str, str]: | |||
output = self.run_python_script(GET_PATHS_FOR_GENERIC_ENVS) | output = self.run_python_script(GET_PATHS_FOR_GENERIC_ENVS) | |||
assert isinstance(output, str) | ||||
paths: dict[str, str] = json.loads(output) | ||||
return paths | ||||
def execute(self, bin: str, *args: str, **kwargs: Any) -> int: | ||||
command = self.get_command_from_bin(bin) + list(args) | ||||
env = kwargs.pop("env", dict(os.environ)) | ||||
if not self._is_windows: | ||||
return os.execvpe(command[0], command, env=env) | ||||
return json.loads(output) | exe = subprocess.Popen([command[0]] + command[1:], env=env, **kwargs) | |||
exe.communicate() | ||||
def execute(self, bin, *args, **kwargs): # type: (str, str, Any) -> Optiona | return exe.returncode | |||
l[int] | ||||
return super(VirtualEnv, self).execute(bin, *args, **kwargs) | ||||
def _run(self, cmd, **kwargs): # type: (List[str], Any) -> Optional[int] | def _run(self, cmd: list[str], **kwargs: Any) -> int | str: | |||
return super(VirtualEnv, self)._run(cmd, **kwargs) | return super(VirtualEnv, self)._run(cmd, **kwargs) | |||
def is_venv(self): # type: () -> bool | def is_venv(self) -> bool: | |||
return self._path != self._base | return self._path != self._base | |||
class NullEnv(SystemEnv): | class NullEnv(SystemEnv): | |||
def __init__(self, path=None, base=None, execute=False): | def __init__( | |||
self, path: Path | None = None, base: Path | None = None, execute: bool | ||||
= False | ||||
) -> None: | ||||
if path is None: | if path is None: | |||
path = Path(sys.prefix) | path = Path(sys.prefix) | |||
super(NullEnv, self).__init__(path, base=base) | super().__init__(path, base=base) | |||
self._execute = execute | self._execute = execute | |||
self.executed = [] | self.executed: list[list[str]] = [] | |||
def get_pip_command(self): # type: () -> List[str] | def get_pip_command(self, embedded: bool = False) -> list[str]: | |||
return [self._bin("python"), "-m", "pip"] | return [ | |||
self._bin(self._executable), | ||||
self.pip_embedded if embedded else self.pip, | ||||
] | ||||
def _run(self, cmd, **kwargs): | def _run(self, cmd: list[str], **kwargs: Any) -> int | str: | |||
self.executed.append(cmd) | self.executed.append(cmd) | |||
if self._execute: | if self._execute: | |||
return super(NullEnv, self)._run(cmd, **kwargs) | return super()._run(cmd, **kwargs) | |||
return 0 | ||||
def execute(self, bin, *args, **kwargs): | def execute(self, bin: str, *args: str, **kwargs: Any) -> int: | |||
self.executed.append([bin] + list(args)) | self.executed.append([bin] + list(args)) | |||
if self._execute: | if self._execute: | |||
return super(NullEnv, self).execute(bin, *args, **kwargs) | return super().execute(bin, *args, **kwargs) | |||
return 0 | ||||
def _bin(self, bin): | def _bin(self, bin: str) -> str: | |||
return bin | return bin | |||
@contextmanager | ||||
def ephemeral_environment( | ||||
executable: str | Path | None = None, | ||||
flags: dict[str, bool] | None = None, | ||||
) -> Iterator[VirtualEnv]: | ||||
with temporary_directory() as tmp_dir: | ||||
# TODO: cache PEP 517 build environment corresponding to each project ve | ||||
nv | ||||
venv_dir = Path(tmp_dir) / ".venv" | ||||
EnvManager.build_venv( | ||||
path=venv_dir.as_posix(), | ||||
executable=executable, | ||||
flags=flags, | ||||
) | ||||
yield VirtualEnv(venv_dir, venv_dir) | ||||
@contextmanager | ||||
def build_environment( | ||||
poetry: CorePoetry, env: Env | None = None, io: IO | None = None | ||||
) -> Iterator[Env]: | ||||
""" | ||||
If a build script is specified for the project, there could be additional bu | ||||
ild | ||||
time dependencies, eg: cython, setuptools etc. In these cases, we create an | ||||
ephemeral build environment with all requirements specified under | ||||
`build-system.requires` and return this. Otherwise, the given default projec | ||||
t | ||||
environment is returned. | ||||
""" | ||||
if not env or poetry.package.build_script: | ||||
with ephemeral_environment(executable=env.python if env else None) as ve | ||||
nv: | ||||
overwrite = ( | ||||
io is not None and io.output.is_decorated() and not io.is_debug( | ||||
) | ||||
) | ||||
if io: | ||||
if not overwrite: | ||||
io.write_line("") | ||||
requires = [ | ||||
f"<c1>{requirement}</c1>" | ||||
for requirement in poetry.pyproject.build_system.requires | ||||
] | ||||
io.overwrite( | ||||
"<b>Preparing</b> build environment with build-system requir | ||||
ements" | ||||
f" {', '.join(requires)}" | ||||
) | ||||
venv.run_pip( | ||||
"install", | ||||
"--disable-pip-version-check", | ||||
"--ignore-installed", | ||||
*poetry.pyproject.build_system.requires, | ||||
) | ||||
if overwrite: | ||||
assert io is not None | ||||
io.write_line("") | ||||
yield venv | ||||
else: | ||||
yield env | ||||
class MockEnv(NullEnv): | class MockEnv(NullEnv): | |||
def __init__( | def __init__( | |||
self, | self, | |||
version_info=(3, 7, 0), | version_info: tuple[int, int, int] = (3, 7, 0), | |||
python_implementation="CPython", | python_implementation: str = "CPython", | |||
platform="darwin", | platform: str = "darwin", | |||
os_name="posix", | os_name: str = "posix", | |||
is_venv=False, | is_venv: bool = False, | |||
pip_version="19.1", | pip_version: str = "19.1", | |||
sys_path=None, | sys_path: list[str] | None = None, | |||
marker_env=None, | marker_env: dict[str, Any] | None = None, | |||
supported_tags=None, | supported_tags: list[Tag] | None = None, | |||
**kwargs | **kwargs: Any, | |||
): | ) -> None: | |||
super(MockEnv, self).__init__(**kwargs) | super().__init__(**kwargs) | |||
self._version_info = version_info | self._version_info = version_info | |||
self._python_implementation = python_implementation | self._python_implementation = python_implementation | |||
self._platform = platform | self._platform = platform | |||
self._os_name = os_name | self._os_name = os_name | |||
self._is_venv = is_venv | self._is_venv = is_venv | |||
self._pip_version = Version.parse(pip_version) | self._pip_version: Version = Version.parse(pip_version) | |||
self._sys_path = sys_path | self._sys_path = sys_path | |||
self._mock_marker_env = marker_env | self._mock_marker_env = marker_env | |||
self._supported_tags = supported_tags | self._supported_tags = supported_tags | |||
@property | @property | |||
def platform(self): # type: () -> str | def platform(self) -> str: | |||
return self._platform | return self._platform | |||
@property | @property | |||
def os(self): # type: () -> str | def os(self) -> str: | |||
return self._os_name | return self._os_name | |||
@property | @property | |||
def pip_version(self): | def pip_version(self) -> Version: | |||
return self._pip_version | return self._pip_version | |||
@property | @property | |||
def sys_path(self): | def sys_path(self) -> list[str]: | |||
if self._sys_path is None: | if self._sys_path is None: | |||
return super(MockEnv, self).sys_path | return super().sys_path | |||
return self._sys_path | return self._sys_path | |||
def get_marker_env(self): # type: () -> Dict[str, Any] | def get_marker_env(self) -> dict[str, Any]: | |||
if self._mock_marker_env is not None: | if self._mock_marker_env is not None: | |||
return self._mock_marker_env | return self._mock_marker_env | |||
marker_env = super(MockEnv, self).get_marker_env() | marker_env = super().get_marker_env() | |||
marker_env["python_implementation"] = self._python_implementation | marker_env["python_implementation"] = self._python_implementation | |||
marker_env["version_info"] = self._version_info | marker_env["version_info"] = self._version_info | |||
marker_env["python_version"] = ".".join(str(v) for v in self._version_in fo[:2]) | marker_env["python_version"] = ".".join(str(v) for v in self._version_in fo[:2]) | |||
marker_env["python_full_version"] = ".".join(str(v) for v in self._versi on_info) | marker_env["python_full_version"] = ".".join(str(v) for v in self._versi on_info) | |||
marker_env["sys_platform"] = self._platform | marker_env["sys_platform"] = self._platform | |||
marker_env["interpreter_name"] = self._python_implementation.lower() | marker_env["interpreter_name"] = self._python_implementation.lower() | |||
marker_env["interpreter_version"] = "cp" + "".join( | marker_env["interpreter_version"] = "cp" + "".join( | |||
str(v) for v in self._version_info[:2] | str(v) for v in self._version_info[:2] | |||
) | ) | |||
return marker_env | return marker_env | |||
def is_venv(self): # type: () -> bool | def is_venv(self) -> bool: | |||
return self._is_venv | return self._is_venv | |||
End of changes. 263 change blocks. | ||||
533 lines changed or deleted | 887 lines changed or added |