"Fossies" - the Fresh Open Source Software Archive

Member "zun-7.0.0/zun/container/cri/driver.py" (14 Apr 2021, 11979 Bytes) of package /linux/misc/openstack/zun-7.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "driver.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 6.0.0_vs_7.0.0.

    1 # Licensed under the Apache License, Version 2.0 (the "License");
    2 # you may not use this file except in compliance with the License.
    3 # You may obtain a copy of the License at
    4 #
    5 #    http://www.apache.org/licenses/LICENSE-2.0
    6 #
    7 # Unless required by applicable law or agreed to in writing, software
    8 # distributed under the License is distributed on an "AS IS" BASIS,
    9 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
   10 # implied.
   11 # See the License for the specific language governing permissions and
   12 # limitations under the License.
   13 
   14 import grpc
   15 from oslo_log import log as logging
   16 import tenacity
   17 
   18 from zun.common import consts
   19 from zun.common import context as zun_context
   20 from zun.common import exception
   21 from zun.common.i18n import _
   22 from zun.common import utils
   23 import zun.conf
   24 from zun.container import driver
   25 from zun.criapi import api_pb2
   26 from zun.criapi import api_pb2_grpc
   27 from zun.network import neutron
   28 from zun.network import os_vif_util
   29 from zun import objects
   30 
   31 
   32 CONF = zun.conf.CONF
   33 LOG = logging.getLogger(__name__)
   34 
   35 
   36 class CriDriver(driver.BaseDriver, driver.CapsuleDriver):
   37     """Implementation of container drivers for CRI runtime."""
   38 
   39     # TODO(hongbin): define a list of capabilities of this driver.
   40     capabilities = {}
   41 
   42     def __init__(self):
   43         super(CriDriver, self).__init__()
   44         channel = grpc.insecure_channel(
   45             'unix:///run/containerd/containerd.sock')
   46         self.runtime_stub = api_pb2_grpc.RuntimeServiceStub(channel)
   47         self.image_stub = api_pb2_grpc.ImageServiceStub(channel)
   48 
   49     def create_capsule(self, context, capsule, image, requested_networks,
   50                        requested_volumes):
   51 
   52         self._create_pod_sandbox(context, capsule, requested_networks)
   53 
   54         # TODO(hongbin): handle init containers
   55         for container in capsule.init_containers:
   56             self._create_container(context, capsule, container,
   57                                    requested_networks,
   58                                    requested_volumes)
   59             self._wait_for_init_container(context, container)
   60             container.save(context)
   61 
   62         for container in capsule.containers:
   63             self._create_container(context, capsule, container,
   64                                    requested_networks,
   65                                    requested_volumes)
   66             container.status = consts.RUNNING
   67             container.save(context)
   68 
   69         capsule.status = consts.RUNNING
   70         return capsule
   71 
   72     def _create_pod_sandbox(self, context, capsule, requested_networks):
   73         sandbox_config = self._get_sandbox_config(capsule)
   74         runtime = capsule.runtime or CONF.container_runtime
   75         if runtime == "runc":
   76             # pass "" to specify the default runtime which is runc
   77             runtime = ""
   78 
   79         self._write_cni_metadata(context, capsule, requested_networks)
   80         sandbox_resp = self.runtime_stub.RunPodSandbox(
   81             api_pb2.RunPodSandboxRequest(
   82                 config=sandbox_config,
   83                 runtime_handler=runtime,
   84             )
   85         )
   86         LOG.debug("podsandbox is created: %s", sandbox_resp)
   87         capsule.container_id = sandbox_resp.pod_sandbox_id
   88 
   89     def _get_sandbox_config(self, capsule):
   90         return api_pb2.PodSandboxConfig(
   91             metadata=api_pb2.PodSandboxMetadata(
   92                 name=capsule.uuid, namespace="default", uid=capsule.uuid
   93             )
   94         )
   95 
   96     def _write_cni_metadata(self, context, capsule, requested_networks):
   97         neutron_api = neutron.NeutronAPI(context)
   98         security_group_ids = utils.get_security_group_ids(
   99             context, capsule.security_groups)
  100         # TODO(hongbin): handle multiple nics
  101         requested_network = requested_networks[0]
  102         network_id = requested_network['network']
  103         addresses, port = neutron_api.create_or_update_port(
  104             capsule, network_id, requested_network, consts.DEVICE_OWNER_ZUN,
  105             security_group_ids, set_binding_host=True)
  106         capsule.addresses = {network_id: addresses}
  107 
  108         neutron_api = neutron.NeutronAPI(zun_context.get_admin_context())
  109         network = neutron_api.show_network(port['network_id'])['network']
  110         subnets = {}
  111         for fixed_ip in port['fixed_ips']:
  112             subnet_id = fixed_ip['subnet_id']
  113             subnets[subnet_id] = neutron_api.show_subnet(subnet_id)['subnet']
  114         vif_plugin = port.get('binding:vif_type')
  115         vif_obj = os_vif_util.neutron_to_osvif_vif(vif_plugin, port, network,
  116                                                    subnets)
  117         state = objects.vif.VIFState(default_vif=vif_obj)
  118         state_dict = state.obj_to_primitive()
  119         capsule.cni_metadata = {consts.CNI_METADATA_VIF: state_dict}
  120         capsule.save(context)
  121 
  122     def _create_container(self, context, capsule, container,
  123                           requested_networks, requested_volumes):
  124         # pull image
  125         self._pull_image(context, container)
  126 
  127         sandbox_config = self._get_sandbox_config(capsule)
  128         container_config = self._get_container_config(context, container,
  129                                                       requested_volumes)
  130         response = self.runtime_stub.CreateContainer(
  131             api_pb2.CreateContainerRequest(
  132                 pod_sandbox_id=capsule.container_id,
  133                 config=container_config,
  134                 sandbox_config=sandbox_config,
  135             )
  136         )
  137 
  138         LOG.debug("container is created: %s", response)
  139         container.container_id = response.container_id
  140         container.save(context)
  141 
  142         response = self.runtime_stub.StartContainer(
  143             api_pb2.StartContainerRequest(
  144                 container_id=container.container_id
  145             )
  146         )
  147         LOG.debug("container is started: %s", response)
  148 
  149     def _get_container_config(self, context, container, requested_volumes):
  150         args = []
  151         if container.command:
  152             args = [str(c) for c in container.command]
  153         envs = []
  154         if container.environment:
  155             envs = [api_pb2.KeyValue(key=str(k), value=str(v))
  156                     for k, v in container.environment.items()]
  157         mounts = []
  158         if container.uuid in requested_volumes:
  159             req_volume = requested_volumes[container.uuid]
  160             mounts = self._get_mounts(context, req_volume)
  161         working_dir = container.workdir or ""
  162         labels = container.labels or []
  163 
  164         cpu = 0
  165         if container.cpu is not None:
  166             cpu = int(1024 * container.cpu)
  167         memory = 0
  168         if container.memory is not None:
  169             memory = int(container.memory) * 1024 * 1024
  170         linux_config = api_pb2.LinuxContainerConfig(
  171             security_context=api_pb2.LinuxContainerSecurityContext(
  172                 privileged=container.privileged
  173             ),
  174             resources={
  175                 'cpu_shares': cpu,
  176                 'memory_limit_in_bytes': memory,
  177             }
  178         )
  179 
  180         # TODO(hongbin): add support for entrypoint
  181         return api_pb2.ContainerConfig(
  182             metadata=api_pb2.ContainerMetadata(name=container.name),
  183             image=api_pb2.ImageSpec(image=container.image),
  184             tty=container.tty,
  185             stdin=container.interactive,
  186             args=args,
  187             envs=envs,
  188             working_dir=working_dir,
  189             labels=labels,
  190             mounts=mounts,
  191             linux=linux_config,
  192         )
  193 
  194     def _pull_image(self, context, container):
  195         # TODO(hongbin): add support for private registry
  196         response = self.image_stub.PullImage(
  197             api_pb2.PullImageRequest(
  198                 image=api_pb2.ImageSpec(image=container.image)
  199             )
  200         )
  201         LOG.debug("image is pulled: %s", response)
  202 
  203     def _get_mounts(self, context, volmaps):
  204         mounts = []
  205         for volume in volmaps:
  206             volume_driver = self._get_volume_driver(volume)
  207             source, destination = volume_driver.bind_mount(context, volume)
  208             mounts.append(api_pb2.Mount(container_path=destination,
  209                                         host_path=source))
  210         return mounts
  211 
  212     def _wait_for_init_container(self, context, container, timeout=3600):
  213         def retry_if_result_is_false(result):
  214             return result is False
  215 
  216         def check_init_container_stopped():
  217             status = self._show_container(context, container).status
  218             if status == consts.STOPPED:
  219                 return True
  220             elif status == consts.RUNNING:
  221                 return False
  222             else:
  223                 raise exception.ZunException(
  224                     _("Container has unexpected status: %s") % status)
  225 
  226         r = tenacity.Retrying(
  227             stop=tenacity.stop_after_delay(timeout),
  228             wait=tenacity.wait_exponential(),
  229             retry=tenacity.retry_if_result(retry_if_result_is_false))
  230         r.call(check_init_container_stopped)
  231 
  232     def _show_container(self, context, container):
  233         container_id = container.container_id
  234         if not container_id:
  235             return
  236 
  237         response = self.runtime_stub.ListContainers(
  238             api_pb2.ListContainersRequest(
  239                 filter={'id': container_id}
  240             )
  241         )
  242         if not response.containers:
  243             raise exception.ZunException(
  244                 "Container %s is not found in runtime" % container_id)
  245 
  246         container_response = response.containers[0]
  247         self._populate_container(container, container_response)
  248         return container
  249 
  250     def _populate_container(self, container, response):
  251         self._populate_container_state(container, response)
  252 
  253     def _populate_container_state(self, container, response):
  254         state = response.state
  255         if state == api_pb2.ContainerState.CONTAINER_CREATED:
  256             container.status = consts.CREATED
  257         elif state == api_pb2.ContainerState.CONTAINER_RUNNING:
  258             container.status = consts.RUNNING
  259         elif state == api_pb2.ContainerState.CONTAINER_EXITED:
  260             container.status = consts.STOPPED
  261         elif state == api_pb2.ContainerState.CONTAINER_UNKNOWN:
  262             LOG.debug('State is unknown, status: %s', state)
  263             container.status = consts.UNKNOWN
  264         else:
  265             LOG.warning('Receive unexpected state from CRI runtime: %s', state)
  266             container.status = consts.UNKNOWN
  267             container.status_reason = "container state unknown"
  268 
  269     def delete_capsule(self, context, capsule, force):
  270         pod_id = capsule.container_id
  271         if not pod_id:
  272             return
  273 
  274         try:
  275             response = self.runtime_stub.StopPodSandbox(
  276                 api_pb2.StopPodSandboxRequest(
  277                     pod_sandbox_id=capsule.container_id,
  278                 )
  279             )
  280             LOG.debug("podsandbox is stopped: %s", response)
  281             response = self.runtime_stub.RemovePodSandbox(
  282                 api_pb2.RemovePodSandboxRequest(
  283                     pod_sandbox_id=capsule.container_id,
  284                 )
  285             )
  286             LOG.debug("podsandbox is removed: %s", response)
  287         except exception.CommandError as e:
  288             if 'error occurred when try to find sandbox' in str(e):
  289                 LOG.error("cannot find pod sandbox in runtime")
  290                 pass
  291             else:
  292                 raise
  293 
  294         self._delete_neutron_ports(context, capsule)
  295 
  296     def _delete_neutron_ports(self, context, capsule):
  297         if not capsule.addresses:
  298             return
  299 
  300         neutron_ports = set()
  301         all_ports = set()
  302         for net_uuid, addrs_list in capsule.addresses.items():
  303             for addr in addrs_list:
  304                 all_ports.add(addr['port'])
  305                 if not addr['preserve_on_delete']:
  306                     port_id = addr['port']
  307                     neutron_ports.add(port_id)
  308 
  309         neutron_api = neutron.NeutronAPI(context)
  310         neutron_api.delete_or_unbind_ports(all_ports, neutron_ports)