"Fossies" - the Fresh Open Source Software Archive 
Member "zun-7.0.0/zun/container/cri/driver.py" (14 Apr 2021, 11979 Bytes) of package /linux/misc/openstack/zun-7.0.0.tar.gz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style:
standard) with prefixed line numbers.
Alternatively you can here
view or
download the uninterpreted source code file.
For more information about "driver.py" see the
Fossies "Dox" file reference documentation and the latest
Fossies "Diffs" side-by-side code changes report:
6.0.0_vs_7.0.0.
1 # Licensed under the Apache License, Version 2.0 (the "License");
2 # you may not use this file except in compliance with the License.
3 # You may obtain a copy of the License at
4 #
5 # http://www.apache.org/licenses/LICENSE-2.0
6 #
7 # Unless required by applicable law or agreed to in writing, software
8 # distributed under the License is distributed on an "AS IS" BASIS,
9 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
10 # implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 import grpc
15 from oslo_log import log as logging
16 import tenacity
17
18 from zun.common import consts
19 from zun.common import context as zun_context
20 from zun.common import exception
21 from zun.common.i18n import _
22 from zun.common import utils
23 import zun.conf
24 from zun.container import driver
25 from zun.criapi import api_pb2
26 from zun.criapi import api_pb2_grpc
27 from zun.network import neutron
28 from zun.network import os_vif_util
29 from zun import objects
30
31
32 CONF = zun.conf.CONF
33 LOG = logging.getLogger(__name__)
34
35
36 class CriDriver(driver.BaseDriver, driver.CapsuleDriver):
37 """Implementation of container drivers for CRI runtime."""
38
39 # TODO(hongbin): define a list of capabilities of this driver.
40 capabilities = {}
41
42 def __init__(self):
43 super(CriDriver, self).__init__()
44 channel = grpc.insecure_channel(
45 'unix:///run/containerd/containerd.sock')
46 self.runtime_stub = api_pb2_grpc.RuntimeServiceStub(channel)
47 self.image_stub = api_pb2_grpc.ImageServiceStub(channel)
48
49 def create_capsule(self, context, capsule, image, requested_networks,
50 requested_volumes):
51
52 self._create_pod_sandbox(context, capsule, requested_networks)
53
54 # TODO(hongbin): handle init containers
55 for container in capsule.init_containers:
56 self._create_container(context, capsule, container,
57 requested_networks,
58 requested_volumes)
59 self._wait_for_init_container(context, container)
60 container.save(context)
61
62 for container in capsule.containers:
63 self._create_container(context, capsule, container,
64 requested_networks,
65 requested_volumes)
66 container.status = consts.RUNNING
67 container.save(context)
68
69 capsule.status = consts.RUNNING
70 return capsule
71
72 def _create_pod_sandbox(self, context, capsule, requested_networks):
73 sandbox_config = self._get_sandbox_config(capsule)
74 runtime = capsule.runtime or CONF.container_runtime
75 if runtime == "runc":
76 # pass "" to specify the default runtime which is runc
77 runtime = ""
78
79 self._write_cni_metadata(context, capsule, requested_networks)
80 sandbox_resp = self.runtime_stub.RunPodSandbox(
81 api_pb2.RunPodSandboxRequest(
82 config=sandbox_config,
83 runtime_handler=runtime,
84 )
85 )
86 LOG.debug("podsandbox is created: %s", sandbox_resp)
87 capsule.container_id = sandbox_resp.pod_sandbox_id
88
89 def _get_sandbox_config(self, capsule):
90 return api_pb2.PodSandboxConfig(
91 metadata=api_pb2.PodSandboxMetadata(
92 name=capsule.uuid, namespace="default", uid=capsule.uuid
93 )
94 )
95
96 def _write_cni_metadata(self, context, capsule, requested_networks):
97 neutron_api = neutron.NeutronAPI(context)
98 security_group_ids = utils.get_security_group_ids(
99 context, capsule.security_groups)
100 # TODO(hongbin): handle multiple nics
101 requested_network = requested_networks[0]
102 network_id = requested_network['network']
103 addresses, port = neutron_api.create_or_update_port(
104 capsule, network_id, requested_network, consts.DEVICE_OWNER_ZUN,
105 security_group_ids, set_binding_host=True)
106 capsule.addresses = {network_id: addresses}
107
108 neutron_api = neutron.NeutronAPI(zun_context.get_admin_context())
109 network = neutron_api.show_network(port['network_id'])['network']
110 subnets = {}
111 for fixed_ip in port['fixed_ips']:
112 subnet_id = fixed_ip['subnet_id']
113 subnets[subnet_id] = neutron_api.show_subnet(subnet_id)['subnet']
114 vif_plugin = port.get('binding:vif_type')
115 vif_obj = os_vif_util.neutron_to_osvif_vif(vif_plugin, port, network,
116 subnets)
117 state = objects.vif.VIFState(default_vif=vif_obj)
118 state_dict = state.obj_to_primitive()
119 capsule.cni_metadata = {consts.CNI_METADATA_VIF: state_dict}
120 capsule.save(context)
121
122 def _create_container(self, context, capsule, container,
123 requested_networks, requested_volumes):
124 # pull image
125 self._pull_image(context, container)
126
127 sandbox_config = self._get_sandbox_config(capsule)
128 container_config = self._get_container_config(context, container,
129 requested_volumes)
130 response = self.runtime_stub.CreateContainer(
131 api_pb2.CreateContainerRequest(
132 pod_sandbox_id=capsule.container_id,
133 config=container_config,
134 sandbox_config=sandbox_config,
135 )
136 )
137
138 LOG.debug("container is created: %s", response)
139 container.container_id = response.container_id
140 container.save(context)
141
142 response = self.runtime_stub.StartContainer(
143 api_pb2.StartContainerRequest(
144 container_id=container.container_id
145 )
146 )
147 LOG.debug("container is started: %s", response)
148
149 def _get_container_config(self, context, container, requested_volumes):
150 args = []
151 if container.command:
152 args = [str(c) for c in container.command]
153 envs = []
154 if container.environment:
155 envs = [api_pb2.KeyValue(key=str(k), value=str(v))
156 for k, v in container.environment.items()]
157 mounts = []
158 if container.uuid in requested_volumes:
159 req_volume = requested_volumes[container.uuid]
160 mounts = self._get_mounts(context, req_volume)
161 working_dir = container.workdir or ""
162 labels = container.labels or []
163
164 cpu = 0
165 if container.cpu is not None:
166 cpu = int(1024 * container.cpu)
167 memory = 0
168 if container.memory is not None:
169 memory = int(container.memory) * 1024 * 1024
170 linux_config = api_pb2.LinuxContainerConfig(
171 security_context=api_pb2.LinuxContainerSecurityContext(
172 privileged=container.privileged
173 ),
174 resources={
175 'cpu_shares': cpu,
176 'memory_limit_in_bytes': memory,
177 }
178 )
179
180 # TODO(hongbin): add support for entrypoint
181 return api_pb2.ContainerConfig(
182 metadata=api_pb2.ContainerMetadata(name=container.name),
183 image=api_pb2.ImageSpec(image=container.image),
184 tty=container.tty,
185 stdin=container.interactive,
186 args=args,
187 envs=envs,
188 working_dir=working_dir,
189 labels=labels,
190 mounts=mounts,
191 linux=linux_config,
192 )
193
194 def _pull_image(self, context, container):
195 # TODO(hongbin): add support for private registry
196 response = self.image_stub.PullImage(
197 api_pb2.PullImageRequest(
198 image=api_pb2.ImageSpec(image=container.image)
199 )
200 )
201 LOG.debug("image is pulled: %s", response)
202
203 def _get_mounts(self, context, volmaps):
204 mounts = []
205 for volume in volmaps:
206 volume_driver = self._get_volume_driver(volume)
207 source, destination = volume_driver.bind_mount(context, volume)
208 mounts.append(api_pb2.Mount(container_path=destination,
209 host_path=source))
210 return mounts
211
212 def _wait_for_init_container(self, context, container, timeout=3600):
213 def retry_if_result_is_false(result):
214 return result is False
215
216 def check_init_container_stopped():
217 status = self._show_container(context, container).status
218 if status == consts.STOPPED:
219 return True
220 elif status == consts.RUNNING:
221 return False
222 else:
223 raise exception.ZunException(
224 _("Container has unexpected status: %s") % status)
225
226 r = tenacity.Retrying(
227 stop=tenacity.stop_after_delay(timeout),
228 wait=tenacity.wait_exponential(),
229 retry=tenacity.retry_if_result(retry_if_result_is_false))
230 r.call(check_init_container_stopped)
231
232 def _show_container(self, context, container):
233 container_id = container.container_id
234 if not container_id:
235 return
236
237 response = self.runtime_stub.ListContainers(
238 api_pb2.ListContainersRequest(
239 filter={'id': container_id}
240 )
241 )
242 if not response.containers:
243 raise exception.ZunException(
244 "Container %s is not found in runtime" % container_id)
245
246 container_response = response.containers[0]
247 self._populate_container(container, container_response)
248 return container
249
250 def _populate_container(self, container, response):
251 self._populate_container_state(container, response)
252
253 def _populate_container_state(self, container, response):
254 state = response.state
255 if state == api_pb2.ContainerState.CONTAINER_CREATED:
256 container.status = consts.CREATED
257 elif state == api_pb2.ContainerState.CONTAINER_RUNNING:
258 container.status = consts.RUNNING
259 elif state == api_pb2.ContainerState.CONTAINER_EXITED:
260 container.status = consts.STOPPED
261 elif state == api_pb2.ContainerState.CONTAINER_UNKNOWN:
262 LOG.debug('State is unknown, status: %s', state)
263 container.status = consts.UNKNOWN
264 else:
265 LOG.warning('Receive unexpected state from CRI runtime: %s', state)
266 container.status = consts.UNKNOWN
267 container.status_reason = "container state unknown"
268
269 def delete_capsule(self, context, capsule, force):
270 pod_id = capsule.container_id
271 if not pod_id:
272 return
273
274 try:
275 response = self.runtime_stub.StopPodSandbox(
276 api_pb2.StopPodSandboxRequest(
277 pod_sandbox_id=capsule.container_id,
278 )
279 )
280 LOG.debug("podsandbox is stopped: %s", response)
281 response = self.runtime_stub.RemovePodSandbox(
282 api_pb2.RemovePodSandboxRequest(
283 pod_sandbox_id=capsule.container_id,
284 )
285 )
286 LOG.debug("podsandbox is removed: %s", response)
287 except exception.CommandError as e:
288 if 'error occurred when try to find sandbox' in str(e):
289 LOG.error("cannot find pod sandbox in runtime")
290 pass
291 else:
292 raise
293
294 self._delete_neutron_ports(context, capsule)
295
296 def _delete_neutron_ports(self, context, capsule):
297 if not capsule.addresses:
298 return
299
300 neutron_ports = set()
301 all_ports = set()
302 for net_uuid, addrs_list in capsule.addresses.items():
303 for addr in addrs_list:
304 all_ports.add(addr['port'])
305 if not addr['preserve_on_delete']:
306 port_id = addr['port']
307 neutron_ports.add(port_id)
308
309 neutron_api = neutron.NeutronAPI(context)
310 neutron_api.delete_or_unbind_ports(all_ports, neutron_ports)