"Fossies" - the Fresh Open Source Software Archive

Member "qinling-3.0.0/qinling/orchestrator/kubernetes/manager.py" (16 Oct 2019, 20460 Bytes) of package /linux/misc/openstack/qinling-3.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "manager.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 2.0.0_vs_3.0.0.

    1 # Copyright 2017 Catalyst IT Limited
    2 #
    3 #    Licensed under the Apache License, Version 2.0 (the "License");
    4 #    you may not use this file except in compliance with the License.
    5 #    You may obtain a copy of the License at
    6 #
    7 #        http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 #    Unless required by applicable law or agreed to in writing, software
   10 #    distributed under the License is distributed on an "AS IS" BASIS,
   11 #    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   12 #    See the License for the specific language governing permissions and
   13 #    limitations under the License.
   14 
   15 import copy
   16 import json
   17 import os
   18 import time
   19 
   20 import jinja2
   21 from oslo_log import log as logging
   22 import requests
   23 import tenacity
   24 import yaml
   25 
   26 from qinling.engine import utils
   27 from qinling import exceptions as exc
   28 from qinling.orchestrator import base
   29 from qinling.orchestrator.kubernetes import utils as k8s_util
   30 from qinling.utils import common
   31 
   32 
   33 LOG = logging.getLogger(__name__)
   34 
   35 TEMPLATES_DIR = (os.path.dirname(os.path.realpath(__file__)) + '/templates/')
   36 
   37 
   38 class KubernetesManager(base.OrchestratorBase):
   39     def __init__(self, conf, qinling_endpoint):
   40         self.conf = conf
   41         self.qinling_endpoint = qinling_endpoint
   42 
   43         clients = k8s_util.get_k8s_clients(self.conf)
   44         self.v1 = clients['v1']
   45         self.v1extension = clients['v1extension']
   46         # self.apps_v1 = clients['apps_v1']
   47 
   48         # Create namespace if not exists
   49         self._ensure_namespace()
   50 
   51         # Get templates.
   52         template_loader = jinja2.FileSystemLoader(
   53             searchpath=os.path.dirname(TEMPLATES_DIR)
   54         )
   55         jinja_env = jinja2.Environment(
   56             loader=template_loader, autoescape=True, trim_blocks=True,
   57             lstrip_blocks=True
   58         )
   59         self.deployment_template = jinja_env.get_template('deployment.j2')
   60         self.service_template = jinja_env.get_template('service.j2')
   61         self.pod_template = jinja_env.get_template('pod.j2')
   62 
   63         # Refer to
   64         # http://docs.python-requests.org/en/master/user/advanced/#session-objects
   65         self.session = requests.Session()
   66 
   67     def _ensure_namespace(self):
   68         ret = self.v1.list_namespace()
   69         cur_names = [i.metadata.name for i in ret.items]
   70 
   71         if self.conf.kubernetes.namespace not in cur_names:
   72             LOG.info('Creating namespace: %s', self.conf.kubernetes.namespace)
   73 
   74             namespace_body = {
   75                 'apiVersion': 'v1',
   76                 'kind': 'Namespace',
   77                 'metadata': {
   78                     'name': self.conf.kubernetes.namespace,
   79                     'labels': {
   80                         'name': self.conf.kubernetes.namespace
   81                     }
   82                 },
   83             }
   84 
   85             self.v1.create_namespace(namespace_body)
   86 
   87             LOG.info('Namespace %s created.', self.conf.kubernetes.namespace)
   88 
   89     @tenacity.retry(
   90         wait=tenacity.wait_fixed(2),
   91         stop=tenacity.stop_after_delay(600),
   92         reraise=True,
   93         retry=tenacity.retry_if_exception_type(exc.OrchestratorException)
   94     )
   95     def _wait_deployment_available(self, name):
   96         ret = self.v1extension.read_namespaced_deployment(
   97             name,
   98             self.conf.kubernetes.namespace
   99         )
  100 
  101         if (not ret.status.replicas or
  102                 ret.status.replicas != ret.status.available_replicas):
  103             raise exc.OrchestratorException('Deployment %s not ready.' % name)
  104 
  105     def get_pool(self, name):
  106         total = 0
  107         available = 0
  108 
  109         try:
  110             ret = self.v1extension.read_namespaced_deployment(
  111                 name,
  112                 namespace=self.conf.kubernetes.namespace
  113             )
  114         except Exception:
  115             raise exc.RuntimeNotFoundException()
  116 
  117         if not ret.status.replicas:
  118             return {"total": total, "available": available}
  119 
  120         total = ret.status.replicas
  121 
  122         labels = {'runtime_id': name}
  123         selector = common.convert_dict_to_string(labels)
  124         ret = self.v1.list_namespaced_pod(
  125             self.conf.kubernetes.namespace,
  126             label_selector='!function_id,%s' % selector
  127         )
  128         available = len(ret.items)
  129 
  130         return {"total": total, "available": available}
  131 
  132     def create_pool(self, name, image, trusted=True):
  133         deployment_body = self.deployment_template.render(
  134             {
  135                 "name": name,
  136                 "labels": {'runtime_id': name},
  137                 "replicas": self.conf.kubernetes.replicas,
  138                 "container_name": 'worker',
  139                 "image": image,
  140                 "sidecar_image": self.conf.engine.sidecar_image,
  141                 "trusted": str(trusted).lower()
  142             }
  143         )
  144 
  145         LOG.info(
  146             "Creating deployment for runtime %s: \n%s", name, deployment_body
  147         )
  148 
  149         self.v1extension.create_namespaced_deployment(
  150             body=yaml.safe_load(deployment_body),
  151             namespace=self.conf.kubernetes.namespace,
  152             async_req=False
  153         )
  154 
  155         self._wait_deployment_available(name)
  156 
  157         LOG.info("Deployment for runtime %s created.", name)
  158 
  159     def delete_pool(self, name):
  160         """Delete all resources belong to the deployment."""
  161         LOG.info("Deleting deployment %s", name)
  162 
  163         labels = {'runtime_id': name}
  164         selector = common.convert_dict_to_string(labels)
  165 
  166         self.v1extension.delete_collection_namespaced_replica_set(
  167             self.conf.kubernetes.namespace,
  168             label_selector=selector
  169         )
  170         LOG.info("ReplicaSets in deployment %s deleted.", name)
  171 
  172         ret = self.v1.list_namespaced_service(
  173             self.conf.kubernetes.namespace, label_selector=selector
  174         )
  175         names = [i.metadata.name for i in ret.items]
  176         for svc_name in names:
  177             self.v1.delete_namespaced_service(
  178                 svc_name,
  179                 self.conf.kubernetes.namespace
  180             )
  181         LOG.info("Services in deployment %s deleted.", name)
  182 
  183         self.v1extension.delete_collection_namespaced_deployment(
  184             self.conf.kubernetes.namespace,
  185             label_selector=selector,
  186             field_selector='metadata.name=%s' % name
  187         )
  188         # Should delete pods after deleting deployment to avoid pods are
  189         # recreated by k8s.
  190         self.v1.delete_collection_namespaced_pod(
  191             self.conf.kubernetes.namespace,
  192             label_selector=selector
  193         )
  194         LOG.info("Pods in deployment %s deleted.", name)
  195         LOG.info("Deployment %s deleted.", name)
  196 
  197     @tenacity.retry(
  198         wait=tenacity.wait_fixed(5),
  199         stop=tenacity.stop_after_delay(600),
  200         reraise=True,
  201         retry=tenacity.retry_if_exception_type(exc.OrchestratorException)
  202     )
  203     def _wait_for_upgrade(self, deploy_name):
  204         ret = self.v1extension.read_namespaced_deployment(
  205             deploy_name,
  206             self.conf.kubernetes.namespace
  207         )
  208         if ret.status.unavailable_replicas is not None:
  209             raise exc.OrchestratorException("Deployment %s upgrade not "
  210                                             "ready." % deploy_name)
  211 
  212     def update_pool(self, name, image=None):
  213         """Deployment rolling-update.
  214 
  215         Return True if successful, otherwise return False after rolling back.
  216         """
  217         LOG.info('Start to do rolling-update deployment %s', name)
  218 
  219         body = {
  220             'spec': {
  221                 'template': {
  222                     'spec': {
  223                         'containers': [
  224                             {
  225                                 'name': 'worker',
  226                                 'image': image
  227                             }
  228                         ]
  229                     }
  230                 }
  231             }
  232         }
  233         self.v1extension.patch_namespaced_deployment(
  234             name, self.conf.kubernetes.namespace, body
  235         )
  236 
  237         try:
  238             time.sleep(10)
  239             self._wait_for_upgrade(name)
  240         except exc.OrchestratorException:
  241             LOG.warn("Timeout when waiting for the deployment %s upgrade, "
  242                      "Start to roll back.", name)
  243 
  244             body = {"rollbackTo": {"revision": 0}}
  245             try:
  246                 self.v1extension.create_namespaced_deployment_rollback(
  247                     name, self.conf.kubernetes.namespace, body
  248                 )
  249             except Exception:
  250                 # TODO(lxkong): remove the exception catch until kubernetes
  251                 # python lib has a new release. Refer to
  252                 # https://github.com/kubernetes-client/python/issues/491
  253                 pass
  254 
  255             return False
  256 
  257         return True
  258 
  259     def _choose_available_pods(self, labels, count=1, function_id=None,
  260                                function_version=0):
  261         # If there is already a pod for function, reuse it.
  262         if function_id:
  263             ret = self.v1.list_namespaced_pod(
  264                 self.conf.kubernetes.namespace,
  265                 label_selector='function_id=%s,function_version=%s' %
  266                                (function_id, function_version)
  267             )
  268             if len(ret.items) >= count:
  269                 LOG.debug(
  270                     "Function %s(version %s) already associates to a pod with "
  271                     "at least %d worker(s). ",
  272                     function_id, function_version, count
  273                 )
  274                 return ret.items[:count]
  275 
  276         selector = common.convert_dict_to_string(labels)
  277         ret = self.v1.list_namespaced_pod(
  278             self.conf.kubernetes.namespace,
  279             label_selector='!function_id,%s' % selector
  280         )
  281 
  282         if len(ret.items) < count:
  283             return []
  284 
  285         return ret.items[-count:]
  286 
  287     def _prepare_pod(self, pod, deployment_name, function_id, version,
  288                      labels=None):
  289         """Pod preparation.
  290 
  291         1. Update pod labels.
  292         2. Expose service.
  293         """
  294         pod_name = pod.metadata.name
  295         labels = labels or {}
  296 
  297         LOG.info(
  298             'Prepare pod %s in deployment %s for function %s(version %s)',
  299             pod_name, deployment_name, function_id, version
  300         )
  301 
  302         # Update pod label.
  303         pod_labels = self._update_pod_label(
  304             pod,
  305             # pod label value should be string
  306             {'function_id': function_id, 'function_version': str(version)}
  307         )
  308 
  309         # Create service for the chosen pod.
  310         service_name = "service-%s-%s" % (function_id, version)
  311         labels.update(
  312             {'function_id': function_id, 'function_version': str(version)}
  313         )
  314 
  315         # TODO(kong): Make the service type configurable.
  316         service_body = self.service_template.render(
  317             {
  318                 "service_name": service_name,
  319                 "labels": labels,
  320                 "selector": pod_labels
  321             }
  322         )
  323         try:
  324             ret = self.v1.create_namespaced_service(
  325                 self.conf.kubernetes.namespace, yaml.safe_load(service_body)
  326             )
  327             LOG.debug(
  328                 'Service created for pod %s, service name: %s',
  329                 pod_name, service_name
  330             )
  331         except Exception as e:
  332             # Service already exists
  333             if e.status == 409:
  334                 LOG.debug(
  335                     'Service already exists for pod %s, service name: %s',
  336                     pod_name, service_name
  337                 )
  338                 time.sleep(1)
  339                 ret = self.v1.read_namespaced_service(
  340                     service_name, self.conf.kubernetes.namespace
  341                 )
  342             else:
  343                 raise
  344 
  345         # Get external ip address for an arbitrary node.
  346         node_port = ret.spec.ports[0].node_port
  347         nodes = self.v1.list_node()
  348         addresses = nodes.items[0].status.addresses
  349         node_ip = None
  350         for addr in addresses:
  351             if addr.type == 'ExternalIP':
  352                 node_ip = addr.address
  353 
  354         # FIXME: test purpose using minikube
  355         if not node_ip:
  356             for addr in addresses:
  357                 if addr.type == 'InternalIP':
  358                     node_ip = addr.address
  359 
  360         pod_service_url = 'http://%s:%s' % (node_ip, node_port)
  361 
  362         return pod_name, pod_service_url
  363 
  364     def _create_pod(self, image, rlimit, pod_name, labels, input):
  365         """Create pod for image type function."""
  366         if not input:
  367             input_list = []
  368         elif isinstance(input, dict) and input.get('__function_input'):
  369             input_list = input.get('__function_input').split()
  370         else:
  371             input_list = list(json.loads(input))
  372 
  373         pod_body = self.pod_template.render(
  374             {
  375                 "pod_name": pod_name,
  376                 "labels": labels,
  377                 "pod_image": image,
  378                 "input": input_list,
  379                 "req_cpu": str(rlimit['cpu']),
  380                 "limit_cpu": str(rlimit['cpu']),
  381                 "req_memory": str(rlimit['memory_size']),
  382                 "limit_memory": str(rlimit['memory_size'])
  383             }
  384         )
  385 
  386         LOG.info(
  387             "Creating pod %s for image function:\n%s", pod_name, pod_body
  388         )
  389 
  390         try:
  391             self.v1.create_namespaced_pod(
  392                 self.conf.kubernetes.namespace,
  393                 body=yaml.safe_load(pod_body),
  394             )
  395         except Exception:
  396             LOG.exception("Failed to create pod.")
  397             raise exc.OrchestratorException('Execution preparation failed.')
  398 
  399     def _update_pod_label(self, pod, new_label):
  400         name = pod.metadata.name
  401 
  402         pod_labels = copy.deepcopy(pod.metadata.labels) or {}
  403         pod_labels.update(new_label)
  404         body = {
  405             'metadata': {
  406                 'labels': pod_labels
  407             }
  408         }
  409         self.v1.patch_namespaced_pod(
  410             name, self.conf.kubernetes.namespace, body
  411         )
  412 
  413         LOG.debug('Labels updated for pod %s', name)
  414 
  415         return pod_labels
  416 
  417     def prepare_execution(self, function_id, version, rlimit=None, image=None,
  418                           identifier=None, labels=None, input=None):
  419         """Prepare service URL for function version.
  420 
  421         :param rlimit: optional argument passed to limit cpu/mem resources.
  422 
  423         For image function, create a single pod with rlimit and input, so the
  424         function will be executed in the resource limited pod.
  425 
  426         For normal function, choose a pod from the pool and expose a service,
  427         return the service URL.
  428 
  429         return a tuple includes pod name and the servise url.
  430         """
  431         pods = None
  432 
  433         labels = labels or {'function_id': function_id}
  434 
  435         if image:
  436             if not rlimit:
  437                 LOG.critical('Param rlimit is required for image function.')
  438                 raise exc.OrchestratorException(
  439                     'Execution preparation failed.'
  440                 )
  441 
  442             self._create_pod(image, rlimit, identifier, labels, input)
  443 
  444             return identifier, None
  445         else:
  446             pods = self._choose_available_pods(labels, function_id=function_id,
  447                                                function_version=version)
  448 
  449         if not pods:
  450             LOG.critical('No worker available.')
  451             raise exc.OrchestratorException('Execution preparation failed.')
  452 
  453         try:
  454             pod_name, url = self._prepare_pod(
  455                 pods[0], identifier, function_id, version, labels
  456             )
  457             return pod_name, url
  458         except Exception:
  459             LOG.exception('Pod preparation failed.')
  460             self.delete_function(function_id, version, labels)
  461             raise exc.OrchestratorException('Execution preparation failed.')
  462 
  463     def run_execution(self, execution_id, function_id, version, rlimit=None,
  464                       input=None, identifier=None, service_url=None,
  465                       entry='main.main', trust_id=None, timeout=None):
  466         """Run execution.
  467 
  468         Return a tuple including the result and the output.
  469         """
  470         if service_url:
  471             func_url = '%s/execute' % service_url
  472             data = utils.get_request_data(
  473                 self.conf, function_id, version, execution_id, rlimit, input,
  474                 entry, trust_id, self.qinling_endpoint, timeout
  475             )
  476             LOG.debug(
  477                 'Invoke function %s(version %s), url: %s, data: %s',
  478                 function_id, version, func_url, data
  479             )
  480 
  481             return utils.url_request(self.session, func_url, body=data)
  482         else:
  483             # Wait for image type function execution to be finished
  484             def _wait_complete():
  485                 pod = self.v1.read_namespaced_pod(
  486                     identifier,
  487                     self.conf.kubernetes.namespace
  488                 )
  489                 status = pod.status.phase
  490 
  491                 if status == 'Succeeded':
  492                     return pod
  493 
  494                 raise exc.TimeoutException()
  495 
  496             duration = 0
  497             try:
  498                 r = tenacity.Retrying(
  499                     wait=tenacity.wait_fixed(1),
  500                     stop=tenacity.stop_after_delay(timeout),
  501                     retry=tenacity.retry_if_exception_type(
  502                         exc.TimeoutException),
  503                     reraise=True
  504                 )
  505                 pod = r.call(_wait_complete)
  506 
  507                 statuses = pod.status.container_statuses
  508                 for s in statuses:
  509                     if hasattr(s.state, "terminated"):
  510                         end_time = s.state.terminated.finished_at
  511                         start_time = s.state.terminated.started_at
  512                         delta = end_time - start_time
  513                         duration = delta.seconds
  514                         break
  515             except exc.TimeoutException:
  516                 LOG.exception(
  517                     "Timeout for function execution %s, pod %s",
  518                     execution_id, identifier
  519                 )
  520 
  521                 self.v1.delete_namespaced_pod(
  522                     identifier,
  523                     self.conf.kubernetes.namespace
  524                 )
  525                 LOG.debug('Pod %s deleted.', identifier)
  526 
  527                 return False, {'output': 'Function execution timeout.',
  528                                'duration': timeout}
  529             except Exception:
  530                 LOG.exception("Failed to wait for pod %s", identifier)
  531                 return False, {'output': 'Function execution failed.',
  532                                'duration': duration}
  533 
  534             log = self.v1.read_namespaced_pod_log(
  535                 identifier,
  536                 self.conf.kubernetes.namespace,
  537             )
  538 
  539             return True, {'duration': duration, 'logs': log}
  540 
  541     def delete_function(self, function_id, version, labels=None):
  542         """Delete related resources for function.
  543 
  544         - Delete service
  545         - Delete pods
  546         """
  547         pre_label = {
  548             'function_id': function_id,
  549             'function_version': str(version)
  550         }
  551         labels = labels or pre_label
  552         selector = common.convert_dict_to_string(labels)
  553 
  554         ret = self.v1.list_namespaced_service(
  555             self.conf.kubernetes.namespace, label_selector=selector
  556         )
  557         names = [i.metadata.name for i in ret.items]
  558         for svc_name in names:
  559             self.v1.delete_namespaced_service(
  560                 svc_name,
  561                 self.conf.kubernetes.namespace
  562             )
  563 
  564         self.v1.delete_collection_namespaced_pod(
  565             self.conf.kubernetes.namespace,
  566             label_selector=selector
  567         )
  568 
  569     def scaleup_function(self, function_id, version, identifier=None, count=1):
  570         pod_names = []
  571         labels = {'runtime_id': identifier}
  572         pods = self._choose_available_pods(labels, count=count)
  573 
  574         if not pods:
  575             raise exc.OrchestratorException('Not enough workers available.')
  576 
  577         for pod in pods:
  578             pod_name, service_url = self._prepare_pod(
  579                 pod, identifier, function_id, version, labels
  580             )
  581             pod_names.append(pod_name)
  582 
  583         LOG.info('Pods scaled up for function %s(version %s): %s', function_id,
  584                  version, pod_names)
  585 
  586         return pod_names, service_url
  587 
  588     def delete_worker(self, pod_name, **kwargs):
  589         self.v1.delete_namespaced_pod(
  590             pod_name,
  591             self.conf.kubernetes.namespace,
  592         )