"Fossies" - the Fresh Open Source Software Archive

Member "octavia-8.0.0/octavia/amphorae/backends/utils/keepalivedlvs_query.py" (14 Apr 2021, 19599 Bytes) of package /linux/misc/openstack/octavia-8.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "keepalivedlvs_query.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 7.1.1_vs_8.0.0.

    1 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    2 # not use this file except in compliance with the License. You may obtain
    3 # a copy of the License at
    4 #
    5 # http://www.apache.org/licenses/LICENSE-2.0
    6 #
    7 # Unless required by applicable law or agreed to in writing, software
    8 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
    9 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   10 # License for the specific language governing permissions and limitations
   11 # under the License.
   12 
   13 import ipaddress
   14 import os
   15 import re
   16 import subprocess
   17 
   18 from octavia_lib.common import constants as lib_consts
   19 from oslo_log import log as logging
   20 
   21 from octavia.amphorae.backends.agent.api_server import util
   22 from octavia.common import constants
   23 
   24 LOG = logging.getLogger(__name__)
   25 KERNEL_LVS_PATH = '/proc/net/ip_vs'
   26 KERNEL_LVS_STATS_PATH = '/proc/net/ip_vs_stats'
   27 LVS_KEY_REGEX = re.compile(r"RemoteAddress:Port\s+(.*$)")
   28 V4_RS_VALUE_REGEX = re.compile(r"(\w{8}:\w{4})\s+(.*$)")
   29 V4_HEX_IP_REGEX = re.compile(r"(\w{2})(\w{2})(\w{2})(\w{2})")
   30 V6_RS_VALUE_REGEX = re.compile(r"(\[[[\w{4}:]+\b\]:\w{4})\s+(.*$)")
   31 
   32 NS_REGEX = re.compile(r"net_namespace\s(\w+-\w+)")
   33 V4_VS_REGEX = re.compile(r"virtual_server\s([\d+\.]+\b)\s(\d{1,5})")
   34 V4_RS_REGEX = re.compile(r"real_server\s([\d+\.]+\b)\s(\d{1,5})")
   35 V6_VS_REGEX = re.compile(r"virtual_server\s([\w*:]+\b)\s(\d{1,5})")
   36 V6_RS_REGEX = re.compile(r"real_server\s([\w*:]+\b)\s(\d{1,5})")
   37 CONFIG_COMMENT_REGEX = re.compile(
   38     r"#\sConfiguration\sfor\s(\w+)\s(\w{8}-\w{4}-\w{4}-\w{4}-\w{12})")
   39 DISABLED_CONFIG_COMMENT_REGEX = re.compile(
   40     r"#\s(\w+)\s(\w{8}-\w{4}-\w{4}-\w{4}-\w{12}) is disabled")
   41 
   42 CHECKER_REGEX = re.compile(r"(MISC_CHECK|HTTP_GET|TCP_CHECK)")
   43 
   44 
   45 def read_kernel_file(ns_name, file_path):
   46     cmd = ("ip netns exec {ns} cat {lvs_stat_path}".format(
   47         ns=ns_name, lvs_stat_path=file_path))
   48     try:
   49         output = subprocess.check_output(cmd.split(),
   50                                          stderr=subprocess.STDOUT)
   51     except subprocess.CalledProcessError as e:
   52         LOG.error("Failed to get kernel lvs status in ns %(ns_name)s "
   53                   "%(kernel_lvs_path)s: %(err)s %(out)s",
   54                   {'ns_name': ns_name, 'kernel_lvs_path': file_path,
   55                    'err': e, 'out': e.output})
   56         raise e
   57     # py3 treat the output as bytes type.
   58     if isinstance(output, bytes):
   59         output = output.decode('utf-8')
   60     return output
   61 
   62 
   63 def get_listener_realserver_mapping(ns_name, listener_ip_port,
   64                                     health_monitor_enabled):
   65     # returned result:
   66     # actual_member_result = {'rs_ip:listened_port': {
   67     #   'status': 'UP',
   68     #   'Forward': forward_type,
   69     #   'Weight': 5,
   70     #   'ActiveConn': 0,
   71     #   'InActConn': 0
   72     # }}
   73     listener_ip, listener_port = listener_ip_port.rsplit(':', 1)
   74     ip_obj = ipaddress.ip_address(listener_ip.strip('[]'))
   75     output = read_kernel_file(ns_name, KERNEL_LVS_PATH).split('\n')
   76     if ip_obj.version == 4:
   77         ip_to_hex_format = "%.8X" % ip_obj._ip
   78     else:
   79         ip_to_hex_format = r'\[' + ip_obj.exploded + r'\]'
   80     port_hex_format = "%.4X" % int(listener_port)
   81     idex = ip_to_hex_format + ':' + port_hex_format
   82 
   83     if health_monitor_enabled:
   84         member_status = constants.UP
   85     else:
   86         member_status = constants.NO_CHECK
   87 
   88     actual_member_result = {}
   89     find_target_block = False
   90     result_keys = []
   91     for line in output:
   92         if 'RemoteAddress:Port' in line:
   93             result_keys = re.split(r'\s+',
   94                                    LVS_KEY_REGEX.findall(line)[0].strip())
   95         elif ((line.startswith(constants.PROTOCOL_UDP) or
   96                line.startswith(lib_consts.PROTOCOL_SCTP)) and
   97               find_target_block):
   98             break
   99         elif re.match(r'^(UDP|SCTP)\s+%s\s+\w+' % idex,
  100                       line):
  101             find_target_block = True
  102         elif find_target_block and line:
  103             rs_is_ipv4 = True
  104             all_values = V4_RS_VALUE_REGEX.findall(line)
  105             # If can not get all_values with ipv4 regex, then this line must be
  106             # a ipv6 real server record.
  107             if not all_values:
  108                 all_values = V6_RS_VALUE_REGEX.findall(line)
  109                 rs_is_ipv4 = False
  110 
  111             all_values = all_values[0]
  112             ip_port = all_values[0]
  113             result_values = re.split(r"\s+", all_values[1].strip())
  114             member_ip, member_port = ip_port.rsplit(':', 1)
  115             port_string = str(int(member_port, 16))
  116             if rs_is_ipv4:
  117                 ip_string = ipaddress.ip_address(int(member_ip, 16)).compressed
  118                 member_ip_port_string = ip_string + ':' + port_string
  119             else:
  120                 ip_string = ipaddress.ip_address(
  121                     member_ip.strip('[]')).compressed
  122                 member_ip_port_string = '[' + ip_string + ']:' + port_string
  123             result_key_count = len(result_keys)
  124             for index in range(result_key_count):
  125                 if member_ip_port_string not in actual_member_result:
  126                     actual_member_result[
  127                         member_ip_port_string] = {'status': member_status,
  128                                                   result_keys[index]:
  129                                                       result_values[index]}
  130                 else:
  131                     # The other values include the weight
  132                     actual_member_result[
  133                         member_ip_port_string][
  134                         result_keys[index]] = result_values[index]
  135             continue
  136 
  137     return find_target_block, actual_member_result
  138 
  139 
  140 def get_lvs_listener_resource_ipports_nsname(listener_id):
  141     # resource_ipport_mapping = {'Listener': {'id': listener-id,
  142     #                                         'ipport': ipport},
  143     #                            'Pool': {'id': pool-id},
  144     #                            'Members': [{'id': member-id-1,
  145     #                                        'ipport': ipport},
  146     #                                       {'id': member-id-2,
  147     #                                        'ipport': ipport}],
  148     #                            'HealthMonitor': {'id': healthmonitor-id}}
  149     resource_ipport_mapping = {}
  150     with open(util.keepalived_lvs_cfg_path(listener_id), 'r') as f:
  151         cfg = f.read()
  152         ns_name = NS_REGEX.findall(cfg)[0]
  153         listener_ip_port = V4_VS_REGEX.findall(cfg)
  154         if not listener_ip_port:
  155             listener_ip_port = V6_VS_REGEX.findall(cfg)
  156         listener_ip_port = listener_ip_port[0] if listener_ip_port else []
  157 
  158         disabled_resource_ids = DISABLED_CONFIG_COMMENT_REGEX.findall(cfg)
  159 
  160         listener_disabled = any(True
  161                                 for resource in disabled_resource_ids
  162                                 if resource[0] == 'Listener')
  163         if listener_disabled:
  164             return None, ns_name
  165 
  166         if not listener_ip_port:
  167             # If not get listener_ip_port from the lvs config file,
  168             # that means the listener's default pool have no enabled member
  169             # yet. But at this moment, we can get listener_id and ns_name, so
  170             # for this function, we will just return ns_name
  171             return resource_ipport_mapping, ns_name
  172 
  173         cfg_line = cfg.split('\n')
  174         rs_ip_port_list = []
  175         for line in cfg_line:
  176             if 'real_server' in line:
  177                 res = V4_RS_REGEX.findall(line)
  178                 if not res:
  179                     res = V6_RS_REGEX.findall(line)
  180                 rs_ip_port_list.append(res[0])
  181 
  182         resource_type_ids = CONFIG_COMMENT_REGEX.findall(cfg)
  183 
  184         for resource_type, resource_id in resource_type_ids:
  185             value = {'id': resource_id}
  186             if resource_type == 'Member':
  187                 resource_type = '%ss' % resource_type
  188                 if resource_type not in resource_ipport_mapping:
  189                     value = [value]
  190             if resource_type not in resource_ipport_mapping:
  191                 resource_ipport_mapping[resource_type] = value
  192             elif resource_type == 'Members':
  193                 resource_ipport_mapping[resource_type].append(value)
  194 
  195         disabled_member_ids = [
  196             resource[1]
  197             for resource in disabled_resource_ids
  198             if resource[0] == 'Member'
  199         ]
  200 
  201         resource_type = 'Members'
  202         for member_id in disabled_member_ids:
  203             value = {'id': member_id,
  204                      'ipport': None}
  205             if resource_type not in resource_ipport_mapping:
  206                 resource_ipport_mapping[resource_type] = []
  207             resource_ipport_mapping[resource_type].append(value)
  208 
  209         if rs_ip_port_list:
  210             rs_ip_port_count = len(rs_ip_port_list)
  211             for index in range(rs_ip_port_count):
  212                 member_ip = ipaddress.ip_address(
  213                     rs_ip_port_list[index][0])
  214                 if member_ip.version == 6:
  215                     rs_ip_port_list[index] = (
  216                         '[' + member_ip.compressed + ']',
  217                         rs_ip_port_list[index][1])
  218                 resource_ipport_mapping['Members'][index]['ipport'] = (
  219                     rs_ip_port_list[index][0] + ':' +
  220                     rs_ip_port_list[index][1])
  221 
  222         listener_ip = ipaddress.ip_address(listener_ip_port[0])
  223         if listener_ip.version == 6:
  224             listener_ip_port = (
  225                 '[' + listener_ip.compressed + ']', listener_ip_port[1])
  226         resource_ipport_mapping['Listener']['ipport'] = (
  227             listener_ip_port[0] + ':' + listener_ip_port[1])
  228 
  229     return resource_ipport_mapping, ns_name
  230 
  231 
  232 def get_lvs_listener_pool_status(listener_id):
  233     (resource_ipport_mapping,
  234      ns_name) = get_lvs_listener_resource_ipports_nsname(listener_id)
  235     if 'Pool' not in resource_ipport_mapping:
  236         return {}
  237     if 'Members' not in resource_ipport_mapping:
  238         return {'lvs': {
  239             'uuid': resource_ipport_mapping['Pool']['id'],
  240             'status': constants.UP,
  241             'members': {}
  242         }}
  243 
  244     config_path = util.keepalived_lvs_cfg_path(listener_id)
  245     pids_pathes = util.keepalived_lvs_pids_path(listener_id)
  246 
  247     config_stat = os.stat(config_path)
  248     check_pid_stat = os.stat(pids_pathes[2])
  249 
  250     # Indicates that keepalived configuration has been updated but the service
  251     # has yet to be restarted.
  252     # NOTE: It only works if we are doing a RESTART on configuration change,
  253     # Iaa34db6cb1dfed98e96a585c5d105e263c7efa65 forces a RESTART instead of a
  254     # RELOAD, we need to be careful if we want to switch back to RELOAD after
  255     # updating to a recent keepalived release.
  256     restarting = config_stat.st_mtime > check_pid_stat.st_mtime
  257 
  258     with open(util.keepalived_lvs_cfg_path(listener_id), 'r') as f:
  259         cfg = f.read()
  260         hm_enabled = len(CHECKER_REGEX.findall(cfg)) > 0
  261 
  262     _, realserver_result = get_listener_realserver_mapping(
  263         ns_name, resource_ipport_mapping['Listener']['ipport'],
  264         hm_enabled)
  265     pool_status = constants.UP
  266     member_results = {}
  267     if realserver_result:
  268         member_ip_port_list = [
  269             member['ipport'] for member in resource_ipport_mapping['Members']]
  270         down_member_ip_port_set = set(
  271             member_ip_port_list) - set(list(realserver_result.keys()))
  272 
  273         for member_ip_port in member_ip_port_list:
  274             member_id = None
  275             for member in resource_ipport_mapping['Members']:
  276                 if member['ipport'] == member_ip_port:
  277                     member_id = member['id']
  278             if member_ip_port is None:
  279                 status = constants.MAINT
  280             elif member_ip_port in down_member_ip_port_set:
  281                 status = (
  282                     constants.RESTARTING if restarting else constants.DOWN)
  283             elif int(realserver_result[member_ip_port]['Weight']) == 0:
  284                 status = constants.DRAIN
  285             else:
  286                 status = realserver_result[member_ip_port]['status']
  287 
  288             if member_id:
  289                 member_results[member_id] = status
  290     else:
  291         if hm_enabled:
  292             pool_status = constants.DOWN
  293 
  294         for member in resource_ipport_mapping['Members']:
  295             if member['ipport'] is None:
  296                 member_results[member['id']] = constants.MAINT
  297             elif hm_enabled:
  298                 member_results[member['id']] = (
  299                     constants.RESTARTING if restarting else constants.DOWN)
  300             else:
  301                 member_results[member['id']] = constants.NO_CHECK
  302 
  303     return {
  304         'lvs':
  305         {
  306             'uuid': resource_ipport_mapping['Pool']['id'],
  307             'status': pool_status,
  308             'members': member_results
  309         }
  310     }
  311 
  312 
  313 def get_ipvsadm_info(ns_name, is_stats_cmd=False):
  314     cmd_list = ['ip', 'netns', 'exec', ns_name, 'ipvsadm', '-Ln']
  315     # use --exact to ensure output is integer only
  316     if is_stats_cmd:
  317         cmd_list += ['--stats', '--exact']
  318     output = subprocess.check_output(cmd_list, stderr=subprocess.STDOUT)
  319     if isinstance(output, bytes):
  320         output = output.decode('utf-8')
  321     output = output.split('\n')
  322     fields = []
  323     # mapping = {'listeneripport': {'Linstener': vs_values,
  324     #                              'members': [rs_values1, rs_values2]}}
  325     last_key = None
  326     value_mapping = dict()
  327     output_line_num = len(output)
  328 
  329     def split_line(line):
  330         return re.sub(r'\s+', ' ', line.strip()).split(' ')
  331     for line_num in range(output_line_num):
  332         # ipvsadm -Ln
  333         if 'Flags' in output[line_num]:
  334             fields = split_line(output[line_num])
  335         elif fields and 'Flags' in fields and fields.index('Flags') == len(
  336                 fields) - 1:
  337             fields.extend(split_line(output[line_num]))
  338         # ipvsadm -Ln --stats
  339         elif 'Prot' in output[line_num]:
  340             fields = split_line(output[line_num])
  341         elif 'RemoteAddress' in output[line_num]:
  342             start = fields.index('LocalAddress:Port') + 1
  343             temp_fields = fields[start:]
  344             fields.extend(split_line(output[line_num]))
  345             fields.extend(temp_fields)
  346         # here we get the all fields
  347         elif (constants.PROTOCOL_UDP in output[line_num] or
  348               lib_consts.PROTOCOL_SCTP in output[line_num]):
  349             # if UDP/TCP in this line, we can know this line is
  350             # VS configuration.
  351             vs_values = split_line(output[line_num])
  352             for value in vs_values:
  353                 if ':' in value:
  354                     value_mapping[value] = {'Listener': vs_values,
  355                                             'Members': []}
  356                     last_key = value
  357                     break
  358         # here the line must be a RS which belongs to a VS
  359         elif '->' in output[line_num] and last_key:
  360             rs_values = split_line(output[line_num])
  361             rs_values.remove('->')
  362             value_mapping[last_key]['Members'].append(rs_values)
  363 
  364     index = fields.index('->')
  365     vs_fields = fields[:index]
  366     if 'Flags' in vs_fields:
  367         vs_fields.remove('Flags')
  368     rs_fields = fields[index + 1:]
  369     for key in list(value_mapping.keys()):
  370         value_mapping[key]['Listener'] = list(
  371             zip(vs_fields, value_mapping[key]['Listener']))
  372         member_res = []
  373         for member_value in value_mapping[key]['Members']:
  374             member_res.append(list(zip(rs_fields, member_value)))
  375         value_mapping[key]['Members'] = member_res
  376 
  377     return value_mapping
  378 
  379 
  380 def get_lvs_listeners_stats():
  381     lvs_listener_ids = util.get_lvs_listeners()
  382     need_check_listener_ids = [
  383         listener_id for listener_id in lvs_listener_ids
  384         if util.is_lvs_listener_running(listener_id)]
  385     ipport_mapping = dict()
  386     listener_stats_res = dict()
  387     for check_listener_id in need_check_listener_ids:
  388         # resource_ipport_mapping = {'Listener': {'id': listener-id,
  389         #                                         'ipport': ipport},
  390         #                            'Pool': {'id': pool-id},
  391         #                            'Members': [{'id': member-id-1,
  392         #                                        'ipport': ipport},
  393         #                                       {'id': member-id-2,
  394         #                                        'ipport': ipport}],
  395         #                            'HealthMonitor': {'id': healthmonitor-id}}
  396         resource_ipport_mapping, ns_name = (
  397             get_lvs_listener_resource_ipports_nsname(check_listener_id))
  398 
  399         # Listener is disabled, we don't need to send an update
  400         if resource_ipport_mapping is None:
  401             continue
  402 
  403         # Since we found the keepalived running, acknowledge the listener
  404         # in the heartbeat. If this listener has a pool and members,
  405         # the stats will be updated later in the code flow.
  406         listener_stats_res.update({
  407             check_listener_id: {
  408                 'stats': {
  409                     'bout': 0,
  410                     'bin': 0,
  411                     'scur': 0,
  412                     'stot': 0,
  413                     'ereq': 0},
  414                 'status': constants.OPEN}})
  415 
  416         # If we can not read the lvs configuration from file, that means
  417         # the pool of this listener may own zero enabled member, but the
  418         # keepalived process is running. So we need to skip it.
  419         if not resource_ipport_mapping:
  420             continue
  421         ipport_mapping.update({check_listener_id: resource_ipport_mapping})
  422 
  423     # So here, if we can not get any ipport_mapping,
  424     # we do nothing, just return
  425     if not ipport_mapping:
  426         return listener_stats_res
  427 
  428     # contains bout, bin, scur, stot, ereq, status
  429     # bout(OutBytes), bin(InBytes), stot(Conns) from cmd ipvsadm -Ln --stats
  430     # scur(ActiveConn) from cmd ipvsadm -Ln
  431     # status, can see configuration in any cmd, treat it as OPEN
  432     # ereq is still 0, as UDP case does not support it.
  433     scur_res = get_ipvsadm_info(constants.AMPHORA_NAMESPACE)
  434     stats_res = get_ipvsadm_info(constants.AMPHORA_NAMESPACE,
  435                                  is_stats_cmd=True)
  436     for listener_id, ipport in ipport_mapping.items():
  437         listener_ipport = ipport['Listener']['ipport']
  438         # This would be in Error, wait for the next loop to sync for the
  439         # listener at this moment. Also this is for skip the case no enabled
  440         # member in UDP listener, so we don't check it for failover.
  441         if listener_ipport not in scur_res or listener_ipport not in stats_res:
  442             continue
  443 
  444         scur, bout, bin, stot, ereq = 0, 0, 0, 0, 0
  445         # As all results contain this listener, so its status should be OPEN
  446         status = constants.OPEN
  447         # Get scur
  448         for m in scur_res[listener_ipport]['Members']:
  449             for item in m:
  450                 if item[0] == 'ActiveConn':
  451                     scur += int(item[1])
  452 
  453         # Get bout, bin, stot
  454         for item in stats_res[listener_ipport]['Listener']:
  455             if item[0] == 'Conns':
  456                 stot = int(item[1])
  457             elif item[0] == 'OutBytes':
  458                 bout = int(item[1])
  459             elif item[0] == 'InBytes':
  460                 bin = int(item[1])
  461 
  462         listener_stats_res.update({
  463             listener_id: {
  464                 'stats': {
  465                     'bout': bout,
  466                     'bin': bin,
  467                     'scur': scur,
  468                     'stot': stot,
  469                     'ereq': ereq},
  470                 'status': status}})
  471 
  472     return listener_stats_res