panko  8.1.0
About: OpenStack Panko is an event storage service and REST API for Ceilometer.
The "Ussuri" series (maintained release).
  Fossies Dox: panko-8.1.0.tar.gz  ("unofficial" and yet experimental doxygen-generated source code documentation)  

impl_mongodb.py
Go to the documentation of this file.
2# Licensed under the Apache License, Version 2.0 (the "License"); you may
3# not use this file except in compliance with the License. You may obtain
4# a copy of the License at
5#
6# http://www.apache.org/licenses/LICENSE-2.0
7#
8# Unless required by applicable law or agreed to in writing, software
9# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
10# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
11# License for the specific language governing permissions and limitations
12# under the License.
13"""MongoDB storage backend"""
14
15from oslo_log import log
16import pymongo
17
18from panko import storage
19from panko.storage.mongo import utils as pymongo_utils
20from panko.storage import pymongo_base
21
22LOG = log.getLogger(__name__)
23
24
26 """Put the event data into a MongoDB database."""
27
28 CONNECTION_POOL = pymongo_utils.ConnectionPool()
29
30 def __init__(self, url, conf):
31
32 # NOTE(jd) Use our own connection pooling on top of the Pymongo one.
33 # We need that otherwise we overflow the MongoDB instance with new
34 # connection since we instantiate a Pymongo client each time someone
35 # requires a new storage connection.
36 self.connconn = self.CONNECTION_POOLCONNECTION_POOL.connect(
37 url,
38 conf.database.max_retries,
39 conf.database.retry_interval)
40
41 # Require MongoDB 2.4 to use $setOnInsert
42 if self.connconn.server_info()['versionArray'] < [2, 4]:
43 raise storage.StorageBadVersion("Need at least MongoDB 2.4")
44
45 connection_options = pymongo.uri_parser.parse_uri(url)
46 self.dbdb = getattr(self.connconn, connection_options['database'])
47 if connection_options.get('username'):
48 self.dbdb.authenticate(connection_options['username'],
49 connection_options['password'])
50
51 # NOTE(jd) Upgrading is just about creating index, so let's do this
52 # on connection to be sure at least the TTL is correctly updated if
53 # needed.
54 self.upgradeupgradeupgrade()
55
56 @staticmethod
57 def update_ttl(ttl, ttl_index_name, index_field, coll):
58 """Update or create time_to_live indexes.
59
60 :param ttl: time to live in seconds.
61 :param ttl_index_name: name of the index we want to update or create.
62 :param index_field: field with the index that we need to update.
63 :param coll: collection which indexes need to be updated.
64 """
65 indexes = coll.index_information()
66 if ttl <= 0:
67 if ttl_index_name in indexes:
68 coll.drop_index(ttl_index_name)
69 return
70
71 if ttl_index_name in indexes:
72 return coll.database.command(
73 'collMod', coll.name,
74 index={'keyPattern': {index_field: pymongo.ASCENDING},
75 'expireAfterSeconds': ttl})
76
77 coll.create_index([(index_field, pymongo.ASCENDING)],
78 expireAfterSeconds=ttl,
79 name=ttl_index_name)
80
81 def upgrade(self):
82 # create collection if not present
83 if 'event' not in self.dbdb.conn.collection_names():
84 self.dbdb.conn.create_collection('event')
85 # Establish indexes
86 # NOTE(idegtiarov): This indexes cover get_events, get_event_types, and
87 # get_trait_types requests based on event_type and timestamp fields.
88 self.dbdb.event.create_index(
89 [('event_type', pymongo.ASCENDING),
90 ('timestamp', pymongo.ASCENDING)],
91 name='event_type_idx'
92 )
93
94 def clear(self):
95 self.connconn.drop_database(self.dbdb.name)
96 # Connection will be reopened automatically if needed
97 self.connconn.close()
98
99 def clear_expired_data(self, ttl, max_count=None):
100 """Clear expired data from the backend storage system.
101
102 Clearing occurs according to the time-to-live.
103
104 :param ttl: Number of seconds to keep records for.
105 :param max_count: Number of records to delete (not used for MongoDB).
106 """
107 self.update_ttlupdate_ttl(ttl, 'event_ttl', 'timestamp', self.dbdb.event)
108 LOG.info("Clearing expired event data is based on native "
109 "MongoDB time to live feature and going in background.")
def clear_expired_data(self, ttl, max_count=None)
Definition: impl_mongodb.py:99
def update_ttl(ttl, ttl_index_name, index_field, coll)
Definition: impl_mongodb.py:57