15"""Common functions for MongoDB backend
20from oslo_log
import log
21from oslo_utils
import netutils
29ERROR_INDEX_WITH_DIFFERENT_SPEC_ALREADY_EXISTS = 86
31LOG = log.getLogger(__name__)
33EVENT_TRAIT_TYPES = {
'none': 0,
'string': 1,
'integer': 2,
'float': 3,
35OP_SIGN = {
'lt':
'$lt',
'le':
'$lte',
'ne':
'$ne',
'gt':
'$gt',
'ge':
'$gte'}
37MINIMUM_COMPATIBLE_MONGODB_VERSION = [2, 4]
38COMPLETE_AGGREGATE_COMPATIBLE_VERSION = [2, 6]
42 start_timestamp_op=None, end_timestamp_op=None):
44 """Create the query document to find timestamps within that range.
46 This is done by given two possible datetimes
and their operations.
47 By default, using $gte
for the lower bound
and $lt
for the upper bound.
52 if start_timestamp_op ==
'gt':
53 start_timestamp_op =
'$gt'
55 start_timestamp_op =
'$gte'
56 ts_range[start_timestamp_op] = start
59 if end_timestamp_op ==
'le':
60 end_timestamp_op =
'$lte'
62 end_timestamp_op =
'$lt'
63 ts_range[end_timestamp_op] = end
68 """Return start and stop row for filtering and a query.
70 Query is based on the selected parameter.
77 event_filter.end_timestamp)
79 q_list.append({
'timestamp': ts_range})
80 if event_filter.event_type:
81 q_list.append({
'event_type': event_filter.event_type})
82 if event_filter.message_id:
83 q_list.append({
'_id': event_filter.message_id})
85 if event_filter.traits_filter:
86 for trait_filter
in event_filter.traits_filter:
87 op = trait_filter.pop(
'op',
'eq')
89 for k, v
in six.iteritems(trait_filter):
94 dict_query.setdefault(
'trait_name', v)
95 elif k
in [
'string',
'integer',
'datetime',
'float']:
96 dict_query.setdefault(
'trait_type',
98 dict_query.setdefault(
'trait_value',
100 else {OP_SIGN[op]: v})
101 dict_query = {
'$elemMatch': dict_query}
102 q_list.append({
'traits': dict_query})
103 if event_filter.admin_proj:
104 q_list.append({
'$or': [
105 {
'traits': {
'$not': {
'$elemMatch': {
'trait_name':
'project_id'}}}},
107 '$elemMatch': {
'trait_name':
'project_id',
108 'trait_value': event_filter.admin_proj}}}]})
110 query = {
'$and': q_list}
120 def connect(self, url, max_retries, retry_interval):
121 connection_options = pymongo.uri_parser.parse_uri(url)
122 del connection_options[
'database']
123 del connection_options[
'username']
124 del connection_options[
'password']
125 del connection_options[
'collection']
126 pool_key = tuple(connection_options)
128 if pool_key
in self.
_pool_pool:
129 client = self.
_pool_pool.get(pool_key)()
132 splitted_url = netutils.urlsplit(url)
133 log_data = {
'db': splitted_url.scheme,
134 'nodelist': connection_options[
'nodelist']}
135 LOG.info(
'Connecting to %(db)s on %(nodelist)s' % log_data)
138 max_retries, retry_interval)
139 except pymongo.errors.ConnectionFailure
as e:
140 LOG.warning(
_(
'Unable to connect to the database server: '
141 '%(errmsg)s.') % {
'errmsg': e})
143 self.
_pool_pool[pool_key] = weakref.ref(client)
148 return tenacity.retry(
149 retry=tenacity.retry_if_exception_type(
150 pymongo.errors.AutoReconnect),
151 wait=tenacity.wait_fixed(retry_interval),
152 stop=(tenacity.stop_after_attempt(max_retries)
if max_retries >= 0
153 else tenacity.stop_never)
157MONGO_METHODS = set([typ
for typ
in dir(pymongo.collection.Collection)
158 if not typ.startswith(
'_')])
159MONGO_METHODS.update(set([typ
for typ
in dir(pymongo.MongoClient)
160 if not typ.startswith(
'_')]))
161MONGO_METHODS.update(set([typ
for typ
in dir(pymongo)
162 if not typ.startswith(
'_')]))
166 def __init__(self, conn, max_retries, retry_interval):
174 """Create and return proxy around the method in the connection.
176 :param item: name of the connection
180 def find(self, *args, **kwargs):
191 except pymongo.errors.OperationFailure
as e:
192 if e.code
is ERROR_INDEX_WITH_DIFFERENT_SPEC_ALREADY_EXISTS:
193 LOG.info(
"Index %s will be recreate." % name)
197 self.
connconn.drop_index(name)
201 """Wrap MongoDB connection.
203 If item is the name of an executable method,
for example find
or
204 insert, wrap this method
in the MongoConn.
205 Else wrap getting attribute
with MongoProxy.
207 if item
in (
'name',
'database'):
208 return getattr(self.
connconn, item)
209 if item
in MONGO_METHODS:
212 )(getattr(self.
connconn, item))
217 return self.
connconn(*args, **kwargs)
221 def __init__(self, cursor, max_retry, retry_interval):
226 return self.
cursorcursor[item]
229 """Wrap Cursor next method.
231 This method will be executed before each Cursor next method call.
234 save_cursor = self.
cursorcursor.clone()
236 except pymongo.errors.AutoReconnect:
237 self.
cursorcursor = save_cursor
241 return getattr(self.
cursorcursor, item)
def connect(self, url, max_retries, retry_interval)
def __getitem__(self, item)
def __getattr__(self, item)
def __init__(self, cursor, max_retry, retry_interval)
def __init__(self, conn, max_retries, retry_interval)
def __call__(self, *args, **kwargs)
def find(self, *args, **kwargs)
def __getattr__(self, item)
def _recreate_index(self, keys, name, *args, **kwargs)
def create_index(self, keys, name=None, *args, **kwargs)
def __getitem__(self, item)
def make_timestamp_range(start, end, start_timestamp_op=None, end_timestamp_op=None)
def _safe_mongo_call(max_retries, retry_interval)
def make_events_query_from_filter(event_filter)