service.py (solum-9.0.0) | : | service.py (solum-10.0.0) | ||
---|---|---|---|---|
skipping to change at line 18 | skipping to change at line 18 | |||
# | # | |||
# Unless required by applicable law or agreed to in writing, software | # Unless required by applicable law or agreed to in writing, software | |||
# distributed under the License is distributed on an "AS IS" BASIS, | # distributed under the License is distributed on an "AS IS" BASIS, | |||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
# See the License for the specific language governing permissions and | # See the License for the specific language governing permissions and | |||
# limitations under the License. | # limitations under the License. | |||
"""Common RPC service and API tools for Solum.""" | """Common RPC service and API tools for Solum.""" | |||
import eventlet | import eventlet | |||
from oslo_config import cfg | ||||
import oslo_messaging as messaging | import oslo_messaging as messaging | |||
from oslo_messaging.rpc import dispatcher | ||||
from oslo_serialization import jsonutils | from oslo_serialization import jsonutils | |||
import solum.common.context | import solum.common.context | |||
from solum import objects | from solum import objects | |||
from solum import rpc | ||||
# NOTE(paulczar): | # NOTE(paulczar): | |||
# Ubuntu 14.04 forces librabbitmq when kombu is used | # Ubuntu 14.04 forces librabbitmq when kombu is used | |||
# Unfortunately it forces a version that has a crash | # Unfortunately it forces a version that has a crash | |||
# bug. Calling eventlet.monkey_patch() tells kombu | # bug. Calling eventlet.monkey_patch() tells kombu | |||
# to use libamqp instead. | # to use libamqp instead. | |||
eventlet.monkey_patch() | eventlet.monkey_patch() | |||
class JsonPayloadSerializer(messaging.NoOpSerializer): | class JsonPayloadSerializer(messaging.NoOpSerializer): | |||
@staticmethod | @staticmethod | |||
skipping to change at line 64 | skipping to change at line 63 | |||
return context.to_dict() | return context.to_dict() | |||
def deserialize_context(self, context): | def deserialize_context(self, context): | |||
return solum.common.context.RequestContext.from_dict(context) | return solum.common.context.RequestContext.from_dict(context) | |||
class Service(object): | class Service(object): | |||
_server = None | _server = None | |||
def __init__(self, topic, server, handlers): | def __init__(self, topic, server, handlers): | |||
serializer = RequestContextSerializer(JsonPayloadSerializer()) | serializer = RequestContextSerializer(JsonPayloadSerializer()) | |||
transport = messaging.get_rpc_transport(cfg.CONF) | ||||
# TODO(asalkeld) add support for version='x.y' | # TODO(asalkeld) add support for version='x.y' | |||
target = messaging.Target(topic=topic, server=server) | target = messaging.Target(topic=topic, server=server) | |||
access_policy = dispatcher.DefaultRPCAccessPolicy | self._server = rpc.get_server(target, handlers, serializer) | |||
self._server = messaging.get_rpc_server(transport, target, handlers, | ||||
executor='threading', | ||||
serializer=serializer, | ||||
access_policy=access_policy) | ||||
def serve(self): | def serve(self): | |||
objects.load() | objects.load() | |||
self._server.start() | self._server.start() | |||
self._server.wait() | self._server.wait() | |||
class API(object): | class API(object): | |||
def __init__(self, transport=None, context=None, topic=None): | def __init__(self, context=None, topic=None): | |||
serializer = RequestContextSerializer(JsonPayloadSerializer()) | serializer = RequestContextSerializer(JsonPayloadSerializer()) | |||
if transport is None: | ||||
transport = messaging.get_rpc_transport(cfg.CONF) | ||||
self._context = context | self._context = context | |||
if topic is None: | if topic is None: | |||
topic = '' | topic = '' | |||
target = messaging.Target(topic=topic) | target = messaging.Target(topic=topic) | |||
self._client = messaging.RPCClient(transport, target, | self._client = rpc.get_client(target, | |||
serializer=serializer) | serializer=serializer) | |||
def _call(self, method, *args, **kwargs): | def _call(self, method, *args, **kwargs): | |||
return self._client.call(self._context, method, *args, **kwargs) | return self._client.call(self._context, method, *args, **kwargs) | |||
def _cast(self, method, *args, **kwargs): | def _cast(self, method, *args, **kwargs): | |||
self._client.cast(self._context, method, *args, **kwargs) | self._client.cast(self._context, method, *args, **kwargs) | |||
def echo(self, message): | def echo(self, message): | |||
self._cast('echo', message=message) | self._cast('echo', message=message) | |||
End of changes. 8 change blocks. | ||||
13 lines changed or deleted | 5 lines changed or added |