__init__.py (aodh-14.0.0) | : | __init__.py (aodh-15.0.0) | ||
---|---|---|---|---|
skipping to change at line 214 | skipping to change at line 214 | |||
self.storage_conn = storage.get_connection_from_config(self.conf) | self.storage_conn = storage.get_connection_from_config(self.conf) | |||
self.partition_coordinator = coordination.PartitionCoordinator( | self.partition_coordinator = coordination.PartitionCoordinator( | |||
self.conf) | self.conf) | |||
self.partition_coordinator.start() | self.partition_coordinator.start() | |||
self.partition_coordinator.join_group(self.PARTITIONING_GROUP_NAME) | self.partition_coordinator.join_group(self.PARTITIONING_GROUP_NAME) | |||
# allow time for coordination if necessary | # allow time for coordination if necessary | |||
delay_start = self.partition_coordinator.is_active() | delay_start = self.partition_coordinator.is_active() | |||
evaluation_interval = self.conf.evaluator.evaluation_interval | ||||
if self.evaluators: | if self.evaluators: | |||
@periodics.periodic(spacing=self.conf.evaluation_interval, | @periodics.periodic(spacing=evaluation_interval, | |||
run_immediately=not delay_start) | run_immediately=not delay_start) | |||
def evaluate_alarms(): | def evaluate_alarms(): | |||
self._evaluate_assigned_alarms() | self._evaluate_assigned_alarms() | |||
self.periodic.add(evaluate_alarms) | self.periodic.add(evaluate_alarms) | |||
if self.partition_coordinator.is_active(): | if self.partition_coordinator.is_active(): | |||
heartbeat_interval = min(self.conf.coordination.heartbeat_interval, | heartbeat_interval = min(self.conf.coordination.heartbeat_interval, | |||
self.conf.evaluation_interval / 4) | evaluation_interval / 4) | |||
@periodics.periodic(spacing=heartbeat_interval, | @periodics.periodic(spacing=heartbeat_interval, | |||
run_immediately=True) | run_immediately=True) | |||
def heartbeat(): | def heartbeat(): | |||
self.partition_coordinator.heartbeat() | self.partition_coordinator.heartbeat() | |||
self.periodic.add(heartbeat) | self.periodic.add(heartbeat) | |||
t = threading.Thread(target=self.periodic.start) | t = threading.Thread(target=self.periodic.start) | |||
t.daemon = True | t.daemon = True | |||
skipping to change at line 287 | skipping to change at line 289 | |||
return | return | |||
LOG.debug('Evaluating alarm %s', alarm.alarm_id) | LOG.debug('Evaluating alarm %s', alarm.alarm_id) | |||
try: | try: | |||
self.evaluators[alarm.type].obj.evaluate(alarm) | self.evaluators[alarm.type].obj.evaluate(alarm) | |||
except Exception: | except Exception: | |||
LOG.exception('Failed to evaluate alarm %s', alarm.alarm_id) | LOG.exception('Failed to evaluate alarm %s', alarm.alarm_id) | |||
def _assigned_alarms(self): | def _assigned_alarms(self): | |||
before = (timeutils.utcnow() - datetime.timedelta( | before = (timeutils.utcnow() - datetime.timedelta( | |||
seconds=self.conf.evaluation_interval / 2)) | seconds=self.conf.evaluator.evaluation_interval / 2)) | |||
selected = self.storage_conn.get_alarms( | selected = self.storage_conn.get_alarms( | |||
enabled=True, | enabled=True, | |||
type={'ne': 'event'}, | type={'ne': 'event'}, | |||
evaluate_timestamp={'lt': before}, | evaluate_timestamp={'lt': before}, | |||
) | ) | |||
if self.partition_coordinator.is_active(): | if self.partition_coordinator.is_active(): | |||
all_alarm_ids = [a.alarm_id for a in selected] | all_alarm_ids = [a.alarm_id for a in selected] | |||
selected_ids = self.partition_coordinator.extract_my_subset( | selected_ids = self.partition_coordinator.extract_my_subset( | |||
self.PARTITIONING_GROUP_NAME, all_alarm_ids | self.PARTITIONING_GROUP_NAME, all_alarm_ids | |||
End of changes. 4 change blocks. | ||||
3 lines changed or deleted | 5 lines changed or added |