"Fossies" - the Fresh Open Source Software Archive

Member "glusterfs-8.2/geo-replication/syncdaemon/monitor.py" (16 Sep 2020, 14710 Bytes) of package /linux/misc/glusterfs-8.2.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "monitor.py" see the Fossies "Dox" file reference documentation.

    1 #
    2 # Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
    3 # This file is part of GlusterFS.
    4 
    5 # This file is licensed to you under your choice of the GNU Lesser
    6 # General Public License, version 3 or any later version (LGPLv3 or
    7 # later), or the GNU General Public License, version 2 (GPLv2), in all
    8 # cases as published by the Free Software Foundation.
    9 #
   10 
   11 import os
   12 import sys
   13 import time
   14 import signal
   15 import logging
   16 import xml.etree.ElementTree as XET
   17 from threading import Lock
   18 from errno import ECHILD, ESRCH
   19 import random
   20 
   21 from resource import SSH
   22 import gsyncdconfig as gconf
   23 import libgfchangelog
   24 from rconf import rconf
   25 from syncdutils import (select, waitpid, errno_wrap, lf, grabpidfile,
   26                         set_term_handler, GsyncdError,
   27                         Thread, finalize, Volinfo, VolinfoFromGconf,
   28                         gf_event, EVENT_GEOREP_FAULTY, get_up_nodes,
   29                         unshare_propagation_supported)
   30 from gsyncdstatus import GeorepStatus, set_monitor_status
   31 import py2py3
   32 from py2py3 import pipe
   33 
   34 ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError
   35 
   36 
   37 def get_subvol_num(brick_idx, vol, hot):
   38     tier = vol.is_tier()
   39     disperse_count = vol.disperse_count(tier, hot)
   40     replica_count = vol.replica_count(tier, hot)
   41     distribute_count = vol.distribution_count(tier, hot)
   42     gconf.setconfig("master-distribution-count", distribute_count)
   43 
   44     if (tier and not hot):
   45         brick_idx = brick_idx - vol.get_hot_bricks_count(tier)
   46 
   47     subvol_size = disperse_count if disperse_count > 0 else replica_count
   48     cnt = int((brick_idx + 1) / subvol_size)
   49     rem = (brick_idx + 1) % subvol_size
   50     if rem > 0:
   51         cnt = cnt + 1
   52 
   53     if (tier and hot):
   54         return "hot_" + str(cnt)
   55     elif (tier and not hot):
   56         return "cold_" + str(cnt)
   57     else:
   58         return str(cnt)
   59 
   60 
   61 class Monitor(object):
   62 
   63     """class which spawns and manages gsyncd workers"""
   64 
   65     ST_INIT = 'Initializing...'
   66     ST_STARTED = 'Started'
   67     ST_STABLE = 'Active'
   68     ST_FAULTY = 'Faulty'
   69     ST_INCON = 'inconsistent'
   70     _ST_ORD = [ST_STABLE, ST_INIT, ST_FAULTY, ST_INCON]
   71 
   72     def __init__(self):
   73         self.lock = Lock()
   74         self.state = {}
   75         self.status = {}
   76 
   77     @staticmethod
   78     def terminate():
   79         # relax one SIGTERM by setting a handler that sets back
   80         # standard handler
   81         set_term_handler(lambda *a: set_term_handler())
   82         # give a chance to graceful exit
   83         errno_wrap(os.kill, [-os.getpid(), signal.SIGTERM], [ESRCH])
   84 
   85     def monitor(self, w, argv, cpids, slave_vol, slave_host, master,
   86                 suuid, slavenodes):
   87         """the monitor loop
   88 
   89         Basic logic is a blantantly simple blunt heuristics:
   90         if spawned client survives 60 secs, it's considered OK.
   91         This servers us pretty well as it's not vulneralbe to
   92         any kind of irregular behavior of the child...
   93 
   94         ... well, except for one: if children is hung up on
   95         waiting for some event, it can survive aeons, still
   96         will be defunct. So we tweak the above logic to
   97         expect the worker to send us a signal within 60 secs
   98         (in the form of closing its end of a pipe). The worker
   99         does this when it's done with the setup stage
  100         ready to enter the service loop (note it's the setup
  101         stage which is vulnerable to hangs -- the full
  102         blown worker blows up on EPIPE if the net goes down,
  103         due to the keep-alive thread)
  104         """
  105         if not self.status.get(w[0]['dir'], None):
  106             self.status[w[0]['dir']] = GeorepStatus(gconf.get("state-file"),
  107                                                     w[0]['host'],
  108                                                     w[0]['dir'],
  109                                                     w[0]['uuid'],
  110                                                     master,
  111                                                     "%s::%s" % (slave_host,
  112                                                                 slave_vol))
  113         ret = 0
  114 
  115         def nwait(p, o=0):
  116             try:
  117                 p2, r = waitpid(p, o)
  118                 if not p2:
  119                     return
  120                 return r
  121             except OSError as e:
  122                 # no child process, this happens if the child process
  123                 # already died and has been cleaned up
  124                 if e.errno == ECHILD:
  125                     return -1
  126                 else:
  127                     raise
  128 
  129         def exit_signalled(s):
  130             """ child terminated due to receipt of SIGUSR1 """
  131             return (os.WIFSIGNALED(s) and (os.WTERMSIG(s) == signal.SIGUSR1))
  132 
  133         def exit_status(s):
  134             if os.WIFEXITED(s):
  135                 return os.WEXITSTATUS(s)
  136             return 1
  137 
  138         conn_timeout = gconf.get("connection-timeout")
  139         while ret in (0, 1):
  140             remote_user, remote_host = w[1][0].split("@")
  141             remote_id = w[1][1]
  142             # Check the status of the connected slave node
  143             # If the connected slave node is down then try to connect to
  144             # different up node.
  145             current_slave_host = remote_host
  146             slave_up_hosts = get_up_nodes(slavenodes, gconf.get("ssh-port"))
  147 
  148             if (current_slave_host, remote_id) not in slave_up_hosts:
  149                 if len(slave_up_hosts) > 0:
  150                     remote_new = random.choice(slave_up_hosts)
  151                     remote_host = "%s@%s" % (remote_user, remote_new[0])
  152                     remote_id = remote_new[1]
  153 
  154             # Spawn the worker in lock to avoid fd leak
  155             self.lock.acquire()
  156 
  157             self.status[w[0]['dir']].set_worker_status(self.ST_INIT)
  158             logging.info(lf('starting gsyncd worker',
  159                             brick=w[0]['dir'],
  160                             slave_node=remote_host))
  161 
  162             pr, pw = pipe()
  163             cpid = os.fork()
  164             if cpid == 0:
  165                 os.close(pr)
  166 
  167                 args_to_worker = argv + [
  168                     'worker',
  169                     rconf.args.master,
  170                     rconf.args.slave,
  171                     '--feedback-fd', str(pw),
  172                     '--local-path', w[0]['dir'],
  173                     '--local-node', w[0]['host'],
  174                     '--local-node-id', w[0]['uuid'],
  175                     '--slave-id', suuid,
  176                     '--subvol-num', str(w[2]),
  177                     '--resource-remote', remote_host,
  178                     '--resource-remote-id', remote_id
  179                 ]
  180 
  181                 if rconf.args.config_file is not None:
  182                     args_to_worker += ['-c', rconf.args.config_file]
  183 
  184                 if w[3]:
  185                     args_to_worker.append("--is-hottier")
  186 
  187                 if rconf.args.debug:
  188                     args_to_worker.append("--debug")
  189 
  190                 access_mount = gconf.get("access-mount")
  191                 if access_mount:
  192                     os.execv(sys.executable, args_to_worker)
  193                 else:
  194                     if unshare_propagation_supported():
  195                         logging.debug("Worker would mount volume privately")
  196                         unshare_cmd = ['unshare', '-m', '--propagation',
  197                                        'private']
  198                         cmd = unshare_cmd + args_to_worker
  199                         os.execvp("unshare", cmd)
  200                     else:
  201                         logging.debug("Mount is not private. It would be lazy"
  202                                       " umounted")
  203                         os.execv(sys.executable, args_to_worker)
  204 
  205             cpids.add(cpid)
  206             os.close(pw)
  207 
  208             self.lock.release()
  209 
  210             t0 = time.time()
  211             so = select((pr,), (), (), conn_timeout)[0]
  212             os.close(pr)
  213 
  214             if so:
  215                 ret = nwait(cpid, os.WNOHANG)
  216 
  217                 if ret is not None:
  218                     logging.info(lf("worker died before establishing "
  219                                     "connection",
  220                                     brick=w[0]['dir']))
  221                 else:
  222                     logging.debug("worker(%s) connected" % w[0]['dir'])
  223                     while time.time() < t0 + conn_timeout:
  224                         ret = nwait(cpid, os.WNOHANG)
  225 
  226                         if ret is not None:
  227                             logging.info(lf("worker died in startup phase",
  228                                             brick=w[0]['dir']))
  229                             break
  230 
  231                         time.sleep(1)
  232             else:
  233                 logging.info(
  234                     lf("Worker not confirmed after wait, aborting it. "
  235                        "Gsyncd invocation on remote slave via SSH or "
  236                        "gluster master mount might have hung. Please "
  237                        "check the above logs for exact issue and check "
  238                        "master or slave volume for errors. Restarting "
  239                        "master/slave volume accordingly might help.",
  240                        brick=w[0]['dir'],
  241                        timeout=conn_timeout))
  242                 errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH])
  243                 ret = nwait(cpid)
  244             if ret is None:
  245                 ret = nwait(cpid)
  246             if exit_signalled(ret):
  247                 ret = 0
  248             else:
  249                 ret = exit_status(ret)
  250                 if ret in (0, 1):
  251                     self.status[w[0]['dir']].set_worker_status(self.ST_FAULTY)
  252                     gf_event(EVENT_GEOREP_FAULTY,
  253                              master_volume=master.volume,
  254                              master_node=w[0]['host'],
  255                              master_node_id=w[0]['uuid'],
  256                              slave_host=slave_host,
  257                              slave_volume=slave_vol,
  258                              current_slave_host=current_slave_host,
  259                              brick_path=w[0]['dir'])
  260             time.sleep(10)
  261         self.status[w[0]['dir']].set_worker_status(self.ST_INCON)
  262         return ret
  263 
  264     def multiplex(self, wspx, suuid, slave_vol, slave_host, master, slavenodes):
  265         argv = [os.path.basename(sys.executable), sys.argv[0]]
  266 
  267         cpids = set()
  268         ta = []
  269         for wx in wspx:
  270             def wmon(w):
  271                 cpid, _ = self.monitor(w, argv, cpids, slave_vol,
  272                                        slave_host, master, suuid, slavenodes)
  273                 time.sleep(1)
  274                 self.lock.acquire()
  275                 for cpid in cpids:
  276                     errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH])
  277                 self.lock.release()
  278                 finalize(exval=1)
  279             t = Thread(target=wmon, args=[wx])
  280             t.start()
  281             ta.append(t)
  282 
  283         # monitor status was being updated in each monitor thread. It
  284         # should not be done as it can cause deadlock for a worker start.
  285         # set_monitor_status uses flock to synchronize multple instances
  286         # updating the file. Since each monitor thread forks worker,
  287         # these processes can hold the reference to fd of status
  288         # file causing deadlock to workers which starts later as flock
  289         # will not be release until all references to same fd is closed.
  290         # It will also cause fd leaks.
  291 
  292         self.lock.acquire()
  293         set_monitor_status(gconf.get("state-file"), self.ST_STARTED)
  294         self.lock.release()
  295         for t in ta:
  296             t.join()
  297 
  298 
  299 def distribute(master, slave):
  300     if rconf.args.use_gconf_volinfo:
  301         mvol = VolinfoFromGconf(master.volume, master=True)
  302     else:
  303         mvol = Volinfo(master.volume, master.host, master=True)
  304     logging.debug('master bricks: ' + repr(mvol.bricks))
  305     prelude = []
  306     slave_host = None
  307     slave_vol = None
  308 
  309     prelude = [gconf.get("ssh-command")] + \
  310         gconf.get("ssh-options").split() + \
  311         ["-p", str(gconf.get("ssh-port"))] + \
  312         [slave.remote_addr]
  313 
  314     logging.debug('slave SSH gateway: ' + slave.remote_addr)
  315 
  316     if rconf.args.use_gconf_volinfo:
  317         svol = VolinfoFromGconf(slave.volume, master=False)
  318     else:
  319         svol = Volinfo(slave.volume, "localhost", prelude, master=False)
  320 
  321     sbricks = svol.bricks
  322     suuid = svol.uuid
  323     slave_host = slave.remote_addr.split('@')[-1]
  324     slave_vol = slave.volume
  325 
  326     # save this xattr for the session delete command
  327     old_stime_xattr_prefix = gconf.get("stime-xattr-prefix", None)
  328     new_stime_xattr_prefix = "trusted.glusterfs." + mvol.uuid + "." + \
  329                              svol.uuid
  330     if not old_stime_xattr_prefix or \
  331        old_stime_xattr_prefix != new_stime_xattr_prefix:
  332         gconf.setconfig("stime-xattr-prefix", new_stime_xattr_prefix)
  333 
  334     logging.debug('slave bricks: ' + repr(sbricks))
  335 
  336     slavenodes = set((b['host'], b["uuid"]) for b in sbricks)
  337     rap = SSH.parse_ssh_address(slave)
  338     slaves = [(rap['user'] + '@' + h[0], h[1]) for h in slavenodes]
  339 
  340     workerspex = []
  341     for idx, brick in enumerate(mvol.bricks):
  342         if rconf.args.local_node_id == brick['uuid']:
  343             is_hot = mvol.is_hot(":".join([brick['host'], brick['dir']]))
  344             workerspex.append((brick,
  345                                slaves[idx % len(slaves)],
  346                                get_subvol_num(idx, mvol, is_hot),
  347                                is_hot))
  348     logging.debug('worker specs: ' + repr(workerspex))
  349     return workerspex, suuid, slave_vol, slave_host, master, slavenodes
  350 
  351 
  352 def monitor(local, remote):
  353     # Check if gsyncd restarted in pause state. If
  354     # yes, send SIGSTOP to negative of monitor pid
  355     # to go back to pause state.
  356     if rconf.args.pause_on_start:
  357         errno_wrap(os.kill, [-os.getpid(), signal.SIGSTOP], [ESRCH])
  358 
  359     """oh yeah, actually Monitor is used as singleton, too"""
  360     return Monitor().multiplex(*distribute(local, remote))
  361 
  362 
  363 def startup(go_daemon=True):
  364     """set up logging, pidfile grabbing, daemonization"""
  365     pid_file = gconf.get("pid-file")
  366     if not grabpidfile():
  367         sys.stderr.write("pidfile is taken, exiting.\n")
  368         sys.exit(2)
  369     rconf.pid_file_owned = True
  370 
  371     if not go_daemon:
  372         return
  373 
  374     x, y = pipe()
  375     cpid = os.fork()
  376     if cpid:
  377         os.close(x)
  378         sys.exit()
  379     os.close(y)
  380     os.setsid()
  381     dn = os.open(os.devnull, os.O_RDWR)
  382     for f in (sys.stdin, sys.stdout, sys.stderr):
  383         os.dup2(dn, f.fileno())
  384 
  385     if not grabpidfile(pid_file + '.tmp'):
  386         raise GsyncdError("cannot grab temporary pidfile")
  387 
  388     os.rename(pid_file + '.tmp', pid_file)
  389 
  390     # wait for parent to terminate
  391     # so we can start up with
  392     # no messing from the dirty
  393     # ol' bustard
  394     select((x,), (), ())
  395     os.close(x)