"Fossies" - the Fresh Open Source Software Archive 
Member "selenium-selenium-4.8.1/py/selenium/webdriver/common/bidi/cdp.py" (17 Feb 2023, 17766 Bytes) of package /linux/www/selenium-selenium-4.8.1.tar.gz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style:
standard) with prefixed line numbers.
Alternatively you can here
view or
download the uninterpreted source code file.
For more information about "cdp.py" see the
Fossies "Dox" file reference documentation and the last
Fossies "Diffs" side-by-side code changes report:
4.8.0_vs_4.8.1.
1 # The MIT License(MIT)
2 #
3 # Copyright(c) 2018 Hyperion Gray
4 #
5 # Permission is hereby granted, free of charge, to any person obtaining a copy
6 # of this software and associated documentation files(the "Software"), to deal
7 # in the Software without restriction, including without limitation the rights
8 # to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
9 # copies of the Software, and to permit persons to whom the Software is
10 # furnished to do so, subject to the following conditions:
11 #
12 # The above copyright notice and this permission notice shall be included in
13 # all copies or substantial portions of the Software.
14 #
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 # THE SOFTWARE.
22 #
23 # This code comes from https://github.com/HyperionGray/trio-chrome-devtools-protocol/tree/master/trio_cdp
24
25 # flake8: noqa
26
27 import contextvars
28 import importlib
29 import itertools
30 import json
31 import logging
32 import pathlib
33 import typing
34 from collections import defaultdict
35 from contextlib import asynccontextmanager
36 from contextlib import contextmanager
37 from dataclasses import dataclass
38
39 import trio
40 from trio_websocket import ConnectionClosed as WsConnectionClosed
41 from trio_websocket import connect_websocket_url
42
43 logger = logging.getLogger("trio_cdp")
44 T = typing.TypeVar("T")
45 MAX_WS_MESSAGE_SIZE = 2**24
46
47 devtools = None
48 version = None
49
50
51 def import_devtools(ver):
52 """Attempt to load the current latest available devtools into the module
53 cache for use later."""
54 global devtools
55 global version
56 version = ver
57 base = "selenium.webdriver.common.devtools.v"
58 try:
59 devtools = importlib.import_module(f"{base}{ver}")
60 return devtools
61 except ModuleNotFoundError:
62 # Attempt to parse and load the 'most recent' devtools module. This is likely
63 # because cdp has been updated but selenium python has not been released yet.
64 devtools_path = pathlib.Path(__file__).parents[1].joinpath("devtools")
65 versions = tuple(f.name for f in devtools_path.iterdir() if f.is_dir())
66 latest = max(int(x[1:]) for x in versions)
67 selenium_logger = logging.getLogger(__name__)
68 selenium_logger.debug(f"Falling back to loading `devtools`: v{latest}")
69 devtools = importlib.import_module(f"{base}{latest}")
70 return devtools
71
72
73 _connection_context: contextvars.ContextVar = contextvars.ContextVar("connection_context")
74 _session_context: contextvars.ContextVar = contextvars.ContextVar("session_context")
75
76
77 def get_connection_context(fn_name):
78 """Look up the current connection.
79
80 If there is no current connection, raise a ``RuntimeError`` with a
81 helpful message.
82 """
83 try:
84 return _connection_context.get()
85 except LookupError:
86 raise RuntimeError(f"{fn_name}() must be called in a connection context.")
87
88
89 def get_session_context(fn_name):
90 """Look up the current session.
91
92 If there is no current session, raise a ``RuntimeError`` with a
93 helpful message.
94 """
95 try:
96 return _session_context.get()
97 except LookupError:
98 raise RuntimeError(f"{fn_name}() must be called in a session context.")
99
100
101 @contextmanager
102 def connection_context(connection):
103 """This context manager installs ``connection`` as the session context for
104 the current Trio task."""
105 token = _connection_context.set(connection)
106 try:
107 yield
108 finally:
109 _connection_context.reset(token)
110
111
112 @contextmanager
113 def session_context(session):
114 """This context manager installs ``session`` as the session context for the
115 current Trio task."""
116 token = _session_context.set(session)
117 try:
118 yield
119 finally:
120 _session_context.reset(token)
121
122
123 def set_global_connection(connection):
124 """Install ``connection`` in the root context so that it will become the
125 default connection for all tasks.
126
127 This is generally not recommended, except it may be necessary in
128 certain use cases such as running inside Jupyter notebook.
129 """
130 global _connection_context
131 _connection_context = contextvars.ContextVar("_connection_context", default=connection)
132
133
134 def set_global_session(session):
135 """Install ``session`` in the root context so that it will become the
136 default session for all tasks.
137
138 This is generally not recommended, except it may be necessary in
139 certain use cases such as running inside Jupyter notebook.
140 """
141 global _session_context
142 _session_context = contextvars.ContextVar("_session_context", default=session)
143
144
145 class BrowserError(Exception):
146 """This exception is raised when the browser's response to a command
147 indicates that an error occurred."""
148
149 def __init__(self, obj):
150 self.code = obj["code"]
151 self.message = obj["message"]
152 self.detail = obj.get("data")
153
154 def __str__(self):
155 return f"BrowserError<code={self.code} message={self.message}> {self.detail}"
156
157
158 class CdpConnectionClosed(WsConnectionClosed):
159 """Raised when a public method is called on a closed CDP connection."""
160
161 def __init__(self, reason):
162 """Constructor.
163
164 :param reason:
165 :type reason: wsproto.frame_protocol.CloseReason
166 """
167 self.reason = reason
168
169 def __repr__(self):
170 """Return representation."""
171 return f"{self.__class__.__name__}<{self.reason}>"
172
173
174 class InternalError(Exception):
175 """This exception is only raised when there is faulty logic in TrioCDP or
176 the integration with PyCDP."""
177
178
179 @dataclass
180 class CmEventProxy:
181 """A proxy object returned by :meth:`CdpBase.wait_for()``.
182
183 After the context manager executes, this proxy object will have a
184 value set that contains the returned event.
185 """
186
187 value: typing.Any = None
188
189
190 class CdpBase:
191 def __init__(self, ws, session_id, target_id):
192 self.ws = ws
193 self.session_id = session_id
194 self.target_id = target_id
195 self.channels = defaultdict(set)
196 self.id_iter = itertools.count()
197 self.inflight_cmd = {}
198 self.inflight_result = {}
199
200 async def execute(self, cmd: typing.Generator[dict, T, typing.Any]) -> T:
201 """Execute a command on the server and wait for the result.
202
203 :param cmd: any CDP command
204 :returns: a CDP result
205 """
206 cmd_id = next(self.id_iter)
207 cmd_event = trio.Event()
208 self.inflight_cmd[cmd_id] = cmd, cmd_event
209 request = next(cmd)
210 request["id"] = cmd_id
211 if self.session_id:
212 request["sessionId"] = self.session_id
213 request_str = json.dumps(request)
214 try:
215 await self.ws.send_message(request_str)
216 except WsConnectionClosed as wcc:
217 raise CdpConnectionClosed(wcc.reason) from None
218 await cmd_event.wait()
219 response = self.inflight_result.pop(cmd_id)
220 if isinstance(response, Exception):
221 raise response
222 return response
223
224 def listen(self, *event_types, buffer_size=10):
225 """Return an async iterator that iterates over events matching the
226 indicated types."""
227 sender, receiver = trio.open_memory_channel(buffer_size)
228 for event_type in event_types:
229 self.channels[event_type].add(sender)
230 return receiver
231
232 @asynccontextmanager
233 async def wait_for(self, event_type: typing.Type[T], buffer_size=10) -> typing.AsyncGenerator[CmEventProxy, None]:
234 """Wait for an event of the given type and return it.
235
236 This is an async context manager, so you should open it inside
237 an async with block. The block will not exit until the indicated
238 event is received.
239 """
240 sender, receiver = trio.open_memory_channel(buffer_size)
241 self.channels[event_type].add(sender)
242 proxy = CmEventProxy()
243 yield proxy
244 async with receiver:
245 event = await receiver.receive()
246 proxy.value = event
247
248 def _handle_data(self, data):
249 """Handle incoming WebSocket data.
250
251 :param dict data: a JSON dictionary
252 """
253 if "id" in data:
254 self._handle_cmd_response(data)
255 else:
256 self._handle_event(data)
257
258 def _handle_cmd_response(self, data):
259 """Handle a response to a command. This will set an event flag that
260 will return control to the task that called the command.
261
262 :param dict data: response as a JSON dictionary
263 """
264 cmd_id = data["id"]
265 try:
266 cmd, event = self.inflight_cmd.pop(cmd_id)
267 except KeyError:
268 logger.warning(f"Got a message with a command ID that does not exist: {data}")
269 return
270 if "error" in data:
271 # If the server reported an error, convert it to an exception and do
272 # not process the response any further.
273 self.inflight_result[cmd_id] = BrowserError(data["error"])
274 else:
275 # Otherwise, continue the generator to parse the JSON result
276 # into a CDP object.
277 try:
278 response = cmd.send(data["result"])
279 raise InternalError("The command's generator function " "did not exit when expected!")
280 except StopIteration as exit:
281 return_ = exit.value
282 self.inflight_result[cmd_id] = return_
283 event.set()
284
285 def _handle_event(self, data):
286 """Handle an event.
287
288 :param dict data: event as a JSON dictionary
289 """
290 global devtools
291 event = devtools.util.parse_json_event(data)
292 logger.debug("Received event: %s", event)
293 to_remove = set()
294 for sender in self.channels[type(event)]:
295 try:
296 sender.send_nowait(event)
297 except trio.WouldBlock:
298 logger.error('Unable to send event "%r" due to full channel %s', event, sender)
299 except trio.BrokenResourceError:
300 to_remove.add(sender)
301 if to_remove:
302 self.channels[type(event)] -= to_remove
303
304
305 class CdpSession(CdpBase):
306 """Contains the state for a CDP session.
307
308 Generally you should not instantiate this object yourself; you should call
309 :meth:`CdpConnection.open_session`.
310 """
311
312 def __init__(self, ws, session_id, target_id):
313 """Constructor.
314
315 :param trio_websocket.WebSocketConnection ws:
316 :param devtools.target.SessionID session_id:
317 :param devtools.target.TargetID target_id:
318 """
319 super().__init__(ws, session_id, target_id)
320
321 self._dom_enable_count = 0
322 self._dom_enable_lock = trio.Lock()
323 self._page_enable_count = 0
324 self._page_enable_lock = trio.Lock()
325
326 @asynccontextmanager
327 async def dom_enable(self):
328 """A context manager that executes ``dom.enable()`` when it enters and
329 then calls ``dom.disable()``.
330
331 This keeps track of concurrent callers and only disables DOM
332 events when all callers have exited.
333 """
334 global devtools
335 async with self._dom_enable_lock:
336 self._dom_enable_count += 1
337 if self._dom_enable_count == 1:
338 await self.execute(devtools.dom.enable())
339
340 yield
341
342 async with self._dom_enable_lock:
343 self._dom_enable_count -= 1
344 if self._dom_enable_count == 0:
345 await self.execute(devtools.dom.disable())
346
347 @asynccontextmanager
348 async def page_enable(self):
349 """A context manager that executes ``page.enable()`` when it enters and
350 then calls ``page.disable()`` when it exits.
351
352 This keeps track of concurrent callers and only disables page
353 events when all callers have exited.
354 """
355 global devtools
356 async with self._page_enable_lock:
357 self._page_enable_count += 1
358 if self._page_enable_count == 1:
359 await self.execute(devtools.page.enable())
360
361 yield
362
363 async with self._page_enable_lock:
364 self._page_enable_count -= 1
365 if self._page_enable_count == 0:
366 await self.execute(devtools.page.disable())
367
368
369 class CdpConnection(CdpBase, trio.abc.AsyncResource):
370 """Contains the connection state for a Chrome DevTools Protocol server.
371
372 CDP can multiplex multiple "sessions" over a single connection. This
373 class corresponds to the "root" session, i.e. the implicitly created
374 session that has no session ID. This class is responsible for
375 reading incoming WebSocket messages and forwarding them to the
376 corresponding session, as well as handling messages targeted at the
377 root session itself. You should generally call the
378 :func:`open_cdp()` instead of instantiating this class directly.
379 """
380
381 def __init__(self, ws):
382 """Constructor.
383
384 :param trio_websocket.WebSocketConnection ws:
385 """
386 super().__init__(ws, session_id=None, target_id=None)
387 self.sessions = {}
388
389 async def aclose(self):
390 """Close the underlying WebSocket connection.
391
392 This will cause the reader task to gracefully exit when it tries
393 to read the next message from the WebSocket. All of the public
394 APIs (``execute()``, ``listen()``, etc.) will raise
395 ``CdpConnectionClosed`` after the CDP connection is closed. It
396 is safe to call this multiple times.
397 """
398 await self.ws.aclose()
399
400 @asynccontextmanager
401 async def open_session(self, target_id) -> typing.AsyncIterator[CdpSession]:
402 """This context manager opens a session and enables the "simple" style
403 of calling CDP APIs.
404
405 For example, inside a session context, you can call ``await
406 dom.get_document()`` and it will execute on the current session
407 automatically.
408 """
409 session = await self.connect_session(target_id)
410 with session_context(session):
411 yield session
412
413 async def connect_session(self, target_id) -> "CdpSession":
414 """Returns a new :class:`CdpSession` connected to the specified
415 target."""
416 global devtools
417 session_id = await self.execute(devtools.target.attach_to_target(target_id, True))
418 session = CdpSession(self.ws, session_id, target_id)
419 self.sessions[session_id] = session
420 return session
421
422 async def _reader_task(self):
423 """Runs in the background and handles incoming messages: dispatching
424 responses to commands and events to listeners."""
425 global devtools
426 while True:
427 try:
428 message = await self.ws.get_message()
429 except WsConnectionClosed:
430 # If the WebSocket is closed, we don't want to throw an
431 # exception from the reader task. Instead we will throw
432 # exceptions from the public API methods, and we can quietly
433 # exit the reader task here.
434 break
435 try:
436 data = json.loads(message)
437 except json.JSONDecodeError:
438 raise BrowserError({"code": -32700, "message": "Client received invalid JSON", "data": message})
439 logger.debug("Received message %r", data)
440 if "sessionId" in data:
441 session_id = devtools.target.SessionID(data["sessionId"])
442 try:
443 session = self.sessions[session_id]
444 except KeyError:
445 raise BrowserError(f"Browser sent a message for an invalid session: {session_id!r}")
446 session._handle_data(data)
447 else:
448 self._handle_data(data)
449
450
451 @asynccontextmanager
452 async def open_cdp(url) -> typing.AsyncIterator[CdpConnection]:
453 """This async context manager opens a connection to the browser specified
454 by ``url`` before entering the block, then closes the connection when the
455 block exits.
456
457 The context manager also sets the connection as the default
458 connection for the current task, so that commands like ``await
459 target.get_targets()`` will run on this connection automatically. If
460 you want to use multiple connections concurrently, it is recommended
461 to open each on in a separate task.
462 """
463
464 async with trio.open_nursery() as nursery:
465 conn = await connect_cdp(nursery, url)
466 try:
467 with connection_context(conn):
468 yield conn
469 finally:
470 await conn.aclose()
471
472
473 async def connect_cdp(nursery, url) -> CdpConnection:
474 """Connect to the browser specified by ``url`` and spawn a background task
475 in the specified nursery.
476
477 The ``open_cdp()`` context manager is preferred in most situations.
478 You should only use this function if you need to specify a custom
479 nursery. This connection is not automatically closed! You can either
480 use the connection object as a context manager (``async with
481 conn:``) or else call ``await conn.aclose()`` on it when you are
482 done with it. If ``set_context`` is True, then the returned
483 connection will be installed as the default connection for the
484 current task. This argument is for unusual use cases, such as
485 running inside of a notebook.
486 """
487 ws = await connect_websocket_url(nursery, url, max_message_size=MAX_WS_MESSAGE_SIZE)
488 cdp_conn = CdpConnection(ws)
489 nursery.start_soon(cdp_conn._reader_task)
490 return cdp_conn