"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "cloudkitty/orchestrator.py" between
cloudkitty-8.0.0.tar.gz and cloudkitty-9.0.0.tar.gz

About: OpenStack CloudKitty provides Rating-as-a-Service for OpenStack.
The "Stein" series (latest release).

orchestrator.py  (cloudkitty-8.0.0):orchestrator.py  (cloudkitty-9.0.0)
skipping to change at line 200 skipping to change at line 200
storage_data = [] storage_data = []
for metric in metrics: for metric in metrics:
try: try:
try: try:
data = self._collect(metric, timestamp) data = self._collect(metric, timestamp)
except collector.NoDataCollected: except collector.NoDataCollected:
raise raise
except Exception as e: except Exception as e:
LOG.warning( LOG.warning(
'Error while collecting metric ' '[%(scope_id)s] Error while collecting metric '
'%(metric)s: %(error)s', '%(metric)s: %(error)s. Retrying on next '
{'metric': metric, 'error': e}) 'collection cycle.',
raise collector.NoDataCollected('', metric) {
'scope_id': self._tenant_id,
'metric': metric,
'error': e,
},
)
# FIXME(peschk_l): here we just exit, and the
# collection will be retried during the next collect
# cycle. In the future, we should implement a retrying
# system in workers
return
except collector.NoDataCollected: except collector.NoDataCollected:
LOG.info( LOG.info(
'No data collected for metric {} ' '[{}] No data collected for metric {} '
'at timestamp {}'.format( 'at timestamp {}'.format(
metric, ck_utils.ts2dt(timestamp)) self._tenant_id, metric, ck_utils.ts2dt(timestamp))
) )
else: else:
# Rating # Rating
for processor in self._processors: for processor in self._processors:
processor.obj.process(data) processor.obj.process(data)
# Writing # Writing
if isinstance(data, list): if isinstance(data, list):
storage_data += data storage_data += data
else: else:
storage_data.append(data) storage_data.append(data)
skipping to change at line 246 skipping to change at line 256
# RPC # RPC
self.server = None self.server = None
self._rating_endpoint = RatingEndpoint(self) self._rating_endpoint = RatingEndpoint(self)
self._init_messaging() self._init_messaging()
# DLM # DLM
self.coord = coordination.get_coordinator( self.coord = coordination.get_coordinator(
CONF.orchestrator.coordination_url, CONF.orchestrator.coordination_url,
uuidutils.generate_uuid().encode('ascii')) uuidutils.generate_uuid().encode('ascii'))
self.coord.start() self.coord.start(start_heart=True)
def _lock(self, tenant_id): def _lock(self, tenant_id):
lock_name = b"cloudkitty-" + str(tenant_id).encode('ascii') lock_name = b"cloudkitty-" + str(tenant_id).encode('ascii')
return self.coord.get_lock(lock_name) return self.coord.get_lock(lock_name)
def _init_messaging(self): def _init_messaging(self):
target = oslo_messaging.Target(topic='cloudkitty', target = oslo_messaging.Target(topic='cloudkitty',
server=CONF.host, server=CONF.host,
version='1.0') version='1.0')
endpoints = [ endpoints = [
skipping to change at line 296 skipping to change at line 306
if state: if state:
worker = Worker( worker = Worker(
self.collector, self.collector,
self.storage, self.storage,
tenant_id, tenant_id,
) )
worker.run() worker.run()
lock.release() lock.release()
self.coord.heartbeat()
# NOTE(sheeprine): Slow down looping if all tenants are # NOTE(sheeprine): Slow down looping if all tenants are
# being processed # being processed
eventlet.sleep(1) eventlet.sleep(1)
# FIXME(sheeprine): We may cause a drift here # FIXME(sheeprine): We may cause a drift here
eventlet.sleep(CONF.collect.period) eventlet.sleep(CONF.collect.period)
def terminate(self): def terminate(self):
self.coord.stop() self.coord.stop()
 End of changes. 5 change blocks. 
8 lines changed or deleted 17 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)