"Fossies" - the Fresh Open Source Software Archive

Member "glusterfs-8.2/geo-replication/syncdaemon/resource.py" (16 Sep 2020, 60906 Bytes) of package /linux/misc/glusterfs-8.2.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "resource.py" see the Fossies "Dox" file reference documentation.

    1 #
    2 # Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
    3 # This file is part of GlusterFS.
    4 
    5 # This file is licensed to you under your choice of the GNU Lesser
    6 # General Public License, version 3 or any later version (LGPLv3 or
    7 # later), or the GNU General Public License, version 2 (GPLv2), in all
    8 # cases as published by the Free Software Foundation.
    9 #
   10 
   11 import re
   12 import os
   13 import sys
   14 import stat
   15 import time
   16 import fcntl
   17 import types
   18 import struct
   19 import logging
   20 import tempfile
   21 import subprocess
   22 from errno import (EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES,
   23                    EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM)
   24 import errno
   25 
   26 from rconf import rconf
   27 import gsyncdconfig as gconf
   28 import libgfchangelog
   29 
   30 import repce
   31 from repce import RepceServer, RepceClient
   32 from master import gmaster_builder
   33 import syncdutils
   34 from syncdutils import (GsyncdError, select, privileged, funcode,
   35                         entry2pb, gauxpfx, errno_wrap, lstat,
   36                         NoStimeAvailable, PartialHistoryAvailable,
   37                         ChangelogException, ChangelogHistoryNotAvailable,
   38                         get_changelog_log_level, get_rsync_version,
   39                         GX_GFID_CANONICAL_LEN,
   40                         gf_mount_ready, lf, Popen, sup,
   41                         Xattr, matching_disk_gfid, get_gfid_from_mnt,
   42                         unshare_propagation_supported, get_slv_dir_path)
   43 from gsyncdstatus import GeorepStatus
   44 from py2py3 import (pipe, str_to_bytearray, entry_pack_reg,
   45                     entry_pack_reg_stat, entry_pack_mkdir,
   46                     entry_pack_symlink)
   47 
   48 
   49 ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP')
   50 
   51 slv_volume = None
   52 slv_host = None
   53 
   54 
   55 class Server(object):
   56 
   57     """singleton implemening those filesystem access primitives
   58        which are needed for geo-replication functionality
   59 
   60     (Singleton in the sense it's a class which has only static
   61     and classmethods and is used directly, without instantiation.)
   62     """
   63 
   64     GX_NSPACE_PFX = (privileged() and "trusted" or "system")
   65     GX_NSPACE = GX_NSPACE_PFX + ".glusterfs"
   66     NTV_FMTSTR = "!" + "B" * 19 + "II"
   67     FRGN_XTRA_FMT = "I"
   68     FRGN_FMTSTR = NTV_FMTSTR + FRGN_XTRA_FMT
   69 
   70     # for backend gfid fetch, do not use GX_NSPACE_PFX
   71     GFID_XATTR = 'trusted.gfid'
   72     GFID_FMTSTR = "!" + "B" * 16
   73 
   74     local_path = ''
   75 
   76     @classmethod
   77     def _fmt_mknod(cls, l):
   78         return "!II%dsI%dsIII" % (GX_GFID_CANONICAL_LEN, l + 1)
   79 
   80     @classmethod
   81     def _fmt_mkdir(cls, l):
   82         return "!II%dsI%dsII" % (GX_GFID_CANONICAL_LEN, l + 1)
   83 
   84     @classmethod
   85     def _fmt_symlink(cls, l1, l2):
   86         return "!II%dsI%ds%ds" % (GX_GFID_CANONICAL_LEN, l1 + 1, l2 + 1)
   87 
   88     def _pathguard(f):
   89         """decorator method that checks
   90         the path argument of the decorated
   91         functions to make sure it does not
   92         point out of the managed tree
   93         """
   94 
   95         fc = funcode(f)
   96         pi = list(fc.co_varnames).index('path')
   97 
   98         def ff(*args):
   99             path = args[pi]
  100             ps = path.split('/')
  101             if path[0] == '/' or '..' in ps:
  102                 raise ValueError('unsafe path')
  103             args = list(args)
  104             args[pi] = os.path.join(args[0].local_path, path)
  105             return f(*args)
  106         return ff
  107 
  108     @classmethod
  109     @_pathguard
  110     def entries(cls, path):
  111         """directory entries in an array"""
  112         # prevent symlinks being followed
  113         if not stat.S_ISDIR(os.lstat(path).st_mode):
  114             raise OSError(ENOTDIR, os.strerror(ENOTDIR))
  115         return os.listdir(path)
  116 
  117     @classmethod
  118     @_pathguard
  119     def lstat(cls, path):
  120         try:
  121             return os.lstat(path)
  122         except (IOError, OSError):
  123             ex = sys.exc_info()[1]
  124             if ex.errno == ENOENT:
  125                 return ex.errno
  126             else:
  127                 raise
  128 
  129     @classmethod
  130     @_pathguard
  131     def linkto_check(cls, path):
  132         try:
  133             return not (
  134                 Xattr.lgetxattr_buf(path,
  135                                     'trusted.glusterfs.dht.linkto') == '')
  136         except (IOError, OSError):
  137             ex = sys.exc_info()[1]
  138             if ex.errno in (ENOENT, ENODATA):
  139                 return False
  140             else:
  141                 raise
  142 
  143     @classmethod
  144     @_pathguard
  145     def gfid(cls, path):
  146         buf = errno_wrap(Xattr.lgetxattr, [path, cls.GFID_XATTR, 16],
  147                          [ENOENT], [ESTALE, ENODATA])
  148         if buf == ENOENT:
  149             return buf
  150         else:
  151             buf = str_to_bytearray(buf)
  152             m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(
  153                 ['%02x' % x for x in struct.unpack(cls.GFID_FMTSTR, buf)]))
  154             return '-'.join(m.groups())
  155 
  156     @classmethod
  157     @_pathguard
  158     def purge(cls, path, entries=None):
  159         """force-delete subtrees
  160 
  161         If @entries is not specified, delete
  162         the whole subtree under @path (including
  163         @path).
  164 
  165         Otherwise, @entries should be a
  166         a sequence of children of @path, and
  167         the effect is identical with a joint
  168         @entries-less purge on them, ie.
  169 
  170         for e in entries:
  171             cls.purge(os.path.join(path, e))
  172         """
  173         me_also = entries is None
  174         if not entries:
  175             try:
  176                 # if it's a symlink, prevent
  177                 # following it
  178                 try:
  179                     os.unlink(path)
  180                     return
  181                 except OSError:
  182                     ex = sys.exc_info()[1]
  183                     if ex.errno == EISDIR:
  184                         entries = os.listdir(path)
  185                     else:
  186                         raise
  187             except OSError:
  188                 ex = sys.exc_info()[1]
  189                 if ex.errno in (ENOTDIR, ENOENT, ELOOP):
  190                     try:
  191                         os.unlink(path)
  192                         return
  193                     except OSError:
  194                         ex = sys.exc_info()[1]
  195                         if ex.errno == ENOENT:
  196                             return
  197                         raise
  198                 else:
  199                     raise
  200         for e in entries:
  201             cls.purge(os.path.join(path, e))
  202         if me_also:
  203             os.rmdir(path)
  204 
  205     @classmethod
  206     @_pathguard
  207     def _create(cls, path, ctor):
  208         """path creation backend routine"""
  209         try:
  210             ctor(path)
  211         except OSError:
  212             ex = sys.exc_info()[1]
  213             if ex.errno == EEXIST:
  214                 cls.purge(path)
  215                 return ctor(path)
  216             raise
  217 
  218     @classmethod
  219     @_pathguard
  220     def mkdir(cls, path):
  221         cls._create(path, os.mkdir)
  222 
  223     @classmethod
  224     @_pathguard
  225     def symlink(cls, lnk, path):
  226         cls._create(path, lambda p: os.symlink(lnk, p))
  227 
  228     @classmethod
  229     @_pathguard
  230     def xtime(cls, path, uuid):
  231         """query xtime extended attribute
  232 
  233         Return xtime of @path for @uuid as a pair of integers.
  234         "Normal" errors due to non-existent @path or extended attribute
  235         are tolerated and errno is returned in such a case.
  236         """
  237 
  238         try:
  239             val = Xattr.lgetxattr(path,
  240                                   '.'.join([cls.GX_NSPACE, uuid, 'xtime']),
  241                                   8)
  242             val = str_to_bytearray(val)
  243             return struct.unpack('!II', val)
  244         except OSError:
  245             ex = sys.exc_info()[1]
  246             if ex.errno in (ENOENT, ENODATA, ENOTDIR):
  247                 return ex.errno
  248             else:
  249                 raise
  250 
  251     @classmethod
  252     @_pathguard
  253     def stime_mnt(cls, path, uuid):
  254         """query xtime extended attribute
  255 
  256         Return xtime of @path for @uuid as a pair of integers.
  257         "Normal" errors due to non-existent @path or extended attribute
  258         are tolerated and errno is returned in such a case.
  259         """
  260 
  261         try:
  262             val = Xattr.lgetxattr(path,
  263                                   '.'.join([cls.GX_NSPACE, uuid, 'stime']),
  264                                   8)
  265             val = str_to_bytearray(val)
  266             return struct.unpack('!II', val)
  267         except OSError:
  268             ex = sys.exc_info()[1]
  269             if ex.errno in (ENOENT, ENODATA, ENOTDIR):
  270                 return ex.errno
  271             else:
  272                 raise
  273 
  274     @classmethod
  275     @_pathguard
  276     def stime(cls, path, uuid):
  277         """query xtime extended attribute
  278 
  279         Return xtime of @path for @uuid as a pair of integers.
  280         "Normal" errors due to non-existent @path or extended attribute
  281         are tolerated and errno is returned in such a case.
  282         """
  283 
  284         try:
  285             val = Xattr.lgetxattr(path,
  286                                   '.'.join([cls.GX_NSPACE, uuid, 'stime']),
  287                                   8)
  288             val = str_to_bytearray(val)
  289             return struct.unpack('!II', val)
  290         except OSError:
  291             ex = sys.exc_info()[1]
  292             if ex.errno in (ENOENT, ENODATA, ENOTDIR):
  293                 return ex.errno
  294             else:
  295                 raise
  296 
  297     @classmethod
  298     @_pathguard
  299     def entry_stime(cls, path, uuid):
  300         """
  301         entry_stime xattr to reduce the number of retry of Entry changes when
  302         Geo-rep worker crashes and restarts. entry_stime is updated after
  303         processing every changelog file. On failure and restart, worker only
  304         have to reprocess the last changelog for Entry ops.
  305         Xattr Key: <PFX>.<MASTERVOL_UUID>.<SLAVEVOL_UUID>.entry_stime
  306         """
  307         try:
  308             val = Xattr.lgetxattr(path,
  309                                   '.'.join([cls.GX_NSPACE, uuid,
  310                                             'entry_stime']),
  311                                   8)
  312             val = str_to_bytearray(val)
  313             return struct.unpack('!II', val)
  314         except OSError:
  315             ex = sys.exc_info()[1]
  316             if ex.errno in (ENOENT, ENODATA, ENOTDIR):
  317                 return ex.errno
  318             else:
  319                 raise
  320 
  321     @classmethod
  322     def node_uuid(cls, path='.'):
  323         try:
  324             uuid_l = Xattr.lgetxattr_buf(
  325                 path, '.'.join([cls.GX_NSPACE, 'node-uuid']))
  326             return uuid_l[:-1].split(' ')
  327         except OSError:
  328             raise
  329 
  330     @classmethod
  331     @_pathguard
  332     def set_stime(cls, path, uuid, mark):
  333         """set @mark as stime for @uuid on @path"""
  334         errno_wrap(Xattr.lsetxattr,
  335                    [path,
  336                     '.'.join([cls.GX_NSPACE, uuid, 'stime']),
  337                     struct.pack('!II', *mark)],
  338                    [ENOENT],
  339                    [ESTALE, EINVAL])
  340 
  341     @classmethod
  342     @_pathguard
  343     def set_entry_stime(cls, path, uuid, mark):
  344         """set @mark as stime for @uuid on @path"""
  345         errno_wrap(Xattr.lsetxattr,
  346                    [path,
  347                     '.'.join([cls.GX_NSPACE, uuid, 'entry_stime']),
  348                     struct.pack('!II', *mark)],
  349                    [ENOENT],
  350                    [ESTALE, EINVAL])
  351 
  352     @classmethod
  353     @_pathguard
  354     def set_xtime(cls, path, uuid, mark):
  355         """set @mark as xtime for @uuid on @path"""
  356         errno_wrap(Xattr.lsetxattr,
  357                    [path,
  358                     '.'.join([cls.GX_NSPACE, uuid, 'xtime']),
  359                     struct.pack('!II', *mark)],
  360                    [ENOENT],
  361                    [ESTALE, EINVAL])
  362 
  363     @classmethod
  364     @_pathguard
  365     def set_xtime_remote(cls, path, uuid, mark):
  366         """
  367         set @mark as xtime for @uuid on @path
  368         the difference b/w this and set_xtime() being
  369         set_xtime() being overloaded to set the xtime
  370         on the brick (this method sets xtime on the
  371         remote slave)
  372         """
  373         Xattr.lsetxattr(
  374             path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']),
  375             struct.pack('!II', *mark))
  376 
  377     @classmethod
  378     def entry_ops(cls, entries):
  379         pfx = gauxpfx()
  380         logging.debug('entries: %s' % repr(entries))
  381         dist_count = rconf.args.master_dist_count
  382 
  383         def entry_purge(op, entry, gfid, e, uid, gid):
  384             # This is an extremely racy code and needs to be fixed ASAP.
  385             # The GFID check here is to be sure that the pargfid/bname
  386             # to be purged is the GFID gotten from the changelog.
  387             # (a stat(changelog_gfid) would also be valid here)
  388             # The race here is between the GFID check and the purge.
  389 
  390             # If the entry or the gfid of the file to be deleted is not present
  391             # on slave, we can ignore the unlink/rmdir
  392             if isinstance(lstat(entry), int) or \
  393                isinstance(lstat(os.path.join(pfx, gfid)), int):
  394                 return
  395 
  396             if not matching_disk_gfid(gfid, entry):
  397                 collect_failure(e, EEXIST, uid, gid)
  398                 return
  399 
  400             if op == 'UNLINK':
  401                 er = errno_wrap(os.unlink, [entry], [ENOENT, ESTALE], [EBUSY])
  402                 # EISDIR is safe error, ignore. This can only happen when
  403                 # unlink is sent from master while fixing gfid conflicts.
  404                 if er != EISDIR:
  405                     return er
  406 
  407             elif op == 'RMDIR':
  408                 er = errno_wrap(os.rmdir, [entry], [ENOENT, ESTALE,
  409                                                     ENOTEMPTY], [EBUSY])
  410                 if er == ENOTEMPTY:
  411                     return er
  412 
  413         def collect_failure(e, cmd_ret, uid, gid, dst=False):
  414             slv_entry_info = {}
  415             slv_entry_info['gfid_mismatch'] = False
  416             slv_entry_info['name_mismatch'] = False
  417             slv_entry_info['dst'] = dst
  418             slv_entry_info['slave_isdir'] = False
  419             slv_entry_info['slave_name'] = None
  420             slv_entry_info['slave_gfid'] = None
  421             # We do this for failing fops on Slave
  422             # Master should be logging this
  423             if cmd_ret is None:
  424                 return False
  425 
  426             if e.get("stat", {}):
  427                 # Copy actual UID/GID value back to entry stat
  428                 e['stat']['uid'] = uid
  429                 e['stat']['gid'] = gid
  430 
  431             if cmd_ret in [EEXIST, ESTALE]:
  432                 if dst:
  433                     en = e['entry1']
  434                 else:
  435                     en = e['entry']
  436                 disk_gfid = get_gfid_from_mnt(en)
  437                 if isinstance(disk_gfid, str) and \
  438                    e['gfid'] != disk_gfid:
  439                     slv_entry_info['gfid_mismatch'] = True
  440                     st = lstat(en)
  441                     if not isinstance(st, int):
  442                         if st and stat.S_ISDIR(st.st_mode):
  443                             slv_entry_info['slave_isdir'] = True
  444                             dir_name = get_slv_dir_path(slv_host, slv_volume,
  445                                                         disk_gfid)
  446                             slv_entry_info['slave_name'] = dir_name
  447                         else:
  448                             slv_entry_info['slave_isdir'] = False
  449                     slv_entry_info['slave_gfid'] = disk_gfid
  450                     failures.append((e, cmd_ret, slv_entry_info))
  451                 else:
  452                     return False
  453             else:
  454                 failures.append((e, cmd_ret, slv_entry_info))
  455 
  456             return True
  457 
  458         failures = []
  459 
  460         def recursive_rmdir(gfid, entry, path):
  461             """disk_gfid check added for original path for which
  462             recursive_delete is called. This disk gfid check executed
  463             before every Unlink/Rmdir. If disk gfid is not matching
  464             with GFID from Changelog, that means other worker
  465             deleted the directory. Even if the subdir/file present,
  466             it belongs to different parent. Exit without performing
  467             further deletes.
  468             """
  469             if not matching_disk_gfid(gfid, entry):
  470                 return
  471 
  472             names = []
  473             names = errno_wrap(os.listdir, [path], [ENOENT], [ESTALE, ENOTSUP])
  474             if isinstance(names, int):
  475                 return
  476 
  477             for name in names:
  478                 fullname = os.path.join(path, name)
  479                 if not matching_disk_gfid(gfid, entry):
  480                     return
  481                 er = errno_wrap(os.remove, [fullname], [ENOENT, ESTALE,
  482                                                         EISDIR], [EBUSY])
  483 
  484                 if er == EISDIR:
  485                     recursive_rmdir(gfid, entry, fullname)
  486 
  487             if not matching_disk_gfid(gfid, entry):
  488                 return
  489 
  490             errno_wrap(os.rmdir, [path], [ENOENT, ESTALE], [EBUSY])
  491 
  492         def rename_with_disk_gfid_confirmation(gfid, entry, en, uid, gid):
  493             if not matching_disk_gfid(gfid, entry):
  494                 logging.error(lf("RENAME ignored: source entry does not match "
  495                                  "with on-disk gfid",
  496                                  source=entry,
  497                                  gfid=gfid,
  498                                  disk_gfid=get_gfid_from_mnt(entry),
  499                                  target=en))
  500                 collect_failure(e, EEXIST, uid, gid)
  501                 return
  502 
  503             cmd_ret = errno_wrap(os.rename,
  504                                  [entry, en],
  505                                  [ENOENT, EEXIST], [ESTALE, EBUSY])
  506             collect_failure(e, cmd_ret, uid, gid)
  507 
  508         for e in entries:
  509             blob = None
  510             op = e['op']
  511             gfid = e['gfid']
  512             entry = e['entry']
  513             uid = 0
  514             gid = 0
  515 
  516             # Skip entry processing if it's marked true during gfid
  517             # conflict resolution
  518             if e['skip_entry']:
  519                 continue
  520 
  521             if e.get("stat", {}):
  522                 # Copy UID/GID value and then reset to zero. Copied UID/GID
  523                 # will be used to run chown once entry is created.
  524                 uid = e['stat']['uid']
  525                 gid = e['stat']['gid']
  526                 e['stat']['uid'] = 0
  527                 e['stat']['gid'] = 0
  528 
  529             (pg, bname) = entry2pb(entry)
  530             if op in ['RMDIR', 'UNLINK']:
  531                 # Try once, if rmdir failed with ENOTEMPTY
  532                 # then delete recursively.
  533                 er = entry_purge(op, entry, gfid, e, uid, gid)
  534                 if isinstance(er, int):
  535                     if er == ENOTEMPTY and op == 'RMDIR':
  536                         # Retry if ENOTEMPTY, ESTALE
  537                         er1 = errno_wrap(recursive_rmdir,
  538                                          [gfid, entry,
  539                                           os.path.join(pg, bname)],
  540                                          [], [ENOTEMPTY, ESTALE, ENODATA])
  541                         if not isinstance(er1, int):
  542                             logging.debug("Removed %s => %s/%s recursively" %
  543                                           (gfid, pg, bname))
  544                         else:
  545                             logging.warn(lf("Recursive remove failed",
  546                                             gfid=gfid,
  547                                             pgfid=pg,
  548                                             bname=bname,
  549                                             error=os.strerror(er1)))
  550                     else:
  551                         logging.warn(lf("Failed to remove",
  552                                         gfid=gfid,
  553                                         pgfid=pg,
  554                                         bname=bname,
  555                                         error=os.strerror(er)))
  556             elif op in ['CREATE', 'MKNOD']:
  557                 slink = os.path.join(pfx, gfid)
  558                 st = lstat(slink)
  559                 # don't create multiple entries with same gfid
  560                 if isinstance(st, int):
  561                     blob = entry_pack_reg(cls, gfid, bname,
  562                                           e['mode'], e['uid'], e['gid'])
  563                 # Self healed hardlinks are recorded as MKNOD.
  564                 # So if the gfid already exists, it should be
  565                 # processed as hard link not mknod.
  566                 elif op in ['MKNOD']:
  567                     cmd_ret = errno_wrap(os.link,
  568                                          [slink, entry],
  569                                          [ENOENT, EEXIST], [ESTALE])
  570                     collect_failure(e, cmd_ret, uid, gid)
  571             elif op == 'MKDIR':
  572                 en = e['entry']
  573                 slink = os.path.join(pfx, gfid)
  574                 st = lstat(slink)
  575                 # don't create multiple entries with same gfid
  576                 if isinstance(st, int):
  577                     blob = entry_pack_mkdir(cls, gfid, bname,
  578                                             e['mode'], e['uid'], e['gid'])
  579                 elif (isinstance(lstat(en), int) or
  580                       not matching_disk_gfid(gfid, en)):
  581                     # If gfid of a directory exists on slave but path based
  582                     # create is getting EEXIST. This means the directory is
  583                     # renamed in master but recorded as MKDIR during hybrid
  584                     # crawl. Get the directory path by reading the backend
  585                     # symlink and trying to rename to new name as said by
  586                     # master.
  587                     logging.info(lf("Special case: rename on mkdir",
  588                                     gfid=gfid, entry=repr(entry)))
  589                     src_entry = get_slv_dir_path(slv_host, slv_volume, gfid)
  590                     if src_entry is None:
  591                         collect_failure(e, ENOENT, uid, gid)
  592                     if src_entry is not None and src_entry != entry:
  593                         slv_entry_info = {}
  594                         slv_entry_info['gfid_mismatch'] = False
  595                         slv_entry_info['name_mismatch'] = True
  596                         slv_entry_info['dst'] = False
  597                         slv_entry_info['slave_isdir'] = True
  598                         slv_entry_info['slave_gfid'] = gfid
  599                         slv_entry_info['slave_entry'] = src_entry
  600 
  601                         failures.append((e, EEXIST, slv_entry_info))
  602             elif op == 'LINK':
  603                 slink = os.path.join(pfx, gfid)
  604                 st = lstat(slink)
  605                 if isinstance(st, int):
  606                     (pg, bname) = entry2pb(entry)
  607                     if stat.S_ISREG(e['stat']['mode']):
  608                         blob = entry_pack_reg_stat(cls, gfid, bname, e['stat'])
  609                     elif stat.S_ISLNK(e['stat']['mode']):
  610                         blob = entry_pack_symlink(cls, gfid, bname, e['link'],
  611                                                   e['stat'])
  612                 else:
  613                     cmd_ret = errno_wrap(os.link,
  614                                          [slink, entry],
  615                                          [ENOENT, EEXIST], [ESTALE])
  616                     collect_failure(e, cmd_ret, uid, gid)
  617             elif op == 'SYMLINK':
  618                 en = e['entry']
  619                 st = lstat(entry)
  620                 if isinstance(st, int):
  621                     blob = entry_pack_symlink(cls, gfid, bname, e['link'],
  622                                               e['stat'])
  623                 elif not matching_disk_gfid(gfid, en):
  624                     collect_failure(e, EEXIST, uid, gid)
  625             elif op == 'RENAME':
  626                 en = e['entry1']
  627                 # The matching disk gfid check validates two things
  628                 #  1. Validates name is present, return false otherwise
  629                 #  2. Validates gfid is same, returns false otherwise
  630                 # So both validations are necessary to decide src doesn't
  631                 # exist. We can't rely on only gfid stat as hardlink could
  632                 # be present and we can't rely only on name as name could
  633                 # exist with different gfid.
  634                 if not matching_disk_gfid(gfid, entry):
  635                     if e['stat'] and not stat.S_ISDIR(e['stat']['mode']):
  636                         if stat.S_ISLNK(e['stat']['mode']):
  637                             # src is not present, so don't sync symlink as
  638                             # we don't know target. It's ok to ignore. If
  639                             # it's unliked, it's fine. If it's renamed to
  640                             # something else, it will be synced then.
  641                             if e['link'] is not None:
  642                                 st1 = lstat(en)
  643                                 if isinstance(st1, int):
  644                                     (pg, bname) = entry2pb(en)
  645                                     blob = entry_pack_symlink(cls, gfid, bname,
  646                                                               e['link'],
  647                                                               e['stat'])
  648                                 elif not matching_disk_gfid(gfid, en):
  649                                     collect_failure(e, EEXIST, uid, gid, True)
  650                         else:
  651                             slink = os.path.join(pfx, gfid)
  652                             st = lstat(slink)
  653                             # don't create multiple entries with same gfid
  654                             if isinstance(st, int):
  655                                 (pg, bname) = entry2pb(en)
  656                                 blob = entry_pack_reg_stat(cls, gfid, bname,
  657                                                            e['stat'])
  658                             else:
  659                                 cmd_ret = errno_wrap(os.link, [slink, en],
  660                                                     [ENOENT, EEXIST], [ESTALE])
  661                                 collect_failure(e, cmd_ret, uid, gid)
  662                 else:
  663                     st = lstat(entry)
  664                     st1 = lstat(en)
  665                     if isinstance(st1, int):
  666                         rename_with_disk_gfid_confirmation(gfid, entry, en,
  667                                                            uid, gid)
  668                     else:
  669                         if st.st_ino == st1.st_ino:
  670                             # we have a hard link, we can now unlink source
  671                             try:
  672                                 errno_wrap(os.unlink, [entry],
  673                                            [ENOENT, ESTALE], [EBUSY])
  674                             except OSError as e:
  675                                 if e.errno == EISDIR:
  676                                     try:
  677                                         errno_wrap(os.rmdir, [entry],
  678                                                    [ENOENT, ESTALE], [EBUSY])
  679                                     except OSError as e:
  680                                         if e.errno == ENOTEMPTY:
  681                                             logging.error(
  682                                                 lf("Directory Rename failed. "
  683                                                    "Both Old and New"
  684                                                    " directories exists",
  685                                                    old=entry,
  686                                                    new=en))
  687                                         else:
  688                                             raise
  689                                 else:
  690                                     raise
  691                         elif not matching_disk_gfid(gfid, en) and dist_count > 1:
  692                             collect_failure(e, EEXIST, uid, gid, True)
  693                         else:
  694                             # We are here which means matching_disk_gfid for
  695                             # both source and destination has returned false
  696                             # and distribution count for master vol is greater
  697                             # then one. Which basically says both the source and
  698                             # destination exist and not hardlinks.
  699                             # So we are safe to go ahead with rename here.
  700                             rename_with_disk_gfid_confirmation(gfid, entry, en,
  701                                                                uid, gid)
  702             if blob:
  703                 cmd_ret = errno_wrap(Xattr.lsetxattr,
  704                                      [pg, 'glusterfs.gfid.newfile', blob],
  705                                      [EEXIST, ENOENT, ESTALE],
  706                                      [ESTALE, EINVAL, EBUSY])
  707                 collect_failure(e, cmd_ret, uid, gid)
  708 
  709                 # If UID/GID is different than zero that means we are trying
  710                 # create Entry with different UID/GID. Create Entry with
  711                 # UID:0 and GID:0, and then call chown to set UID/GID
  712                 if uid != 0 or gid != 0:
  713                     path = os.path.join(pfx, gfid)
  714                     cmd_ret = errno_wrap(os.lchown, [path, uid, gid], [ENOENT],
  715                                          [ESTALE, EINVAL])
  716                     collect_failure(e, cmd_ret, uid, gid)
  717 
  718         return failures
  719 
  720     @classmethod
  721     def meta_ops(cls, meta_entries):
  722         logging.debug('Meta-entries: %s' % repr(meta_entries))
  723         failures = []
  724         for e in meta_entries:
  725             mode = e['stat']['mode']
  726             uid = e['stat']['uid']
  727             gid = e['stat']['gid']
  728             atime = e['stat']['atime']
  729             mtime = e['stat']['mtime']
  730             go = e['go']
  731             # Linux doesn't support chmod on symlink itself.
  732             # It is always applied to the target file. So
  733             # changelog would record target file's gfid
  734             # and we are good. But 'chown' is supported on
  735             # symlink file. So changelog would record symlink
  736             # gfid in such cases. Since we do 'chown' 'chmod'
  737             # 'utime' for each gfid recorded for metadata, and
  738             # we know from changelog the metadata is on symlink's
  739             # gfid or target file's gfid, we should be doing
  740             # 'lchown' 'lchmod' 'utime with no-deference' blindly.
  741             # But since 'lchmod' and 'utime with no de-reference' is
  742             # not supported in python3, we have to rely on 'chmod'
  743             # and 'utime with de-reference'. Hence avoiding 'chmod'
  744             # and 'utime' if it's symlink file.
  745 
  746             is_symlink = False
  747             cmd_ret = errno_wrap(os.lchown, [go, uid, gid], [ENOENT],
  748                                  [ESTALE, EINVAL])
  749             if isinstance(cmd_ret, int):
  750                 continue
  751 
  752             is_symlink = os.path.islink(go)
  753 
  754             if not is_symlink:
  755                 cmd_ret = errno_wrap(os.chmod, [go, mode],
  756                                      [ENOENT, EACCES, EPERM], [ESTALE, EINVAL])
  757                 if isinstance(cmd_ret, int):
  758                     failures.append((e, cmd_ret, "chmod"))
  759 
  760                 cmd_ret = errno_wrap(os.utime, [go, (atime, mtime)],
  761                                      [ENOENT, EACCES, EPERM], [ESTALE, EINVAL])
  762                 if isinstance(cmd_ret, int):
  763                     failures.append((e, cmd_ret, "utime"))
  764         return failures
  765 
  766     @classmethod
  767     @_pathguard
  768     def setattr(cls, path, adct):
  769         """set file attributes
  770 
  771         @adct is a dict, where 'own', 'mode' and 'times'
  772         keys are looked for and values used to perform
  773         chown, chmod or utimes on @path.
  774         """
  775         own = adct.get('own')
  776         if own:
  777             os.lchown(path, *own)
  778         mode = adct.get('mode')
  779         if mode:
  780             os.chmod(path, stat.S_IMODE(mode))
  781         times = adct.get('times')
  782         if times:
  783             os.utime(path, times)
  784 
  785     @staticmethod
  786     def pid():
  787         return os.getpid()
  788 
  789     last_keep_alive = 0
  790 
  791     @classmethod
  792     def keep_alive(cls, dct):
  793         """process keepalive messages.
  794 
  795         Return keep-alive counter (number of received keep-alive
  796         messages).
  797 
  798         Now the "keep-alive" message can also have a payload which is
  799         used to set a foreign volume-mark on the underlying file system.
  800         """
  801         if dct:
  802             key = '.'.join([cls.GX_NSPACE, 'volume-mark', dct['uuid']])
  803             val = struct.pack(cls.FRGN_FMTSTR,
  804                               *(dct['version'] +
  805                                 tuple(int(x, 16)
  806                                       for x in re.findall('(?:[\da-f]){2}',
  807                                                           dct['uuid'])) +
  808                                 (dct['retval'],) + dct['volume_mark'][0:2] + (
  809                                     dct['timeout'],)))
  810             Xattr.lsetxattr('.', key, val)
  811         cls.last_keep_alive += 1
  812         return cls.last_keep_alive
  813 
  814     @staticmethod
  815     def version():
  816         """version used in handshake"""
  817         return 1.0
  818 
  819 
  820 class Mounter(object):
  821 
  822     """Abstract base class for mounter backends"""
  823 
  824     def __init__(self, params):
  825         self.params = params
  826         self.mntpt = None
  827         self.umount_cmd = []
  828 
  829     @classmethod
  830     def get_glusterprog(cls):
  831         gluster_cmd_dir = gconf.get("gluster-command-dir")
  832         if rconf.args.subcmd == "slave":
  833             gluster_cmd_dir = gconf.get("slave-gluster-command-dir")
  834         return os.path.join(gluster_cmd_dir, cls.glusterprog)
  835 
  836     def umount_l(self, d):
  837         """perform lazy umount"""
  838         po = Popen(self.make_umount_argv(d), stderr=subprocess.PIPE,
  839                    universal_newlines=True)
  840         po.wait()
  841         return po
  842 
  843     @classmethod
  844     def make_umount_argv(cls, d):
  845         raise NotImplementedError
  846 
  847     def make_mount_argv(self, label=None):
  848         raise NotImplementedError
  849 
  850     def cleanup_mntpt(self, *a):
  851         pass
  852 
  853     def handle_mounter(self, po):
  854         po.wait()
  855 
  856     def inhibit(self, label):
  857         """inhibit a gluster filesystem
  858 
  859         Mount glusterfs over a temporary mountpoint,
  860         change into the mount, and lazy unmount the
  861         filesystem.
  862         """
  863         mpi, mpo = pipe()
  864         mh = Popen.fork()
  865         if mh:
  866             # Parent
  867             os.close(mpi)
  868             fcntl.fcntl(mpo, fcntl.F_SETFD, fcntl.FD_CLOEXEC)
  869             d = None
  870             margv = self.make_mount_argv(label)
  871             if self.mntpt:
  872                 # mntpt is determined pre-mount
  873                 d = self.mntpt
  874                 mnt_msg = d + '\0'
  875                 encoded_msg = mnt_msg.encode()
  876                 os.write(mpo, encoded_msg)
  877             po = Popen(margv, **self.mountkw)
  878             self.handle_mounter(po)
  879             po.terminate_geterr()
  880             logging.debug('auxiliary glusterfs mount in place')
  881             if not d:
  882                 # mntpt is determined during mount
  883                 d = self.mntpt
  884                 mnt_msg = d + '\0'
  885                 encoded_msg = mnt_msg.encode()
  886                 os.write(mpo, encoded_msg)
  887             encoded_msg = 'M'.encode()
  888             os.write(mpo, encoded_msg)
  889             t = syncdutils.Thread(target=lambda: os.chdir(d))
  890             t.start()
  891             tlim = rconf.starttime + gconf.get("connection-timeout")
  892             while True:
  893                 if not t.isAlive():
  894                     break
  895 
  896                 if time.time() >= tlim:
  897                     syncdutils.finalize(exval=1)
  898                 time.sleep(1)
  899             os.close(mpo)
  900             _, rv = syncdutils.waitpid(mh, 0)
  901             if rv:
  902                 rv = (os.WIFEXITED(rv) and os.WEXITSTATUS(rv) or 0) - \
  903                      (os.WIFSIGNALED(rv) and os.WTERMSIG(rv) or 0)
  904                 logging.warn(lf('stale mount possibly left behind',
  905                                 path=d))
  906                 raise GsyncdError("cleaning up temp mountpoint %s "
  907                                   "failed with status %d" %
  908                                   (d, rv))
  909         else:
  910             rv = 0
  911             try:
  912                 os.setsid()
  913                 os.close(mpo)
  914                 mntdata = ''
  915                 while True:
  916                     c = os.read(mpi, 1)
  917                     c = c.decode()
  918                     if not c:
  919                         break
  920                     mntdata += c
  921                 if mntdata:
  922                     mounted = False
  923                     if mntdata[-1] == 'M':
  924                         mntdata = mntdata[:-1]
  925                         assert(mntdata)
  926                         mounted = True
  927                     assert(mntdata[-1] == '\0')
  928                     mntpt = mntdata[:-1]
  929                     assert(mntpt)
  930 
  931                     umount_master = False
  932                     umount_slave = False
  933                     if rconf.args.subcmd == "worker" \
  934                        and not unshare_propagation_supported() \
  935                        and not gconf.get("access-mount"):
  936                         umount_master = True
  937                     if rconf.args.subcmd == "slave" \
  938                        and not gconf.get("slave-access-mount"):
  939                         umount_slave = True
  940 
  941                     if mounted and (umount_master or umount_slave):
  942                         po = self.umount_l(mntpt)
  943                         po.terminate_geterr(fail_on_err=False)
  944                         if po.returncode != 0:
  945                             po.errlog()
  946                             rv = po.returncode
  947                         logging.debug("Lazy umount done: %s" % mntpt)
  948                     if umount_master or umount_slave:
  949                         self.cleanup_mntpt(mntpt)
  950             except:
  951                 logging.exception('mount cleanup failure:')
  952                 rv = 200
  953             os._exit(rv)
  954 
  955         #Polling the dht.subvol.status value.
  956         RETRIES = 10
  957         while not gf_mount_ready():
  958             if RETRIES < 0:
  959                 logging.error('Subvols are not up')
  960                 break
  961             RETRIES -= 1
  962             time.sleep(0.2)
  963 
  964         logging.debug('auxiliary glusterfs mount prepared')
  965 
  966 
  967 class DirectMounter(Mounter):
  968 
  969     """mounter backend which calls mount(8), umount(8) directly"""
  970 
  971     mountkw = {'stderr': subprocess.PIPE, 'universal_newlines': True}
  972     glusterprog = 'glusterfs'
  973 
  974     @staticmethod
  975     def make_umount_argv(d):
  976         return ['umount', '-l', d]
  977 
  978     def make_mount_argv(self, label=None):
  979         self.mntpt = tempfile.mkdtemp(prefix='gsyncd-aux-mount-')
  980         rconf.mount_point = self.mntpt
  981         return [self.get_glusterprog()] + \
  982             ['--' + p for p in self.params] + [self.mntpt]
  983 
  984     def cleanup_mntpt(self, mntpt=None):
  985         if not mntpt:
  986             mntpt = self.mntpt
  987         errno_wrap(os.rmdir, [mntpt], [ENOENT, EBUSY])
  988 
  989 
  990 class MountbrokerMounter(Mounter):
  991 
  992     """mounter backend using the mountbroker gluster service"""
  993 
  994     mountkw = {'stderr': subprocess.PIPE, 'stdout': subprocess.PIPE,
  995                'universal_newlines': True}
  996     glusterprog = 'gluster'
  997 
  998     @classmethod
  999     def make_cli_argv(cls):
 1000         return [cls.get_glusterprog()] + ['--remote-host=localhost'] + \
 1001             gconf.get("gluster-cli-options").split() + ['system::']
 1002 
 1003     @classmethod
 1004     def make_umount_argv(cls, d):
 1005         return cls.make_cli_argv() + ['umount', d, 'lazy']
 1006 
 1007     def make_mount_argv(self, label):
 1008         return self.make_cli_argv() + \
 1009             ['mount', label, 'user-map-root=' +
 1010                 syncdutils.getusername()] + self.params
 1011 
 1012     def handle_mounter(self, po):
 1013         self.mntpt = po.stdout.readline()[:-1]
 1014         rconf.mount_point = self.mntpt
 1015         rconf.mountbroker = True
 1016         self.umount_cmd = self.make_cli_argv() + ['umount']
 1017         rconf.mbr_umount_cmd = self.umount_cmd
 1018         po.stdout.close()
 1019         sup(self, po)
 1020         if po.returncode != 0:
 1021             # if cli terminated with error due to being
 1022             # refused by glusterd, what it put
 1023             # out on stdout is a diagnostic message
 1024             logging.error(lf('glusterd answered', mnt=self.mntpt))
 1025 
 1026 
 1027 class GLUSTERServer(Server):
 1028 
 1029     "server enhancements for a glusterfs backend"""
 1030 
 1031     @classmethod
 1032     def _attr_unpack_dict(cls, xattr, extra_fields=''):
 1033         """generic volume mark fetching/parsing backed"""
 1034         fmt_string = cls.NTV_FMTSTR + extra_fields
 1035         buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string))
 1036         buf = str_to_bytearray(buf)
 1037         vm = struct.unpack(fmt_string, buf)
 1038         m = re.match(
 1039             '(.{8})(.{4})(.{4})(.{4})(.{12})',
 1040             "".join(['%02x' % x for x in vm[2:18]]))
 1041         uuid = '-'.join(m.groups())
 1042         volinfo = {'version': vm[0:2],
 1043                    'uuid': uuid,
 1044                    'retval': vm[18],
 1045                    'volume_mark': vm[19:21],
 1046                    }
 1047         if extra_fields:
 1048             return volinfo, vm[-len(extra_fields):]
 1049         else:
 1050             return volinfo
 1051 
 1052     @classmethod
 1053     def foreign_volume_infos(cls):
 1054         """return list of valid (not expired) foreign volume marks"""
 1055         dict_list = []
 1056         xattr_list = Xattr.llistxattr_buf('.')
 1057         for ele in xattr_list:
 1058             if ele.find('.'.join([cls.GX_NSPACE, 'volume-mark', ''])) == 0:
 1059                 d, x = cls._attr_unpack_dict(ele, cls.FRGN_XTRA_FMT)
 1060                 now = int(time.time())
 1061                 if x[0] > now:
 1062                     logging.debug("volinfo[%s] expires: %d "
 1063                                   "(%d sec later)" %
 1064                                   (d['uuid'], x[0], x[0] - now))
 1065                     d['timeout'] = x[0]
 1066                     dict_list.append(d)
 1067                 else:
 1068                     try:
 1069                         Xattr.lremovexattr('.', ele)
 1070                     except OSError:
 1071                         pass
 1072         return dict_list
 1073 
 1074     @classmethod
 1075     def native_volume_info(cls):
 1076         """get the native volume mark of the underlying gluster volume"""
 1077         try:
 1078             return cls._attr_unpack_dict('.'.join([cls.GX_NSPACE,
 1079                                                    'volume-mark']))
 1080         except OSError:
 1081             ex = sys.exc_info()[1]
 1082             if ex.errno != ENODATA:
 1083                 raise
 1084 
 1085 
 1086 class GLUSTER(object):
 1087 
 1088     """scheme class for gluster:// urls
 1089 
 1090     can be used to represent a gluster slave server
 1091     on slave side, or interface to a remote gluster
 1092     slave on master side, or to represent master
 1093     (slave-ish features come from the mixins, master
 1094     functionality is outsourced to GMaster from master)
 1095     """
 1096     server = GLUSTERServer
 1097 
 1098     def __init__(self, host, volume):
 1099         self.path = "%s:%s" % (host, volume)
 1100         self.host = host
 1101         self.volume = volume
 1102 
 1103         global slv_volume
 1104         global slv_host
 1105         slv_volume = self.volume
 1106         slv_host = self.host
 1107 
 1108     def connect(self):
 1109         """inhibit the resource beyond
 1110 
 1111         Choose mounting backend (direct or mountbroker),
 1112         set up glusterfs parameters and perform the mount
 1113         with given backend
 1114         """
 1115 
 1116         logging.info("Mounting gluster volume locally...")
 1117         t0 = time.time()
 1118         label = gconf.get('mountbroker', None)
 1119         if not label and not privileged():
 1120             label = syncdutils.getusername()
 1121         mounter = label and MountbrokerMounter or DirectMounter
 1122 
 1123         log_file = gconf.get("gluster-log-file")
 1124         if rconf.args.subcmd == "slave":
 1125             log_file = gconf.get("slave-gluster-log-file")
 1126 
 1127         log_level = gconf.get("gluster-log-level")
 1128         if rconf.args.subcmd == "slave":
 1129             log_level = gconf.get("slave-gluster-log-level")
 1130 
 1131         params = gconf.get("gluster-params").split() + \
 1132             ['log-level=' + log_level] + \
 1133             ['log-file=' + log_file, 'volfile-server=' + self.host] + \
 1134             ['volfile-id=' + self.volume, 'client-pid=-1']
 1135 
 1136         self.mounter = mounter(params)
 1137         self.mounter.inhibit(label)
 1138         logging.info(lf("Mounted gluster volume",
 1139                         duration="%.4f" % (time.time() - t0)))
 1140 
 1141     def gmaster_instantiate_tuple(self, slave):
 1142         """return a tuple of the 'one shot' and the 'main crawl'
 1143         class instance"""
 1144         return (gmaster_builder('xsync')(self, slave),
 1145                 gmaster_builder()(self, slave),
 1146                 gmaster_builder('changeloghistory')(self, slave))
 1147 
 1148     def service_loop(self, slave=None):
 1149         """enter service loop
 1150 
 1151         - if slave given, instantiate GMaster and
 1152           pass control to that instance, which implements
 1153           master behavior
 1154         - else do that's what's inherited
 1155         """
 1156         if rconf.args.subcmd == "slave":
 1157             if gconf.get("use-rsync-xattrs") and not privileged():
 1158                 raise GsyncdError(
 1159                     "using rsync for extended attributes is not supported")
 1160 
 1161             repce = RepceServer(
 1162                 self.server, sys.stdin, sys.stdout, gconf.get("sync-jobs"))
 1163             t = syncdutils.Thread(target=lambda: (repce.service_loop(),
 1164                                                   syncdutils.finalize()))
 1165             t.start()
 1166             logging.info("slave listening")
 1167             if gconf.get("slave-timeout") and gconf.get("slave-timeout") > 0:
 1168                 while True:
 1169                     lp = self.server.last_keep_alive
 1170                     time.sleep(gconf.get("slave-timeout"))
 1171                     if lp == self.server.last_keep_alive:
 1172                         logging.info(
 1173                             lf("connection inactive, stopping",
 1174                                timeout=gconf.get("slave-timeout")))
 1175                         break
 1176             else:
 1177                 select((), (), ())
 1178 
 1179             return
 1180 
 1181         class brickserver(Server):
 1182             local_path = rconf.args.local_path
 1183             aggregated = self.server
 1184 
 1185             @classmethod
 1186             def entries(cls, path):
 1187                 e = super(brickserver, cls).entries(path)
 1188                 # on the brick don't mess with /.glusterfs
 1189                 if path == '.':
 1190                     try:
 1191                         e.remove('.glusterfs')
 1192                         e.remove('.trashcan')
 1193                     except ValueError:
 1194                         pass
 1195                 return e
 1196 
 1197             @classmethod
 1198             def lstat(cls, e):
 1199                 """ path based backend stat """
 1200                 return super(brickserver, cls).lstat(e)
 1201 
 1202             @classmethod
 1203             def gfid(cls, e):
 1204                 """ path based backend gfid fetch """
 1205                 return super(brickserver, cls).gfid(e)
 1206 
 1207             @classmethod
 1208             def linkto_check(cls, e):
 1209                 return super(brickserver, cls).linkto_check(e)
 1210 
 1211         # define {,set_}xtime in slave, thus preempting
 1212         # the call to remote, so that it takes data from
 1213         # the local brick
 1214         slave.server.xtime = types.MethodType(
 1215             lambda _self, path, uuid: (
 1216                 brickserver.xtime(path,
 1217                                   uuid + '.' + rconf.args.slave_id)
 1218             ),
 1219             slave.server)
 1220         slave.server.stime = types.MethodType(
 1221             lambda _self, path, uuid: (
 1222                 brickserver.stime(path,
 1223                                   uuid + '.' + rconf.args.slave_id)
 1224             ),
 1225             slave.server)
 1226         slave.server.entry_stime = types.MethodType(
 1227             lambda _self, path, uuid: (
 1228                 brickserver.entry_stime(
 1229                     path,
 1230                     uuid + '.' + rconf.args.slave_id)
 1231             ),
 1232             slave.server)
 1233         slave.server.set_stime = types.MethodType(
 1234             lambda _self, path, uuid, mark: (
 1235                 brickserver.set_stime(path,
 1236                                       uuid + '.' + rconf.args.slave_id,
 1237                                       mark)
 1238             ),
 1239             slave.server)
 1240         slave.server.set_entry_stime = types.MethodType(
 1241             lambda _self, path, uuid, mark: (
 1242                 brickserver.set_entry_stime(
 1243                     path,
 1244                     uuid + '.' + rconf.args.slave_id,
 1245                     mark)
 1246             ),
 1247             slave.server)
 1248 
 1249         (g1, g2, g3) = self.gmaster_instantiate_tuple(slave)
 1250         g1.master.server = brickserver
 1251         g2.master.server = brickserver
 1252         g3.master.server = brickserver
 1253 
 1254         # bad bad bad: bad way to do things like this
 1255         # need to make this elegant
 1256         # register the crawlers and start crawling
 1257         # g1 ==> Xsync, g2 ==> config.change_detector(changelog by default)
 1258         # g3 ==> changelog History
 1259         status = GeorepStatus(gconf.get("state-file"),
 1260                               rconf.args.local_node,
 1261                               rconf.args.local_path,
 1262                               rconf.args.local_node_id,
 1263                               rconf.args.master,
 1264                               rconf.args.slave)
 1265         status.reset_on_worker_start()
 1266 
 1267         try:
 1268             workdir = g2.setup_working_dir()
 1269             # Register only when change_detector is not set to
 1270             # xsync, else agent will generate changelog files
 1271             # in .processing directory of working dir
 1272             if gconf.get("change-detector") != 'xsync':
 1273                 # register with the changelog library
 1274                 # 9 == log level (DEBUG)
 1275                 # 5 == connection retries
 1276                 libgfchangelog.register(rconf.args.local_path,
 1277                                         workdir,
 1278                                         gconf.get("changelog-log-file"),
 1279                                         get_changelog_log_level(
 1280                                             gconf.get("changelog-log-level")),
 1281                                         g2.CHANGELOG_CONN_RETRIES)
 1282 
 1283             register_time = int(time.time())
 1284             g2.register(register_time, status)
 1285             g3.register(register_time, status)
 1286         except ChangelogException as e:
 1287             logging.error(lf("Changelog register failed", error=e))
 1288             sys.exit(1)
 1289 
 1290         g1.register(status=status)
 1291         logging.info(lf("Register time",
 1292                         time=register_time))
 1293         # oneshot: Try to use changelog history api, if not
 1294         # available switch to FS crawl
 1295         # Note: if config.change_detector is xsync then
 1296         # it will not use changelog history api
 1297         try:
 1298             g3.crawlwrap(oneshot=True)
 1299         except PartialHistoryAvailable as e:
 1300             logging.info(lf('Partial history available, using xsync crawl'
 1301                             ' after consuming history',
 1302                             till=e))
 1303             g1.crawlwrap(oneshot=True, register_time=register_time)
 1304         except ChangelogHistoryNotAvailable:
 1305             logging.info('Changelog history not available, using xsync')
 1306             g1.crawlwrap(oneshot=True, register_time=register_time)
 1307         except NoStimeAvailable:
 1308             logging.info('No stime available, using xsync crawl')
 1309             g1.crawlwrap(oneshot=True, register_time=register_time)
 1310         except ChangelogException as e:
 1311             logging.error(lf("Changelog History Crawl failed",
 1312                              error=e))
 1313             sys.exit(1)
 1314 
 1315         try:
 1316             g2.crawlwrap()
 1317         except ChangelogException as e:
 1318             logging.error(lf("Changelog crawl failed", error=e))
 1319             sys.exit(1)
 1320 
 1321 
 1322 class SSH(object):
 1323 
 1324     """scheme class for ssh:// urls
 1325 
 1326     interface to remote slave on master side
 1327     implementing an ssh based proxy
 1328     """
 1329 
 1330     def __init__(self, host, volume):
 1331         self.remote_addr = host
 1332         self.volume = volume
 1333 
 1334     @staticmethod
 1335     def parse_ssh_address(self):
 1336         m = re.match('([^@]+)@(.+)', self.remote_addr)
 1337         if m:
 1338             u, h = m.groups()
 1339         else:
 1340             u, h = syncdutils.getusername(), self.remote_addr
 1341         self.remotehost = h
 1342         return {'user': u, 'host': h}
 1343 
 1344     def start_fd_client(self, i, o):
 1345         """set up RePCe client, handshake with server
 1346 
 1347         It's cut out as a separate method to let
 1348         subclasses hook into client startup
 1349         """
 1350         self.server = RepceClient(i, o)
 1351         rv = self.server.__version__()
 1352         exrv = {'proto': repce.repce_version, 'object': Server.version()}
 1353         da0 = (rv, exrv)
 1354         da1 = ({}, {})
 1355         for i in range(2):
 1356             for k, v in da0[i].items():
 1357                 da1[i][k] = int(v)
 1358         if da1[0] != da1[1]:
 1359             raise GsyncdError(
 1360                 "RePCe major version mismatch: local %s, remote %s" %
 1361                 (exrv, rv))
 1362         slavepath = "/proc/%d/cwd" % self.server.pid()
 1363         self.slaveurl = ':'.join([self.remote_addr, slavepath])
 1364 
 1365     def connect_remote(self):
 1366         """connect to inner slave url through outer ssh url
 1367 
 1368         Wrap the connecting utility in ssh.
 1369 
 1370         Much care is put into daemonizing: in that case
 1371         ssh is started before daemonization, but
 1372         RePCe client is to be created after that (as ssh
 1373         interactive password auth would be defeated by
 1374         a daemonized ssh, while client should be present
 1375         only in the final process). In that case the action
 1376         is taken apart to two parts, this method is ivoked
 1377         once pre-daemon, once post-daemon. Use @go_daemon
 1378         to deiced what part to perform.
 1379 
 1380         [NB. ATM gluster product does not makes use of interactive
 1381         authentication.]
 1382         """
 1383         syncdutils.setup_ssh_ctl(tempfile.mkdtemp(prefix='gsyncd-aux-ssh-'),
 1384                                  self.remote_addr,
 1385                                  self.volume)
 1386 
 1387         logging.info("Initializing SSH connection between master and slave...")
 1388         t0 = time.time()
 1389 
 1390         extra_opts = []
 1391         remote_gsyncd = gconf.get("remote-gsyncd")
 1392         if remote_gsyncd == "":
 1393             remote_gsyncd = "/nonexistent/gsyncd"
 1394 
 1395         if gconf.get("use-rsync-xattrs"):
 1396             extra_opts.append('--use-rsync-xattrs')
 1397 
 1398         args_to_slave = [gconf.get("ssh-command")] + \
 1399             gconf.get("ssh-options").split() + \
 1400             ["-p", str(gconf.get("ssh-port"))] + \
 1401             rconf.ssh_ctl_args + [self.remote_addr] + \
 1402             [remote_gsyncd, "slave"] + \
 1403             extra_opts + \
 1404             [rconf.args.master, rconf.args.slave] + \
 1405             [
 1406                 '--master-node', rconf.args.local_node,
 1407                 '--master-node-id', rconf.args.local_node_id,
 1408                 '--master-brick', rconf.args.local_path,
 1409                 '--local-node', rconf.args.resource_remote,
 1410                 '--local-node-id', rconf.args.resource_remote_id] + \
 1411             [
 1412                 # Add all config arguments here, slave gsyncd will not use
 1413                 # config file in slave side, so all overriding options should
 1414                 # be sent as arguments
 1415                 '--slave-timeout', str(gconf.get("slave-timeout")),
 1416                 '--slave-log-level', gconf.get("slave-log-level"),
 1417                 '--slave-gluster-log-level',
 1418                 gconf.get("slave-gluster-log-level"),
 1419                 '--slave-gluster-command-dir',
 1420                 gconf.get("slave-gluster-command-dir"),
 1421                 '--master-dist-count',
 1422                 str(gconf.get("master-distribution-count"))]
 1423 
 1424         if gconf.get("slave-access-mount"):
 1425             args_to_slave.append('--slave-access-mount')
 1426 
 1427         if rconf.args.debug:
 1428             args_to_slave.append('--debug')
 1429 
 1430         po = Popen(args_to_slave,
 1431                    stdin=subprocess.PIPE, stdout=subprocess.PIPE,
 1432                    stderr=subprocess.PIPE)
 1433         rconf.transport = po
 1434         self.start_fd_client(po.stdout, po.stdin)
 1435         logging.info(lf("SSH connection between master and slave established.",
 1436                         duration="%.4f" % (time.time() - t0)))
 1437 
 1438     def rsync(self, files, *args, **kw):
 1439         """invoke rsync"""
 1440         if not files:
 1441             raise GsyncdError("no files to sync")
 1442         logging.debug("files: " + ", ".join(files))
 1443 
 1444         extra_rsync_flags = []
 1445         # Performance flag, --ignore-missing-args, if rsync version is
 1446         # greater than 3.1.0 then include this flag.
 1447         if gconf.get("rsync-opt-ignore-missing-args") and \
 1448            get_rsync_version(gconf.get("rsync-command")) >= "3.1.0":
 1449             extra_rsync_flags = ["--ignore-missing-args"]
 1450 
 1451         rsync_ssh_opts = [gconf.get("ssh-command")] + \
 1452             gconf.get("ssh-options").split() + \
 1453             ["-p", str(gconf.get("ssh-port"))] + \
 1454             rconf.ssh_ctl_args + \
 1455             gconf.get("rsync-ssh-options").split()
 1456 
 1457         argv = [
 1458             gconf.get("rsync-command"),
 1459             '-aR0',
 1460             '--inplace',
 1461             '--files-from=-',
 1462             '--super',
 1463             '--stats',
 1464             '--numeric-ids',
 1465             '--no-implied-dirs'
 1466         ]
 1467 
 1468         if gconf.get("rsync-opt-existing"):
 1469             argv += ["--existing"]
 1470 
 1471         if gconf.get("sync-xattrs"):
 1472             argv += ['--xattrs']
 1473 
 1474         if gconf.get("sync-acls"):
 1475             argv += ['--acls']
 1476 
 1477         argv = argv + \
 1478             gconf.get("rsync-options").split() + \
 1479             extra_rsync_flags + ['.'] + \
 1480             ["-e", " ".join(rsync_ssh_opts)] + \
 1481             [self.slaveurl]
 1482 
 1483         log_rsync_performance = gconf.getr("log-rsync-performance", False)
 1484 
 1485         if log_rsync_performance:
 1486             # use stdout=PIPE only when log_rsync_performance enabled
 1487             # Else rsync will write to stdout and nobody is there
 1488             # to consume. If PIPE is full rsync hangs.
 1489             po = Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
 1490                        stderr=subprocess.PIPE, universal_newlines=True)
 1491         else:
 1492             po = Popen(argv, stdin=subprocess.PIPE, stderr=subprocess.PIPE,
 1493                        universal_newlines=True)
 1494 
 1495         for f in files:
 1496             po.stdin.write(f)
 1497             po.stdin.write('\0')
 1498 
 1499         stdout, stderr = po.communicate()
 1500 
 1501         if kw.get("log_err", False):
 1502             for errline in stderr.strip().split("\n")[:-1]:
 1503                 logging.error(lf("SYNC Error",
 1504                                  sync_engine="Rsync",
 1505                                  error=errline))
 1506 
 1507         if log_rsync_performance:
 1508             rsync_msg = []
 1509             for line in stdout.split("\n"):
 1510                 if line.startswith("Number of files:") or \
 1511                    line.startswith("Number of regular files transferred:") or \
 1512                    line.startswith("Total file size:") or \
 1513                    line.startswith("Total transferred file size:") or \
 1514                    line.startswith("Literal data:") or \
 1515                    line.startswith("Matched data:") or \
 1516                    line.startswith("Total bytes sent:") or \
 1517                    line.startswith("Total bytes received:") or \
 1518                    line.startswith("sent "):
 1519                     rsync_msg.append(line)
 1520             logging.info(lf("rsync performance",
 1521                             data=", ".join(rsync_msg)))
 1522 
 1523         return po
 1524 
 1525     def tarssh(self, files, log_err=False):
 1526         """invoke tar+ssh
 1527         -z (compress) can be use if needed, but omitting it now
 1528         as it results in weird error (tar+ssh errors out (errcode: 2)
 1529         """
 1530         if not files:
 1531             raise GsyncdError("no files to sync")
 1532         logging.debug("files: " + ", ".join(files))
 1533         (host, rdir) = self.slaveurl.split(':')
 1534 
 1535         tar_cmd = ["tar"] + \
 1536             ["--sparse", "-cf", "-", "--files-from", "-"]
 1537         ssh_cmd = gconf.get("ssh-command").split() + \
 1538             gconf.get("ssh-options-tar").split() + \
 1539             ["-p", str(gconf.get("ssh-port"))] + \
 1540             [host, "tar"] + \
 1541             ["--overwrite", "-xf", "-", "-C", rdir]
 1542         p0 = Popen(tar_cmd, stdout=subprocess.PIPE,
 1543                    stdin=subprocess.PIPE, stderr=subprocess.PIPE,
 1544                    universal_newlines=True)
 1545         p1 = Popen(ssh_cmd, stdin=p0.stdout, stderr=subprocess.PIPE,
 1546                    universal_newlines=True)
 1547         for f in files:
 1548             p0.stdin.write(f)
 1549             p0.stdin.write('\n')
 1550 
 1551         p0.stdin.close()
 1552         p0.stdout.close()  # Allow p0 to receive a SIGPIPE if p1 exits.
 1553 
 1554         # stdin and stdout of p0 is already closed, Reset to None and
 1555         # wait for child process to complete
 1556         p0.stdin = None
 1557         p0.stdout = None
 1558 
 1559         def wait_for_tar(p0):
 1560             _, stderr = p0.communicate()
 1561             if log_err:
 1562                 for errline in stderr.strip().split("\n")[:-1]:
 1563                     if "No such file or directory" not in errline:
 1564                         logging.error(lf("SYNC Error",
 1565                                          sync_engine="Tarssh",
 1566                                          error=errline))
 1567 
 1568         t = syncdutils.Thread(target=wait_for_tar, args=(p0, ))
 1569         # wait for tar to terminate, collecting any errors, further
 1570         # waiting for transfer to complete
 1571         t.start()
 1572 
 1573         # wait for ssh process
 1574         _, stderr1 = p1.communicate()
 1575         t.join()
 1576 
 1577         if log_err:
 1578             for errline in stderr1.strip().split("\n")[:-1]:
 1579                 logging.error(lf("SYNC Error",
 1580                                  sync_engine="Tarssh",
 1581                                  error=errline))
 1582 
 1583         return p1