"Fossies" - the Fresh Open Source Software Archive

Member "buildbot-2.5.1/buildbot/test/fake/kube.py" (24 Nov 2019, 2425 Bytes) of package /linux/misc/buildbot-2.5.1.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the last Fossies "Diffs" side-by-side code changes report for "kube.py": 2.3.1_vs_2.4.0.

    1 # This file is part of Buildbot.  Buildbot is free software: you can
    2 # redistribute it and/or modify it under the terms of the GNU General Public
    3 # License as published by the Free Software Foundation, version 2.
    4 #
    5 # This program is distributed in the hope that it will be useful, but WITHOUT
    6 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
    7 # FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
    8 # details.
    9 #
   10 # You should have received a copy of the GNU General Public License along with
   11 # this program; if not, write to the Free Software Foundation, Inc., 51
   12 # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
   13 #
   14 # Copyright Buildbot Team Members
   15 
   16 import copy
   17 import time
   18 
   19 from buildbot.test.fake import httpclientservice as fakehttpclientservice
   20 from buildbot.util.kubeclientservice import KubeError
   21 
   22 
   23 class KubeClientService(fakehttpclientservice.HTTPClientService):
   24 
   25     def __init__(self, kube_config=None, *args, **kwargs):
   26         c = kube_config.getConfig()
   27         super().__init__(c['master_url'], *args, **kwargs)
   28         self.namespace = c['namespace']
   29         self.addService(kube_config)
   30         self.pods = {}
   31 
   32     def createPod(self, namespace, spec):
   33         if 'metadata' not in spec:
   34             raise KubeError({
   35                 'message': 'Pod "" is invalid: metadata.name: Required value: name or generateName is required'})
   36         name = spec['metadata']['name']
   37         pod = {
   38             'kind': 'Pod',
   39             'metadata': copy.copy(spec['metadata']),
   40             'spec': copy.deepcopy(spec['spec'])
   41         }
   42         self.pods[namespace + '/' + name] = pod
   43         return pod
   44 
   45     def deletePod(self, namespace, name, graceperiod=0):
   46         if namespace + '/' + name not in self.pods:
   47             raise KubeError({
   48                 'message': 'Pod not found',
   49                 'reason': 'NotFound'})
   50 
   51         spec = self.pods[namespace + '/' + name]
   52 
   53         del self.pods[namespace + '/' + name]
   54         spec['metadata']['deletionTimestamp'] = time.ctime(time.time())
   55         return spec
   56 
   57     def waitForPodDeletion(self, namespace, name, timeout):
   58         if namespace + '/' + name in self.pods:
   59             raise TimeoutError("Did not see pod {name} terminate after {timeout}s".format(
   60                 name=name, timeout=timeout
   61             ))
   62         return {
   63             'kind': 'Status',
   64             'reason': 'NotFound'
   65         }