"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "poetry/utils/env.py" between
poetry-1.1.15.tar.gz and poetry-1.2.0.tar.gz

About: Poetry is a tool for dependency management and packaging in Python.

env.py  (poetry-1.1.15):env.py  (poetry-1.2.0)
from __future__ import annotations
import base64 import base64
import contextlib
import hashlib import hashlib
import itertools
import json import json
import os import os
import platform import platform
import plistlib
import re import re
import shutil import subprocess
import sys import sys
import sysconfig import sysconfig
import textwrap import warnings
from contextlib import contextmanager from contextlib import contextmanager
from copy import deepcopy from copy import deepcopy
from pathlib import Path
from subprocess import CalledProcessError
from typing import TYPE_CHECKING
from typing import Any from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union
import packaging.tags import packaging.tags
import tomlkit import tomlkit
import virtualenv import virtualenv
from clikit.api.io import IO from cleo.io.outputs.output import Verbosity
from packaging.tags import Tag from packaging.tags import Tag
from packaging.tags import interpreter_name from packaging.tags import interpreter_name
from packaging.tags import interpreter_version from packaging.tags import interpreter_version
from packaging.tags import sys_tags from packaging.tags import sys_tags
from poetry.core.semver.helpers import parse_constraint
from poetry.core.semver import parse_constraint
from poetry.core.semver.version import Version from poetry.core.semver.version import Version
from poetry.core.toml.file import TOMLFile from poetry.core.toml.file import TOMLFile
from poetry.core.version.markers import BaseMarker from poetry.core.utils.helpers import temporary_directory
from poetry.locations import CACHE_DIR from virtualenv.seed.wheels.embed import get_embed_wheel
from poetry.poetry import Poetry
from poetry.utils._compat import CalledProcessError from poetry.utils._compat import WINDOWS
from poetry.utils._compat import Path
from poetry.utils._compat import decode from poetry.utils._compat import decode
from poetry.utils._compat import encode from poetry.utils._compat import encode
from poetry.utils._compat import list_to_shell_command from poetry.utils._compat import list_to_shell_command
from poetry.utils._compat import subprocess from poetry.utils._compat import metadata
from poetry.utils.helpers import get_real_windows_path
from poetry.utils.helpers import is_dir_writable from poetry.utils.helpers import is_dir_writable
from poetry.utils.helpers import paths_csv from poetry.utils.helpers import paths_csv
from poetry.utils.helpers import remove_directory
if TYPE_CHECKING:
from collections.abc import Iterable
from collections.abc import Iterator
from cleo.io.io import IO
from poetry.core.poetry import Poetry as CorePoetry
from poetry.core.version.markers import BaseMarker
from virtualenv.seed.wheels.util import Wheel
from poetry.poetry import Poetry
GET_SYS_TAGS = f"""
import importlib.util
import json
import sys
from pathlib import Path
spec = importlib.util.spec_from_file_location(
"packaging", Path(r"{packaging.__file__}")
)
packaging = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = packaging
spec = importlib.util.spec_from_file_location(
"packaging.tags", Path(r"{packaging.tags.__file__}")
)
packaging_tags = importlib.util.module_from_spec(spec)
spec.loader.exec_module(packaging_tags)
print(
json.dumps([(t.interpreter, t.abi, t.platform) for t in packaging_tags.sys_t
ags()])
)
"""
GET_ENVIRONMENT_INFO = """\ GET_ENVIRONMENT_INFO = """\
import json import json
import os import os
import platform import platform
import sys import sys
import sysconfig import sysconfig
INTERPRETER_SHORT_NAMES = { INTERPRETER_SHORT_NAMES = {
"python": "py", "python": "py",
skipping to change at line 71 skipping to change at line 109
def interpreter_version(): def interpreter_version():
version = sysconfig.get_config_var("interpreter_version") version = sysconfig.get_config_var("interpreter_version")
if version: if version:
version = str(version) version = str(version)
else: else:
version = _version_nodot(sys.version_info[:2]) version = _version_nodot(sys.version_info[:2])
return version return version
def _version_nodot(version): def _version_nodot(version):
# type: (PythonVersion) -> str
if any(v >= 10 for v in version): if any(v >= 10 for v in version):
sep = "_" sep = "_"
else: else:
sep = "" sep = ""
return sep.join(map(str, version)) return sep.join(map(str, version))
if hasattr(sys, "implementation"): if hasattr(sys, "implementation"):
info = sys.implementation.version info = sys.implementation.version
iver = "{0.major}.{0.minor}.{0.micro}".format(info) iver = "{0.major}.{0.minor}.{0.micro}".format(info)
skipping to change at line 105 skipping to change at line 142
"platform_machine": platform.machine(), "platform_machine": platform.machine(),
"platform_release": platform.release(), "platform_release": platform.release(),
"platform_system": platform.system(), "platform_system": platform.system(),
"platform_version": platform.version(), "platform_version": platform.version(),
"python_full_version": platform.python_version(), "python_full_version": platform.python_version(),
"platform_python_implementation": platform.python_implementation(), "platform_python_implementation": platform.python_implementation(),
"python_version": ".".join(platform.python_version_tuple()[:2]), "python_version": ".".join(platform.python_version_tuple()[:2]),
"sys_platform": sys.platform, "sys_platform": sys.platform,
"version_info": tuple(sys.version_info), "version_info": tuple(sys.version_info),
# Extra information # Extra information
"interpreter_name": INTERPRETER_SHORT_NAMES.get(implementation_name, impleme "interpreter_name": INTERPRETER_SHORT_NAMES.get(
ntation_name), implementation_name, implementation_name
),
"interpreter_version": interpreter_version(), "interpreter_version": interpreter_version(),
} }
print(json.dumps(env)) print(json.dumps(env))
""" """
GET_BASE_PREFIX = """\ GET_BASE_PREFIX = """\
import sys import sys
if hasattr(sys, "real_prefix"): if hasattr(sys, "real_prefix"):
skipping to change at line 129 skipping to change at line 168
else: else:
print(sys.prefix) print(sys.prefix)
""" """
GET_PYTHON_VERSION = """\ GET_PYTHON_VERSION = """\
import sys import sys
print('.'.join([str(s) for s in sys.version_info[:3]])) print('.'.join([str(s) for s in sys.version_info[:3]]))
""" """
GET_PYTHON_VERSION_ONELINER = (
"\"import sys; print('.'.join([str(s) for s in sys.version_info[:3]]))\""
)
GET_SYS_PATH = """\ GET_SYS_PATH = """\
import json import json
import sys import sys
print(json.dumps(sys.path)) print(json.dumps(sys.path))
""" """
GET_PATHS = """\ GET_PATHS = """\
import json import json
import sysconfig import sysconfig
skipping to change at line 152 skipping to change at line 195
GET_PATHS_FOR_GENERIC_ENVS = """\ GET_PATHS_FOR_GENERIC_ENVS = """\
# We can't use sysconfig.get_paths() because # We can't use sysconfig.get_paths() because
# on some distributions it does not return the proper paths # on some distributions it does not return the proper paths
# (those used by pip for instance). We go through distutils # (those used by pip for instance). We go through distutils
# to get the proper ones. # to get the proper ones.
import json import json
import site import site
import sysconfig import sysconfig
from distutils.command.install import SCHEME_KEYS # noqa from distutils.command.install import SCHEME_KEYS
from distutils.core import Distribution from distutils.core import Distribution
d = Distribution() d = Distribution()
d.parse_config_files() d.parse_config_files()
obj = d.get_command_obj("install", create=True) obj = d.get_command_obj("install", create=True)
obj.finalize_options() obj.finalize_options()
paths = sysconfig.get_paths().copy() paths = sysconfig.get_paths().copy()
for key in SCHEME_KEYS: for key in SCHEME_KEYS:
if key == "headers": if key == "headers":
skipping to change at line 177 skipping to change at line 220
if site.check_enableusersite() and hasattr(obj, "install_usersite"): if site.check_enableusersite() and hasattr(obj, "install_usersite"):
paths["usersite"] = getattr(obj, "install_usersite") paths["usersite"] = getattr(obj, "install_usersite")
paths["userbase"] = getattr(obj, "install_userbase") paths["userbase"] = getattr(obj, "install_userbase")
print(json.dumps(paths)) print(json.dumps(paths))
""" """
class SitePackages: class SitePackages:
def __init__( def __init__(
self, path, fallbacks=None, skip_write_checks=False self,
): # type: (Path, List[Path], bool) -> None purelib: Path,
self._path = path platlib: Path | None = None,
fallbacks: list[Path] | None = None,
skip_write_checks: bool = False,
) -> None:
self._purelib = purelib
self._platlib = platlib or purelib
if platlib and platlib.resolve() == purelib.resolve():
self._platlib = purelib
self._fallbacks = fallbacks or [] self._fallbacks = fallbacks or []
self._skip_write_checks = skip_write_checks self._skip_write_checks = skip_write_checks
self._candidates = [self._path] + self._fallbacks
self._candidates: list[Path] = []
for path in itertools.chain([self._purelib, self._platlib], self._fallba
cks):
if path not in self._candidates:
self._candidates.append(path)
self._writable_candidates = None if not skip_write_checks else self._can didates self._writable_candidates = None if not skip_write_checks else self._can didates
@property @property
def path(self): # type: () -> Path def path(self) -> Path:
return self._path return self._purelib
@property
def purelib(self) -> Path:
return self._purelib
@property
def platlib(self) -> Path:
return self._platlib
@property @property
def candidates(self): # type: () -> List[Path] def candidates(self) -> list[Path]:
return self._candidates return self._candidates
@property @property
def writable_candidates(self): # type: () -> List[Path] def writable_candidates(self) -> list[Path]:
if self._writable_candidates is not None: if self._writable_candidates is not None:
return self._writable_candidates return self._writable_candidates
self._writable_candidates = [] self._writable_candidates = []
for candidate in self._candidates: for candidate in self._candidates:
if not is_dir_writable(path=candidate, create=True): if not is_dir_writable(path=candidate, create=True):
continue continue
self._writable_candidates.append(candidate) self._writable_candidates.append(candidate)
return self._writable_candidates return self._writable_candidates
def make_candidates( def make_candidates(
self, path, writable_only=False self, path: Path, writable_only: bool = False, strict: bool = False
): # type: (Path, bool) -> List[Path] ) -> list[Path]:
candidates = self._candidates if not writable_only else self.writable_ca ndidates candidates = self._candidates if not writable_only else self.writable_ca ndidates
if path.is_absolute(): if path.is_absolute():
for candidate in candidates: for candidate in candidates:
try: with contextlib.suppress(ValueError):
path.relative_to(candidate) path.relative_to(candidate)
return [path] return [path]
except ValueError: site_type = "writable " if writable_only else ""
pass raise ValueError(
else: f"{path} is not relative to any discovered {site_type}sites"
raise ValueError( )
"{} is not relative to any discovered {}sites".format(
path, "writable " if writable_only else "" results = [candidate / path for candidate in candidates]
if not results and strict:
raise RuntimeError(
f'Unable to find a suitable destination for "{path}" in'
f" {paths_csv(self._candidates)}"
)
return results
def distributions(
self, name: str | None = None, writable_only: bool = False
) -> Iterable[metadata.Distribution]:
path = list(
map(
str, self._candidates if not writable_only else self.writable_ca
ndidates
)
)
yield from metadata.PathDistribution.discover( # type: ignore[no-untype
d-call]
name=name,
path=path,
)
def find_distribution(
self, name: str, writable_only: bool = False
) -> metadata.Distribution | None:
for distribution in self.distributions(name=name, writable_only=writable
_only):
return distribution
return None
def find_distribution_files_with_suffix(
self, distribution_name: str, suffix: str, writable_only: bool = False
) -> Iterable[Path]:
for distribution in self.distributions(
name=distribution_name, writable_only=writable_only
):
assert distribution.files is not None
for file in distribution.files:
if file.name.endswith(suffix):
yield Path(
distribution.locate_file(file), # type: ignore[no-untyp
ed-call]
) )
)
return [candidate / path for candidate in candidates if candidate] def find_distribution_files_with_name(
self, distribution_name: str, name: str, writable_only: bool = False
) -> Iterable[Path]:
for distribution in self.distributions(
name=distribution_name, writable_only=writable_only
):
assert distribution.files is not None
for file in distribution.files:
if file.name == name:
yield Path(
distribution.locate_file(file), # type: ignore[no-untyp
ed-call]
)
def _path_method_wrapper( def find_distribution_nspkg_pth_files(
self, path, method, *args, **kwargs self, distribution_name: str, writable_only: bool = False
): # type: (Path, str, *Any, **Any) -> Union[Tuple[Path, Any], List[Tuple[P ) -> Iterable[Path]:
ath, Any]]] return self.find_distribution_files_with_suffix(
distribution_name=distribution_name,
suffix="-nspkg.pth",
writable_only=writable_only,
)
# TODO: Move to parameters after dropping Python 2.7 def find_distribution_direct_url_json_files(
return_first = kwargs.pop("return_first", True) self, distribution_name: str, writable_only: bool = False
writable_only = kwargs.pop("writable_only", False) ) -> Iterable[Path]:
return self.find_distribution_files_with_name(
distribution_name=distribution_name,
name="direct_url.json",
writable_only=writable_only,
)
candidates = self.make_candidates(path, writable_only=writable_only) def remove_distribution_files(self, distribution_name: str) -> list[Path]:
paths = []
if not candidates: for distribution in self.distributions(
raise RuntimeError( name=distribution_name, writable_only=True
'Unable to find a suitable destination for "{}" in {}'.format( ):
str(path), paths_csv(self._candidates) assert distribution.files is not None
for file in distribution.files:
path = Path(
distribution.locate_file(file), # type: ignore[no-untyped-c
all]
) )
) # We can't use unlink(missing_ok=True) because it's not always a
vailable
if path.exists():
path.unlink()
distribution_path: Path = distribution._path # type: ignore[attr-de
fined]
if distribution_path.exists():
remove_directory(str(distribution_path), force=True)
paths.append(distribution_path)
return paths
def _path_method_wrapper(
self,
path: str | Path,
method: str,
*args: Any,
return_first: bool = True,
writable_only: bool = False,
**kwargs: Any,
) -> tuple[Path, Any] | list[tuple[Path, Any]]:
if isinstance(path, str):
path = Path(path)
candidates = self.make_candidates(
path, writable_only=writable_only, strict=True
)
results = [] results = []
for candidate in candidates: for candidate in candidates:
try: try:
result = candidate, getattr(candidate, method)(*args, **kwargs) result = candidate, getattr(candidate, method)(*args, **kwargs)
if return_first: if return_first:
return result return result
else: results.append(result)
results.append(result) except OSError:
except (IOError, OSError):
# TODO: Replace with PermissionError # TODO: Replace with PermissionError
pass pass
if results: if results:
return results return results
raise OSError("Unable to access any of {}".format(paths_csv(candidates)) ) raise OSError(f"Unable to access any of {paths_csv(candidates)}")
def write_text(self, path, *args, **kwargs): # type: (Path, *Any, **Any) -> def write_text(self, path: str | Path, *args: Any, **kwargs: Any) -> Path:
Path paths = self._path_method_wrapper(path, "write_text", *args, **kwargs)
return self._path_method_wrapper(path, "write_text", *args, **kwargs)[0] assert isinstance(paths, tuple)
return paths[0]
def mkdir(self, path: str | Path, *args: Any, **kwargs: Any) -> Path:
paths = self._path_method_wrapper(path, "mkdir", *args, **kwargs)
assert isinstance(paths, tuple)
return paths[0]
def mkdir(self, path, *args, **kwargs): # type: (Path, *Any, **Any) -> Path def exists(self, path: str | Path) -> bool:
return self._path_method_wrapper(path, "mkdir", *args, **kwargs)[0]
def exists(self, path): # type: (Path) -> bool
return any( return any(
value[-1] value[-1]
for value in self._path_method_wrapper(path, "exists", return_first= False) for value in self._path_method_wrapper(path, "exists", return_first= False)
) )
def find(self, path, writable_only=False): # type: (Path, bool) -> List[Pat def find(
h] self,
path: str | Path,
writable_only: bool = False,
) -> list[Path]:
return [ return [
value[0] value[0]
for value in self._path_method_wrapper( for value in self._path_method_wrapper(
path, "exists", return_first=False, writable_only=writable_only path, "exists", return_first=False, writable_only=writable_only
) )
if value[-1] is True if value[-1] is True
] ]
def __getattr__(self, item): def __getattr__(self, item: str) -> Any:
try: try:
return super(SitePackages, self).__getattribute__(item) return super().__getattribute__(item)
except AttributeError: except AttributeError:
return getattr(self.path, item) return getattr(self.path, item)
class EnvError(Exception): class EnvError(Exception):
pass pass
class EnvCommandError(EnvError): class EnvCommandError(EnvError):
def __init__(self, e, input=None): # type: (CalledProcessError) -> None def __init__(self, e: CalledProcessError, input: str | None = None) -> None:
self.e = e self.e = e
message = "Command {} errored with the following return code {}, and out message = (
put: \n{}".format( f"Command {e.cmd} errored with the following return code {e.returnco
e.cmd, e.returncode, decode(e.output) de},"
f" and output: \n{decode(e.output)}"
) )
if input: if input:
message += "input was : {}".format(input) message += f"input was : {input}"
super(EnvCommandError, self).__init__(message) super().__init__(message)
class NoCompatiblePythonVersionFound(EnvError): class NoCompatiblePythonVersionFound(EnvError):
def __init__(self, expected, given=None): def __init__(self, expected: str, given: str | None = None) -> None:
if given: if given:
message = ( message = (
"The specified Python version ({}) " f"The specified Python version ({given}) "
"is not supported by the project ({}).\n" f"is not supported by the project ({expected}).\n"
"Please choose a compatible version " "Please choose a compatible version "
"or loosen the python constraint specified " "or loosen the python constraint specified "
"in the pyproject.toml file.".format(given, expected) "in the pyproject.toml file."
) )
else: else:
message = ( message = (
"Poetry was unable to find a compatible version. " "Poetry was unable to find a compatible version. "
"If you have one, you can explicitly use it " "If you have one, you can explicitly use it "
'via the "env use" command.' 'via the "env use" command.'
) )
super(NoCompatiblePythonVersionFound, self).__init__(message) super().__init__(message)
class InvalidCurrentPythonVersionError(EnvError):
def __init__(self, expected: str, given: str) -> None:
message = (
f"Current Python version ({given}) "
f"is not allowed by the project ({expected}).\n"
'Please change python executable via the "env use" command.'
)
super().__init__(message)
class EnvManager(object): class EnvManager:
""" """
Environments manager Environments manager
""" """
_env = None _env = None
ENVS_FILE = "envs.toml" ENVS_FILE = "envs.toml"
def __init__(self, poetry): # type: (Poetry) -> None def __init__(self, poetry: Poetry) -> None:
self._poetry = poetry self._poetry = poetry
def activate(self, python, io): # type: (str, IO) -> Env def _full_python_path(self, python: str) -> str:
venv_path = self._poetry.config.get("virtualenvs.path") try:
if venv_path is None: executable = decode(
venv_path = Path(CACHE_DIR) / "virtualenvs" subprocess.check_output(
else: list_to_shell_command(
venv_path = Path(venv_path) [python, "-c", '"import sys; print(sys.executable)"']
),
shell=True,
).strip()
)
except CalledProcessError as e:
raise EnvCommandError(e)
return executable
def _detect_active_python(self, io: IO) -> str | None:
executable = None
try:
io.write_line(
"Trying to detect current active python executable as specified
in the"
" config.",
verbosity=Verbosity.VERBOSE,
)
executable = self._full_python_path("python")
io.write_line(f"Found: {executable}", verbosity=Verbosity.VERBOSE)
except CalledProcessError:
io.write_line(
"Unable to detect the current active python executable. Falling
back to"
" default.",
verbosity=Verbosity.VERBOSE,
)
return executable
def activate(self, python: str, io: IO) -> Env:
venv_path = self._poetry.config.virtualenvs_path
cwd = self._poetry.file.parent cwd = self._poetry.file.parent
envs_file = TOMLFile(venv_path / self.ENVS_FILE) envs_file = TOMLFile(venv_path / self.ENVS_FILE)
try: try:
python_version = Version.parse(python) python_version = Version.parse(python)
python = "python{}".format(python_version.major) python = f"python{python_version.major}"
if python_version.precision > 1: if python_version.precision > 1:
python += ".{}".format(python_version.minor) python += f".{python_version.minor}"
except ValueError: except ValueError:
# Executable in PATH or full executable path # Executable in PATH or full executable path
pass pass
python = self._full_python_path(python)
try: try:
python_version = decode( python_version_string = decode(
subprocess.check_output( subprocess.check_output(
list_to_shell_command( list_to_shell_command([python, "-c", GET_PYTHON_VERSION_ONEL
[ INER]),
python,
"-c",
"\"import sys; print('.'.join([str(s) for s in sys.v
ersion_info[:3]]))\"",
]
),
shell=True, shell=True,
) )
) )
except CalledProcessError as e: except CalledProcessError as e:
raise EnvCommandError(e) raise EnvCommandError(e)
python_version = Version.parse(python_version.strip()) python_version = Version.parse(python_version_string.strip())
minor = "{}.{}".format(python_version.major, python_version.minor) minor = f"{python_version.major}.{python_version.minor}"
patch = python_version.text patch = python_version.text
create = False create = False
is_root_venv = self._poetry.config.get("virtualenvs.in-project") is_root_venv = self._poetry.config.get("virtualenvs.in-project")
# If we are required to create the virtual environment in the root folde r, # If we are required to create the virtual environment in the root folde r,
# create or recreate it if needed # create or recreate it if needed
if is_root_venv: if is_root_venv:
create = False create = False
venv = self._poetry.file.parent / ".venv" venv = self._poetry.file.parent / ".venv"
if venv.exists(): if venv.exists():
skipping to change at line 406 skipping to change at line 604
envs = envs_file.read() envs = envs_file.read()
current_env = envs.get(base_env_name) current_env = envs.get(base_env_name)
if current_env is not None: if current_env is not None:
current_minor = current_env["minor"] current_minor = current_env["minor"]
current_patch = current_env["patch"] current_patch = current_env["patch"]
if current_minor == minor and current_patch != patch: if current_minor == minor and current_patch != patch:
# We need to recreate # We need to recreate
create = True create = True
name = "{}-py{}".format(base_env_name, minor) name = f"{base_env_name}-py{minor}"
venv = venv_path / name venv = venv_path / name
# Create if needed # Create if needed
if not venv.exists() or venv.exists() and create: if not venv.exists() or venv.exists() and create:
in_venv = os.environ.get("VIRTUAL_ENV") is not None in_venv = os.environ.get("VIRTUAL_ENV") is not None
if in_venv or not venv.exists(): if in_venv or not venv.exists():
create = True create = True
if venv.exists(): if venv.exists():
# We need to check if the patch version is correct # We need to check if the patch version is correct
skipping to change at line 431 skipping to change at line 629
create = True create = True
self.create_venv(io, executable=python, force=create) self.create_venv(io, executable=python, force=create)
# Activate # Activate
envs[base_env_name] = {"minor": minor, "patch": patch} envs[base_env_name] = {"minor": minor, "patch": patch}
envs_file.write(envs) envs_file.write(envs)
return self.get(reload=True) return self.get(reload=True)
def deactivate(self, io): # type: (IO) -> None def deactivate(self, io: IO) -> None:
venv_path = self._poetry.config.get("virtualenvs.path") venv_path = self._poetry.config.virtualenvs_path
if venv_path is None: name = self.generate_env_name(
venv_path = Path(CACHE_DIR) / "virtualenvs" self._poetry.package.name, str(self._poetry.file.parent)
else: )
venv_path = Path(venv_path)
name = self._poetry.package.name
name = self.generate_env_name(name, str(self._poetry.file.parent))
envs_file = TOMLFile(venv_path / self.ENVS_FILE) envs_file = TOMLFile(venv_path / self.ENVS_FILE)
if envs_file.exists(): if envs_file.exists():
envs = envs_file.read() envs = envs_file.read()
env = envs.get(name) env = envs.get(name)
if env is not None: if env is not None:
io.write_line( venv = venv_path / f"{name}-py{env['minor']}"
"Deactivating virtualenv: <comment>{}</comment>".format( io.write_line(f"Deactivating virtualenv: <comment>{venv}</commen
venv_path / (name + "-py{}".format(env["minor"])) t>")
)
)
del envs[name] del envs[name]
envs_file.write(envs) envs_file.write(envs)
def get(self, reload=False): # type: (bool) -> Env def get(self, reload: bool = False) -> Env:
if self._env is not None and not reload: if self._env is not None and not reload:
return self._env return self._env
python_minor = ".".join([str(v) for v in sys.version_info[:2]]) python_minor = ".".join([str(v) for v in sys.version_info[:2]])
venv_path = self._poetry.config.get("virtualenvs.path") venv_path = self._poetry.config.virtualenvs_path
if venv_path is None:
venv_path = Path(CACHE_DIR) / "virtualenvs"
else:
venv_path = Path(venv_path)
cwd = self._poetry.file.parent cwd = self._poetry.file.parent
envs_file = TOMLFile(venv_path / self.ENVS_FILE) envs_file = TOMLFile(venv_path / self.ENVS_FILE)
env = None env = None
base_env_name = self.generate_env_name(self._poetry.package.name, str(cw d)) base_env_name = self.generate_env_name(self._poetry.package.name, str(cw d))
if envs_file.exists(): if envs_file.exists():
envs = envs_file.read() envs = envs_file.read()
env = envs.get(base_env_name) env = envs.get(base_env_name)
if env: if env:
python_minor = env["minor"] python_minor = env["minor"]
skipping to change at line 488 skipping to change at line 675
# Conda sets CONDA_PREFIX in its envs, see # Conda sets CONDA_PREFIX in its envs, see
# https://github.com/conda/conda/issues/2764 # https://github.com/conda/conda/issues/2764
env_prefix = os.environ.get("VIRTUAL_ENV", os.environ.get("CONDA_PREFIX" )) env_prefix = os.environ.get("VIRTUAL_ENV", os.environ.get("CONDA_PREFIX" ))
conda_env_name = os.environ.get("CONDA_DEFAULT_ENV") conda_env_name = os.environ.get("CONDA_DEFAULT_ENV")
# It's probably not a good idea to pollute Conda's global "base" env, si nce # It's probably not a good idea to pollute Conda's global "base" env, si nce
# most users have it activated all the time. # most users have it activated all the time.
in_venv = env_prefix is not None and conda_env_name != "base" in_venv = env_prefix is not None and conda_env_name != "base"
if not in_venv or env is not None: if not in_venv or env is not None:
# Checking if a local virtualenv exists # Checking if a local virtualenv exists
if self._poetry.config.get("virtualenvs.in-project") is not False: if (
if (cwd / ".venv").exists() and (cwd / ".venv").is_dir(): self._poetry.config.get("virtualenvs.in-project") is not False
venv = cwd / ".venv" and (cwd / ".venv").exists()
and (cwd / ".venv").is_dir()
):
venv = cwd / ".venv"
return VirtualEnv(venv) return VirtualEnv(venv)
create_venv = self._poetry.config.get("virtualenvs.create", True) create_venv = self._poetry.config.get("virtualenvs.create", True)
if not create_venv: if not create_venv:
return self.get_system_env() return self.get_system_env()
venv_path = self._poetry.config.get("virtualenvs.path") venv_path = self._poetry.config.virtualenvs_path
if venv_path is None:
venv_path = Path(CACHE_DIR) / "virtualenvs"
else:
venv_path = Path(venv_path)
name = "{}-py{}".format(base_env_name, python_minor.strip()) name = f"{base_env_name}-py{python_minor.strip()}"
venv = venv_path / name venv = venv_path / name
if not venv.exists(): if not venv.exists():
return self.get_system_env() return self.get_system_env()
return VirtualEnv(venv) return VirtualEnv(venv)
if env_prefix is not None: if env_prefix is not None:
prefix = Path(env_prefix) prefix = Path(env_prefix)
base_prefix = None base_prefix = None
else: else:
prefix = Path(sys.prefix) prefix = Path(sys.prefix)
base_prefix = self.get_base_prefix() base_prefix = self.get_base_prefix()
return VirtualEnv(prefix, base_prefix) return VirtualEnv(prefix, base_prefix)
def list(self, name=None): # type: (Optional[str]) -> List[VirtualEnv] def list(self, name: str | None = None) -> list[VirtualEnv]:
if name is None: if name is None:
name = self._poetry.package.name name = self._poetry.package.name
venv_name = self.generate_env_name(name, str(self._poetry.file.parent)) venv_name = self.generate_env_name(name, str(self._poetry.file.parent))
venv_path = self._poetry.config.virtualenvs_path
venv_path = self._poetry.config.get("virtualenvs.path")
if venv_path is None:
venv_path = Path(CACHE_DIR) / "virtualenvs"
else:
venv_path = Path(venv_path)
env_list = [ env_list = [
VirtualEnv(Path(p)) VirtualEnv(Path(p)) for p in sorted(venv_path.glob(f"{venv_name}-py*
for p in sorted(venv_path.glob("{}-py*".format(venv_name))) "))
] ]
venv = self._poetry.file.parent / ".venv" venv = self._poetry.file.parent / ".venv"
if ( if (
self._poetry.config.get("virtualenvs.in-project") self._poetry.config.get("virtualenvs.in-project")
and venv.exists() and venv.exists()
and venv.is_dir() and venv.is_dir()
): ):
env_list.insert(0, VirtualEnv(venv)) env_list.insert(0, VirtualEnv(venv))
return env_list return env_list
def remove(self, python): # type: (str) -> Env def remove(self, python: str) -> Env:
venv_path = self._poetry.config.get("virtualenvs.path") venv_path = self._poetry.config.virtualenvs_path
if venv_path is None:
venv_path = Path(CACHE_DIR) / "virtualenvs"
else:
venv_path = Path(venv_path)
cwd = self._poetry.file.parent cwd = self._poetry.file.parent
envs_file = TOMLFile(venv_path / self.ENVS_FILE) envs_file = TOMLFile(venv_path / self.ENVS_FILE)
base_env_name = self.generate_env_name(self._poetry.package.name, str(cw d)) base_env_name = self.generate_env_name(self._poetry.package.name, str(cw d))
if python.startswith(base_env_name): if python.startswith(base_env_name):
venvs = self.list() venvs = self.list()
for venv in venvs: for venv in venvs:
if venv.path.name == python: if venv.path.name == python:
# Exact virtualenv name # Exact virtualenv name
skipping to change at line 589 skipping to change at line 764
if current_env["minor"] == venv_minor: if current_env["minor"] == venv_minor:
del envs[base_env_name] del envs[base_env_name]
envs_file.write(envs) envs_file.write(envs)
self.remove_venv(venv.path) self.remove_venv(venv.path)
return venv return venv
raise ValueError( raise ValueError(
'<warning>Environment "{}" does not exist.</warning>'.format(pyt hon) f'<warning>Environment "{python}" does not exist.</warning>'
) )
try: try:
python_version = Version.parse(python) python_version = Version.parse(python)
python = "python{}".format(python_version.major) python = f"python{python_version.major}"
if python_version.precision > 1: if python_version.precision > 1:
python += ".{}".format(python_version.minor) python += f".{python_version.minor}"
except ValueError: except ValueError:
# Executable in PATH or full executable path # Executable in PATH or full executable path
pass pass
try: try:
python_version = decode( python_version_string = decode(
subprocess.check_output( subprocess.check_output(
list_to_shell_command( list_to_shell_command([python, "-c", GET_PYTHON_VERSION_ONEL
[ INER]),
python,
"-c",
"\"import sys; print('.'.join([str(s) for s in sys.v
ersion_info[:3]]))\"",
]
),
shell=True, shell=True,
) )
) )
except CalledProcessError as e: except CalledProcessError as e:
raise EnvCommandError(e) raise EnvCommandError(e)
python_version = Version.parse(python_version.strip()) python_version = Version.parse(python_version_string.strip())
minor = "{}.{}".format(python_version.major, python_version.minor) minor = f"{python_version.major}.{python_version.minor}"
name = "{}-py{}".format(base_env_name, minor) name = f"{base_env_name}-py{minor}"
venv = venv_path / name venv_path = venv_path / name
if not venv.exists(): if not venv_path.exists():
raise ValueError( raise ValueError(f'<warning>Environment "{name}" does not exist.</wa
'<warning>Environment "{}" does not exist.</warning>'.format(nam rning>')
e)
)
if envs_file.exists(): if envs_file.exists():
envs = envs_file.read() envs = envs_file.read()
current_env = envs.get(base_env_name) current_env = envs.get(base_env_name)
if current_env is not None: if current_env is not None:
current_minor = current_env["minor"] current_minor = current_env["minor"]
if current_minor == minor: if current_minor == minor:
del envs[base_env_name] del envs[base_env_name]
envs_file.write(envs) envs_file.write(envs)
self.remove_venv(venv) self.remove_venv(venv_path)
return VirtualEnv(venv, venv) return VirtualEnv(venv_path, venv_path)
def create_venv( def create_venv(
self, io, name=None, executable=None, force=False self,
): # type: (IO, Optional[str], Optional[str], bool) -> Env io: IO,
name: str | None = None,
executable: str | None = None,
force: bool = False,
) -> Env:
if self._env is not None and not force: if self._env is not None and not force:
return self._env return self._env
cwd = self._poetry.file.parent cwd = self._poetry.file.parent
env = self.get(reload=True) env = self.get(reload=True)
if not env.is_sane(): if not env.is_sane():
force = True force = True
if env.is_venv() and not force: if env.is_venv() and not force:
# Already inside a virtualenv. # Already inside a virtualenv.
current_python = Version.parse(
".".join(str(c) for c in env.version_info[:3])
)
if not self._poetry.package.python_constraint.allows(current_python)
:
raise InvalidCurrentPythonVersionError(
self._poetry.package.python_versions, str(current_python)
)
return env return env
create_venv = self._poetry.config.get("virtualenvs.create") create_venv = self._poetry.config.get("virtualenvs.create")
root_venv = self._poetry.config.get("virtualenvs.in-project") root_venv = self._poetry.config.get("virtualenvs.in-project")
prefer_active_python = self._poetry.config.get(
"virtualenvs.prefer-active-python"
)
venv_prompt = self._poetry.config.get("virtualenvs.prompt")
venv_path = self._poetry.config.get("virtualenvs.path") if not executable and prefer_active_python:
if root_venv: executable = self._detect_active_python(io)
venv_path = cwd / ".venv"
elif venv_path is None:
venv_path = Path(CACHE_DIR) / "virtualenvs"
else:
venv_path = Path(venv_path)
venv_path = cwd / ".venv" if root_venv else self._poetry.config.virtuale nvs_path
if not name: if not name:
name = self._poetry.package.name name = self._poetry.package.name
assert name is not None
python_patch = ".".join([str(v) for v in sys.version_info[:3]]) python_patch = ".".join([str(v) for v in sys.version_info[:3]])
python_minor = ".".join([str(v) for v in sys.version_info[:2]]) python_minor = ".".join([str(v) for v in sys.version_info[:2]])
if executable: if executable:
python_patch = decode( python_patch = decode(
subprocess.check_output( subprocess.check_output(
list_to_shell_command( list_to_shell_command(
[ [executable, "-c", GET_PYTHON_VERSION_ONELINER]
executable,
"-c",
"\"import sys; print('.'.join([str(s) for s in sys.v
ersion_info[:3]]))\"",
]
), ),
shell=True, shell=True,
).strip() ).strip()
) )
python_minor = ".".join(python_patch.split(".")[:2]) python_minor = ".".join(python_patch.split(".")[:2])
supported_python = self._poetry.package.python_constraint supported_python = self._poetry.package.python_constraint
if not supported_python.allows(Version.parse(python_patch)): if not supported_python.allows(Version.parse(python_patch)):
# The currently activated or chosen Python version # The currently activated or chosen Python version
# is not compatible with the Python constraint specified # is not compatible with the Python constraint specified
# for the project. # for the project.
# If an executable has been specified, we stop there # If an executable has been specified, we stop there
# and notify the user of the incompatibility. # and notify the user of the incompatibility.
# Otherwise, we try to find a compatible Python version. # Otherwise, we try to find a compatible Python version.
if executable: if executable and not prefer_active_python:
raise NoCompatiblePythonVersionFound( raise NoCompatiblePythonVersionFound(
self._poetry.package.python_versions, python_patch self._poetry.package.python_versions, python_patch
) )
io.write_line( io.write_error_line(
"<warning>The currently activated Python version {} " f"<warning>The currently activated Python version {python_patch}
"is not supported by the project ({}).\n" is not"
"Trying to find and use a compatible version.</warning> ".format f" supported by the project ({self._poetry.package.python_versio
( ns}).\n"
python_patch, self._poetry.package.python_versions "Trying to find and use a compatible version.</warning> "
)
) )
for python_to_try in reversed( for python_to_try in sorted(
sorted( self._poetry.package.AVAILABLE_PYTHONS,
self._poetry.package.AVAILABLE_PYTHONS, key=lambda v: (v.startswith("3"), -len(v), v),
key=lambda v: (v.startswith("3"), -len(v), v), reverse=True,
)
): ):
if len(python_to_try) == 1: if len(python_to_try) == 1:
if not parse_constraint("^{}.0".format(python_to_try)).allow s_any( if not parse_constraint(f"^{python_to_try}.0").allows_any(
supported_python supported_python
): ):
continue continue
elif not supported_python.allows_all( elif not supported_python.allows_any(
parse_constraint(python_to_try + ".*") parse_constraint(python_to_try + ".*")
): ):
continue continue
python = "python" + python_to_try python = "python" + python_to_try
if io.is_debug(): if io.is_debug():
io.write_line("<debug>Trying {}</debug>".format(python)) io.write_line(f"<debug>Trying {python}</debug>")
try: try:
python_patch = decode( python_patch = decode(
subprocess.check_output( subprocess.check_output(
list_to_shell_command( list_to_shell_command(
[ [python, "-c", GET_PYTHON_VERSION_ONELINER]
python,
"-c",
"\"import sys; print('.'.join([str(s) for s
in sys.version_info[:3]]))\"",
]
), ),
stderr=subprocess.STDOUT, stderr=subprocess.STDOUT,
shell=True, shell=True,
).strip() ).strip()
) )
except CalledProcessError: except CalledProcessError:
continue continue
if not python_patch: if not python_patch:
continue continue
if supported_python.allows(Version.parse(python_patch)): if supported_python.allows(Version.parse(python_patch)):
io.write_line("Using <c1>{}</c1> ({})".format(python, python _patch)) io.write_line(f"Using <c1>{python}</c1> ({python_patch})")
executable = python executable = python
python_minor = ".".join(python_patch.split(".")[:2]) python_minor = ".".join(python_patch.split(".")[:2])
break break
if not executable: if not executable:
raise NoCompatiblePythonVersionFound( raise NoCompatiblePythonVersionFound(
self._poetry.package.python_versions self._poetry.package.python_versions
) )
if root_venv: if root_venv:
venv = venv_path venv = venv_path
else: else:
name = self.generate_env_name(name, str(cwd)) name = self.generate_env_name(name, str(cwd))
name = "{}-py{}".format(name, python_minor.strip()) name = f"{name}-py{python_minor.strip()}"
venv = venv_path / name venv = venv_path / name
if venv_prompt is not None:
venv_prompt = venv_prompt.format(
project_name=self._poetry.package.name or "virtualenv",
python_version=python_minor,
)
if not venv.exists(): if not venv.exists():
if create_venv is False: if create_venv is False:
io.write_line( io.write_line(
"<fg=black;bg=yellow>" "<fg=black;bg=yellow>"
"Skipping virtualenv creation, " "Skipping virtualenv creation, "
"as specified in config file." "as specified in config file."
"</>" "</>"
) )
return self.get_system_env() return self.get_system_env()
io.write_line( io.write_line(
"Creating virtualenv <c1>{}</> in {}".format(name, str(venv_path f"Creating virtualenv <c1>{name}</> in"
)) f" {venv_path if not WINDOWS else get_real_windows_path(venv_pat
h)!s}"
) )
self.build_venv(venv, executable=executable)
else: else:
create_venv = False
if force: if force:
if not env.is_sane(): if not env.is_sane():
io.write_line( io.write_error_line(
"<warning>The virtual environment found in {} seems to b f"<warning>The virtual environment found in {env.path} s
e broken.</warning>".format( eems to"
env.path " be broken.</warning>"
)
) )
io.write_line( io.write_line(f"Recreating virtualenv <c1>{name}</> in {venv!s}"
"Recreating virtualenv <c1>{}</> in {}".format(name, str(ven )
v))
)
self.remove_venv(venv) self.remove_venv(venv)
self.build_venv(venv, executable=executable) create_venv = True
elif io.is_very_verbose(): elif io.is_very_verbose():
io.write_line("Virtualenv <c1>{}</> already exists.".format(name io.write_line(f"Virtualenv <c1>{name}</> already exists.")
))
if create_venv:
self.build_venv(
venv,
executable=executable,
flags=self._poetry.config.get("virtualenvs.options"),
prompt=venv_prompt,
)
# venv detection: # venv detection:
# stdlib venv may symlink sys.executable, so we can't use realpath. # stdlib venv may symlink sys.executable, so we can't use realpath.
# but others can symlink *to* the venv Python, # but others can symlink *to* the venv Python,
# so we can't just use sys.executable. # so we can't just use sys.executable.
# So we just check every item in the symlink tree (generally <= 3) # So we just check every item in the symlink tree (generally <= 3)
p = os.path.normcase(sys.executable) p = os.path.normcase(sys.executable)
paths = [p] paths = [p]
while os.path.islink(p): while os.path.islink(p):
p = os.path.normcase(os.path.join(os.path.dirname(p), os.readlink(p) )) p = os.path.normcase(os.path.join(os.path.dirname(p), os.readlink(p) ))
paths.append(p) paths.append(p)
p_venv = os.path.normcase(str(venv)) p_venv = os.path.normcase(str(venv))
if any(p.startswith(p_venv) for p in paths): if any(p.startswith(p_venv) for p in paths):
# Running properly in the virtualenv, don't need to do anything # Running properly in the virtualenv, don't need to do anything
return SystemEnv(Path(sys.prefix), Path(self.get_base_prefix())) return self.get_system_env()
return VirtualEnv(venv) return VirtualEnv(venv)
@classmethod @classmethod
def build_venv( def build_venv(
cls, path, executable=None cls,
): # type: (Union[Path,str], Optional[Union[str, Path]]) -> virtualenv.run. path: Path | str,
session.Session executable: str | Path | None = None,
flags: dict[str, bool] | None = None,
with_pip: bool | None = None,
with_wheel: bool | None = None,
with_setuptools: bool | None = None,
prompt: str | None = None,
) -> virtualenv.run.session.Session:
if WINDOWS:
path = get_real_windows_path(path)
executable = get_real_windows_path(executable) if executable else No
ne
flags = flags or {}
flags["no-pip"] = (
not with_pip if with_pip is not None else flags.pop("no-pip", True)
)
flags["no-setuptools"] = (
not with_setuptools
if with_setuptools is not None
else flags.pop("no-setuptools", True)
)
# we want wheels to be enabled when pip is required and it has not been
# explicitly disabled
flags["no-wheel"] = (
not with_wheel
if with_wheel is not None
else flags.pop("no-wheel", flags["no-pip"])
)
if isinstance(executable, Path): if isinstance(executable, Path):
executable = executable.resolve().as_posix() executable = executable.resolve().as_posix()
return virtualenv.cli_run(
[ args = [
"--no-download", "--no-download",
"--no-periodic-update", "--no-periodic-update",
"--python", "--python",
executable or sys.executable, executable or sys.executable,
]
if prompt is not None:
args.extend(["--prompt", prompt])
for flag, value in flags.items():
if value is True:
args.append(f"--{flag}")
args.append(str(path))
cli_result = virtualenv.cli_run(args)
# Exclude the venv folder from from macOS Time Machine backups
# TODO: Add backup-ignore markers for other platforms too
if sys.platform == "darwin":
import xattr
xattr.setxattr(
str(path), str(path),
] "com.apple.metadata:com_apple_backup_excludeItem",
) plistlib.dumps("com.apple.backupd", fmt=plistlib.FMT_BINARY),
)
return cli_result
@classmethod @classmethod
def remove_venv(cls, path): # type: (Union[Path,str]) -> None def remove_venv(cls, path: Path | str) -> None:
if isinstance(path, str): if isinstance(path, str):
path = Path(path) path = Path(path)
assert path.is_dir() assert path.is_dir()
try: try:
shutil.rmtree(str(path)) remove_directory(path)
return return
except OSError as e: except OSError as e:
# Continue only if e.errno == 16 # Continue only if e.errno == 16
if e.errno != 16: # ERRNO 16: Device or resource busy if e.errno != 16: # ERRNO 16: Device or resource busy
raise e raise e
# Delete all files and folders but the toplevel one. This is because som etimes # Delete all files and folders but the toplevel one. This is because som etimes
# the venv folder is mounted by the OS, such as in a docker volume. In s uch # the venv folder is mounted by the OS, such as in a docker volume. In s uch
# cases, an attempt to delete the folder itself will result in an `OSErr or`. # cases, an attempt to delete the folder itself will result in an `OSErr or`.
# See https://github.com/python-poetry/poetry/pull/2064 # See https://github.com/python-poetry/poetry/pull/2064
for file_path in path.iterdir(): for file_path in path.iterdir():
if file_path.is_file() or file_path.is_symlink(): if file_path.is_file() or file_path.is_symlink():
file_path.unlink() file_path.unlink()
elif file_path.is_dir(): elif file_path.is_dir():
shutil.rmtree(str(file_path)) remove_directory(file_path, force=True)
@classmethod @classmethod
def get_system_env( def get_system_env(cls, naive: bool = False) -> Env:
cls, naive=False
): # type: (bool) -> Union["SystemEnv", "GenericEnv"]
""" """
Retrieve the current Python environment. Retrieve the current Python environment.
This can be the base Python environment or an activated virtual environm ent. This can be the base Python environment or an activated virtual environm ent.
This method also works around the issue that the virtual environment
This method also workaround the issue that the virtual environment
used by Poetry internally (when installed via the custom installer) used by Poetry internally (when installed via the custom installer)
is incorrectly detected as the system environment. Note that this workar ound is incorrectly detected as the system environment. Note that this workar ound
happens only when `naive` is False since there are times where we actual ly happens only when `naive` is False since there are times where we actual ly
want to retrieve Poetry's custom virtual environment want to retrieve Poetry's custom virtual environment
(e.g. plugin installation or self update). (e.g. plugin installation or self update).
""" """
prefix, base_prefix = Path(sys.prefix), Path(cls.get_base_prefix()) prefix, base_prefix = Path(sys.prefix), Path(cls.get_base_prefix())
env = SystemEnv(prefix) env: Env = SystemEnv(prefix)
if not naive: if not naive:
if prefix.joinpath("poetry_env").exists(): if prefix.joinpath("poetry_env").exists():
env = GenericEnv(base_prefix, child_env=env) env = GenericEnv(base_prefix, child_env=env)
else: else:
from poetry.locations import data_dir from poetry.locations import data_dir
try: try:
prefix.relative_to(data_dir()) prefix.relative_to(data_dir())
except ValueError: except ValueError:
pass pass
else: else:
env = GenericEnv(base_prefix, child_env=env) env = GenericEnv(base_prefix, child_env=env)
return env return env
@classmethod @classmethod
def get_base_prefix(cls): # type: () -> str def get_base_prefix(cls) -> Path:
if hasattr(sys, "real_prefix"): real_prefix = getattr(sys, "real_prefix", None)
return sys.real_prefix if real_prefix is not None:
return Path(real_prefix)
if hasattr(sys, "base_prefix"):
return sys.base_prefix base_prefix = getattr(sys, "base_prefix", None)
if base_prefix is not None:
return Path(base_prefix)
return sys.prefix return Path(sys.prefix)
@classmethod @classmethod
def generate_env_name(cls, name, cwd): # type: (str, str) -> str def generate_env_name(cls, name: str, cwd: str) -> str:
name = name.lower() name = name.lower()
sanitized_name = re.sub(r'[ $`!*@"\\\r\n\t]', "_", name)[:42] sanitized_name = re.sub(r'[ $`!*@"\\\r\n\t]', "_", name)[:42]
h = hashlib.sha256(encode(cwd)).digest() normalized_cwd = os.path.normcase(os.path.realpath(cwd))
h = base64.urlsafe_b64encode(h).decode()[:8] h_bytes = hashlib.sha256(encode(normalized_cwd)).digest()
h_str = base64.urlsafe_b64encode(h_bytes).decode()[:8]
return "{}-{}".format(sanitized_name, h) return f"{sanitized_name}-{h_str}"
class Env(object): class Env:
""" """
An abstract Python environment. An abstract Python environment.
""" """
def __init__(self, path, base=None): # type: (Path, Optional[Path]) -> None def __init__(self, path: Path, base: Path | None = None) -> None:
self._is_windows = sys.platform == "win32" self._is_windows = sys.platform == "win32"
self._is_mingw = sysconfig.get_platform().startswith("mingw") self._is_mingw = sysconfig.get_platform().startswith("mingw")
self._is_conda = bool(os.environ.get("CONDA_DEFAULT_ENV")) self._is_conda = bool(os.environ.get("CONDA_DEFAULT_ENV"))
if self._is_windows:
path = get_real_windows_path(path)
base = get_real_windows_path(base) if base else None
if not self._is_windows or self._is_mingw: if not self._is_windows or self._is_mingw:
bin_dir = "bin" bin_dir = "bin"
else: else:
bin_dir = "Scripts" bin_dir = "Scripts"
self._path = path self._path = path
self._bin_dir = self._path / bin_dir self._bin_dir = self._path / bin_dir
self._base = base or path
self._executable = "python" self._executable = "python"
self._pip_executable = "pip" self._pip_executable = "pip"
self.find_executables() self.find_executables()
self._marker_env = None self._base = base or path
self._pip_version = None
self._site_packages = None self._marker_env: dict[str, Any] | None = None
self._paths = None self._pip_version: Version | None = None
self._supported_tags = None self._site_packages: SitePackages | None = None
self._purelib = None self._paths: dict[str, str] | None = None
self._platlib = None self._supported_tags: list[Tag] | None = None
self._script_dirs = None self._purelib: Path | None = None
self._platlib: Path | None = None
self._script_dirs: list[Path] | None = None
self._embedded_pip_path: str | None = None
@property @property
def path(self): # type: () -> Path def path(self) -> Path:
return self._path return self._path
@property @property
def base(self): # type: () -> Path def base(self) -> Path:
return self._base return self._base
@property @property
def version_info(self): # type: () -> Tuple[int] def version_info(self) -> tuple[Any, ...]:
return tuple(self.marker_env["version_info"]) return tuple(self.marker_env["version_info"])
@property @property
def python_implementation(self): # type: () -> str def python_implementation(self) -> str:
return self.marker_env["platform_python_implementation"] implementation: str = self.marker_env["platform_python_implementation"]
return implementation
@property @property
def python(self): # type: () -> str def python(self) -> str:
""" """
Path to current python executable Path to current python executable
""" """
return self._bin(self._executable) return self._bin(self._executable)
@property @property
def marker_env(self): def marker_env(self) -> dict[str, Any]:
if self._marker_env is None: if self._marker_env is None:
self._marker_env = self.get_marker_env() self._marker_env = self.get_marker_env()
return self._marker_env return self._marker_env
@property @property
def parent_env(self): # type: () -> GenericEnv def parent_env(self) -> GenericEnv:
return GenericEnv(self.base, child_env=self) return GenericEnv(self.base, child_env=self)
def _find_python_executable(self) -> None:
bin_dir = self._bin_dir
if self._is_windows and self._is_conda:
bin_dir = self._path
python_executables = sorted(
p.name
for p in bin_dir.glob("python*")
if re.match(r"python(?:\d+(?:\.\d+)?)?(?:\.exe)?$", p.name)
)
if python_executables:
executable = python_executables[0]
if executable.endswith(".exe"):
executable = executable[:-4]
self._executable = executable
def _find_pip_executable(self) -> None:
pip_executables = sorted(
p.name
for p in self._bin_dir.glob("pip*")
if re.match(r"pip(?:\d+(?:\.\d+)?)?(?:\.exe)?$", p.name)
)
if pip_executables:
pip_executable = pip_executables[0]
if pip_executable.endswith(".exe"):
pip_executable = pip_executable[:-4]
self._pip_executable = pip_executable
def find_executables(self) -> None:
self._find_python_executable()
self._find_pip_executable()
def get_embedded_wheel(self, distribution: str) -> Path:
wheel: Wheel = get_embed_wheel(
distribution, f"{self.version_info[0]}.{self.version_info[1]}"
)
path: Path = wheel.path
return path
@property
def pip_embedded(self) -> str:
if self._embedded_pip_path is None:
self._embedded_pip_path = str(self.get_embedded_wheel("pip") / "pip"
)
return self._embedded_pip_path
@property @property
def pip(self): # type: () -> str def pip(self) -> str:
""" """
Path to current pip executable Path to current pip executable
""" """
return self._bin(self._pip_executable) # we do not use as_posix() here due to issues with windows pathlib2
# implementation
path = self._bin(self._pip_executable)
if not Path(path).exists():
return str(self.pip_embedded)
return path
@property @property
def platform(self): # type: () -> str def platform(self) -> str:
return sys.platform return sys.platform
@property @property
def os(self): # type: () -> str def os(self) -> str:
return os.name return os.name
@property @property
def pip_version(self): def pip_version(self) -> Version:
if self._pip_version is None: if self._pip_version is None:
self._pip_version = self.get_pip_version() self._pip_version = self.get_pip_version()
return self._pip_version return self._pip_version
@property @property
def site_packages(self): # type: () -> SitePackages def site_packages(self) -> SitePackages:
if self._site_packages is None: if self._site_packages is None:
# we disable write checks if no user site exist # we disable write checks if no user site exist
fallbacks = [self.usersite] if self.usersite else [] fallbacks = [self.usersite] if self.usersite else []
self._site_packages = SitePackages( self._site_packages = SitePackages(
self.purelib, fallbacks, skip_write_checks=False if fallbacks el self.purelib,
se True self.platlib,
fallbacks,
skip_write_checks=not fallbacks,
) )
return self._site_packages return self._site_packages
@property @property
def usersite(self): # type: () -> Optional[Path] def usersite(self) -> Path | None:
if "usersite" in self.paths: if "usersite" in self.paths:
return Path(self.paths["usersite"]) return Path(self.paths["usersite"])
return None
@property @property
def userbase(self): # type: () -> Optional[Path] def userbase(self) -> Path | None:
if "userbase" in self.paths: if "userbase" in self.paths:
return Path(self.paths["userbase"]) return Path(self.paths["userbase"])
return None
@property @property
def purelib(self): # type: () -> Path def purelib(self) -> Path:
if self._purelib is None: if self._purelib is None:
self._purelib = Path(self.paths["purelib"]) self._purelib = Path(self.paths["purelib"])
return self._purelib return self._purelib
@property @property
def platlib(self): # type: () -> Path def platlib(self) -> Path:
if self._platlib is None: if self._platlib is None:
if "platlib" in self.paths: if "platlib" in self.paths:
self._platlib = Path(self.paths["platlib"]) self._platlib = Path(self.paths["platlib"])
else: else:
self._platlib = self.purelib self._platlib = self.purelib
return self._platlib return self._platlib
def is_path_relative_to_lib(self, path): # type: (Path) -> bool def is_path_relative_to_lib(self, path: Path) -> bool:
for lib_path in [self.purelib, self.platlib]: for lib_path in [self.purelib, self.platlib]:
try: with contextlib.suppress(ValueError):
path.relative_to(lib_path) path.relative_to(lib_path)
return True return True
except ValueError:
pass
return False return False
@property @property
def sys_path(self): # type: () -> List[str] def sys_path(self) -> list[str]:
raise NotImplementedError() raise NotImplementedError()
@property @property
def paths(self): # type: () -> Dict[str, str] def paths(self) -> dict[str, str]:
if self._paths is None: if self._paths is None:
self._paths = self.get_paths() self._paths = self.get_paths()
return self._paths return self._paths
@property @property
def supported_tags(self): # type: () -> List[Tag] def supported_tags(self) -> list[Tag]:
if self._supported_tags is None: if self._supported_tags is None:
self._supported_tags = self.get_supported_tags() self._supported_tags = self.get_supported_tags()
return self._supported_tags return self._supported_tags
@classmethod @classmethod
def get_base_prefix(cls): # type: () -> str def get_base_prefix(cls) -> Path:
if hasattr(sys, "real_prefix"): real_prefix = getattr(sys, "real_prefix", None)
return sys.real_prefix if real_prefix is not None:
return Path(real_prefix)
if hasattr(sys, "base_prefix"):
return sys.base_prefix base_prefix = getattr(sys, "base_prefix", None)
if base_prefix is not None:
return sys.prefix return Path(base_prefix)
def _find_python_executable(self): # type: () -> None
bin_dir = self._bin_dir
if self._is_windows and self._is_conda:
bin_dir = self._path
python_executables = sorted(
p.name
for p in bin_dir.glob("python*")
if re.match(r"python(?:\d+(?:\.\d+)?)?(?:\.exe)?$", p.name)
)
if python_executables:
executable = python_executables[0]
if executable.endswith(".exe"):
executable = executable[:-4]
self._executable = executable
def _find_pip_executable(self): # type: () -> None
pip_executables = sorted(
p.name
for p in self._bin_dir.glob("pip*")
if re.match(r"pip(?:\d+(?:\.\d+)?)?(?:\.exe)?$", p.name)
)
if pip_executables:
pip_executable = pip_executables[0]
if pip_executable.endswith(".exe"):
pip_executable = pip_executable[:-4]
self._pip_executable = pip_executable return Path(sys.prefix)
def find_executables(self): # type: () -> None def get_version_info(self) -> tuple[Any, ...]:
self._find_python_executable()
self._find_pip_executable()
def get_version_info(self): # type: () -> Tuple[int]
raise NotImplementedError() raise NotImplementedError()
def get_python_implementation(self): # type: () -> str def get_python_implementation(self) -> str:
raise NotImplementedError() raise NotImplementedError()
def get_marker_env(self): # type: () -> Dict[str, Any] def get_marker_env(self) -> dict[str, Any]:
raise NotImplementedError() raise NotImplementedError()
def get_pip_command(self): # type: () -> List[str] def get_pip_command(self, embedded: bool = False) -> list[str]:
raise NotImplementedError() raise NotImplementedError()
def get_supported_tags(self): # type: () -> List[Tag] def get_supported_tags(self) -> list[Tag]:
raise NotImplementedError() raise NotImplementedError()
def get_pip_version(self): # type: () -> Version def get_pip_version(self) -> Version:
raise NotImplementedError() raise NotImplementedError()
def get_paths(self): # type: () -> Dict[str, str] def get_paths(self) -> dict[str, str]:
raise NotImplementedError() raise NotImplementedError()
def is_valid_for_marker(self, marker): # type: (BaseMarker) -> bool def is_valid_for_marker(self, marker: BaseMarker) -> bool:
return marker.validate(self.marker_env) valid: bool = marker.validate(self.marker_env)
return valid
def is_sane(self): # type: () -> bool def is_sane(self) -> bool:
""" """
Checks whether the current environment is sane or not. Checks whether the current environment is sane or not.
""" """
return True return True
def run(self, bin, *args, **kwargs): def get_command_from_bin(self, bin: str) -> list[str]:
bin = self._bin(bin) if bin == "pip":
cmd = [bin] + list(args) # when pip is required we need to ensure that we fallback to
return self._run(cmd, **kwargs) # embedded pip when pip is not available in the environment
return self.get_pip_command()
def run_python(self, *args, **kwargs): return [self._bin(bin)]
return self.run(self._executable, *args, **kwargs)
def run_pip(self, *args, **kwargs): def run(self, bin: str, *args: str, **kwargs: Any) -> str | int:
pip = self.get_pip_command() cmd = self.get_command_from_bin(bin) + list(args)
return self._run(cmd, **kwargs)
def run_pip(self, *args: str, **kwargs: Any) -> int | str:
pip = self.get_pip_command(embedded=True)
cmd = pip + list(args) cmd = pip + list(args)
return self._run(cmd, **kwargs) return self._run(cmd, **kwargs)
def run_python_script(self, content, **kwargs): # type: (str, Any) -> str def run_python_script(self, content: str, **kwargs: Any) -> int | str:
return self.run(self._executable, "-W", "ignore", "-", input_=content, * *kwargs) return self.run(self._executable, "-W", "ignore", "-", input_=content, * *kwargs)
def _run(self, cmd, **kwargs): def _run(self, cmd: list[str], **kwargs: Any) -> int | str:
""" """
Run a command inside the Python environment. Run a command inside the Python environment.
""" """
call = kwargs.pop("call", False) call = kwargs.pop("call", False)
input_ = kwargs.pop("input_", None) input_ = kwargs.pop("input_", None)
env = kwargs.pop("env", dict(os.environ))
try: try:
if self._is_windows: if self._is_windows:
kwargs["shell"] = True kwargs["shell"] = True
command: str | list[str]
if kwargs.get("shell", False): if kwargs.get("shell", False):
cmd = list_to_shell_command(cmd) command = list_to_shell_command(cmd)
else:
command = cmd
if input_: if input_:
output = subprocess.run( output = subprocess.run(
cmd, command,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, stderr=subprocess.STDOUT,
input=encode(input_), input=encode(input_),
check=True, check=True,
**kwargs **kwargs,
).stdout ).stdout
elif call: elif call:
return subprocess.call(cmd, stderr=subprocess.STDOUT, **kwargs) return subprocess.call(
command, stderr=subprocess.STDOUT, env=env, **kwargs
)
else: else:
output = subprocess.check_output( output = subprocess.check_output(
cmd, stderr=subprocess.STDOUT, **kwargs command, stderr=subprocess.STDOUT, env=env, **kwargs
) )
except CalledProcessError as e: except CalledProcessError as e:
raise EnvCommandError(e, input=input_) raise EnvCommandError(e, input=input_)
return decode(output) return decode(output)
def execute(self, bin, *args, **kwargs): def execute(self, bin: str, *args: str, **kwargs: Any) -> int:
bin = self._bin(bin) command = self.get_command_from_bin(bin) + list(args)
env = kwargs.pop("env", {k: v for k, v in os.environ.items()}) env = kwargs.pop("env", dict(os.environ))
if not self._is_windows: if not self._is_windows:
args = [bin] + list(args) return os.execvpe(command[0], command, env=env)
return os.execvpe(bin, args, env=env)
else:
exe = subprocess.Popen([bin] + list(args), env=env, **kwargs)
exe.communicate()
return exe.returncode
def is_venv(self): # type: () -> bool kwargs["shell"] = True
exe = subprocess.Popen([command[0]] + command[1:], env=env, **kwargs)
exe.communicate()
return exe.returncode
def is_venv(self) -> bool:
raise NotImplementedError() raise NotImplementedError()
@property @property
def script_dirs(self): # type: () -> List[Path] def script_dirs(self) -> list[Path]:
if self._script_dirs is None: if self._script_dirs is None:
self._script_dirs = ( scripts = self.paths.get("scripts")
[Path(self.paths["scripts"])] self._script_dirs = [
if "scripts" in self.paths Path(scripts) if scripts is not None else self._bin_dir
else self._bin_dir ]
)
if self.userbase: if self.userbase:
self._script_dirs.append(self.userbase / self._script_dirs[0].na me) self._script_dirs.append(self.userbase / self._script_dirs[0].na me)
return self._script_dirs return self._script_dirs
def _bin(self, bin): # type: (str) -> str def _bin(self, bin: str) -> str:
""" """
Return path to the given executable. Return path to the given executable.
""" """
if self._is_windows and not bin.endswith(".exe"): if self._is_windows and not bin.endswith(".exe"):
bin_path = self._bin_dir / (bin + ".exe") bin_path = self._bin_dir / (bin + ".exe")
else: else:
bin_path = self._bin_dir / bin bin_path = self._bin_dir / bin
if not bin_path.exists(): if not bin_path.exists():
# On Windows, some executables can be in the base path # On Windows, some executables can be in the base path
# This is especially true when installing Python with # This is especially true when installing Python with
# the official installer, where python.exe will be at # the official installer, where python.exe will be at
# the root of the env path. # the root of the env path.
# This is an edge case and should not be encountered
# in normal uses but this happens in the sonnet script
# that creates a fake virtual environment pointing to
# a base Python install.
if self._is_windows: if self._is_windows:
if not bin.endswith(".exe"): if not bin.endswith(".exe"):
bin_path = self._path / (bin + ".exe") bin_path = self._path / (bin + ".exe")
else: else:
bin_path = self._path / bin bin_path = self._path / bin
if bin_path.exists(): if bin_path.exists():
return str(bin_path) return str(bin_path)
return bin return bin
return str(bin_path) return str(bin_path)
def __eq__(self, other): # type: (Env) -> bool def __eq__(self, other: object) -> bool:
if not isinstance(other, Env):
return False
return other.__class__ == self.__class__ and other.path == self.path return other.__class__ == self.__class__ and other.path == self.path
def __repr__(self): def __repr__(self) -> str:
return '{}("{}")'.format(self.__class__.__name__, self._path) return f'{self.__class__.__name__}("{self._path}")'
class SystemEnv(Env): class SystemEnv(Env):
""" """
A system (i.e. not a virtualenv) Python environment. A system (i.e. not a virtualenv) Python environment.
""" """
@property @property
def python(self): # type: () -> str def python(self) -> str:
return sys.executable return sys.executable
@property @property
def sys_path(self): # type: () -> List[str] def sys_path(self) -> list[str]:
return sys.path return sys.path
def get_version_info(self): # type: () -> Tuple[int] def get_version_info(self) -> tuple[Any, ...]:
return sys.version_info return tuple(sys.version_info)
def get_python_implementation(self): # type: () -> str def get_python_implementation(self) -> str:
return platform.python_implementation() return platform.python_implementation()
def get_pip_command(self): # type: () -> List[str] def get_pip_command(self, embedded: bool = False) -> list[str]:
# If we're not in a venv, assume the interpreter we're running on # If we're not in a venv, assume the interpreter we're running on
# has a pip and use that # has a pip and use that
return [sys.executable, "-m", "pip"] return [sys.executable, self.pip_embedded if embedded else self.pip]
def get_paths(self): # type: () -> Dict[str, str] def get_paths(self) -> dict[str, str]:
# We can't use sysconfig.get_paths() because # We can't use sysconfig.get_paths() because
# on some distributions it does not return the proper paths # on some distributions it does not return the proper paths
# (those used by pip for instance). We go through distutils # (those used by pip for instance). We go through distutils
# to get the proper ones. # to get the proper ones.
import site import site
from distutils.command.install import SCHEME_KEYS # noqa from distutils.command.install import SCHEME_KEYS
from distutils.core import Distribution from distutils.core import Distribution
d = Distribution() d = Distribution()
d.parse_config_files() d.parse_config_files()
obj = d.get_command_obj("install", create=True) with warnings.catch_warnings():
warnings.filterwarnings("ignore", "setup.py install is deprecated")
obj = d.get_command_obj("install", create=True)
assert obj is not None
obj.finalize_options() obj.finalize_options()
paths = sysconfig.get_paths().copy() paths = sysconfig.get_paths().copy()
for key in SCHEME_KEYS: for key in SCHEME_KEYS:
if key == "headers": if key == "headers":
# headers is not a path returned by sysconfig.get_paths() # headers is not a path returned by sysconfig.get_paths()
continue continue
paths[key] = getattr(obj, "install_{}".format(key)) paths[key] = getattr(obj, f"install_{key}")
if site.check_enableusersite() and hasattr(obj, "install_usersite"): if site.check_enableusersite():
paths["usersite"] = getattr(obj, "install_usersite") usersite = getattr(obj, "install_usersite", None)
paths["userbase"] = getattr(obj, "install_userbase") userbase = getattr(obj, "install_userbase", None)
if usersite is not None and userbase is not None:
paths["usersite"] = usersite
paths["userbase"] = userbase
return paths return paths
def get_supported_tags(self): # type: () -> List[Tag] def get_supported_tags(self) -> list[Tag]:
return list(sys_tags()) return list(sys_tags())
def get_marker_env(self): # type: () -> Dict[str, Any] def get_marker_env(self) -> dict[str, Any]:
if hasattr(sys, "implementation"): if hasattr(sys, "implementation"):
info = sys.implementation.version info = sys.implementation.version
iver = "{0.major}.{0.minor}.{0.micro}".format(info) iver = f"{info.major}.{info.minor}.{info.micro}"
kind = info.releaselevel kind = info.releaselevel
if kind != "final": if kind != "final":
iver += kind[0] + str(info.serial) iver += kind[0] + str(info.serial)
implementation_name = sys.implementation.name implementation_name = sys.implementation.name
else: else:
iver = "0" iver = "0"
implementation_name = "" implementation_name = ""
return { return {
"implementation_name": implementation_name, "implementation_name": implementation_name,
"implementation_version": iver, "implementation_version": iver,
"os_name": os.name, "os_name": os.name,
"platform_machine": platform.machine(), "platform_machine": platform.machine(),
"platform_release": platform.release(), "platform_release": platform.release(),
"platform_system": platform.system(), "platform_system": platform.system(),
"platform_version": platform.version(), "platform_version": platform.version(),
"python_full_version": platform.python_version(), "python_full_version": platform.python_version(),
"platform_python_implementation": platform.python_implementation(), "platform_python_implementation": platform.python_implementation(),
"python_version": ".".join( "python_version": ".".join(platform.python_version().split(".")[:2])
v for v in platform.python_version().split(".")[:2] ,
),
"sys_platform": sys.platform, "sys_platform": sys.platform,
"version_info": sys.version_info, "version_info": sys.version_info,
# Extra information
"interpreter_name": interpreter_name(), "interpreter_name": interpreter_name(),
"interpreter_version": interpreter_version(), "interpreter_version": interpreter_version(),
} }
def get_pip_version(self): # type: () -> Version def get_pip_version(self) -> Version:
from pip import __version__ from pip import __version__
return Version.parse(__version__) return Version.parse(__version__)
def is_venv(self): # type: () -> bool def is_venv(self) -> bool:
return self._path != self._base return self._path != self._base
class VirtualEnv(Env): class VirtualEnv(Env):
""" """
A virtual Python environment. A virtual Python environment.
""" """
def __init__(self, path, base=None): # type: (Path, Optional[Path]) -> None def __init__(self, path: Path, base: Path | None = None) -> None:
super(VirtualEnv, self).__init__(path, base) super().__init__(path, base)
# If base is None, it probably means this is # If base is None, it probably means this is
# a virtualenv created from VIRTUAL_ENV. # a virtualenv created from VIRTUAL_ENV.
# In this case we need to get sys.base_prefix # In this case we need to get sys.base_prefix
# from inside the virtualenv. # from inside the virtualenv.
if base is None: if base is None:
self._base = Path(self.run_python_script(GET_BASE_PREFIX).strip()) output = self.run_python_script(GET_BASE_PREFIX)
assert isinstance(output, str)
self._base = Path(output.strip())
@property @property
def sys_path(self): # type: () -> List[str] def sys_path(self) -> list[str]:
output = self.run_python_script(GET_SYS_PATH) output = self.run_python_script(GET_SYS_PATH)
assert isinstance(output, str)
paths: list[str] = json.loads(output)
return paths
return json.loads(output) def get_version_info(self) -> tuple[Any, ...]:
def get_version_info(self): # type: () -> Tuple[int]
output = self.run_python_script(GET_PYTHON_VERSION) output = self.run_python_script(GET_PYTHON_VERSION)
assert isinstance(output, str)
return tuple([int(s) for s in output.strip().split(".")]) return tuple(int(s) for s in output.strip().split("."))
def get_python_implementation(self): # type: () -> str def get_python_implementation(self) -> str:
return self.marker_env["platform_python_implementation"] implementation: str = self.marker_env["platform_python_implementation"]
return implementation
def get_pip_command(self): # type: () -> List[str] def get_pip_command(self, embedded: bool = False) -> list[str]:
# We're in a virtualenv that is known to be sane, # We're in a virtualenv that is known to be sane,
# so assume that we have a functional pip # so assume that we have a functional pip
return [self._bin(self._pip_executable)] return [
self._bin(self._executable),
def get_supported_tags(self): # type: () -> List[Tag] self.pip_embedded if embedded else self.pip,
file_path = Path(packaging.tags.__file__) ]
if file_path.suffix == ".pyc":
# Python 2
file_path = file_path.with_suffix(".py")
with file_path.open(encoding="utf-8") as f:
script = decode(f.read())
script = script.replace(
"from ._typing import TYPE_CHECKING, cast",
"TYPE_CHECKING = False\ncast = lambda type_, value: value",
)
script = script.replace(
"from ._typing import MYPY_CHECK_RUNNING, cast",
"MYPY_CHECK_RUNNING = False\ncast = lambda type_, value: value",
)
script += textwrap.dedent(
"""
import json
print(json.dumps([(t.interpreter, t.abi, t.platform) for t in sys_ta
gs()]))
"""
)
output = self.run_python_script(script) def get_supported_tags(self) -> list[Tag]:
output = self.run_python_script(GET_SYS_TAGS)
assert isinstance(output, str)
return [Tag(*t) for t in json.loads(output)] return [Tag(*t) for t in json.loads(output)]
def get_marker_env(self): # type: () -> Dict[str, Any] def get_marker_env(self) -> dict[str, Any]:
output = self.run(self._executable, "-", input_=GET_ENVIRONMENT_INFO) output = self.run_python_script(GET_ENVIRONMENT_INFO)
assert isinstance(output, str)
env: dict[str, Any] = json.loads(output)
return env
return json.loads(output) def get_pip_version(self) -> Version:
output = self.run_pip("--version")
assert isinstance(output, str)
output = output.strip()
def get_pip_version(self): # type: () -> Version
output = self.run_pip("--version").strip()
m = re.match("pip (.+?)(?: from .+)?$", output) m = re.match("pip (.+?)(?: from .+)?$", output)
if not m: if not m:
return Version.parse("0.0") return Version.parse("0.0")
return Version.parse(m.group(1)) return Version.parse(m.group(1))
def get_paths(self): # type: () -> Dict[str, str] def get_paths(self) -> dict[str, str]:
output = self.run_python_script(GET_PATHS) output = self.run_python_script(GET_PATHS)
assert isinstance(output, str)
paths: dict[str, str] = json.loads(output)
return paths
return json.loads(output) def is_venv(self) -> bool:
def is_venv(self): # type: () -> bool
return True return True
def is_sane(self): def is_sane(self) -> bool:
# A virtualenv is considered sane if both "python" and "pip" exist. # A virtualenv is considered sane if "python" exists.
return os.path.exists(self.python) and os.path.exists(self._bin("pip")) return os.path.exists(self.python)
def _run(self, cmd, **kwargs): def _run(self, cmd: list[str], **kwargs: Any) -> int | str:
kwargs["env"] = self.get_temp_environ(environ=kwargs.get("env")) kwargs["env"] = self.get_temp_environ(environ=kwargs.get("env"))
return super(VirtualEnv, self)._run(cmd, **kwargs) return super()._run(cmd, **kwargs)
def get_temp_environ( def get_temp_environ(
self, environ=None, exclude=None, **kwargs self,
): # type: (Optional[Dict[str, str]], Optional[List[str]], **str) -> Dict[s environ: dict[str, str] | None = None,
tr, str] exclude: list[str] | None = None,
**kwargs: str,
) -> dict[str, str]:
exclude = exclude or [] exclude = exclude or []
exclude.extend(["PYTHONHOME", "__PYVENV_LAUNCHER__"]) exclude.extend(["PYTHONHOME", "__PYVENV_LAUNCHER__"])
if environ: if environ:
environ = deepcopy(environ) environ = deepcopy(environ)
for key in exclude: for key in exclude:
environ.pop(key, None) environ.pop(key, None)
else: else:
environ = {k: v for k, v in os.environ.items() if k not in exclude} environ = {k: v for k, v in os.environ.items() if k not in exclude}
environ.update(kwargs) environ.update(kwargs)
environ["PATH"] = self._updated_path() environ["PATH"] = self._updated_path()
environ["VIRTUAL_ENV"] = str(self._path) environ["VIRTUAL_ENV"] = str(self._path)
return environ return environ
def execute(self, bin, *args, **kwargs): def execute(self, bin: str, *args: str, **kwargs: Any) -> int:
kwargs["env"] = self.get_temp_environ(environ=kwargs.get("env")) kwargs["env"] = self.get_temp_environ(environ=kwargs.get("env"))
return super(VirtualEnv, self).execute(bin, *args, **kwargs) return super().execute(bin, *args, **kwargs)
@contextmanager @contextmanager
def temp_environ(self): def temp_environ(self) -> Iterator[None]:
environ = dict(os.environ) environ = dict(os.environ)
try: try:
yield yield
finally: finally:
os.environ.clear() os.environ.clear()
os.environ.update(environ) os.environ.update(environ)
def _updated_path(self): def _updated_path(self) -> str:
return os.pathsep.join([str(self._bin_dir), os.environ.get("PATH", "")]) return os.pathsep.join([str(self._bin_dir), os.environ.get("PATH", "")])
class GenericEnv(VirtualEnv): class GenericEnv(VirtualEnv):
def __init__( def __init__(
self, path, base=None, child_env=None self, path: Path, base: Path | None = None, child_env: Env | None = None
): # type: (Path, Optional[Path], Optional[Env]) -> None ) -> None:
self._child_env = child_env self._child_env = child_env
super(GenericEnv, self).__init__(path, base=base) super().__init__(path, base=base)
def find_executables(self): # type: () -> None def find_executables(self) -> None:
patterns = [("python*", "pip*")] patterns = [("python*", "pip*")]
if self._child_env: if self._child_env:
minor_version = "{}.{}".format( minor_version = (
self._child_env.version_info[0], self._child_env.version_info[1] f"{self._child_env.version_info[0]}.{self._child_env.version_inf
o[1]}"
) )
major_version = "{}".format(self._child_env.version_info[0]) major_version = f"{self._child_env.version_info[0]}"
patterns = [ patterns = [
("python{}".format(minor_version), "pip{}".format(minor_version) (f"python{minor_version}", f"pip{minor_version}"),
), (f"python{major_version}", f"pip{major_version}"),
("python{}".format(major_version), "pip{}".format(major_version)
),
] ]
python_executable = None python_executable = None
pip_executable = None pip_executable = None
for python_pattern, pip_pattern in patterns: for python_pattern, pip_pattern in patterns:
if python_executable and pip_executable: if python_executable and pip_executable:
break break
if not python_executable: if not python_executable:
python_executables = sorted( python_executables = sorted(
[ p.name
p.name for p in self._bin_dir.glob(python_pattern)
for p in self._bin_dir.glob(python_pattern) if re.match(r"python(?:\d+(?:\.\d+)?)?(?:\.exe)?$", p.name)
if re.match(r"python(?:\d+(?:\.\d+)?)?(?:\.exe)?$", p.na
me)
]
) )
if python_executables: if python_executables:
executable = python_executables[0] executable = python_executables[0]
if executable.endswith(".exe"): if executable.endswith(".exe"):
executable = executable[:-4] executable = executable[:-4]
python_executable = executable python_executable = executable
if not pip_executable: if not pip_executable:
pip_executables = sorted( pip_executables = sorted(
[ p.name
p.name for p in self._bin_dir.glob(pip_pattern)
for p in self._bin_dir.glob(pip_pattern) if re.match(r"pip(?:\d+(?:\.\d+)?)?(?:\.exe)?$", p.name)
if re.match(r"pip(?:\d+(?:\.\d+)?)?(?:\.exe)?$", p.name)
]
) )
if pip_executables: if pip_executables:
pip_executable = pip_executables[0] pip_executable = pip_executables[0]
if pip_executable.endswith(".exe"): if pip_executable.endswith(".exe"):
pip_executable = pip_executable[:-4] pip_executable = pip_executable[:-4]
pip_executable = pip_executable
if python_executable: if python_executable:
self._executable = python_executable self._executable = python_executable
if pip_executable: if pip_executable:
self._pip_executable = pip_executable self._pip_executable = pip_executable
def get_paths(self): # type: () -> Dict[str, str] def get_paths(self) -> dict[str, str]:
output = self.run_python_script(GET_PATHS_FOR_GENERIC_ENVS) output = self.run_python_script(GET_PATHS_FOR_GENERIC_ENVS)
assert isinstance(output, str)
paths: dict[str, str] = json.loads(output)
return paths
def execute(self, bin: str, *args: str, **kwargs: Any) -> int:
command = self.get_command_from_bin(bin) + list(args)
env = kwargs.pop("env", dict(os.environ))
if not self._is_windows:
return os.execvpe(command[0], command, env=env)
return json.loads(output) exe = subprocess.Popen([command[0]] + command[1:], env=env, **kwargs)
exe.communicate()
def execute(self, bin, *args, **kwargs): # type: (str, str, Any) -> Optiona return exe.returncode
l[int]
return super(VirtualEnv, self).execute(bin, *args, **kwargs)
def _run(self, cmd, **kwargs): # type: (List[str], Any) -> Optional[int] def _run(self, cmd: list[str], **kwargs: Any) -> int | str:
return super(VirtualEnv, self)._run(cmd, **kwargs) return super(VirtualEnv, self)._run(cmd, **kwargs)
def is_venv(self): # type: () -> bool def is_venv(self) -> bool:
return self._path != self._base return self._path != self._base
class NullEnv(SystemEnv): class NullEnv(SystemEnv):
def __init__(self, path=None, base=None, execute=False): def __init__(
self, path: Path | None = None, base: Path | None = None, execute: bool
= False
) -> None:
if path is None: if path is None:
path = Path(sys.prefix) path = Path(sys.prefix)
super(NullEnv, self).__init__(path, base=base) super().__init__(path, base=base)
self._execute = execute self._execute = execute
self.executed = [] self.executed: list[list[str]] = []
def get_pip_command(self): # type: () -> List[str] def get_pip_command(self, embedded: bool = False) -> list[str]:
return [self._bin("python"), "-m", "pip"] return [
self._bin(self._executable),
self.pip_embedded if embedded else self.pip,
]
def _run(self, cmd, **kwargs): def _run(self, cmd: list[str], **kwargs: Any) -> int | str:
self.executed.append(cmd) self.executed.append(cmd)
if self._execute: if self._execute:
return super(NullEnv, self)._run(cmd, **kwargs) return super()._run(cmd, **kwargs)
return 0
def execute(self, bin, *args, **kwargs): def execute(self, bin: str, *args: str, **kwargs: Any) -> int:
self.executed.append([bin] + list(args)) self.executed.append([bin] + list(args))
if self._execute: if self._execute:
return super(NullEnv, self).execute(bin, *args, **kwargs) return super().execute(bin, *args, **kwargs)
return 0
def _bin(self, bin): def _bin(self, bin: str) -> str:
return bin return bin
@contextmanager
def ephemeral_environment(
executable: str | Path | None = None,
flags: dict[str, bool] | None = None,
) -> Iterator[VirtualEnv]:
with temporary_directory() as tmp_dir:
# TODO: cache PEP 517 build environment corresponding to each project ve
nv
venv_dir = Path(tmp_dir) / ".venv"
EnvManager.build_venv(
path=venv_dir.as_posix(),
executable=executable,
flags=flags,
)
yield VirtualEnv(venv_dir, venv_dir)
@contextmanager
def build_environment(
poetry: CorePoetry, env: Env | None = None, io: IO | None = None
) -> Iterator[Env]:
"""
If a build script is specified for the project, there could be additional bu
ild
time dependencies, eg: cython, setuptools etc. In these cases, we create an
ephemeral build environment with all requirements specified under
`build-system.requires` and return this. Otherwise, the given default projec
t
environment is returned.
"""
if not env or poetry.package.build_script:
with ephemeral_environment(executable=env.python if env else None) as ve
nv:
overwrite = (
io is not None and io.output.is_decorated() and not io.is_debug(
)
)
if io:
if not overwrite:
io.write_line("")
requires = [
f"<c1>{requirement}</c1>"
for requirement in poetry.pyproject.build_system.requires
]
io.overwrite(
"<b>Preparing</b> build environment with build-system requir
ements"
f" {', '.join(requires)}"
)
venv.run_pip(
"install",
"--disable-pip-version-check",
"--ignore-installed",
*poetry.pyproject.build_system.requires,
)
if overwrite:
assert io is not None
io.write_line("")
yield venv
else:
yield env
class MockEnv(NullEnv): class MockEnv(NullEnv):
def __init__( def __init__(
self, self,
version_info=(3, 7, 0), version_info: tuple[int, int, int] = (3, 7, 0),
python_implementation="CPython", python_implementation: str = "CPython",
platform="darwin", platform: str = "darwin",
os_name="posix", os_name: str = "posix",
is_venv=False, is_venv: bool = False,
pip_version="19.1", pip_version: str = "19.1",
sys_path=None, sys_path: list[str] | None = None,
marker_env=None, marker_env: dict[str, Any] | None = None,
supported_tags=None, supported_tags: list[Tag] | None = None,
**kwargs **kwargs: Any,
): ) -> None:
super(MockEnv, self).__init__(**kwargs) super().__init__(**kwargs)
self._version_info = version_info self._version_info = version_info
self._python_implementation = python_implementation self._python_implementation = python_implementation
self._platform = platform self._platform = platform
self._os_name = os_name self._os_name = os_name
self._is_venv = is_venv self._is_venv = is_venv
self._pip_version = Version.parse(pip_version) self._pip_version: Version = Version.parse(pip_version)
self._sys_path = sys_path self._sys_path = sys_path
self._mock_marker_env = marker_env self._mock_marker_env = marker_env
self._supported_tags = supported_tags self._supported_tags = supported_tags
@property @property
def platform(self): # type: () -> str def platform(self) -> str:
return self._platform return self._platform
@property @property
def os(self): # type: () -> str def os(self) -> str:
return self._os_name return self._os_name
@property @property
def pip_version(self): def pip_version(self) -> Version:
return self._pip_version return self._pip_version
@property @property
def sys_path(self): def sys_path(self) -> list[str]:
if self._sys_path is None: if self._sys_path is None:
return super(MockEnv, self).sys_path return super().sys_path
return self._sys_path return self._sys_path
def get_marker_env(self): # type: () -> Dict[str, Any] def get_marker_env(self) -> dict[str, Any]:
if self._mock_marker_env is not None: if self._mock_marker_env is not None:
return self._mock_marker_env return self._mock_marker_env
marker_env = super(MockEnv, self).get_marker_env() marker_env = super().get_marker_env()
marker_env["python_implementation"] = self._python_implementation marker_env["python_implementation"] = self._python_implementation
marker_env["version_info"] = self._version_info marker_env["version_info"] = self._version_info
marker_env["python_version"] = ".".join(str(v) for v in self._version_in fo[:2]) marker_env["python_version"] = ".".join(str(v) for v in self._version_in fo[:2])
marker_env["python_full_version"] = ".".join(str(v) for v in self._versi on_info) marker_env["python_full_version"] = ".".join(str(v) for v in self._versi on_info)
marker_env["sys_platform"] = self._platform marker_env["sys_platform"] = self._platform
marker_env["interpreter_name"] = self._python_implementation.lower() marker_env["interpreter_name"] = self._python_implementation.lower()
marker_env["interpreter_version"] = "cp" + "".join( marker_env["interpreter_version"] = "cp" + "".join(
str(v) for v in self._version_info[:2] str(v) for v in self._version_info[:2]
) )
return marker_env return marker_env
def is_venv(self): # type: () -> bool def is_venv(self) -> bool:
return self._is_venv return self._is_venv
 End of changes. 263 change blocks. 
533 lines changed or deleted 887 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)