shell.py (poetry-1.1.15) | : | shell.py (poetry-1.2.0) | ||
---|---|---|---|---|
from __future__ import annotations | ||||
import os | import os | |||
import signal | import signal | |||
import subprocess | ||||
import sys | import sys | |||
from pathlib import Path | ||||
from typing import TYPE_CHECKING | ||||
from typing import Any | ||||
import pexpect | import pexpect | |||
from clikit.utils.terminal import Terminal | from cleo.terminal import Terminal | |||
from shellingham import ShellDetectionFailure | from shellingham import ShellDetectionFailure | |||
from shellingham import detect_shell | from shellingham import detect_shell | |||
from ._compat import WINDOWS | from poetry.utils._compat import WINDOWS | |||
from ._compat import Path | ||||
from .env import VirtualEnv | if TYPE_CHECKING: | |||
from poetry.utils.env import VirtualEnv | ||||
class Shell: | class Shell: | |||
""" | """ | |||
Represents the current shell. | Represents the current shell. | |||
""" | """ | |||
_shell = None | _shell = None | |||
def __init__(self, name, path): # type: (str, str) -> None | def __init__(self, name: str, path: str) -> None: | |||
self._name = name | self._name = name | |||
self._path = path | self._path = path | |||
@property | @property | |||
def name(self): # type: () -> str | def name(self) -> str: | |||
return self._name | return self._name | |||
@property | @property | |||
def path(self): # type: () -> str | def path(self) -> str: | |||
return self._path | return self._path | |||
@classmethod | @classmethod | |||
def get(cls): # type: () -> Shell | def get(cls) -> Shell: | |||
""" | """ | |||
Retrieve the current shell. | Retrieve the current shell. | |||
""" | """ | |||
if cls._shell is not None: | if cls._shell is not None: | |||
return cls._shell | return cls._shell | |||
try: | try: | |||
name, path = detect_shell(os.getpid()) | name, path = detect_shell(os.getpid()) | |||
except (RuntimeError, ShellDetectionFailure): | except (RuntimeError, ShellDetectionFailure): | |||
shell = None | shell = None | |||
skipping to change at line 61 | skipping to change at line 69 | |||
if not shell: | if not shell: | |||
raise RuntimeError("Unable to detect the current shell.") | raise RuntimeError("Unable to detect the current shell.") | |||
name, path = Path(shell).stem, shell | name, path = Path(shell).stem, shell | |||
cls._shell = cls(name, path) | cls._shell = cls(name, path) | |||
return cls._shell | return cls._shell | |||
def activate(self, env): # type: (VirtualEnv) -> None | def activate(self, env: VirtualEnv) -> int | None: | |||
if WINDOWS: | activate_script = self._get_activate_script() | |||
return env.execute(self.path) | bin_dir = "Scripts" if WINDOWS else "bin" | |||
activate_path = env.path / bin_dir / activate_script | ||||
# mypy requires using sys.platform instead of WINDOWS constant | ||||
# in if statements to properly type check on Windows | ||||
if sys.platform == "win32": | ||||
if self._name in ("powershell", "pwsh"): | ||||
args = ["-NoExit", "-File", str(activate_path)] | ||||
else: | ||||
# /K will execute the bat file and | ||||
# keep the cmd process from terminating | ||||
args = ["/K", str(activate_path)] | ||||
completed_proc = subprocess.run([self.path, *args]) | ||||
return completed_proc.returncode | ||||
import shlex | ||||
terminal = Terminal() | terminal = Terminal() | |||
with env.temp_environ(): | with env.temp_environ(): | |||
c = pexpect.spawn( | c = pexpect.spawn( | |||
self._path, ["-i"], dimensions=(terminal.height, terminal.width) | self._path, ["-i"], dimensions=(terminal.height, terminal.width) | |||
) | ) | |||
if self._name == "zsh": | if self._name in ["zsh", "nu"]: | |||
c.setecho(False) | c.setecho(False) | |||
activate_script = self._get_activate_script() | c.sendline(f"{self._get_source_command()} {shlex.quote(str(activate_path | |||
bin_dir = "Scripts" if WINDOWS else "bin" | ))}") | |||
activate_path = env.path / bin_dir / activate_script | ||||
c.sendline("{} {}".format(self._get_source_command(), activate_path)) | ||||
def resize(sig, data): | def resize(sig: Any, data: Any) -> None: | |||
terminal = Terminal() | terminal = Terminal() | |||
c.setwinsize(terminal.height, terminal.width) | c.setwinsize(terminal.height, terminal.width) | |||
signal.signal(signal.SIGWINCH, resize) | signal.signal(signal.SIGWINCH, resize) | |||
# Interact with the new shell. | # Interact with the new shell. | |||
c.interact(escape_character=None) | c.interact(escape_character=None) | |||
c.close() | c.close() | |||
sys.exit(c.exitstatus) | sys.exit(c.exitstatus) | |||
def _get_activate_script(self): | def _get_activate_script(self) -> str: | |||
if "fish" == self._name: | if self._name == "fish": | |||
suffix = ".fish" | suffix = ".fish" | |||
elif "csh" == self._name: | elif self._name in ("csh", "tcsh"): | |||
suffix = ".csh" | ||||
elif "tcsh" == self._name: | ||||
suffix = ".csh" | suffix = ".csh" | |||
elif self._name in ("powershell", "pwsh"): | ||||
suffix = ".ps1" | ||||
elif self._name == "cmd": | ||||
suffix = ".bat" | ||||
elif self._name == "nu": | ||||
suffix = ".nu" | ||||
else: | else: | |||
suffix = "" | suffix = "" | |||
return "activate" + suffix | return "activate" + suffix | |||
def _get_source_command(self): | def _get_source_command(self) -> str: | |||
if "fish" == self._name: | if self._name in ("fish", "csh", "tcsh", "nu"): | |||
return "source" | return "source" | |||
elif "csh" == self._name: | ||||
return "source" | ||||
elif "tcsh" == self._name: | ||||
return "source" | ||||
return "." | return "." | |||
def __repr__(self): # type: () -> str | def __repr__(self) -> str: | |||
return '{}("{}", "{}")'.format(self.__class__.__name__, self._name, self | return f'{self.__class__.__name__}("{self._name}", "{self._path}")' | |||
._path) | ||||
End of changes. 19 change blocks. | ||||
29 lines changed or deleted | 49 lines changed or added |