"Fossies" - the Fresh Open Source Software Archive

Member "selenium-selenium-4.8.1/py/selenium/webdriver/common/bidi/cdp.py" (17 Feb 2023, 17766 Bytes) of package /linux/www/selenium-selenium-4.8.1.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "cdp.py" see the Fossies "Dox" file reference documentation and the last Fossies "Diffs" side-by-side code changes report: 4.8.0_vs_4.8.1.

    1 # The MIT License(MIT)
    2 #
    3 # Copyright(c) 2018 Hyperion Gray
    4 #
    5 # Permission is hereby granted, free of charge, to any person obtaining a copy
    6 # of this software and associated documentation files(the "Software"), to deal
    7 # in the Software without restriction, including without limitation the rights
    8 # to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
    9 # copies of the Software, and to permit persons to whom the Software is
   10 # furnished to do so, subject to the following conditions:
   11 #
   12 # The above copyright notice and this permission notice shall be included in
   13 # all copies or substantial portions of the Software.
   14 #
   15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   16 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
   17 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
   18 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
   19 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
   20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
   21 # THE SOFTWARE.
   22 #
   23 # This code comes from https://github.com/HyperionGray/trio-chrome-devtools-protocol/tree/master/trio_cdp
   24 
   25 # flake8: noqa
   26 
   27 import contextvars
   28 import importlib
   29 import itertools
   30 import json
   31 import logging
   32 import pathlib
   33 import typing
   34 from collections import defaultdict
   35 from contextlib import asynccontextmanager
   36 from contextlib import contextmanager
   37 from dataclasses import dataclass
   38 
   39 import trio
   40 from trio_websocket import ConnectionClosed as WsConnectionClosed
   41 from trio_websocket import connect_websocket_url
   42 
   43 logger = logging.getLogger("trio_cdp")
   44 T = typing.TypeVar("T")
   45 MAX_WS_MESSAGE_SIZE = 2**24
   46 
   47 devtools = None
   48 version = None
   49 
   50 
   51 def import_devtools(ver):
   52     """Attempt to load the current latest available devtools into the module
   53     cache for use later."""
   54     global devtools
   55     global version
   56     version = ver
   57     base = "selenium.webdriver.common.devtools.v"
   58     try:
   59         devtools = importlib.import_module(f"{base}{ver}")
   60         return devtools
   61     except ModuleNotFoundError:
   62         # Attempt to parse and load the 'most recent' devtools module. This is likely
   63         # because cdp has been updated but selenium python has not been released yet.
   64         devtools_path = pathlib.Path(__file__).parents[1].joinpath("devtools")
   65         versions = tuple(f.name for f in devtools_path.iterdir() if f.is_dir())
   66         latest = max(int(x[1:]) for x in versions)
   67         selenium_logger = logging.getLogger(__name__)
   68         selenium_logger.debug(f"Falling back to loading `devtools`: v{latest}")
   69         devtools = importlib.import_module(f"{base}{latest}")
   70         return devtools
   71 
   72 
   73 _connection_context: contextvars.ContextVar = contextvars.ContextVar("connection_context")
   74 _session_context: contextvars.ContextVar = contextvars.ContextVar("session_context")
   75 
   76 
   77 def get_connection_context(fn_name):
   78     """Look up the current connection.
   79 
   80     If there is no current connection, raise a ``RuntimeError`` with a
   81     helpful message.
   82     """
   83     try:
   84         return _connection_context.get()
   85     except LookupError:
   86         raise RuntimeError(f"{fn_name}() must be called in a connection context.")
   87 
   88 
   89 def get_session_context(fn_name):
   90     """Look up the current session.
   91 
   92     If there is no current session, raise a ``RuntimeError`` with a
   93     helpful message.
   94     """
   95     try:
   96         return _session_context.get()
   97     except LookupError:
   98         raise RuntimeError(f"{fn_name}() must be called in a session context.")
   99 
  100 
  101 @contextmanager
  102 def connection_context(connection):
  103     """This context manager installs ``connection`` as the session context for
  104     the current Trio task."""
  105     token = _connection_context.set(connection)
  106     try:
  107         yield
  108     finally:
  109         _connection_context.reset(token)
  110 
  111 
  112 @contextmanager
  113 def session_context(session):
  114     """This context manager installs ``session`` as the session context for the
  115     current Trio task."""
  116     token = _session_context.set(session)
  117     try:
  118         yield
  119     finally:
  120         _session_context.reset(token)
  121 
  122 
  123 def set_global_connection(connection):
  124     """Install ``connection`` in the root context so that it will become the
  125     default connection for all tasks.
  126 
  127     This is generally not recommended, except it may be necessary in
  128     certain use cases such as running inside Jupyter notebook.
  129     """
  130     global _connection_context
  131     _connection_context = contextvars.ContextVar("_connection_context", default=connection)
  132 
  133 
  134 def set_global_session(session):
  135     """Install ``session`` in the root context so that it will become the
  136     default session for all tasks.
  137 
  138     This is generally not recommended, except it may be necessary in
  139     certain use cases such as running inside Jupyter notebook.
  140     """
  141     global _session_context
  142     _session_context = contextvars.ContextVar("_session_context", default=session)
  143 
  144 
  145 class BrowserError(Exception):
  146     """This exception is raised when the browser's response to a command
  147     indicates that an error occurred."""
  148 
  149     def __init__(self, obj):
  150         self.code = obj["code"]
  151         self.message = obj["message"]
  152         self.detail = obj.get("data")
  153 
  154     def __str__(self):
  155         return f"BrowserError<code={self.code} message={self.message}> {self.detail}"
  156 
  157 
  158 class CdpConnectionClosed(WsConnectionClosed):
  159     """Raised when a public method is called on a closed CDP connection."""
  160 
  161     def __init__(self, reason):
  162         """Constructor.
  163 
  164         :param reason:
  165         :type reason: wsproto.frame_protocol.CloseReason
  166         """
  167         self.reason = reason
  168 
  169     def __repr__(self):
  170         """Return representation."""
  171         return f"{self.__class__.__name__}<{self.reason}>"
  172 
  173 
  174 class InternalError(Exception):
  175     """This exception is only raised when there is faulty logic in TrioCDP or
  176     the integration with PyCDP."""
  177 
  178 
  179 @dataclass
  180 class CmEventProxy:
  181     """A proxy object returned by :meth:`CdpBase.wait_for()``.
  182 
  183     After the context manager executes, this proxy object will have a
  184     value set that contains the returned event.
  185     """
  186 
  187     value: typing.Any = None
  188 
  189 
  190 class CdpBase:
  191     def __init__(self, ws, session_id, target_id):
  192         self.ws = ws
  193         self.session_id = session_id
  194         self.target_id = target_id
  195         self.channels = defaultdict(set)
  196         self.id_iter = itertools.count()
  197         self.inflight_cmd = {}
  198         self.inflight_result = {}
  199 
  200     async def execute(self, cmd: typing.Generator[dict, T, typing.Any]) -> T:
  201         """Execute a command on the server and wait for the result.
  202 
  203         :param cmd: any CDP command
  204         :returns: a CDP result
  205         """
  206         cmd_id = next(self.id_iter)
  207         cmd_event = trio.Event()
  208         self.inflight_cmd[cmd_id] = cmd, cmd_event
  209         request = next(cmd)
  210         request["id"] = cmd_id
  211         if self.session_id:
  212             request["sessionId"] = self.session_id
  213         request_str = json.dumps(request)
  214         try:
  215             await self.ws.send_message(request_str)
  216         except WsConnectionClosed as wcc:
  217             raise CdpConnectionClosed(wcc.reason) from None
  218         await cmd_event.wait()
  219         response = self.inflight_result.pop(cmd_id)
  220         if isinstance(response, Exception):
  221             raise response
  222         return response
  223 
  224     def listen(self, *event_types, buffer_size=10):
  225         """Return an async iterator that iterates over events matching the
  226         indicated types."""
  227         sender, receiver = trio.open_memory_channel(buffer_size)
  228         for event_type in event_types:
  229             self.channels[event_type].add(sender)
  230         return receiver
  231 
  232     @asynccontextmanager
  233     async def wait_for(self, event_type: typing.Type[T], buffer_size=10) -> typing.AsyncGenerator[CmEventProxy, None]:
  234         """Wait for an event of the given type and return it.
  235 
  236         This is an async context manager, so you should open it inside
  237         an async with block. The block will not exit until the indicated
  238         event is received.
  239         """
  240         sender, receiver = trio.open_memory_channel(buffer_size)
  241         self.channels[event_type].add(sender)
  242         proxy = CmEventProxy()
  243         yield proxy
  244         async with receiver:
  245             event = await receiver.receive()
  246         proxy.value = event
  247 
  248     def _handle_data(self, data):
  249         """Handle incoming WebSocket data.
  250 
  251         :param dict data: a JSON dictionary
  252         """
  253         if "id" in data:
  254             self._handle_cmd_response(data)
  255         else:
  256             self._handle_event(data)
  257 
  258     def _handle_cmd_response(self, data):
  259         """Handle a response to a command. This will set an event flag that
  260         will return control to the task that called the command.
  261 
  262         :param dict data: response as a JSON dictionary
  263         """
  264         cmd_id = data["id"]
  265         try:
  266             cmd, event = self.inflight_cmd.pop(cmd_id)
  267         except KeyError:
  268             logger.warning(f"Got a message with a command ID that does not exist: {data}")
  269             return
  270         if "error" in data:
  271             # If the server reported an error, convert it to an exception and do
  272             # not process the response any further.
  273             self.inflight_result[cmd_id] = BrowserError(data["error"])
  274         else:
  275             # Otherwise, continue the generator to parse the JSON result
  276             # into a CDP object.
  277             try:
  278                 response = cmd.send(data["result"])
  279                 raise InternalError("The command's generator function " "did not exit when expected!")
  280             except StopIteration as exit:
  281                 return_ = exit.value
  282             self.inflight_result[cmd_id] = return_
  283         event.set()
  284 
  285     def _handle_event(self, data):
  286         """Handle an event.
  287 
  288         :param dict data: event as a JSON dictionary
  289         """
  290         global devtools
  291         event = devtools.util.parse_json_event(data)
  292         logger.debug("Received event: %s", event)
  293         to_remove = set()
  294         for sender in self.channels[type(event)]:
  295             try:
  296                 sender.send_nowait(event)
  297             except trio.WouldBlock:
  298                 logger.error('Unable to send event "%r" due to full channel %s', event, sender)
  299             except trio.BrokenResourceError:
  300                 to_remove.add(sender)
  301         if to_remove:
  302             self.channels[type(event)] -= to_remove
  303 
  304 
  305 class CdpSession(CdpBase):
  306     """Contains the state for a CDP session.
  307 
  308     Generally you should not instantiate this object yourself; you should call
  309     :meth:`CdpConnection.open_session`.
  310     """
  311 
  312     def __init__(self, ws, session_id, target_id):
  313         """Constructor.
  314 
  315         :param trio_websocket.WebSocketConnection ws:
  316         :param devtools.target.SessionID session_id:
  317         :param devtools.target.TargetID target_id:
  318         """
  319         super().__init__(ws, session_id, target_id)
  320 
  321         self._dom_enable_count = 0
  322         self._dom_enable_lock = trio.Lock()
  323         self._page_enable_count = 0
  324         self._page_enable_lock = trio.Lock()
  325 
  326     @asynccontextmanager
  327     async def dom_enable(self):
  328         """A context manager that executes ``dom.enable()`` when it enters and
  329         then calls ``dom.disable()``.
  330 
  331         This keeps track of concurrent callers and only disables DOM
  332         events when all callers have exited.
  333         """
  334         global devtools
  335         async with self._dom_enable_lock:
  336             self._dom_enable_count += 1
  337             if self._dom_enable_count == 1:
  338                 await self.execute(devtools.dom.enable())
  339 
  340         yield
  341 
  342         async with self._dom_enable_lock:
  343             self._dom_enable_count -= 1
  344             if self._dom_enable_count == 0:
  345                 await self.execute(devtools.dom.disable())
  346 
  347     @asynccontextmanager
  348     async def page_enable(self):
  349         """A context manager that executes ``page.enable()`` when it enters and
  350         then calls ``page.disable()`` when it exits.
  351 
  352         This keeps track of concurrent callers and only disables page
  353         events when all callers have exited.
  354         """
  355         global devtools
  356         async with self._page_enable_lock:
  357             self._page_enable_count += 1
  358             if self._page_enable_count == 1:
  359                 await self.execute(devtools.page.enable())
  360 
  361         yield
  362 
  363         async with self._page_enable_lock:
  364             self._page_enable_count -= 1
  365             if self._page_enable_count == 0:
  366                 await self.execute(devtools.page.disable())
  367 
  368 
  369 class CdpConnection(CdpBase, trio.abc.AsyncResource):
  370     """Contains the connection state for a Chrome DevTools Protocol server.
  371 
  372     CDP can multiplex multiple "sessions" over a single connection. This
  373     class corresponds to the "root" session, i.e. the implicitly created
  374     session that has no session ID. This class is responsible for
  375     reading incoming WebSocket messages and forwarding them to the
  376     corresponding session, as well as handling messages targeted at the
  377     root session itself. You should generally call the
  378     :func:`open_cdp()` instead of instantiating this class directly.
  379     """
  380 
  381     def __init__(self, ws):
  382         """Constructor.
  383 
  384         :param trio_websocket.WebSocketConnection ws:
  385         """
  386         super().__init__(ws, session_id=None, target_id=None)
  387         self.sessions = {}
  388 
  389     async def aclose(self):
  390         """Close the underlying WebSocket connection.
  391 
  392         This will cause the reader task to gracefully exit when it tries
  393         to read the next message from the WebSocket. All of the public
  394         APIs (``execute()``, ``listen()``, etc.) will raise
  395         ``CdpConnectionClosed`` after the CDP connection is closed. It
  396         is safe to call this multiple times.
  397         """
  398         await self.ws.aclose()
  399 
  400     @asynccontextmanager
  401     async def open_session(self, target_id) -> typing.AsyncIterator[CdpSession]:
  402         """This context manager opens a session and enables the "simple" style
  403         of calling CDP APIs.
  404 
  405         For example, inside a session context, you can call ``await
  406         dom.get_document()`` and it will execute on the current session
  407         automatically.
  408         """
  409         session = await self.connect_session(target_id)
  410         with session_context(session):
  411             yield session
  412 
  413     async def connect_session(self, target_id) -> "CdpSession":
  414         """Returns a new :class:`CdpSession` connected to the specified
  415         target."""
  416         global devtools
  417         session_id = await self.execute(devtools.target.attach_to_target(target_id, True))
  418         session = CdpSession(self.ws, session_id, target_id)
  419         self.sessions[session_id] = session
  420         return session
  421 
  422     async def _reader_task(self):
  423         """Runs in the background and handles incoming messages: dispatching
  424         responses to commands and events to listeners."""
  425         global devtools
  426         while True:
  427             try:
  428                 message = await self.ws.get_message()
  429             except WsConnectionClosed:
  430                 # If the WebSocket is closed, we don't want to throw an
  431                 # exception from the reader task. Instead we will throw
  432                 # exceptions from the public API methods, and we can quietly
  433                 # exit the reader task here.
  434                 break
  435             try:
  436                 data = json.loads(message)
  437             except json.JSONDecodeError:
  438                 raise BrowserError({"code": -32700, "message": "Client received invalid JSON", "data": message})
  439             logger.debug("Received message %r", data)
  440             if "sessionId" in data:
  441                 session_id = devtools.target.SessionID(data["sessionId"])
  442                 try:
  443                     session = self.sessions[session_id]
  444                 except KeyError:
  445                     raise BrowserError(f"Browser sent a message for an invalid session: {session_id!r}")
  446                 session._handle_data(data)
  447             else:
  448                 self._handle_data(data)
  449 
  450 
  451 @asynccontextmanager
  452 async def open_cdp(url) -> typing.AsyncIterator[CdpConnection]:
  453     """This async context manager opens a connection to the browser specified
  454     by ``url`` before entering the block, then closes the connection when the
  455     block exits.
  456 
  457     The context manager also sets the connection as the default
  458     connection for the current task, so that commands like ``await
  459     target.get_targets()`` will run on this connection automatically. If
  460     you want to use multiple connections concurrently, it is recommended
  461     to open each on in a separate task.
  462     """
  463 
  464     async with trio.open_nursery() as nursery:
  465         conn = await connect_cdp(nursery, url)
  466         try:
  467             with connection_context(conn):
  468                 yield conn
  469         finally:
  470             await conn.aclose()
  471 
  472 
  473 async def connect_cdp(nursery, url) -> CdpConnection:
  474     """Connect to the browser specified by ``url`` and spawn a background task
  475     in the specified nursery.
  476 
  477     The ``open_cdp()`` context manager is preferred in most situations.
  478     You should only use this function if you need to specify a custom
  479     nursery. This connection is not automatically closed! You can either
  480     use the connection object as a context manager (``async with
  481     conn:``) or else call ``await conn.aclose()`` on it when you are
  482     done with it. If ``set_context`` is True, then the returned
  483     connection will be installed as the default connection for the
  484     current task. This argument is for unusual use cases, such as
  485     running inside of a notebook.
  486     """
  487     ws = await connect_websocket_url(nursery, url, max_message_size=MAX_WS_MESSAGE_SIZE)
  488     cdp_conn = CdpConnection(ws)
  489     nursery.start_soon(cdp_conn._reader_task)
  490     return cdp_conn