"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "poetry/installation/executor.py" between
poetry-1.1.15.tar.gz and poetry-1.2.0.tar.gz

About: Poetry is a tool for dependency management and packaging in Python.

executor.py  (poetry-1.1.15):executor.py  (poetry-1.2.0)
# -*- coding: utf-8 -*- from __future__ import annotations
from __future__ import division
import csv
import itertools import itertools
import json
import os import os
import threading import threading
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import wait from concurrent.futures import wait
from pathlib import Path
from subprocess import CalledProcessError from subprocess import CalledProcessError
from typing import TYPE_CHECKING
from typing import Any
from typing import cast
from cleo.io.null_io import NullIO
from poetry.core.packages.file_dependency import FileDependency from poetry.core.packages.file_dependency import FileDependency
from poetry.core.packages.utils.link import Link from poetry.core.packages.utils.link import Link
from poetry.core.packages.utils.utils import url_to_path
from poetry.core.pyproject.toml import PyProjectTOML from poetry.core.pyproject.toml import PyProjectTOML
from poetry.io.null_io import NullIO
from poetry.utils._compat import PY2 from poetry.installation.chef import Chef
from poetry.utils._compat import WINDOWS from poetry.installation.chooser import Chooser
from poetry.utils._compat import OrderedDict from poetry.installation.operations import Install
from poetry.utils._compat import Path from poetry.installation.operations import Uninstall
from poetry.utils._compat import cpu_count from poetry.installation.operations import Update
from poetry.utils._compat import decode from poetry.utils._compat import decode
from poetry.utils.authenticator import Authenticator
from poetry.utils.env import EnvCommandError from poetry.utils.env import EnvCommandError
from poetry.utils.helpers import safe_rmtree from poetry.utils.helpers import pluralize
from poetry.utils.helpers import remove_directory
from .authenticator import Authenticator from poetry.utils.pip import pip_install
from .chef import Chef
from .chooser import Chooser if TYPE_CHECKING:
from .operations.install import Install from cleo.io.io import IO
from .operations.operation import Operation from cleo.io.outputs.section_output import SectionOutput
from .operations.uninstall import Uninstall from poetry.core.masonry.builders.builder import Builder
from .operations.update import Update from poetry.core.packages.package import Package
class Executor(object): from poetry.config.config import Config
def __init__(self, env, pool, config, io, parallel=None): from poetry.installation.operations.operation import Operation
from poetry.repositories import Pool
from poetry.utils.env import Env
class Executor:
def __init__(
self,
env: Env,
pool: Pool,
config: Config,
io: IO,
parallel: bool | None = None,
) -> None:
self._env = env self._env = env
self._io = io self._io = io
self._dry_run = False self._dry_run = False
self._enabled = True self._enabled = True
self._verbose = False self._verbose = False
self._authenticator = Authenticator(config, self._io) self._authenticator = Authenticator(config, self._io)
self._chef = Chef(config, self._env) self._chef = Chef(config, self._env)
self._chooser = Chooser(pool, self._env) self._chooser = Chooser(pool, self._env, config)
if parallel is None: if parallel is None:
parallel = config.get("installer.parallel", True) parallel = config.get("installer.parallel", True)
if parallel and not (PY2 and WINDOWS): if parallel:
# This should be directly handled by ThreadPoolExecutor self._max_workers = self._get_max_workers(
# however, on some systems the number of CPUs cannot be determined desired_max_workers=config.get("installer.max-workers")
# (it raises a NotImplementedError), so, in this case, we assume )
# that the system only has one CPU.
try:
self._max_workers = cpu_count() + 4
except NotImplementedError:
self._max_workers = 5
else: else:
self._max_workers = 1 self._max_workers = 1
self._executor = ThreadPoolExecutor(max_workers=self._max_workers) self._executor = ThreadPoolExecutor(max_workers=self._max_workers)
self._total_operations = 0 self._total_operations = 0
self._executed_operations = 0 self._executed_operations = 0
self._executed = {"install": 0, "update": 0, "uninstall": 0} self._executed = {"install": 0, "update": 0, "uninstall": 0}
self._skipped = {"install": 0, "update": 0, "uninstall": 0} self._skipped = {"install": 0, "update": 0, "uninstall": 0}
self._sections = OrderedDict() self._sections: dict[int, SectionOutput] = {}
self._yanked_warnings: list[str] = []
self._lock = threading.Lock() self._lock = threading.Lock()
self._shutdown = False self._shutdown = False
self._hashes: dict[str, str] = {}
@property @property
def installations_count(self): # type: () -> int def installations_count(self) -> int:
return self._executed["install"] return self._executed["install"]
@property @property
def updates_count(self): # type: () -> int def updates_count(self) -> int:
return self._executed["update"] return self._executed["update"]
@property @property
def removals_count(self): # type: () -> int def removals_count(self) -> int:
return self._executed["uninstall"] return self._executed["uninstall"]
def supports_fancy_output(self): # type: () -> bool def supports_fancy_output(self) -> bool:
return self._io.supports_ansi() and not self._dry_run return self._io.output.is_decorated() and not self._dry_run
def disable(self): def disable(self) -> Executor:
self._enabled = False self._enabled = False
return self return self
def dry_run(self, dry_run=True): def dry_run(self, dry_run: bool = True) -> Executor:
self._dry_run = dry_run self._dry_run = dry_run
return self return self
def verbose(self, verbose=True): def verbose(self, verbose: bool = True) -> Executor:
self._verbose = verbose self._verbose = verbose
return self return self
def execute(self, operations): # type: (Operation) -> int def pip_install(
self, req: Path, upgrade: bool = False, editable: bool = False
) -> int:
try:
pip_install(req, self._env, upgrade=upgrade, editable=editable)
except EnvCommandError as e:
output = decode(e.e.output)
if (
"KeyboardInterrupt" in output
or "ERROR: Operation cancelled by user" in output
):
return -2
raise
return 0
def execute(self, operations: list[Operation]) -> int:
self._total_operations = len(operations) self._total_operations = len(operations)
for job_type in self._executed: for job_type in self._executed:
self._executed[job_type] = 0 self._executed[job_type] = 0
self._skipped[job_type] = 0 self._skipped[job_type] = 0
if operations and (self._enabled or self._dry_run): if operations and (self._enabled or self._dry_run):
self._display_summary(operations) self._display_summary(operations)
# We group operations by priority # We group operations by priority
groups = itertools.groupby(operations, key=lambda o: -o.priority) groups = itertools.groupby(operations, key=lambda o: -o.priority)
self._sections = OrderedDict() self._sections = {}
self._yanked_warnings = []
for _, group in groups: for _, group in groups:
tasks = [] tasks = []
serial_operations = [] serial_operations = []
for operation in group: for operation in group:
if self._shutdown: if self._shutdown:
break break
# Some operations are unsafe, we mus execute them serially in a group # Some operations are unsafe, we must execute them serially in a group
# https://github.com/python-poetry/poetry/issues/3086 # https://github.com/python-poetry/poetry/issues/3086
# https://github.com/python-poetry/poetry/issues/2658 # https://github.com/python-poetry/poetry/issues/2658
# #
# We need to explicitly check source type here, see: # We need to explicitly check source type here, see:
# https://github.com/python-poetry/poetry-core/pull/98 # https://github.com/python-poetry/poetry-core/pull/98
is_parallel_unsafe = operation.job_type == "uninstall" or ( is_parallel_unsafe = operation.job_type == "uninstall" or (
operation.package.develop operation.package.develop
and operation.package.source_type in {"directory", "git"} and operation.package.source_type in {"directory", "git"}
) )
if not operation.skipped and is_parallel_unsafe: if not operation.skipped and is_parallel_unsafe:
skipping to change at line 150 skipping to change at line 182
except KeyboardInterrupt: except KeyboardInterrupt:
self._shutdown = True self._shutdown = True
if self._shutdown: if self._shutdown:
# Cancelling further tasks from being executed # Cancelling further tasks from being executed
[task.cancel() for task in tasks] [task.cancel() for task in tasks]
self._executor.shutdown(wait=True) self._executor.shutdown(wait=True)
break break
for warning in self._yanked_warnings:
self._io.write_error_line(f"<warning>Warning: {warning}</warning>")
return 1 if self._shutdown else 0 return 1 if self._shutdown else 0
def _write(self, operation, line): @staticmethod
def _get_max_workers(desired_max_workers: int | None = None) -> int:
# This should be directly handled by ThreadPoolExecutor
# however, on some systems the number of CPUs cannot be determined
# (it raises a NotImplementedError), so, in this case, we assume
# that the system only has one CPU.
try:
default_max_workers = (os.cpu_count() or 1) + 4
except NotImplementedError:
default_max_workers = 5
if desired_max_workers is None:
return default_max_workers
return min(default_max_workers, desired_max_workers)
def _write(self, operation: Operation, line: str) -> None:
if not self.supports_fancy_output() or not self._should_write_operation( if not self.supports_fancy_output() or not self._should_write_operation(
operation operation
): ):
return return
if self._io.is_debug(): if self._io.is_debug():
with self._lock: with self._lock:
section = self._sections[id(operation)] section = self._sections[id(operation)]
section.write_line(line) section.write_line(line)
return return
with self._lock: with self._lock:
section = self._sections[id(operation)] section = self._sections[id(operation)]
section.output.clear() section.clear()
section.write(line) section.write(line)
def _execute_operation(self, operation): def _execute_operation(self, operation: Operation) -> None:
try: try:
op_message = self.get_operation_message(operation)
if self.supports_fancy_output(): if self.supports_fancy_output():
if id(operation) not in self._sections: if id(operation) not in self._sections and self._should_write_op
if self._should_write_operation(operation): eration(
with self._lock: operation
self._sections[id(operation)] = self._io.section() ):
self._sections[id(operation)].write_line( with self._lock:
" <fg=blue;options=bold>•</> {message}: <fg=blu self._sections[id(operation)] = self._io.section()
e>Pending...</>".format( self._sections[id(operation)].write_line(
message=self.get_operation_message(operation f" <fg=blue;options=bold>•</> {op_message}:"
), " <fg=blue>Pending...</>"
), )
)
else: else:
if self._should_write_operation(operation): if self._should_write_operation(operation):
if not operation.skipped: if not operation.skipped:
self._io.write_line( self._io.write_line(
" <fg=blue;options=bold>•</> {message}".format( f" <fg=blue;options=bold>•</> {op_message}"
message=self.get_operation_message(operation),
),
) )
else: else:
self._io.write_line( self._io.write_line(
" <fg=default;options=bold,dark>•</> {message}: " f" <fg=default;options=bold,dark>•</> {op_message}: "
"<fg=default;options=bold,dark>Skipped</> " "<fg=default;options=bold,dark>Skipped</> "
"<fg=default;options=dark>for the following reason:< /> " "<fg=default;options=dark>for the following reason:< /> "
"<fg=default;options=bold,dark>{reason}</>".format( f"<fg=default;options=bold,dark>{operation.skip_reas
message=self.get_operation_message(operation), on}</>"
reason=operation.skip_reason,
)
) )
try: try:
result = self._do_execute_operation(operation) result = self._do_execute_operation(operation)
except EnvCommandError as e: except EnvCommandError as e:
if e.e.returncode == -2: if e.e.returncode == -2:
result = -2 result = -2
else: else:
raise raise
# If we have a result of -2 it means a KeyboardInterrupt # If we have a result of -2 it means a KeyboardInterrupt
# in the any python subprocess, so we raise a KeyboardInterrupt # in the any python subprocess, so we raise a KeyboardInterrupt
# error to be picked up by the error handler. # error to be picked up by the error handler.
if result == -2: if result == -2:
raise KeyboardInterrupt raise KeyboardInterrupt
except Exception as e: except Exception as e:
try: try:
from clikit.ui.components.exception_trace import ExceptionTrace from cleo.ui.exception_trace import ExceptionTrace
io: IO | SectionOutput
if not self.supports_fancy_output(): if not self.supports_fancy_output():
io = self._io io = self._io
else: else:
message = " <error>•</error> {message}: <error>Failed</erro message = (
r>".format( " <error>•</error>"
message=self.get_operation_message(operation, error=True f" {self.get_operation_message(operation, error=True)}:"
), " <error>Failed</error>"
) )
self._write(operation, message) self._write(operation, message)
io = self._sections.get(id(operation), self._io) io = self._sections.get(id(operation), self._io)
with self._lock: with self._lock:
trace = ExceptionTrace(e) trace = ExceptionTrace(e)
trace.render(io) trace.render(io)
io.write_line("") io.write_line("")
finally: finally:
with self._lock: with self._lock:
self._shutdown = True self._shutdown = True
except KeyboardInterrupt: except KeyboardInterrupt:
try: try:
message = " <warning>•</warning> {message}: <warning>Cancelled< message = (
/warning>".format( " <warning>•</warning>"
message=self.get_operation_message(operation, warning=True), f" {self.get_operation_message(operation, warning=True)}:"
" <warning>Cancelled</warning>"
) )
if not self.supports_fancy_output(): if not self.supports_fancy_output():
self._io.write_line(message) self._io.write_line(message)
else: else:
self._write(operation, message) self._write(operation, message)
finally: finally:
with self._lock: with self._lock:
self._shutdown = True self._shutdown = True
def _do_execute_operation(self, operation): def _do_execute_operation(self, operation: Operation) -> int:
method = operation.job_type method = operation.job_type
operation_message = self.get_operation_message(operation) operation_message = self.get_operation_message(operation)
if operation.skipped: if operation.skipped:
if self.supports_fancy_output(): if self.supports_fancy_output():
self._write( self._write(
operation, operation,
" <fg=default;options=bold,dark>•</> {message}: " f" <fg=default;options=bold,dark>•</> {operation_message}: "
"<fg=default;options=bold,dark>Skipped</> " "<fg=default;options=bold,dark>Skipped</> "
"<fg=default;options=dark>for the following reason:</> " "<fg=default;options=dark>for the following reason:</> "
"<fg=default;options=bold,dark>{reason}</>".format( f"<fg=default;options=bold,dark>{operation.skip_reason}</>",
message=operation_message, reason=operation.skip_reason,
),
) )
self._skipped[operation.job_type] += 1 self._skipped[operation.job_type] += 1
return 0 return 0
if not self._enabled or self._dry_run: if not self._enabled or self._dry_run:
self._io.write_line( self._io.write_line(f" <fg=blue;options=bold>•</> {operation_messag
" <fg=blue;options=bold>•</> {message}".format( e}")
message=operation_message,
)
)
return 0 return 0
result = getattr(self, "_execute_{}".format(method))(operation) result: int = getattr(self, f"_execute_{method}")(operation)
if result != 0: if result != 0:
return result return result
message = " <fg=green;options=bold>•</> {message}".format( operation_message = self.get_operation_message(operation, done=True)
message=self.get_operation_message(operation, done=True), message = f" <fg=green;options=bold>•</> {operation_message}"
)
self._write(operation, message) self._write(operation, message)
self._increment_operations_count(operation, True) self._increment_operations_count(operation, True)
return result return result
def _increment_operations_count(self, operation, executed): def _increment_operations_count(self, operation: Operation, executed: bool) -> None:
with self._lock: with self._lock:
if executed: if executed:
self._executed_operations += 1 self._executed_operations += 1
self._executed[operation.job_type] += 1 self._executed[operation.job_type] += 1
else: else:
self._skipped[operation.job_type] += 1 self._skipped[operation.job_type] += 1
def run_pip(self, *args, **kwargs): # type: (...) -> int def run_pip(self, *args: Any, **kwargs: Any) -> int:
try: try:
self._env.run_pip(*args, **kwargs) self._env.run_pip(*args, **kwargs)
except EnvCommandError as e: except EnvCommandError as e:
output = decode(e.e.output) output = decode(e.e.output)
if ( if (
"KeyboardInterrupt" in output "KeyboardInterrupt" in output
or "ERROR: Operation cancelled by user" in output or "ERROR: Operation cancelled by user" in output
): ):
return -2 return -2
raise raise
return 0 return 0
def get_operation_message(self, operation, done=False, error=False, warning= def get_operation_message(
False): self,
operation: Operation,
done: bool = False,
error: bool = False,
warning: bool = False,
) -> str:
base_tag = "fg=default" base_tag = "fg=default"
operation_color = "c2" operation_color = "c2"
source_operation_color = "c2" source_operation_color = "c2"
package_color = "c1" package_color = "c1"
if error: if error:
operation_color = "error" operation_color = "error"
elif warning: elif warning:
operation_color = "warning" operation_color = "warning"
elif done: elif done:
operation_color = "success" operation_color = "success"
if operation.skipped: if operation.skipped:
base_tag = "fg=default;options=dark" base_tag = "fg=default;options=dark"
operation_color += "_dark" operation_color += "_dark"
source_operation_color += "_dark" source_operation_color += "_dark"
package_color += "_dark" package_color += "_dark"
if operation.job_type == "install": if isinstance(operation, Install):
return "<{}>Installing <{}>{}</{}> (<{}>{}</>)</>".format( return (
base_tag, f"<{base_tag}>Installing"
package_color, f" <{package_color}>{operation.package.name}</{package_color}>"
operation.package.name, f" (<{operation_color}>{operation.package.full_pretty_version}</
package_color, >)</>"
operation_color,
operation.package.full_pretty_version,
) )
if operation.job_type == "uninstall": if isinstance(operation, Uninstall):
return "<{}>Removing <{}>{}</{}> (<{}>{}</>)</>".format( return (
base_tag, f"<{base_tag}>Removing"
package_color, f" <{package_color}>{operation.package.name}</{package_color}>"
operation.package.name, f" (<{operation_color}>{operation.package.full_pretty_version}</
package_color, >)</>"
operation_color,
operation.package.full_pretty_version,
) )
if operation.job_type == "update": if isinstance(operation, Update):
return "<{}>Updating <{}>{}</{}> (<{}>{}</{}> -> <{}>{}</>)</>".form return (
at( f"<{base_tag}>Updating"
base_tag, f" <{package_color}>{operation.initial_package.name}</{package_c
package_color, olor}> "
operation.initial_package.name, f"(<{source_operation_color}>"
package_color, f"{operation.initial_package.full_pretty_version}"
source_operation_color, f"</{source_operation_color}> -> <{operation_color}>"
operation.initial_package.full_pretty_version, f"{operation.target_package.full_pretty_version}</>)</>"
source_operation_color,
operation_color,
operation.target_package.full_pretty_version,
) )
return "" return ""
def _display_summary(self, operations): def _display_summary(self, operations: list[Operation]) -> None:
installs = 0 installs = 0
updates = 0 updates = 0
uninstalls = 0 uninstalls = 0
skipped = 0 skipped = 0
for op in operations: for op in operations:
if op.skipped: if op.skipped:
skipped += 1 skipped += 1
continue continue
if op.job_type == "install": if op.job_type == "install":
skipping to change at line 391 skipping to change at line 431
elif op.job_type == "uninstall": elif op.job_type == "uninstall":
uninstalls += 1 uninstalls += 1
if not installs and not updates and not uninstalls and not self._verbose : if not installs and not updates and not uninstalls and not self._verbose :
self._io.write_line("") self._io.write_line("")
self._io.write_line("No dependencies to install or update") self._io.write_line("No dependencies to install or update")
return return
self._io.write_line("") self._io.write_line("")
self._io.write_line( self._io.write("<b>Package operations</b>: ")
"<b>Package operations</b>: " self._io.write(f"<info>{installs}</> install{pluralize(installs)}, ")
"<info>{}</> install{}, " self._io.write(f"<info>{updates}</> update{pluralize(updates)}, ")
"<info>{}</> update{}, " self._io.write(f"<info>{uninstalls}</> removal{pluralize(uninstalls)}")
"<info>{}</> removal{}" if skipped and self._verbose:
"{}".format( self._io.write(f", <info>{skipped}</> skipped")
installs, self._io.write_line("")
"" if installs == 1 else "s",
updates,
"" if updates == 1 else "s",
uninstalls,
"" if uninstalls == 1 else "s",
", <info>{}</> skipped".format(skipped)
if skipped and self._verbose
else "",
)
)
self._io.write_line("") self._io.write_line("")
def _execute_install(self, operation): # type: (Install) -> None def _execute_install(self, operation: Install | Update) -> int:
return self._install(operation) status_code = self._install(operation)
def _execute_update(self, operation): # type: (Update) -> None self._save_url_reference(operation)
return self._update(operation)
def _execute_uninstall(self, operation): # type: (Uninstall) -> None return status_code
message = " <fg=blue;options=bold>•</> {message}: <info>Removing...</in
fo>".format( def _execute_update(self, operation: Install | Update) -> int:
message=self.get_operation_message(operation), status_code = self._update(operation)
)
self._save_url_reference(operation)
return status_code
def _execute_uninstall(self, operation: Uninstall) -> int:
op_msg = self.get_operation_message(operation)
message = f" <fg=blue;options=bold>•</> {op_msg}: <info>Removing...</in
fo>"
self._write(operation, message) self._write(operation, message)
return self._remove(operation) return self._remove(operation)
def _install(self, operation): def _install(self, operation: Install | Update) -> int:
package = operation.package package = operation.package
if package.source_type == "directory": if package.source_type == "directory":
return self._install_directory(operation) return self._install_directory(operation)
if package.source_type == "git": if package.source_type == "git":
return self._install_git(operation) return self._install_git(operation)
if package.source_type == "file": if package.source_type == "file":
archive = self._prepare_file(operation) archive = self._prepare_file(operation)
elif package.source_type == "url": elif package.source_type == "url":
assert package.source_url is not None
archive = self._download_link(operation, Link(package.source_url)) archive = self._download_link(operation, Link(package.source_url))
else: else:
archive = self._download(operation) archive = self._download(operation)
operation_message = self.get_operation_message(operation) operation_message = self.get_operation_message(operation)
message = " <fg=blue;options=bold>•</> {message}: <info>Installing...</ message = (
info>".format( f" <fg=blue;options=bold>•</> {operation_message}:"
message=operation_message, " <info>Installing...</info>"
) )
self._write(operation, message) self._write(operation, message)
return self.pip_install(archive, upgrade=operation.job_type == "update")
args = ["install", "--no-deps", str(archive)] def _update(self, operation: Install | Update) -> int:
if operation.job_type == "update":
args.insert(2, "-U")
return self.run_pip(*args)
def _update(self, operation):
return self._install(operation) return self._install(operation)
def _remove(self, operation): def _remove(self, operation: Uninstall) -> int:
package = operation.package package = operation.package
# If we have a VCS package, remove its source directory # If we have a VCS package, remove its source directory
if package.source_type == "git": if package.source_type == "git":
src_dir = self._env.path / "src" / package.name src_dir = self._env.path / "src" / package.name
if src_dir.exists(): if src_dir.exists():
safe_rmtree(str(src_dir)) remove_directory(src_dir, force=True)
try: try:
return self.run_pip("uninstall", package.name, "-y") return self.run_pip("uninstall", package.name, "-y")
except CalledProcessError as e: except CalledProcessError as e:
if "not installed" in str(e): if "not installed" in str(e):
return 0 return 0
raise raise
def _prepare_file(self, operation): def _prepare_file(self, operation: Install | Update) -> Path:
package = operation.package package = operation.package
operation_message = self.get_operation_message(operation)
message = " <fg=blue;options=bold>•</> {message}: <info>Preparing...</i message = (
nfo>".format( f" <fg=blue;options=bold>•</> {operation_message}:"
message=self.get_operation_message(operation), " <info>Preparing...</info>"
) )
self._write(operation, message) self._write(operation, message)
assert package.source_url is not None
archive = Path(package.source_url) archive = Path(package.source_url)
if not Path(package.source_url).is_absolute() and package.root_dir: if not Path(package.source_url).is_absolute() and package.root_dir:
archive = package.root_dir / archive archive = package.root_dir / archive
archive = self._chef.prepare(archive)
return archive return archive
def _install_directory(self, operation): def _install_directory(self, operation: Install | Update) -> int:
from poetry.factory import Factory from poetry.factory import Factory
package = operation.package package = operation.package
operation_message = self.get_operation_message(operation) operation_message = self.get_operation_message(operation)
message = " <fg=blue;options=bold>•</> {message}: <info>Building...</in message = (
fo>".format( f" <fg=blue;options=bold>•</> {operation_message}:"
message=operation_message, " <info>Building...</info>"
) )
self._write(operation, message) self._write(operation, message)
assert package.source_url is not None
if package.root_dir: if package.root_dir:
req = os.path.join(str(package.root_dir), package.source_url) req = package.root_dir / package.source_url
else: else:
req = os.path.realpath(package.source_url) req = Path(package.source_url).resolve(strict=False)
args = ["install", "--no-deps", "-U"] if package.source_subdirectory:
req /= package.source_subdirectory
pyproject = PyProjectTOML(os.path.join(req, "pyproject.toml")) pyproject = PyProjectTOML(os.path.join(req, "pyproject.toml"))
if pyproject.is_poetry_project(): if pyproject.is_poetry_project():
# Even if there is a build system specified # Even if there is a build system specified
# some versions of pip (< 19.0.0) don't understand it # some versions of pip (< 19.0.0) don't understand it
# so we need to check the version of pip to know # so we need to check the version of pip to know
# if we can rely on the build system # if we can rely on the build system
legacy_pip = self._env.pip_version < self._env.pip_version.__class__ legacy_pip = (
( self._env.pip_version
19, 0, 0 < self._env.pip_version.__class__.from_parts(19, 0, 0)
) )
try: try:
package_poetry = Factory().create_poetry(pyproject.file.path.par ent) package_poetry = Factory().create_poetry(pyproject.file.path.par ent)
except RuntimeError: except RuntimeError:
package_poetry = None package_poetry = None
if package_poetry is not None: if package_poetry is not None:
builder: Builder
if package.develop and not package_poetry.package.build_script: if package.develop and not package_poetry.package.build_script:
from poetry.masonry.builders.editable import EditableBuilder from poetry.masonry.builders.editable import EditableBuilder
# This is a Poetry package in editable mode # This is a Poetry package in editable mode
# we can use the EditableBuilder without going through pip # we can use the EditableBuilder without going through pip
# to install it, unless it has a build script. # to install it, unless it has a build script.
builder = EditableBuilder(package_poetry, self._env, NullIO( )) builder = EditableBuilder(package_poetry, self._env, NullIO( ))
builder.build() builder.build()
return 0 return 0
skipping to change at line 543 skipping to change at line 583
from poetry.core.masonry.builders.sdist import SdistBuilder from poetry.core.masonry.builders.sdist import SdistBuilder
# We need to rely on creating a temporary setup.py # We need to rely on creating a temporary setup.py
# file since the version of pip does not support # file since the version of pip does not support
# build-systems # build-systems
# We also need it for non-PEP-517 packages # We also need it for non-PEP-517 packages
builder = SdistBuilder(package_poetry) builder = SdistBuilder(package_poetry)
with builder.setup_py(): with builder.setup_py():
if package.develop: if package.develop:
args.append("-e") return self.pip_install(req, upgrade=True, editable=
True)
args.append(req) return self.pip_install(req, upgrade=True)
return self.run_pip(*args)
if package.develop: if package.develop:
args.append("-e") return self.pip_install(req, upgrade=True, editable=True)
args.append(req)
return self.run_pip(*args) return self.pip_install(req, upgrade=True)
def _install_git(self, operation): def _install_git(self, operation: Install | Update) -> int:
from poetry.core.vcs import Git from poetry.vcs.git import Git
package = operation.package package = operation.package
operation_message = self.get_operation_message(operation) operation_message = self.get_operation_message(operation)
message = " <fg=blue;options=bold>•</> {message}: <info>Cloning...</inf message = (
o>".format( f" <fg=blue;options=bold>•</> {operation_message}: <info>Cloning...
message=operation_message, </info>"
) )
self._write(operation, message) self._write(operation, message)
src_dir = self._env.path / "src" / package.name assert package.source_url is not None
if src_dir.exists(): source = Git.clone(
safe_rmtree(str(src_dir)) url=package.source_url,
source_root=self._env.path / "src",
src_dir.parent.mkdir(exist_ok=True) revision=package.source_resolved_reference or package.source_referen
ce,
git = Git() )
git.clone(package.source_url, src_dir)
reference = package.source_resolved_reference # Now we just need to install from the source directory
if not reference: original_url = package.source_url
reference = package.source_reference package._source_url = str(source.path)
git.checkout(reference, src_dir) status_code = self._install_directory(operation)
# Now we just need to install from the source directory package._source_url = original_url
package._source_url = str(src_dir)
return self._install_directory(operation) return status_code
def _download(self, operation): # type: (Operation) -> Path def _download(self, operation: Install | Update) -> Path:
link = self._chooser.choose_for(operation.package) link = self._chooser.choose_for(operation.package)
if link.yanked:
# Store yanked warnings in a list and print after installing, so the
y can't
# be overlooked. Further, printing them in the concerning section wo
uld have
# the risk of overwriting the warning, so it is only briefly visible
.
message = (
f"The file chosen for install of {operation.package.pretty_name}
"
f"{operation.package.pretty_version} ({link.show_url}) is yanked
."
)
if link.yanked_reason:
message += f" Reason for being yanked: {link.yanked_reason}"
self._yanked_warnings.append(message)
return self._download_link(operation, link) return self._download_link(operation, link)
def _download_link(self, operation, link): def _download_link(self, operation: Install | Update, link: Link) -> Path:
package = operation.package package = operation.package
archive = self._chef.get_cached_archive_for_link(link) archive = self._chef.get_cached_archive_for_link(link)
if archive is link: if archive is None:
# No cached distributions was found, so we download and prepare it # No cached distributions was found, so we download and prepare it
try: try:
archive = self._download_archive(operation, link) archive = self._download_archive(operation, link)
except BaseException: except BaseException:
cache_directory = self._chef.get_cache_directory_for_link(link) cache_directory = self._chef.get_cache_directory_for_link(link)
cached_file = cache_directory.joinpath(link.filename) cached_file = cache_directory.joinpath(link.filename)
# We can't use unlink(missing_ok=True) because it's not availabl e # We can't use unlink(missing_ok=True) because it's not availabl e
# in pathlib2 for Python 2.7 # prior to Python 3.8
if cached_file.exists(): if cached_file.exists():
cached_file.unlink() cached_file.unlink()
raise raise
# TODO: Check readability of the created archive
if not link.is_wheel:
archive = self._chef.prepare(archive)
if package.files: if package.files:
hashes = {f["hash"] for f in package.files} archive_hash = self._validate_archive_hash(archive, package)
hash_types = {h.split(":")[0] for h in hashes}
archive_hashes = set()
archive_path = (
url_to_path(archive.url) if isinstance(archive, Link) else archi
ve
)
for hash_type in hash_types:
archive_hashes.add(
"{}:{}".format(
hash_type,
FileDependency(package.name, archive_path).hash(hash_typ
e),
)
)
if archive_hashes.isdisjoint(hashes): self._hashes[package.name] = archive_hash
raise RuntimeError(
"Invalid hashes ({}) for {} using archive {}. Expected one o
f {}.".format(
", ".join(sorted(archive_hashes)),
package,
archive_path.name,
", ".join(sorted(hashes)),
)
)
return archive return archive
def _download_archive(self, operation, link): # type: (Operation, Link) -> @staticmethod
Path def _validate_archive_hash(archive: Path, package: Package) -> str:
file_dep = FileDependency(package.name, archive)
archive_hash: str = "sha256:" + file_dep.hash()
known_hashes = {f["hash"] for f in package.files}
if archive_hash not in known_hashes:
raise RuntimeError(
f"Hash for {package} from archive {archive.name} not found in"
f" known hashes (was: {archive_hash})"
)
return archive_hash
def _download_archive(self, operation: Install | Update, link: Link) -> Path
:
response = self._authenticator.request( response = self._authenticator.request(
"get", link.url, stream=True, io=self._sections.get(id(operation), s elf._io) "get", link.url, stream=True, io=self._sections.get(id(operation), s elf._io)
) )
wheel_size = response.headers.get("content-length") wheel_size = response.headers.get("content-length")
operation_message = self.get_operation_message(operation) operation_message = self.get_operation_message(operation)
message = " <fg=blue;options=bold>•</> {message}: <info>Downloading...< message = (
/>".format( f" <fg=blue;options=bold>•</> {operation_message}: <info>Downloadin
message=operation_message, g...</>"
) )
progress = None progress = None
if self.supports_fancy_output(): if self.supports_fancy_output():
if wheel_size is None: if wheel_size is None:
self._write(operation, message) self._write(operation, message)
else: else:
from clikit.ui.components.progress_bar import ProgressBar from cleo.ui.progress_bar import ProgressBar
progress = ProgressBar( progress = ProgressBar(
self._sections[id(operation)].output, max=int(wheel_size) self._sections[id(operation)], max=int(wheel_size)
) )
progress.set_format(message + " <b>%percent%%</b>") progress.set_format(message + " <b>%percent%%</b>")
if progress: if progress:
with self._lock: with self._lock:
self._sections[id(operation)].clear()
progress.start() progress.start()
done = 0 done = 0
archive = self._chef.get_cache_directory_for_link(link) / link.filename archive = self._chef.get_cache_directory_for_link(link) / link.filename
archive.parent.mkdir(parents=True, exist_ok=True) archive.parent.mkdir(parents=True, exist_ok=True)
with archive.open("wb") as f: with archive.open("wb") as f:
for chunk in response.iter_content(chunk_size=4096): for chunk in response.iter_content(chunk_size=4096):
if not chunk: if not chunk:
break break
skipping to change at line 689 skipping to change at line 723
progress.set_progress(done) progress.set_progress(done)
f.write(chunk) f.write(chunk)
if progress: if progress:
with self._lock: with self._lock:
progress.finish() progress.finish()
return archive return archive
def _should_write_operation(self, operation): # type: (Operation) -> bool def _should_write_operation(self, operation: Operation) -> bool:
if not operation.skipped: return not operation.skipped or self._dry_run or self._verbose
return True
def _save_url_reference(self, operation: Operation) -> None:
"""
Create and store a PEP-610 `direct_url.json` file, if needed.
"""
if operation.job_type not in {"install", "update"}:
return
package = operation.package
if not package.source_url or package.source_type == "legacy":
# Since we are installing from our own distribution cache
# pip will write a `direct_url.json` file pointing to the cache
# distribution.
# That's not what we want, so we remove the direct_url.json file,
# if it exists.
for (
direct_url_json
) in self._env.site_packages.find_distribution_direct_url_json_files
(
distribution_name=package.name, writable_only=True
):
# We can't use unlink(missing_ok=True) because it's not always a
vailable
if direct_url_json.exists():
direct_url_json.unlink()
return
url_reference: dict[str, Any] | None = None
if package.source_type == "git":
url_reference = self._create_git_url_reference(package)
elif package.source_type == "url":
url_reference = self._create_url_url_reference(package)
elif package.source_type == "directory":
url_reference = self._create_directory_url_reference(package)
elif package.source_type == "file":
url_reference = self._create_file_url_reference(package)
if url_reference:
for dist in self._env.site_packages.distributions(
name=package.name, writable_only=True
):
dist_path = cast(Path, dist._path) # type: ignore[attr-defined]
url = dist_path / "direct_url.json"
url.write_text(json.dumps(url_reference), encoding="utf-8")
record = dist_path / "RECORD"
if record.exists():
with record.open(mode="a", encoding="utf-8", newline="") as
f:
writer = csv.writer(f)
path = url.relative_to(record.parent.parent)
writer.writerow([str(path), "", ""])
def _create_git_url_reference(self, package: Package) -> dict[str, Any]:
reference = {
"url": package.source_url,
"vcs_info": {
"vcs": "git",
"requested_revision": package.source_reference,
"commit_id": package.source_resolved_reference,
},
}
if package.source_subdirectory:
reference["subdirectory"] = package.source_subdirectory
return reference
def _create_url_url_reference(self, package: Package) -> dict[str, Any]:
archive_info = {}
if package.name in self._hashes:
archive_info["hash"] = self._hashes[package.name]
reference = {"url": package.source_url, "archive_info": archive_info}
return reference
def _create_file_url_reference(self, package: Package) -> dict[str, Any]:
archive_info = {}
if package.name in self._hashes:
archive_info["hash"] = self._hashes[package.name]
assert package.source_url is not None
return {
"url": Path(package.source_url).as_uri(),
"archive_info": archive_info,
}
def _create_directory_url_reference(self, package: Package) -> dict[str, Any
]:
dir_info = {}
if package.develop:
dir_info["editable"] = True
return self._dry_run or self._verbose assert package.source_url is not None
return {
"url": Path(package.source_url).as_uri(),
"dir_info": dir_info,
}
 End of changes. 100 change blocks. 
249 lines changed or deleted 378 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)