panko  8.1.0
About: OpenStack Panko is an event storage service and REST API for Ceilometer.
The "Ussuri" series (maintained release).
  Fossies Dox: panko-8.1.0.tar.gz  ("unofficial" and yet experimental doxygen-generated source code documentation)  

utils.py
Go to the documentation of this file.
2# Copyright Ericsson AB 2013. All rights reserved
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15"""Common functions for MongoDB backend
16"""
17
18import weakref
19
20from oslo_log import log
21from oslo_utils import netutils
22import pymongo
23import pymongo.errors
24import six
25import tenacity
26
27from panko.i18n import _
28
29ERROR_INDEX_WITH_DIFFERENT_SPEC_ALREADY_EXISTS = 86
30
31LOG = log.getLogger(__name__)
32
33EVENT_TRAIT_TYPES = {'none': 0, 'string': 1, 'integer': 2, 'float': 3,
34 'datetime': 4}
35OP_SIGN = {'lt': '$lt', 'le': '$lte', 'ne': '$ne', 'gt': '$gt', 'ge': '$gte'}
36
37MINIMUM_COMPATIBLE_MONGODB_VERSION = [2, 4]
38COMPLETE_AGGREGATE_COMPATIBLE_VERSION = [2, 6]
39
40
41def make_timestamp_range(start, end,
42 start_timestamp_op=None, end_timestamp_op=None):
43
44 """Create the query document to find timestamps within that range.
45
46 This is done by given two possible datetimes and their operations.
47 By default, using $gte for the lower bound and $lt for the upper bound.
48 """
49 ts_range = {}
50
51 if start:
52 if start_timestamp_op == 'gt':
53 start_timestamp_op = '$gt'
54 else:
55 start_timestamp_op = '$gte'
56 ts_range[start_timestamp_op] = start
57
58 if end:
59 if end_timestamp_op == 'le':
60 end_timestamp_op = '$lte'
61 else:
62 end_timestamp_op = '$lt'
63 ts_range[end_timestamp_op] = end
64 return ts_range
65
66
68 """Return start and stop row for filtering and a query.
69
70 Query is based on the selected parameter.
71
72 :param event_filter: storage.EventFilter object.
73 """
74 query = {}
75 q_list = []
76 ts_range = make_timestamp_range(event_filter.start_timestamp,
77 event_filter.end_timestamp)
78 if ts_range:
79 q_list.append({'timestamp': ts_range})
80 if event_filter.event_type:
81 q_list.append({'event_type': event_filter.event_type})
82 if event_filter.message_id:
83 q_list.append({'_id': event_filter.message_id})
84
85 if event_filter.traits_filter:
86 for trait_filter in event_filter.traits_filter:
87 op = trait_filter.pop('op', 'eq')
88 dict_query = {}
89 for k, v in six.iteritems(trait_filter):
90 if v is not None:
91 # All parameters in EventFilter['traits'] are optional, so
92 # we need to check if they are in the query or no.
93 if k == 'key':
94 dict_query.setdefault('trait_name', v)
95 elif k in ['string', 'integer', 'datetime', 'float']:
96 dict_query.setdefault('trait_type',
97 EVENT_TRAIT_TYPES[k])
98 dict_query.setdefault('trait_value',
99 v if op == 'eq'
100 else {OP_SIGN[op]: v})
101 dict_query = {'$elemMatch': dict_query}
102 q_list.append({'traits': dict_query})
103 if event_filter.admin_proj:
104 q_list.append({'$or': [
105 {'traits': {'$not': {'$elemMatch': {'trait_name': 'project_id'}}}},
106 {'traits': {
107 '$elemMatch': {'trait_name': 'project_id',
108 'trait_value': event_filter.admin_proj}}}]})
109 if q_list:
110 query = {'$and': q_list}
111
112 return query
113
114
115class ConnectionPool(object):
116
117 def __init__(self):
118 self._pool_pool = {}
119
120 def connect(self, url, max_retries, retry_interval):
121 connection_options = pymongo.uri_parser.parse_uri(url)
122 del connection_options['database']
123 del connection_options['username']
124 del connection_options['password']
125 del connection_options['collection']
126 pool_key = tuple(connection_options)
127
128 if pool_key in self._pool_pool:
129 client = self._pool_pool.get(pool_key)()
130 if client:
131 return client
132 splitted_url = netutils.urlsplit(url)
133 log_data = {'db': splitted_url.scheme,
134 'nodelist': connection_options['nodelist']}
135 LOG.info('Connecting to %(db)s on %(nodelist)s' % log_data)
136 try:
137 client = MongoProxy(pymongo.MongoClient(url),
138 max_retries, retry_interval)
139 except pymongo.errors.ConnectionFailure as e:
140 LOG.warning(_('Unable to connect to the database server: '
141 '%(errmsg)s.') % {'errmsg': e})
142 raise
143 self._pool_pool[pool_key] = weakref.ref(client)
144 return client
145
146
147def _safe_mongo_call(max_retries, retry_interval):
148 return tenacity.retry(
149 retry=tenacity.retry_if_exception_type(
150 pymongo.errors.AutoReconnect),
151 wait=tenacity.wait_fixed(retry_interval),
152 stop=(tenacity.stop_after_attempt(max_retries) if max_retries >= 0
153 else tenacity.stop_never)
154 )
155
156
157MONGO_METHODS = set([typ for typ in dir(pymongo.collection.Collection)
158 if not typ.startswith('_')])
159MONGO_METHODS.update(set([typ for typ in dir(pymongo.MongoClient)
160 if not typ.startswith('_')]))
161MONGO_METHODS.update(set([typ for typ in dir(pymongo)
162 if not typ.startswith('_')]))
163
164
165class MongoProxy(object):
166 def __init__(self, conn, max_retries, retry_interval):
167 self.connconn = conn
168 self.max_retriesmax_retries = max_retries
169 self.retry_intervalretry_interval = retry_interval
171 self.max_retriesmax_retries, self.retry_intervalretry_interval)(self._recreate_index_recreate_index_recreate_index)
172
173 def __getitem__(self, item):
174 """Create and return proxy around the method in the connection.
175
176 :param item: name of the connection
177 """
178 return MongoProxy(self.connconn[item])
179
180 def find(self, *args, **kwargs):
181 # We need this modifying method to return a CursorProxy object so that
182 # we can handle the Cursor next function to catch the AutoReconnect
183 # exception.
184 return CursorProxy(self.connconn.find(*args, **kwargs),
185 self.max_retriesmax_retries,
186 self.retry_intervalretry_interval)
187
188 def create_index(self, keys, name=None, *args, **kwargs):
189 try:
190 self.connconn.create_index(keys, name=name, *args, **kwargs)
191 except pymongo.errors.OperationFailure as e:
192 if e.code is ERROR_INDEX_WITH_DIFFERENT_SPEC_ALREADY_EXISTS:
193 LOG.info("Index %s will be recreate." % name)
194 self._recreate_index_recreate_index_recreate_index(keys, name, *args, **kwargs)
195
196 def _recreate_index(self, keys, name, *args, **kwargs):
197 self.connconn.drop_index(name)
198 self.connconn.create_index(keys, name=name, *args, **kwargs)
199
200 def __getattr__(self, item):
201 """Wrap MongoDB connection.
202
203 If item is the name of an executable method, for example find or
204 insert, wrap this method in the MongoConn.
205 Else wrap getting attribute with MongoProxy.
206 """
207 if item in ('name', 'database'):
208 return getattr(self.connconn, item)
209 if item in MONGO_METHODS:
210 return _safe_mongo_call(
211 self.max_retriesmax_retries, self.retry_intervalretry_interval
212 )(getattr(self.connconn, item))
213 return MongoProxy(getattr(self.connconn, item),
214 self.max_retriesmax_retries, self.retry_intervalretry_interval)
215
216 def __call__(self, *args, **kwargs):
217 return self.connconn(*args, **kwargs)
218
219
220class CursorProxy(pymongo.cursor.Cursor):
221 def __init__(self, cursor, max_retry, retry_interval):
222 self.cursorcursor = cursor
223 self.nextnext = _safe_mongo_call(max_retry, retry_interval)(self._next_next)
224
225 def __getitem__(self, item):
226 return self.cursorcursor[item]
227
228 def _next(self):
229 """Wrap Cursor next method.
230
231 This method will be executed before each Cursor next method call.
232 """
233 try:
234 save_cursor = self.cursorcursor.clone()
235 return self.cursorcursor.next()
236 except pymongo.errors.AutoReconnect:
237 self.cursorcursor = save_cursor
238 raise
239
240 def __getattr__(self, item):
241 return getattr(self.cursorcursor, item)
def connect(self, url, max_retries, retry_interval)
Definition: utils.py:120
def __init__(self, cursor, max_retry, retry_interval)
Definition: utils.py:221
def __init__(self, conn, max_retries, retry_interval)
Definition: utils.py:166
def __call__(self, *args, **kwargs)
Definition: utils.py:216
def find(self, *args, **kwargs)
Definition: utils.py:180
def _recreate_index(self, keys, name, *args, **kwargs)
Definition: utils.py:196
def create_index(self, keys, name=None, *args, **kwargs)
Definition: utils.py:188
def make_timestamp_range(start, end, start_timestamp_op=None, end_timestamp_op=None)
Definition: utils.py:42
def _safe_mongo_call(max_retries, retry_interval)
Definition: utils.py:147
def make_events_query_from_filter(event_filter)
Definition: utils.py:67