"Fossies" - the Fresh Open Source Software Archive

Member "zaqar-7.0.0/zaqar/transport/websocket/driver.py" (30 Aug 2018, 4137 Bytes) of package /linux/misc/openstack/zaqar-7.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "driver.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 5.0.0_vs_7.0.0.

    1 # Copyright (c) 2015 Red Hat, Inc.
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License");
    4 # you may not use this file except in compliance with the License.
    5 # You may obtain a copy of the License at
    6 #
    7 #    http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS,
   11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
   12 # implied.
   13 # See the License for the specific language governing permissions and
   14 # limitations under the License.
   15 
   16 import socket
   17 
   18 from oslo_log import log as logging
   19 from oslo_utils import netutils
   20 
   21 try:
   22     import asyncio
   23 except ImportError:
   24     import trollius as asyncio
   25 
   26 from zaqar.common import decorators
   27 from zaqar.conf import drivers_transport_websocket
   28 from zaqar.i18n import _
   29 from zaqar.transport import base
   30 from zaqar.transport.middleware import auth
   31 from zaqar.transport.websocket import factory
   32 
   33 
   34 LOG = logging.getLogger(__name__)
   35 
   36 
   37 # TODO(derekh): use escape_ipv6 from oslo.utils once available
   38 def _escape_ipv6(address):
   39     """Escape an IP address in square brackets if IPv6"""
   40     if netutils.is_valid_ipv6(address):
   41         return "[%s]" % address
   42     return address
   43 
   44 
   45 class Driver(base.DriverBase):
   46 
   47     def __init__(self, conf, api, cache):
   48         super(Driver, self).__init__(conf, None, None, None)
   49         self._api = api
   50         self._cache = cache
   51 
   52         self._conf.register_opts(drivers_transport_websocket.ALL_OPTS,
   53                                  group=drivers_transport_websocket.GROUP_NAME)
   54         self._ws_conf = self._conf[drivers_transport_websocket.GROUP_NAME]
   55 
   56         if self._conf.auth_strategy:
   57             auth_strategy = auth.strategy(self._conf.auth_strategy)
   58             self._auth_strategy = lambda app: auth_strategy.install(
   59                 app, self._conf)
   60         else:
   61             self._auth_strategy = None
   62 
   63     @decorators.lazy_property(write=False)
   64     def factory(self):
   65         uri = 'ws://' + _escape_ipv6(self._ws_conf.bind) + ':' + \
   66               str(self._ws_conf.port)
   67         return factory.ProtocolFactory(
   68             uri,
   69             handler=self._api,
   70             external_port=self._ws_conf.external_port,
   71             auth_strategy=self._auth_strategy,
   72             loop=asyncio.get_event_loop(),
   73             secret_key=self._conf.signed_url.secret_key)
   74 
   75     @decorators.lazy_property(write=False)
   76     def notification_factory(self):
   77         return factory.NotificationFactory(self.factory)
   78 
   79     def listen(self):
   80         """Self-host the WebSocket server.
   81 
   82         It runs the WebSocket server using 'bind' and 'port' options from the
   83         websocket config group, and the notifiton endpoint using the
   84         'notification_bind' and 'notification_port' options.
   85         """
   86         msgtmpl = _(u'Serving on host %(bind)s:%(port)s')
   87         LOG.info(msgtmpl,
   88                  {'bind': self._ws_conf.bind, 'port': self._ws_conf.port})
   89 
   90         loop = asyncio.get_event_loop()
   91         coro_notification = loop.create_server(
   92             self.notification_factory,
   93             self._ws_conf.notification_bind,
   94             self._ws_conf.notification_port)
   95         coro = loop.create_server(
   96             self.factory,
   97             self._ws_conf.bind,
   98             self._ws_conf.port)
   99 
  100         def got_server(task):
  101             # Retrieve the port number of the listening socket
  102             port = task.result().sockets[0].getsockname()[1]
  103             if self._ws_conf.notification_bind is not None:
  104                 host = self._ws_conf.notification_bind
  105             else:
  106                 host = socket.gethostname()
  107             self.notification_factory.set_subscription_url(
  108                 'http://%s:%s/' % (_escape_ipv6(host), port))
  109             self._api.set_subscription_factory(self.notification_factory)
  110 
  111         task = asyncio.Task(coro_notification)
  112         task.add_done_callback(got_server)
  113 
  114         loop.run_until_complete(asyncio.wait([coro, task]))
  115 
  116         try:
  117             loop.run_forever()
  118         except KeyboardInterrupt:
  119             pass
  120         finally:
  121             loop.close()