"Fossies" - the Fresh Open Source Software Archive

Member "zaqar-9.0.0/zaqar/notification/tasks/webhook.py" (16 Oct 2019, 6916 Bytes) of package /linux/misc/openstack/zaqar-9.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "webhook.py" see the Fossies "Dox" file reference documentation and the last Fossies "Diffs" side-by-side code changes report: 5.0.0_vs_7.0.0.

    1 # Copyright (c) 2015 Catalyst IT Ltd
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License");
    4 # you may not use this file except in compliance with the License.
    5 # You may obtain a copy of the License at
    6 #
    7 #    http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS,
   11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
   12 # implied.
   13 # See the License for the specific language governing permissions and
   14 # limitations under the License.
   15 
   16 import math
   17 import time
   18 
   19 import json
   20 from oslo_log import log as logging
   21 import requests
   22 
   23 from zaqar.common import consts
   24 
   25 LOG = logging.getLogger(__name__)
   26 
   27 
   28 def _Linear_function(minimum_delay, maximum_delay, times):
   29     return range(minimum_delay, maximum_delay, times)
   30 
   31 
   32 def _Geometric_function(minimum_delay, maximum_delay, times):
   33     x_max = int((maximum_delay - minimum_delay) / times)
   34     k = math.pow(10, math.log10(maximum_delay/minimum_delay)/(x_max-1))
   35     xarray = range(1, x_max+1)
   36     return [int(minimum_delay*math.pow(k, a-1)) for a in xarray]
   37 
   38 
   39 def _Exponential_function(minimum_delay, maximum_delay, times):
   40     x_max = int((maximum_delay - minimum_delay) / times)
   41     k = math.pow(10, math.log10(maximum_delay/minimum_delay)/(x_max-1))
   42     p = minimum_delay/k
   43     xarray = range(1, x_max+1)
   44     return [int(p*math.pow(k, a)) for a in xarray]
   45 
   46 
   47 def _Arithmetic_function(minimum_delay, maximum_delay, times):
   48     x_max = int((maximum_delay - minimum_delay) / times)
   49     d = 2.0 * (maximum_delay - minimum_delay) / (x_max * (x_max - 1))
   50     xarray = range(1, x_max+1)
   51     return [int(minimum_delay+(a-1)*a*d/2) for a in xarray]
   52 
   53 
   54 RETRY_BACKOFF_FUNCTION_MAP = {'linear': _Linear_function,
   55                               'arithmetic': _Arithmetic_function,
   56                               'geometric': _Geometric_function,
   57                               'exponential': _Exponential_function}
   58 
   59 
   60 class WebhookTask(object):
   61 
   62     def _post_request_success(self, subscriber, data, headers):
   63         try:
   64             response = requests.post(subscriber, data=data, headers=headers)
   65             if response and (response.status_code in range(200, 500)):
   66                 return True
   67         except Exception as e:
   68             LOG.exception('post request got exception in retry: %s.', str(e))
   69         return False
   70 
   71     def _retry_post(self, sub_retry_policy, queue_retry_policy, subscriber,
   72                     data, headers):
   73         retry_policy = None
   74         if sub_retry_policy.get('ignore_subscription_override') or \
   75            queue_retry_policy.get('ignore_subscription_override'):
   76             retry_policy = queue_retry_policy or {}
   77         else:
   78             retry_policy = sub_retry_policy or queue_retry_policy or {}
   79         # Immediate Retry Phase
   80         for retry_with_no_delay in range(
   81                 0, retry_policy.get('retries_with_no_delay',
   82                                     consts.RETRIES_WITH_NO_DELAY)):
   83             LOG.debug('Retry with no delay, count: %s', retry_with_no_delay)
   84             if self._post_request_success(subscriber, data, headers):
   85                 return
   86         # Pre-Backoff Phase
   87         for minimum_delay_retry in range(
   88                 0, retry_policy.get('minimum_delay_retries',
   89                                     consts.MINIMUM_DELAY_RETRIES)):
   90             LOG.debug('Retry with minimum delay, count: %s',
   91                       minimum_delay_retry)
   92             time.sleep(retry_policy.get('minimum_delay', consts.MINIMUM_DELAY))
   93             if self._post_request_success(subscriber, data, headers):
   94                 return
   95         # Now we support linear,arithmetic,
   96         # exponential and geometric retry backoff function.
   97         retry_function = retry_policy.get('retry_backoff_function', 'linear')
   98         backoff_function = RETRY_BACKOFF_FUNCTION_MAP[retry_function]
   99         for i in backoff_function(retry_policy.get('minimum_delay',
  100                                                    consts.MINIMUM_DELAY),
  101                                   retry_policy.get('maximum_delay',
  102                                                    consts.MAXIMUM_DELAY),
  103                                   consts.LINEAR_INTERVAL):
  104             LOG.debug('Retry with function:%s, sleep: %s seconds',
  105                       retry_function, i)
  106             time.sleep(i)
  107             if self._post_request_success(subscriber, data, headers):
  108                 return
  109         # Post-Backoff Phase
  110         for maximum_delay_retries in range(
  111                 0, retry_policy.get('maximum_delay_retries',
  112                                     consts.MAXIMUM_DELAY_RETRIES)):
  113             LOG.debug('Retry with maximum delay, count: %s',
  114                       maximum_delay_retries)
  115             time.sleep(retry_policy.get('maximum_delay', consts.MAXIMUM_DELAY))
  116             if self._post_request_success(subscriber, data, headers):
  117                 return
  118         LOG.debug('Send request retries are all failed.')
  119 
  120     def execute(self, subscription, messages, headers=None, **kwargs):
  121         if headers is None:
  122             headers = {'Content-Type': 'application/json'}
  123         headers.update(subscription['options'].get('post_headers', {}))
  124         try:
  125             for msg in messages:
  126                 # NOTE(Eva-i): Unfortunately this will add 'queue_name' key to
  127                 # our original messages(dicts) which will be later consumed in
  128                 # the storage controller. It seems safe though.
  129                 msg['queue_name'] = subscription['source']
  130                 if 'post_data' in subscription['options']:
  131                     data = subscription['options']['post_data']
  132                     data = data.replace('"$zaqar_message$"', json.dumps(msg))
  133                 else:
  134                     data = json.dumps(msg)
  135                 response = requests.post(subscription['subscriber'],
  136                                          data=data,
  137                                          headers=headers)
  138                 if response and (response.status_code not in range(200, 500)):
  139                     LOG.info("Response is %s, begin to retry",
  140                              response.status_code)
  141                     self._retry_post(
  142                         subscription['options'].get('_retry_policy', {}),
  143                         kwargs.get('queue_retry_policy'),
  144                         subscription['subscriber'],
  145                         data, headers)
  146         except Exception as e:
  147             LOG.exception('webhook task got exception: %s.', str(e))
  148             self._retry_post(subscription['options'].get('_retry_policy', {}),
  149                              kwargs.get('queue_retry_policy'),
  150                              subscription['subscriber'],
  151                              data, headers)
  152 
  153     def register(self, subscriber, options, ttl, project_id, request_data):
  154         pass