"Fossies" - the Fresh Open Source Software Archive

Member "glusterfs-8.6/geo-replication/syncdaemon/master.py" (20 Aug 2021, 80655 Bytes) of package /linux/misc/glusterfs-8.6.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "master.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 8.5_vs_8.6.

    1 #
    2 # Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
    3 # This file is part of GlusterFS.
    4 
    5 # This file is licensed to you under your choice of the GNU Lesser
    6 # General Public License, version 3 or any later version (LGPLv3 or
    7 # later), or the GNU General Public License, version 2 (GPLv2), in all
    8 # cases as published by the Free Software Foundation.
    9 #
   10 
   11 import os
   12 import sys
   13 import time
   14 import stat
   15 import logging
   16 import fcntl
   17 import string
   18 import errno
   19 import tarfile
   20 from errno import ENOENT, ENODATA, EEXIST, EACCES, EAGAIN, ESTALE, EINTR
   21 from threading import Condition, Lock
   22 from datetime import datetime
   23 
   24 import gsyncdconfig as gconf
   25 import libgfchangelog
   26 from rconf import rconf
   27 from syncdutils import (Thread, GsyncdError, escape_space_newline,
   28                         unescape_space_newline, gauxpfx, escape,
   29                         lstat, errno_wrap, FreeObject, lf, matching_disk_gfid,
   30                         NoStimeAvailable, PartialHistoryAvailable)
   31 
   32 URXTIME = (-1, 0)
   33 
   34 # Default rollover time set in changelog translator
   35 # changelog rollover time is hardcoded here to avoid the
   36 # xsync usage when crawling switch happens from history
   37 # to changelog. If rollover time increased in translator
   38 # then geo-rep can enter into xsync crawl after history
   39 # crawl before starting live changelog crawl.
   40 CHANGELOG_ROLLOVER_TIME = 15
   41 
   42 # Utility functions to help us to get to closer proximity
   43 # of the DRY principle (no, don't look for elevated or
   44 # perspectivistic things here)
   45 
   46 
   47 def _xtime_now():
   48     t = time.time()
   49     sec = int(t)
   50     nsec = int((t - sec) * 1000000)
   51     return (sec, nsec)
   52 
   53 
   54 def _volinfo_hook_relax_foreign(self):
   55     volinfo_sys = self.get_sys_volinfo()
   56     fgn_vi = volinfo_sys[self.KFGN]
   57     if fgn_vi:
   58         expiry = fgn_vi['timeout'] - int(time.time()) + 1
   59         logging.info(lf('foreign volume info found, waiting for expiry',
   60                         expiry=expiry))
   61         time.sleep(expiry)
   62         volinfo_sys = self.get_sys_volinfo()
   63     return volinfo_sys
   64 
   65 
   66 def edct(op, **ed):
   67     dct = {}
   68     dct['op'] = op
   69     # This is used in automatic gfid conflict resolution.
   70     # When marked True, it's skipped during re-processing.
   71     dct['skip_entry'] = False
   72     for k in ed:
   73         if k == 'stat':
   74             st = ed[k]
   75             dst = dct['stat'] = {}
   76             if st:
   77                 dst['uid'] = st.st_uid
   78                 dst['gid'] = st.st_gid
   79                 dst['mode'] = st.st_mode
   80                 dst['atime'] = st.st_atime
   81                 dst['mtime'] = st.st_mtime
   82         else:
   83             dct[k] = ed[k]
   84     return dct
   85 
   86 
   87 # The API!
   88 
   89 def gmaster_builder(excrawl=None):
   90     """produce the GMaster class variant corresponding
   91        to sync mode"""
   92     this = sys.modules[__name__]
   93     modemixin = gconf.get("special-sync-mode")
   94     if not modemixin:
   95         modemixin = 'normal'
   96 
   97     if gconf.get("change-detector") == 'xsync':
   98         changemixin = 'xsync'
   99     elif excrawl:
  100         changemixin = excrawl
  101     else:
  102         changemixin = gconf.get("change-detector")
  103 
  104     logging.debug(lf('setting up change detection mode',
  105                      mode=changemixin))
  106     modemixin = getattr(this, modemixin.capitalize() + 'Mixin')
  107     crawlmixin = getattr(this, 'GMaster' + changemixin.capitalize() + 'Mixin')
  108 
  109     if gconf.get("use-rsync-xattrs"):
  110         sendmarkmixin = SendmarkRsyncMixin
  111     else:
  112         sendmarkmixin = SendmarkNormalMixin
  113 
  114     if gconf.get("ignore-deletes"):
  115         purgemixin = PurgeNoopMixin
  116     else:
  117         purgemixin = PurgeNormalMixin
  118 
  119     if gconf.get("sync-method") == "tarssh":
  120         syncengine = TarSSHEngine
  121     else:
  122         syncengine = RsyncEngine
  123 
  124     class _GMaster(crawlmixin, modemixin, sendmarkmixin,
  125                    purgemixin, syncengine):
  126         pass
  127 
  128     return _GMaster
  129 
  130 
  131 # Mixin classes that implement the data format
  132 # and logic particularities of the certain
  133 # sync modes
  134 
  135 class NormalMixin(object):
  136 
  137     """normal geo-rep behavior"""
  138 
  139     minus_infinity = URXTIME
  140 
  141     # following staticmethods ideally would be
  142     # methods of an xtime object (in particular,
  143     # implementing the hooks needed for comparison
  144     # operators), but at this point we don't yet
  145     # have a dedicated xtime class
  146 
  147     @staticmethod
  148     def serialize_xtime(xt):
  149         return "%d.%d" % tuple(xt)
  150 
  151     @staticmethod
  152     def deserialize_xtime(xt):
  153         return tuple(int(x) for x in xt.split("."))
  154 
  155     @staticmethod
  156     def native_xtime(xt):
  157         return xt
  158 
  159     @staticmethod
  160     def xtime_geq(xt0, xt1):
  161         return xt0 >= xt1
  162 
  163     def make_xtime_opts(self, is_master, opts):
  164         if 'create' not in opts:
  165             opts['create'] = is_master
  166         if 'default_xtime' not in opts:
  167             opts['default_xtime'] = URXTIME
  168 
  169     def xtime_low(self, rsc, path, **opts):
  170         if rsc == self.master:
  171             xt = rsc.server.xtime(path, self.uuid)
  172         else:
  173             xt = rsc.server.stime(path, self.uuid)
  174             if isinstance(xt, int) and xt == ENODATA:
  175                 xt = rsc.server.xtime(path, self.uuid)
  176                 if not isinstance(xt, int):
  177                     self.slave.server.set_stime(path, self.uuid, xt)
  178         if isinstance(xt, int) and xt != ENODATA:
  179             return xt
  180         if xt == ENODATA or xt < self.volmark:
  181             if opts['create']:
  182                 xt = _xtime_now()
  183                 rsc.server.aggregated.set_xtime(path, self.uuid, xt)
  184             else:
  185                 zero_zero = (0, 0)
  186                 if xt != zero_zero:
  187                     xt = opts['default_xtime']
  188         return xt
  189 
  190     def keepalive_payload_hook(self, timo, gap):
  191         # first grab a reference as self.volinfo
  192         # can be changed in main thread
  193         vi = self.volinfo
  194         if vi:
  195             # then have a private copy which we can mod
  196             vi = vi.copy()
  197             vi['timeout'] = int(time.time()) + timo
  198         else:
  199             # send keep-alive more frequently to
  200             # avoid a delay in announcing our volume info
  201             # to slave if it becomes established in the
  202             # meantime
  203             gap = min(10, gap)
  204         return (vi, gap)
  205 
  206     def volinfo_hook(self):
  207         return self.get_sys_volinfo()
  208 
  209     def xtime_reversion_hook(self, path, xtl, xtr):
  210         if xtr > xtl:
  211             raise GsyncdError("timestamp corruption for " + path)
  212 
  213     def need_sync(self, e, xte, xtrd):
  214         return xte > xtrd
  215 
  216     def set_slave_xtime(self, path, mark):
  217         self.slave.server.set_stime(path, self.uuid, mark)
  218         # self.slave.server.set_xtime_remote(path, self.uuid, mark)
  219 
  220 
  221 class PartialMixin(NormalMixin):
  222 
  223     """a variant tuned towards operation with a master
  224        that has partial info of the slave (brick typically)"""
  225 
  226     def xtime_reversion_hook(self, path, xtl, xtr):
  227         pass
  228 
  229 
  230 class RecoverMixin(NormalMixin):
  231 
  232     """a variant that differs from normal in terms
  233        of ignoring non-indexed files"""
  234 
  235     @staticmethod
  236     def make_xtime_opts(is_master, opts):
  237         if 'create' not in opts:
  238             opts['create'] = False
  239         if 'default_xtime' not in opts:
  240             opts['default_xtime'] = URXTIME
  241 
  242     def keepalive_payload_hook(self, timo, gap):
  243         return (None, gap)
  244 
  245     def volinfo_hook(self):
  246         return _volinfo_hook_relax_foreign(self)
  247 
  248 # Further mixins for certain tunable behaviors
  249 
  250 
  251 class SendmarkNormalMixin(object):
  252 
  253     def sendmark_regular(self, *a, **kw):
  254         return self.sendmark(*a, **kw)
  255 
  256 
  257 class SendmarkRsyncMixin(object):
  258 
  259     def sendmark_regular(self, *a, **kw):
  260         pass
  261 
  262 
  263 class PurgeNormalMixin(object):
  264 
  265     def purge_missing(self, path, names):
  266         self.slave.server.purge(path, names)
  267 
  268 
  269 class PurgeNoopMixin(object):
  270 
  271     def purge_missing(self, path, names):
  272         pass
  273 
  274 
  275 class TarSSHEngine(object):
  276 
  277     """Sync engine that uses tar(1) piped over ssh(1)
  278        for data transfers. Good for lots of small files.
  279     """
  280 
  281     def a_syncdata(self, files):
  282         logging.debug(lf("Files", files=files))
  283 
  284         for f in files:
  285             pb = self.syncer.add(f)
  286 
  287             def regjob(se, xte, pb):
  288                 rv = pb.wait()
  289                 if rv[0]:
  290                     logging.debug(lf('synced', file=se))
  291                     return True
  292                 else:
  293                     # stat check for file presence
  294                     st = lstat(se)
  295                     if isinstance(st, int):
  296                         # file got unlinked in the interim
  297                         self.unlinked_gfids.add(se)
  298                         return True
  299 
  300             self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)
  301 
  302     def syncdata_wait(self):
  303         if self.wait(self.FLAT_DIR_HIERARCHY, None):
  304             return True
  305 
  306     def syncdata(self, files):
  307         self.a_syncdata(files)
  308         self.syncdata_wait()
  309 
  310 
  311 class RsyncEngine(object):
  312 
  313     """Sync engine that uses rsync(1) for data transfers"""
  314 
  315     def a_syncdata(self, files):
  316         logging.debug(lf("files", files=files))
  317 
  318         for f in files:
  319             logging.debug(lf('candidate for syncing', file=f))
  320             pb = self.syncer.add(f)
  321 
  322             def regjob(se, xte, pb):
  323                 rv = pb.wait()
  324                 if rv[0]:
  325                     logging.debug(lf('synced', file=se))
  326                     return True
  327                 else:
  328                     # stat to check if the file exist
  329                     st = lstat(se)
  330                     if isinstance(st, int):
  331                         # file got unlinked in the interim
  332                         self.unlinked_gfids.add(se)
  333                         return True
  334 
  335             self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)
  336 
  337     def syncdata_wait(self):
  338         if self.wait(self.FLAT_DIR_HIERARCHY, None):
  339             return True
  340 
  341     def syncdata(self, files):
  342         self.a_syncdata(files)
  343         self.syncdata_wait()
  344 
  345 
  346 class GMasterCommon(object):
  347 
  348     """abstract class impementling master role"""
  349 
  350     KFGN = 0
  351     KNAT = 1
  352 
  353     def get_sys_volinfo(self):
  354         """query volume marks on fs root
  355 
  356         err out on multiple foreign masters
  357         """
  358         fgn_vis, nat_vi = (
  359             self.master.server.aggregated.foreign_volume_infos(),
  360             self.master.server.aggregated.native_volume_info())
  361         fgn_vi = None
  362         if fgn_vis:
  363             if len(fgn_vis) > 1:
  364                 raise GsyncdError("cannot work with multiple foreign masters")
  365             fgn_vi = fgn_vis[0]
  366         return fgn_vi, nat_vi
  367 
  368     @property
  369     def uuid(self):
  370         if self.volinfo:
  371             return self.volinfo['uuid']
  372 
  373     @property
  374     def volmark(self):
  375         if self.volinfo:
  376             return self.volinfo['volume_mark']
  377 
  378     def get_entry_stime(self):
  379         data = self.slave.server.entry_stime(".", self.uuid)
  380         if isinstance(data, int):
  381             data = None
  382         return data
  383 
  384     def get_data_stime(self):
  385         data = self.slave.server.stime(".", self.uuid)
  386         if isinstance(data, int):
  387             data = None
  388         return data
  389 
  390     def xtime(self, path, *a, **opts):
  391         """get amended xtime
  392 
  393         as of amending, we can create missing xtime, or
  394         determine a valid value if what we get is expired
  395         (as of the volume mark expiry); way of amendig
  396         depends on @opts and on subject of query (master
  397         or slave).
  398         """
  399         if a:
  400             rsc = a[0]
  401         else:
  402             rsc = self.master
  403         self.make_xtime_opts(rsc == self.master, opts)
  404         return self.xtime_low(rsc, path, **opts)
  405 
  406     def __init__(self, master, slave):
  407         self.master = master
  408         self.slave = slave
  409         self.jobtab = {}
  410         if gconf.get("sync-method") == "tarssh":
  411             self.syncer = Syncer(slave, self.slave.tarssh, [2])
  412         else:
  413             # partial transfer (cf. rsync(1)), that's normal
  414             self.syncer = Syncer(slave, self.slave.rsync, [23, 24])
  415         # crawls vs. turns:
  416         # - self.crawls is simply the number of crawl() invocations on root
  417         # - one turn is a maximal consecutive sequence of crawls so that each
  418         #   crawl in it detects a change to be synced
  419         # - self.turns is the number of turns since start
  420         # - self.total_turns is a limit so that if self.turns reaches it, then
  421         #   we exit (for diagnostic purposes)
  422         # so, eg., if the master fs changes unceasingly, self.turns will remain
  423         # 0.
  424         self.crawls = 0
  425         self.turns = 0
  426         self.total_turns = rconf.turns
  427         self.crawl_start = datetime.now()
  428         self.lastreport = {'crawls': 0, 'turns': 0, 'time': 0}
  429         self.start = None
  430         self.change_seen = None
  431         # the actual volinfo we make use of
  432         self.volinfo = None
  433         self.terminate = False
  434         self.sleep_interval = 1
  435         self.unlinked_gfids = set()
  436 
  437     def init_keep_alive(cls):
  438         """start the keep-alive thread """
  439         timo = gconf.get("slave-timeout", 0)
  440         if timo > 0:
  441             def keep_alive():
  442                 while True:
  443                     vi, gap = cls.keepalive_payload_hook(timo, timo * 0.5)
  444                     cls.slave.server.keep_alive(vi)
  445                     time.sleep(gap)
  446             t = Thread(target=keep_alive)
  447             t.start()
  448 
  449     def mgmt_lock(self):
  450 
  451         """Take management volume lock """
  452         if rconf.mgmt_lock_fd:
  453             try:
  454                 fcntl.lockf(rconf.mgmt_lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
  455                 return True
  456             except:
  457                 ex = sys.exc_info()[1]
  458                 if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN):
  459                     return False
  460                 raise
  461 
  462         fd = None
  463         bname = str(self.uuid) + "_" + rconf.args.slave_id + "_subvol_" \
  464             + str(rconf.args.subvol_num) + ".lock"
  465         mgmt_lock_dir = os.path.join(gconf.get("meta-volume-mnt"), "geo-rep")
  466         path = os.path.join(mgmt_lock_dir, bname)
  467         logging.debug(lf("lock file path", path=path))
  468         try:
  469             fd = os.open(path, os.O_CREAT | os.O_RDWR)
  470         except OSError:
  471             ex = sys.exc_info()[1]
  472             if ex.errno == ENOENT:
  473                 logging.info("Creating geo-rep directory in meta volume...")
  474                 try:
  475                     os.makedirs(mgmt_lock_dir)
  476                 except OSError:
  477                     ex = sys.exc_info()[1]
  478                     if ex.errno == EEXIST:
  479                         pass
  480                     else:
  481                         raise
  482                 fd = os.open(path, os.O_CREAT | os.O_RDWR)
  483             else:
  484                 raise
  485         try:
  486             fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
  487             # Save latest FD for future use
  488             rconf.mgmt_lock_fd = fd
  489         except:
  490             ex = sys.exc_info()[1]
  491             if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN):
  492                 # cannot grab, it's taken
  493                 rconf.mgmt_lock_fd = fd
  494                 return False
  495             raise
  496 
  497         return True
  498 
  499     def should_crawl(self):
  500         if not gconf.get("use-meta-volume"):
  501             return rconf.args.local_node_id in self.master.server.node_uuid()
  502 
  503         if not os.path.ismount(gconf.get("meta-volume-mnt")):
  504             logging.error("Meta-volume is not mounted. Worker Exiting...")
  505             sys.exit(1)
  506         return self.mgmt_lock()
  507 
  508     def register(self):
  509         self.register()
  510 
  511     def crawlwrap(self, oneshot=False, register_time=None):
  512         if oneshot:
  513             # it's important to do this during the oneshot crawl as
  514             # for a passive gsyncd (ie. in a replicate scenario)
  515             # the keepalive thread would keep the connection alive.
  516             self.init_keep_alive()
  517 
  518         # If crawlwrap is called when partial history available,
  519         # then it sets register_time which is the time when geo-rep
  520         # worker registered to changelog consumption. Since nsec is
  521         # not considered in register time, there are chances of skipping
  522         # changes detection in xsync crawl. This limit will be reset when
  523         # crawlwrap is called again.
  524         self.live_changelog_start_time = None
  525         if register_time:
  526             self.live_changelog_start_time = (register_time, 0)
  527 
  528         # no need to maintain volinfo state machine.
  529         # in a cascading setup, each geo-replication session is
  530         # independent (ie. 'volume-mark' and 'xtime' are not
  531         # propagated). This is because the slave's xtime is now
  532         # stored on the master itself. 'volume-mark' just identifies
  533         # that we are in a cascading setup and need to enable
  534         # 'geo-replication.ignore-pid-check' option.
  535         volinfo_sys = self.volinfo_hook()
  536         self.volinfo = volinfo_sys[self.KNAT]
  537         inter_master = volinfo_sys[self.KFGN]
  538         logging.debug("%s master with volume id %s ..." %
  539                       (inter_master and "intermediate" or "primary",
  540                        self.uuid))
  541         rconf.volume_id = self.uuid
  542         if self.volinfo:
  543             if self.volinfo['retval']:
  544                 logging.warn(lf("master cluster's info may not be valid",
  545                                 error=self.volinfo['retval']))
  546         else:
  547             raise GsyncdError("master volinfo unavailable")
  548         self.lastreport['time'] = time.time()
  549 
  550         t0 = time.time()
  551         crawl = self.should_crawl()
  552         while not self.terminate:
  553             if self.start:
  554                 logging.debug("... crawl #%d done, took %.6f seconds" %
  555                               (self.crawls, time.time() - self.start))
  556             self.start = time.time()
  557             should_display_info = self.start - self.lastreport['time'] >= 60
  558             if should_display_info:
  559                 logging.debug("%d crawls, %d turns",
  560                               self.crawls - self.lastreport['crawls'],
  561                               self.turns - self.lastreport['turns'])
  562                 self.lastreport.update(crawls=self.crawls,
  563                                        turns=self.turns,
  564                                        time=self.start)
  565             t1 = time.time()
  566             if int(t1 - t0) >= gconf.get("replica-failover-interval"):
  567                 crawl = self.should_crawl()
  568                 t0 = t1
  569             self.update_worker_remote_node()
  570             if not crawl:
  571                 self.status.set_passive()
  572                 # bring up _this_ brick to the cluster stime
  573                 # which is min of cluster (but max of the replicas)
  574                 brick_stime = self.xtime('.', self.slave)
  575                 cluster_stime = self.master.server.aggregated.stime_mnt(
  576                     '.', '.'.join([str(self.uuid), rconf.args.slave_id]))
  577                 logging.debug(lf("Crawl info",
  578                                  cluster_stime=cluster_stime,
  579                                  brick_stime=brick_stime))
  580 
  581                 if not isinstance(cluster_stime, int):
  582                     if brick_stime < cluster_stime:
  583                         self.slave.server.set_stime(
  584                             self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime)
  585                         self.upd_stime(cluster_stime)
  586                         # Purge all changelogs available in processing dir
  587                         # less than cluster_stime
  588                         proc_dir = os.path.join(self.tempdir,
  589                                                 ".processing")
  590 
  591                         if os.path.exists(proc_dir):
  592                             to_purge = [f for f in os.listdir(proc_dir)
  593                                         if (f.startswith("CHANGELOG.") and
  594                                             int(f.split('.')[-1]) <
  595                                             cluster_stime[0])]
  596                             for f in to_purge:
  597                                 os.remove(os.path.join(proc_dir, f))
  598 
  599                 time.sleep(5)
  600                 continue
  601 
  602             self.status.set_active()
  603             self.crawl()
  604 
  605             if oneshot:
  606                 return
  607             time.sleep(self.sleep_interval)
  608 
  609     @staticmethod
  610     def humantime(*tpair):
  611         """format xtime-like (sec, nsec) pair to human readable format"""
  612         ts = datetime.fromtimestamp(float('.'.join(str(n) for n in tpair))).\
  613             strftime("%Y-%m-%d %H:%M:%S")
  614         if len(tpair) > 1:
  615             ts += '.' + str(tpair[1])
  616         return ts
  617 
  618     def _crawl_time_format(self, crawl_time):
  619         # Ex: 5 years, 4 days, 20:23:10
  620         years, days = divmod(crawl_time.days, 365.25)
  621         years = int(years)
  622         days = int(days)
  623 
  624         date = ""
  625         m, s = divmod(crawl_time.seconds, 60)
  626         h, m = divmod(m, 60)
  627 
  628         if years != 0:
  629             date += "%s %s " % (years, "year" if years == 1 else "years")
  630         if days != 0:
  631             date += "%s %s " % (days, "day" if days == 1 else "days")
  632 
  633         date += "%s:%s:%s" % (string.zfill(h, 2),
  634                               string.zfill(m, 2), string.zfill(s, 2))
  635         return date
  636 
  637     def add_job(self, path, label, job, *a, **kw):
  638         """insert @job function to job table at @path with @label"""
  639         if self.jobtab.get(path) is None:
  640             self.jobtab[path] = []
  641         self.jobtab[path].append((label, a, lambda: job(*a, **kw)))
  642 
  643     def add_failjob(self, path, label):
  644         """invoke .add_job with a job that does nothing just fails"""
  645         logging.debug('salvaged: ' + label)
  646         self.add_job(path, label, lambda: False)
  647 
  648     def wait(self, path, *args):
  649         """perform jobs registered for @path
  650 
  651         Reset jobtab entry for @path,
  652         determine success as the conjunction of
  653         success of all the jobs. In case of
  654         success, call .sendmark on @path
  655         """
  656         jobs = self.jobtab.pop(path, [])
  657         succeed = True
  658         for j in jobs:
  659             ret = j[-1]()
  660             if not ret:
  661                 succeed = False
  662         if succeed and not args[0] is None:
  663             self.sendmark(path, *args)
  664         return succeed
  665 
  666     def sendmark(self, path, mark, adct=None):
  667         """update slave side xtime for @path to master side xtime
  668 
  669         also can send a setattr payload (see Server.setattr).
  670         """
  671         if adct:
  672             self.slave.server.setattr(path, adct)
  673         self.set_slave_xtime(path, mark)
  674 
  675 
  676 class XCrawlMetadata(object):
  677     def __init__(self, st_uid, st_gid, st_mode, st_atime, st_mtime):
  678         self.st_uid = int(st_uid)
  679         self.st_gid = int(st_gid)
  680         self.st_mode = int(st_mode)
  681         self.st_atime = float(st_atime)
  682         self.st_mtime = float(st_mtime)
  683 
  684 
  685 class GMasterChangelogMixin(GMasterCommon):
  686 
  687     """ changelog based change detection and syncing """
  688 
  689     # index for change type and entry
  690     IDX_START = 0
  691     IDX_END = 2
  692     UNLINK_ENTRY = 2
  693 
  694     POS_GFID = 0
  695     POS_TYPE = 1
  696     POS_ENTRY1 = -1
  697 
  698     TYPE_META = "M "
  699     TYPE_GFID = "D "
  700     TYPE_ENTRY = "E "
  701 
  702     MAX_EF_RETRIES = 10
  703     MAX_OE_RETRIES = 10
  704 
  705     # flat directory hierarchy for gfid based access
  706     FLAT_DIR_HIERARCHY = '.'
  707 
  708     CHANGELOG_CONN_RETRIES = 5
  709 
  710     def init_fop_batch_stats(self):
  711         self.batch_stats = {
  712             "CREATE": 0,
  713             "MKNOD": 0,
  714             "UNLINK": 0,
  715             "MKDIR": 0,
  716             "RMDIR": 0,
  717             "LINK": 0,
  718             "SYMLINK": 0,
  719             "RENAME": 0,
  720             "SETATTR": 0,
  721             "SETXATTR": 0,
  722             "XATTROP": 0,
  723             "DATA": 0,
  724             "ENTRY_SYNC_TIME": 0,
  725             "META_SYNC_TIME": 0,
  726             "DATA_START_TIME": 0
  727         }
  728 
  729     def update_fop_batch_stats(self, ty):
  730         if ty in ['FSETXATTR']:
  731             ty = 'SETXATTR'
  732         self.batch_stats[ty] = self.batch_stats.get(ty, 0) + 1
  733 
  734     def archive_and_purge_changelogs(self, changelogs):
  735         # Creates tar file instead of tar.gz, since changelogs will
  736         # be appended to existing tar. archive name is
  737         # archive_<YEAR><MONTH>.tar
  738         archive_name = "archive_%s.tar" % datetime.today().strftime(
  739             gconf.get("changelog-archive-format"))
  740 
  741         try:
  742             tar = tarfile.open(os.path.join(self.processed_changelogs_dir,
  743                                             archive_name),
  744                                "a")
  745         except tarfile.ReadError:
  746             tar = tarfile.open(os.path.join(self.processed_changelogs_dir,
  747                                             archive_name),
  748                                "w")
  749 
  750         for f in changelogs:
  751             try:
  752                 f = os.path.basename(f)
  753                 tar.add(os.path.join(self.processed_changelogs_dir, f),
  754                         arcname=os.path.basename(f))
  755             except:
  756                 exc = sys.exc_info()[1]
  757                 if ((isinstance(exc, OSError) or
  758                      isinstance(exc, IOError)) and exc.errno == ENOENT):
  759                     continue
  760                 else:
  761                     tar.close()
  762                     raise
  763         tar.close()
  764 
  765         for f in changelogs:
  766             try:
  767                 f = os.path.basename(f)
  768                 os.remove(os.path.join(self.processed_changelogs_dir, f))
  769             except OSError as e:
  770                 if e.errno == errno.ENOENT:
  771                     continue
  772                 else:
  773                     raise
  774 
  775     def setup_working_dir(self):
  776         workdir = os.path.join(gconf.get("working-dir"),
  777                                escape(rconf.args.local_path))
  778         logging.debug('changelog working dir %s' % workdir)
  779         return workdir
  780 
  781     def log_failures(self, failures, entry_key, gfid_prefix, log_prefix):
  782         num_failures = 0
  783         for failure in failures:
  784             st = lstat(os.path.join(gfid_prefix, failure[0][entry_key]))
  785             if not isinstance(st, int):
  786                 num_failures += 1
  787                 logging.error(lf('%s FAILED' % log_prefix,
  788                                  data=failure))
  789                 if failure[0]['op'] == 'MKDIR':
  790                     raise GsyncdError("The above directory failed to sync."
  791                                       " Please fix it to proceed further.")
  792 
  793         self.status.inc_value("failures", num_failures)
  794 
  795     def fix_possible_entry_failures(self, failures, retry_count, entries):
  796         pfx = gauxpfx()
  797         fix_entry_ops = []
  798         failures1 = []
  799         remove_gfids = set()
  800         for failure in failures:
  801             if failure[2]['name_mismatch']:
  802                 pbname = failure[2]['slave_entry']
  803             elif failure[2]['dst']:
  804                 pbname = failure[0]['entry1']
  805             else:
  806                 pbname = failure[0]['entry']
  807 
  808             op = failure[0]['op']
  809             # name exists but gfid is different
  810             if failure[2]['gfid_mismatch'] or failure[2]['name_mismatch']:
  811                 slave_gfid = failure[2]['slave_gfid']
  812                 st = lstat(os.path.join(pfx, slave_gfid))
  813                 # Takes care of scenarios with no hardlinks
  814                 if isinstance(st, int) and st == ENOENT:
  815                     logging.debug(lf('Entry not present on master. Fixing gfid '
  816                                     'mismatch in slave. Deleting the entry',
  817                                     retry_count=retry_count,
  818                                     entry=repr(failure)))
  819                     # Add deletion to fix_entry_ops list
  820                     if failure[2]['slave_isdir']:
  821                         fix_entry_ops.append(
  822                             edct('RMDIR',
  823                                  gfid=failure[2]['slave_gfid'],
  824                                  entry=pbname))
  825                     else:
  826                         fix_entry_ops.append(
  827                             edct('UNLINK',
  828                                  gfid=failure[2]['slave_gfid'],
  829                                  entry=pbname))
  830                     remove_gfids.add(slave_gfid)
  831                     if op in ['RENAME']:
  832                         # If renamed gfid doesn't exists on master, remove
  833                         # rename entry and unlink src on slave
  834                         st = lstat(os.path.join(pfx, failure[0]['gfid']))
  835                         if isinstance(st, int) and st == ENOENT:
  836                             logging.debug("Unlink source %s" % repr(failure))
  837                             remove_gfids.add(failure[0]['gfid'])
  838                             fix_entry_ops.append(
  839                                 edct('UNLINK',
  840                                      gfid=failure[0]['gfid'],
  841                                      entry=failure[0]['entry']))
  842                 # Takes care of scenarios of hardlinks/renames on master
  843                 elif not isinstance(st, int):
  844                     if matching_disk_gfid(slave_gfid, pbname):
  845                         # Safe to ignore the failure as master contains same
  846                         # file with same gfid. Remove entry from entries list
  847                         logging.debug(lf('Fixing gfid mismatch in slave. '
  848                                         ' Safe to ignore, take out entry',
  849                                         retry_count=retry_count,
  850                                         entry=repr(failure)))
  851                         remove_gfids.add(failure[0]['gfid'])
  852                         if op == 'RENAME':
  853                             fix_entry_ops.append(
  854                                 edct('UNLINK',
  855                                      gfid=failure[0]['gfid'],
  856                                      entry=failure[0]['entry']))
  857                     # The file exists on master but with different name.
  858                     # Probably renamed and got missed during xsync crawl.
  859                     elif failure[2]['slave_isdir']:
  860                         realpath = os.readlink(os.path.join(
  861                                                rconf.args.local_path,
  862                                                ".glusterfs",
  863                                                slave_gfid[0:2],
  864                                                slave_gfid[2:4],
  865                                                slave_gfid))
  866                         dst_entry = os.path.join(pfx, realpath.split('/')[-2],
  867                                                  realpath.split('/')[-1])
  868                         src_entry = pbname
  869                         logging.debug(lf('Fixing dir name/gfid mismatch in '
  870                                         'slave', retry_count=retry_count,
  871                                         entry=repr(failure)))
  872                         if src_entry == dst_entry:
  873                             # Safe to ignore the failure as master contains
  874                             # same directory as in slave with same gfid.
  875                             # Remove the failure entry from entries list
  876                             logging.debug(lf('Fixing dir name/gfid mismatch'
  877                                             ' in slave. Safe to ignore, '
  878                                             'take out entry',
  879                                             retry_count=retry_count,
  880                                             entry=repr(failure)))
  881                             try:
  882                                 entries.remove(failure[0])
  883                             except ValueError:
  884                                 pass
  885                         else:
  886                             rename_dict = edct('RENAME', gfid=slave_gfid,
  887                                                entry=src_entry,
  888                                                entry1=dst_entry, stat=st,
  889                                                link=None)
  890                             logging.debug(lf('Fixing dir name/gfid mismatch'
  891                                             ' in slave. Renaming',
  892                                             retry_count=retry_count,
  893                                             entry=repr(rename_dict)))
  894                             fix_entry_ops.append(rename_dict)
  895                     else:
  896                         # A hardlink file exists with different name or
  897                         # renamed file exists and we are sure from
  898                         # matching_disk_gfid check that the entry doesn't
  899                         # exist with same gfid so we can safely delete on slave
  900                         logging.debug(lf('Fixing file gfid mismatch in slave. '
  901                                         'Hardlink/Rename Case. Deleting entry',
  902                                         retry_count=retry_count,
  903                                         entry=repr(failure)))
  904                         fix_entry_ops.append(
  905                             edct('UNLINK',
  906                                  gfid=failure[2]['slave_gfid'],
  907                                  entry=pbname))
  908             elif failure[1] == ENOENT:
  909                 if op in ['RENAME']:
  910                     pbname = failure[0]['entry1']
  911                 else:
  912                     pbname = failure[0]['entry']
  913 
  914                 pargfid = pbname.split('/')[1]
  915                 st = lstat(os.path.join(pfx, pargfid))
  916                 # Safe to ignore the failure as master doesn't contain
  917                 # parent directory.
  918                 if isinstance(st, int):
  919                     logging.debug(lf('Fixing ENOENT error in slave. Parent '
  920                                     'does not exist on master. Safe to '
  921                                     'ignore, take out entry',
  922                                     retry_count=retry_count,
  923                                     entry=repr(failure)))
  924                     try:
  925                         entries.remove(failure[0])
  926                     except ValueError:
  927                         pass
  928                 else:
  929                     logging.debug(lf('Fixing ENOENT error in slave. Create '
  930                                     'parent directory on slave.',
  931                                     retry_count=retry_count,
  932                                     entry=repr(failure)))
  933                     realpath = os.readlink(os.path.join(rconf.args.local_path,
  934                                                         ".glusterfs",
  935                                                         pargfid[0:2],
  936                                                         pargfid[2:4],
  937                                                         pargfid))
  938                     dir_entry = os.path.join(pfx, realpath.split('/')[-2],
  939                                              realpath.split('/')[-1])
  940                     fix_entry_ops.append(
  941                         edct('MKDIR', gfid=pargfid, entry=dir_entry,
  942                              mode=st.st_mode, uid=st.st_uid, gid=st.st_gid))
  943 
  944         logging.debug("remove_gfids: %s" % repr(remove_gfids))
  945         if remove_gfids:
  946             for e in entries:
  947                 if e['op'] in ['MKDIR', 'MKNOD', 'CREATE', 'RENAME'] \
  948                    and e['gfid'] in remove_gfids:
  949                     logging.debug("Removed entry op from retrial list: entry: %s" % repr(e))
  950                     e['skip_entry'] = True
  951 
  952         if fix_entry_ops:
  953             # Process deletions of entries whose gfids are mismatched
  954             failures1 = self.slave.server.entry_ops(fix_entry_ops)
  955 
  956         return (failures1, fix_entry_ops)
  957 
  958     def handle_entry_failures(self, failures, entries):
  959         retries = 0
  960         pending_failures = False
  961         failures1 = []
  962         failures2 = []
  963         entry_ops1 = []
  964         entry_ops2 = []
  965 
  966         if failures:
  967             pending_failures = True
  968             failures1 = failures
  969             entry_ops1 = entries
  970 
  971             while pending_failures and retries < self.MAX_EF_RETRIES:
  972                 retries += 1
  973                 (failures2, entry_ops2) = self.fix_possible_entry_failures(
  974                     failures1, retries, entry_ops1)
  975                 if not failures2:
  976                     pending_failures = False
  977                     logging.info(lf('Successfully fixed entry ops with gfid '
  978                                  'mismatch', retry_count=retries))
  979                 else:
  980                     pending_failures = True
  981                     failures1 = failures2
  982                     entry_ops1 = entry_ops2
  983 
  984             if pending_failures:
  985                 for failure in failures1:
  986                     logging.error("Failed to fix entry ops %s", repr(failure))
  987 
  988     def process_change(self, change, done, retry):
  989         pfx = gauxpfx()
  990         clist = []
  991         entries = []
  992         meta_gfid = set()
  993         datas = set()
  994 
  995         change_ts = change.split(".")[-1]
  996 
  997         # Ignore entry ops which are already processed in Changelog modes
  998         ignore_entry_ops = False
  999         entry_stime = None
 1000         data_stime = None
 1001         if self.name in ["live_changelog", "history_changelog"]:
 1002             entry_stime = self.get_entry_stime()
 1003             data_stime = self.get_data_stime()
 1004 
 1005         if entry_stime is not None and data_stime is not None:
 1006             # if entry_stime is not None but data_stime > entry_stime
 1007             # This situation is caused by the stime update of Passive worker
 1008             # Consider data_stime in this case.
 1009             if data_stime[0] > entry_stime[0]:
 1010                 entry_stime = data_stime
 1011 
 1012             # Compare the entry_stime with changelog file suffix
 1013             # if changelog time is less than entry_stime then ignore
 1014             if int(change_ts) <= entry_stime[0]:
 1015                 ignore_entry_ops = True
 1016 
 1017         try:
 1018             f = open(change, "r")
 1019             clist = f.readlines()
 1020             f.close()
 1021         except IOError:
 1022             raise
 1023 
 1024         for e in clist:
 1025             e = e.strip()
 1026             et = e[self.IDX_START:self.IDX_END]   # entry type
 1027             ec = e[self.IDX_END:].split(' ')      # rest of the bits
 1028 
 1029             # skip ENTRY operation if hot tier brick
 1030             if self.name == 'live_changelog' or \
 1031                self.name == 'history_changelog':
 1032                 if rconf.args.is_hottier and et == self.TYPE_ENTRY:
 1033                     logging.debug(lf('skip ENTRY op if hot tier brick',
 1034                                      op=ec[self.POS_TYPE]))
 1035                     continue
 1036 
 1037             # Data and Meta operations are decided while parsing
 1038             # UNLINK/RMDIR/MKNOD except that case ignore all the other
 1039             # entry ops if ignore_entry_ops is True.
 1040             # UNLINK/RMDIR/MKNOD entry_ops are ignored in the end
 1041             if ignore_entry_ops and et == self.TYPE_ENTRY and \
 1042                ec[self.POS_TYPE] not in ["UNLINK", "RMDIR", "MKNOD"]:
 1043                 continue
 1044 
 1045             if et == self.TYPE_ENTRY:
 1046                 # extract information according to the type of
 1047                 # the entry operation. create(), mkdir() and mknod()
 1048                 # have mode, uid, gid information in the changelog
 1049                 # itself, so no need to stat()...
 1050                 ty = ec[self.POS_TYPE]
 1051 
 1052                 self.update_fop_batch_stats(ec[self.POS_TYPE])
 1053 
 1054                 # PARGFID/BNAME
 1055                 en = unescape_space_newline(
 1056                     os.path.join(pfx, ec[self.POS_ENTRY1]))
 1057                 # GFID of the entry
 1058                 gfid = ec[self.POS_GFID]
 1059 
 1060                 if ty in ['UNLINK', 'RMDIR']:
 1061                     # The index of PARGFID/BNAME for UNLINK, RMDIR
 1062                     # is no more the last index. It varies based on
 1063                     # changelog.capture-del-path is enabled or not.
 1064                     en = unescape_space_newline(
 1065                         os.path.join(pfx, ec[self.UNLINK_ENTRY]))
 1066 
 1067                     # Remove from DATA list, so that rsync will
 1068                     # not fail
 1069                     pt = os.path.join(pfx, ec[0])
 1070                     st = lstat(pt)
 1071                     if pt in datas and isinstance(st, int):
 1072                         # file got unlinked, May be historical Changelog
 1073                         datas.remove(pt)
 1074 
 1075                     if ty in ['RMDIR'] and not isinstance(st, int):
 1076                         logging.info(lf('Ignoring rmdir. Directory present in '
 1077                                         'master', gfid=gfid, pgfid_bname=en))
 1078                         continue
 1079 
 1080                     if not gconf.get("ignore-deletes"):
 1081                         if not ignore_entry_ops:
 1082                             entries.append(edct(ty, gfid=gfid, entry=en))
 1083                 elif ty in ['CREATE', 'MKDIR', 'MKNOD']:
 1084                     # Special case: record mknod as link
 1085                     if ty in ['MKNOD']:
 1086                         mode = int(ec[2])
 1087                         if mode & 0o1000:
 1088                                 # Avoid stat'ing the file as it
 1089                                 # may be deleted in the interim
 1090                                 st = FreeObject(st_mode=int(ec[2]),
 1091                                                 st_uid=int(ec[3]),
 1092                                                 st_gid=int(ec[4]),
 1093                                                 st_atime=0,
 1094                                                 st_mtime=0)
 1095 
 1096                                 # So, it may be deleted, but still we are
 1097                                 # append LINK? Because, the file will be
 1098                                 # CREATED if source not exists.
 1099                                 entries.append(edct('LINK', stat=st, entry=en,
 1100                                                gfid=gfid))
 1101 
 1102                                 # Here, we have the assumption that only
 1103                                 # tier-gfid.linkto causes this mknod. Add data
 1104                                 datas.add(os.path.join(pfx, ec[0]))
 1105                                 continue
 1106 
 1107                     # stat info. present in the changelog itself
 1108                     entries.append(edct(ty, gfid=gfid, entry=en,
 1109                                    mode=int(ec[2]),
 1110                                    uid=int(ec[3]), gid=int(ec[4])))
 1111                 elif ty == "RENAME":
 1112                     go = os.path.join(pfx, gfid)
 1113                     st = lstat(go)
 1114                     if isinstance(st, int):
 1115                         st = {}
 1116 
 1117                     rl = None
 1118                     if st and stat.S_ISLNK(st.st_mode):
 1119                         rl = errno_wrap(os.readlink, [en], [ENOENT],
 1120                                         [ESTALE, EINTR])
 1121                         if isinstance(rl, int):
 1122                             rl = None
 1123 
 1124                     e1 = unescape_space_newline(
 1125                         os.path.join(pfx, ec[self.POS_ENTRY1 - 1]))
 1126                     entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en,
 1127                                         stat=st, link=rl))
 1128                     # If src doesn't exist while doing rename, destination
 1129                     # is created. If data is not followed by rename, this
 1130                     # remains zero byte file on slave. Hence add data entry
 1131                     # for renames
 1132                     datas.add(os.path.join(pfx, gfid))
 1133                 else:
 1134                     # stat() to get mode and other information
 1135                     if not matching_disk_gfid(gfid, en):
 1136                         logging.debug(lf('Ignoring entry, purged in the '
 1137                                       'interim', file=en, gfid=gfid))
 1138                         continue
 1139 
 1140                     go = os.path.join(pfx, gfid)
 1141                     st = lstat(go)
 1142                     if isinstance(st, int):
 1143                         logging.debug(lf('Ignoring entry, purged in the '
 1144                                       'interim', file=en, gfid=gfid))
 1145                         continue
 1146 
 1147                     if ty == 'LINK':
 1148                         rl = None
 1149                         if st and stat.S_ISLNK(st.st_mode):
 1150                             rl = errno_wrap(os.readlink, [en], [ENOENT],
 1151                                             [ESTALE, EINTR])
 1152                             if isinstance(rl, int):
 1153                                 rl = None
 1154                         entries.append(edct(ty, stat=st, entry=en, gfid=gfid,
 1155                                        link=rl))
 1156                         # If src doesn't exist while doing link, destination
 1157                         # is created based on file type. If data is not
 1158                         # followed by link, this remains zero byte file on
 1159                         # slave. Hence add data entry for links
 1160                         if rl is None:
 1161                             datas.add(os.path.join(pfx, gfid))
 1162                     elif ty == 'SYMLINK':
 1163                         rl = errno_wrap(os.readlink, [en], [ENOENT],
 1164                                         [ESTALE, EINTR])
 1165                         if isinstance(rl, int):
 1166                             continue
 1167 
 1168                         entries.append(
 1169                             edct(ty, stat=st, entry=en, gfid=gfid, link=rl))
 1170                     else:
 1171                         logging.warn(lf('ignoring op',
 1172                                         gfid=gfid,
 1173                                         type=ty))
 1174             elif et == self.TYPE_GFID:
 1175                 # If self.unlinked_gfids is available, then that means it is
 1176                 # retrying the changelog second time. Do not add the GFID's
 1177                 # to rsync job if failed previously but unlinked in master
 1178                 if self.unlinked_gfids and \
 1179                    os.path.join(pfx, ec[0]) in self.unlinked_gfids:
 1180                     logging.debug("ignoring data, since file purged interim")
 1181                 else:
 1182                     datas.add(os.path.join(pfx, ec[0]))
 1183             elif et == self.TYPE_META:
 1184                 self.update_fop_batch_stats(ec[self.POS_TYPE])
 1185                 if ec[1] == 'SETATTR':  # only setattr's for now...
 1186                     if len(ec) == 5:
 1187                         # In xsync crawl, we already have stat data
 1188                         # avoid doing stat again
 1189                         meta_gfid.add((os.path.join(pfx, ec[0]),
 1190                                        XCrawlMetadata(st_uid=ec[2],
 1191                                                       st_gid=ec[3],
 1192                                                       st_mode=ec[4],
 1193                                                       st_atime=ec[5],
 1194                                                       st_mtime=ec[6])))
 1195                     else:
 1196                         meta_gfid.add((os.path.join(pfx, ec[0]), ))
 1197                 elif ec[1] in ['SETXATTR', 'XATTROP', 'FXATTROP']:
 1198                     # To sync xattr/acls use rsync/tar, --xattrs and --acls
 1199                     # switch to rsync and tar
 1200                     if not gconf.get("sync-method") == "tarssh" and \
 1201                        (gconf.get("sync-xattrs") or gconf.get("sync-acls")):
 1202                         datas.add(os.path.join(pfx, ec[0]))
 1203             else:
 1204                 logging.warn(lf('got invalid fop type',
 1205                                 type=et))
 1206         logging.debug('entries: %s' % repr(entries))
 1207 
 1208         # Increment counters for Status
 1209         self.files_in_batch += len(datas)
 1210         self.status.inc_value("data", len(datas))
 1211 
 1212         self.batch_stats["DATA"] += self.files_in_batch - \
 1213             self.batch_stats["SETXATTR"] - \
 1214             self.batch_stats["XATTROP"]
 1215 
 1216         entry_start_time = time.time()
 1217         # sync namespace
 1218         if entries and not ignore_entry_ops:
 1219             # Increment counters for Status
 1220             self.status.inc_value("entry", len(entries))
 1221 
 1222             failures = self.slave.server.entry_ops(entries)
 1223 
 1224             if gconf.get("gfid-conflict-resolution"):
 1225                 count = 0
 1226                 num_entries = len(entries)
 1227                 num_failures = len(failures)
 1228                 if failures:
 1229                     logging.info(lf('Entry ops failed with gfid mismatch',
 1230                                     count=num_failures))
 1231                 while failures and count < self.MAX_OE_RETRIES:
 1232                     count += 1
 1233                     self.handle_entry_failures(failures, entries)
 1234                     logging.info(lf('Retry original entries', count=count))
 1235                     failures = self.slave.server.entry_ops(entries)
 1236                     if not failures:
 1237                         logging.info("Successfully fixed all entry ops with "
 1238                                      "gfid mismatch")
 1239                         break
 1240 
 1241                     # If this iteration has not removed any entry or reduced
 1242                     # the number of failures compared to the previous one, we
 1243                     # don't need to keep iterating because we'll get the same
 1244                     # result in all other attempts.
 1245                     if ((num_entries == len(entries)) and
 1246                         (num_failures == len(failures))):
 1247                         logging.info(lf("No more gfid mismatches can be fixed",
 1248                                         entries=num_entries,
 1249                                         failures=num_failures))
 1250                         break
 1251 
 1252                     num_entries = len(entries)
 1253                     num_failures = len(failures)
 1254 
 1255             self.log_failures(failures, 'gfid', gauxpfx(), 'ENTRY')
 1256             self.status.dec_value("entry", len(entries))
 1257 
 1258             # Update Entry stime in Brick Root only in case of Changelog mode
 1259             if self.name in ["live_changelog", "history_changelog"]:
 1260                 entry_stime_to_update = (int(change_ts) - 1, 0)
 1261                 self.upd_entry_stime(entry_stime_to_update)
 1262                 self.status.set_field("last_synced_entry",
 1263                                       entry_stime_to_update[0])
 1264 
 1265         self.batch_stats["ENTRY_SYNC_TIME"] += time.time() - entry_start_time
 1266 
 1267         if ignore_entry_ops:
 1268             # Book keeping, to show in logs the range of Changelogs skipped
 1269             self.num_skipped_entry_changelogs += 1
 1270             if self.skipped_entry_changelogs_first is None:
 1271                 self.skipped_entry_changelogs_first = change_ts
 1272 
 1273             self.skipped_entry_changelogs_last = change_ts
 1274 
 1275         meta_start_time = time.time()
 1276         # sync metadata
 1277         if meta_gfid:
 1278             meta_entries = []
 1279             for go in meta_gfid:
 1280                 if len(go) > 1:
 1281                     st = go[1]
 1282                 else:
 1283                     st = lstat(go[0])
 1284                 if isinstance(st, int):
 1285                     logging.debug(lf('file got purged in the interim',
 1286                                      file=go[0]))
 1287                     continue
 1288                 meta_entries.append(edct('META', go=go[0], stat=st))
 1289             if meta_entries:
 1290                 self.status.inc_value("meta", len(meta_entries))
 1291                 failures = self.slave.server.meta_ops(meta_entries)
 1292                 self.log_failures(failures, 'go', '', 'META')
 1293                 self.status.dec_value("meta", len(meta_entries))
 1294 
 1295         self.batch_stats["META_SYNC_TIME"] += time.time() - meta_start_time
 1296 
 1297         if self.batch_stats["DATA_START_TIME"] == 0:
 1298             self.batch_stats["DATA_START_TIME"] = time.time()
 1299 
 1300         # sync data
 1301         if datas:
 1302             self.a_syncdata(datas)
 1303             self.datas_in_batch.update(datas)
 1304 
 1305     def process(self, changes, done=1):
 1306         tries = 0
 1307         retry = False
 1308         self.unlinked_gfids = set()
 1309         self.files_in_batch = 0
 1310         self.datas_in_batch = set()
 1311         # Error log disabled till the last round
 1312         self.syncer.disable_errorlog()
 1313         self.skipped_entry_changelogs_first = None
 1314         self.skipped_entry_changelogs_last = None
 1315         self.num_skipped_entry_changelogs = 0
 1316         self.batch_start_time = time.time()
 1317         self.init_fop_batch_stats()
 1318 
 1319         while True:
 1320             # first, fire all changelog transfers in parallel. entry and
 1321             # metadata are performed synchronously, therefore in serial.
 1322             # However at the end of each changelog, data is synchronized
 1323             # with syncdata_async() - which means it is serial w.r.t
 1324             # entries/metadata of that changelog but happens in parallel
 1325             # with data of other changelogs.
 1326 
 1327             if retry:
 1328                 if tries == (gconf.get("max-rsync-retries") - 1):
 1329                     # Enable Error logging if it is last retry
 1330                     self.syncer.enable_errorlog()
 1331 
 1332                 # Remove Unlinked GFIDs from Queue
 1333                 for unlinked_gfid in self.unlinked_gfids:
 1334                     if unlinked_gfid in self.datas_in_batch:
 1335                         self.datas_in_batch.remove(unlinked_gfid)
 1336 
 1337                 # Retry only Sync. Do not retry entry ops
 1338                 if self.datas_in_batch:
 1339                     self.a_syncdata(self.datas_in_batch)
 1340             else:
 1341                 for change in changes:
 1342                     logging.debug(lf('processing change',
 1343                                      changelog=change))
 1344                     self.process_change(change, done, retry)
 1345                     if not retry:
 1346                         # number of changelogs processed in the batch
 1347                         self.turns += 1
 1348 
 1349             # Now we wait for all the data transfers fired off in the above
 1350             # step to complete. Note that this is not ideal either. Ideally
 1351             # we want to trigger the entry/meta-data transfer of the next
 1352             # batch while waiting for the data transfer of the current batch
 1353             # to finish.
 1354 
 1355             # Note that the reason to wait for the data transfer (vs doing it
 1356             # completely in the background and call the changelog_done()
 1357             # asynchronously) is because this waiting acts as a "backpressure"
 1358             # and prevents a spiraling increase of wait stubs from consuming
 1359             # unbounded memory and resources.
 1360 
 1361             # update the slave's time with the timestamp of the _last_
 1362             # changelog file time suffix. Since, the changelog prefix time
 1363             # is the time when the changelog was rolled over, introduce a
 1364             # tolerance of 1 second to counter the small delta b/w the
 1365             # marker update and gettimeofday().
 1366             # NOTE: this is only for changelog mode, not xsync.
 1367 
 1368             # @change is the last changelog (therefore max time for this batch)
 1369             if self.syncdata_wait():
 1370                 self.unlinked_gfids = set()
 1371                 if done:
 1372                     xtl = (int(change.split('.')[-1]) - 1, 0)
 1373                     self.upd_stime(xtl)
 1374                     list(map(self.changelog_done_func, changes))
 1375                     self.archive_and_purge_changelogs(changes)
 1376 
 1377                 # Reset Data counter after sync
 1378                 self.status.dec_value("data", self.files_in_batch)
 1379                 self.files_in_batch = 0
 1380                 self.datas_in_batch = set()
 1381                 break
 1382 
 1383             # We do not know which changelog transfer failed, retry everything.
 1384             retry = True
 1385             tries += 1
 1386             if tries == gconf.get("max-rsync-retries"):
 1387                 logging.error(lf('changelogs could not be processed '
 1388                                  'completely - moving on...',
 1389                                  files=list(map(os.path.basename, changes))))
 1390 
 1391                 # Reset data counter on failure
 1392                 self.status.dec_value("data", self.files_in_batch)
 1393                 self.files_in_batch = 0
 1394                 self.datas_in_batch = set()
 1395 
 1396                 if done:
 1397                     xtl = (int(change.split('.')[-1]) - 1, 0)
 1398                     self.upd_stime(xtl)
 1399                     list(map(self.changelog_done_func, changes))
 1400                     self.archive_and_purge_changelogs(changes)
 1401                 break
 1402             # it's either entry_ops() or Rsync that failed to do it's
 1403             # job. Mostly it's entry_ops() [which currently has a problem
 1404             # of failing to create an entry but failing to return an errno]
 1405             # Therefore we do not know if it's either Rsync or the freaking
 1406             # entry_ops() that failed... so we retry the _whole_ changelog
 1407             # again.
 1408             # TODO: remove entry retries when it's gets fixed.
 1409             logging.warn(lf('incomplete sync, retrying changelogs',
 1410                             files=list(map(os.path.basename, changes))))
 1411 
 1412             # Reset the Data counter before Retry
 1413             self.status.dec_value("data", self.files_in_batch)
 1414             self.files_in_batch = 0
 1415             self.init_fop_batch_stats()
 1416             time.sleep(0.5)
 1417 
 1418         # Log the Skipped Entry ops range if any
 1419         if self.skipped_entry_changelogs_first is not None and \
 1420            self.skipped_entry_changelogs_last is not None:
 1421             logging.info(lf("Skipping already processed entry ops",
 1422                             from_changelog=self.skipped_entry_changelogs_first,
 1423                             to_changelog=self.skipped_entry_changelogs_last,
 1424                             num_changelogs=self.num_skipped_entry_changelogs))
 1425 
 1426         # Log Current batch details
 1427         if changes:
 1428             logging.info(
 1429                 lf("Entry Time Taken",
 1430                    UNL=self.batch_stats["UNLINK"],
 1431                    RMD=self.batch_stats["RMDIR"],
 1432                    CRE=self.batch_stats["CREATE"],
 1433                    MKN=self.batch_stats["MKNOD"],
 1434                    MKD=self.batch_stats["MKDIR"],
 1435                    REN=self.batch_stats["RENAME"],
 1436                    LIN=self.batch_stats["LINK"],
 1437                    SYM=self.batch_stats["SYMLINK"],
 1438                    duration="%.4f" % self.batch_stats["ENTRY_SYNC_TIME"]))
 1439 
 1440             logging.info(
 1441                 lf("Data/Metadata Time Taken",
 1442                    SETA=self.batch_stats["SETATTR"],
 1443                    meta_duration="%.4f" % self.batch_stats["META_SYNC_TIME"],
 1444                    SETX=self.batch_stats["SETXATTR"],
 1445                    XATT=self.batch_stats["XATTROP"],
 1446                    DATA=self.batch_stats["DATA"],
 1447                    data_duration="%.4f" % (
 1448                        time.time() - self.batch_stats["DATA_START_TIME"])))
 1449 
 1450             logging.info(
 1451                 lf("Batch Completed",
 1452                    mode=self.name,
 1453                    duration="%.4f" % (time.time() - self.batch_start_time),
 1454                    changelog_start=changes[0].split(".")[-1],
 1455                    changelog_end=changes[-1].split(".")[-1],
 1456                    num_changelogs=len(changes),
 1457                    stime=self.get_data_stime(),
 1458                    entry_stime=self.get_entry_stime()))
 1459 
 1460     def upd_entry_stime(self, stime):
 1461         self.slave.server.set_entry_stime(self.FLAT_DIR_HIERARCHY,
 1462                                           self.uuid,
 1463                                           stime)
 1464 
 1465     def upd_stime(self, stime, path=None):
 1466         if not path:
 1467             path = self.FLAT_DIR_HIERARCHY
 1468         if not stime == URXTIME:
 1469             self.sendmark(path, stime)
 1470 
 1471         # Update last_synced_time in status file based on stime
 1472         # only update stime if stime xattr set to Brick root
 1473         if path == self.FLAT_DIR_HIERARCHY:
 1474             chkpt_time = gconf.getr("checkpoint")
 1475             checkpoint_time = 0
 1476             if chkpt_time is not None:
 1477                 checkpoint_time = int(chkpt_time)
 1478 
 1479             self.status.set_last_synced(stime, checkpoint_time)
 1480 
 1481     def update_worker_remote_node(self):
 1482         node = rconf.args.resource_remote
 1483         node_data = node.split("@")
 1484         node = node_data[-1]
 1485         remote_node_ip = node.split(":")[0]
 1486         self.status.set_slave_node(remote_node_ip)
 1487 
 1488     def changelogs_batch_process(self, changes):
 1489         changelogs_batches = []
 1490         current_size = 0
 1491         for c in changes:
 1492             si = os.lstat(c).st_size
 1493             if (si + current_size) > gconf.get("changelog-batch-size"):
 1494                 # Create new batch if single Changelog file greater than
 1495                 # Max Size! or current batch size exceeds Max size
 1496                 changelogs_batches.append([c])
 1497                 current_size = si
 1498             else:
 1499                 # Append to last batch, if No batches available Create one
 1500                 current_size += si
 1501                 if not changelogs_batches:
 1502                     changelogs_batches.append([c])
 1503                 else:
 1504                     changelogs_batches[-1].append(c)
 1505 
 1506         for batch in changelogs_batches:
 1507             logging.debug(lf('processing changes',
 1508                              batch=batch))
 1509             self.process(batch)
 1510 
 1511     def crawl(self):
 1512         self.status.set_worker_crawl_status("Changelog Crawl")
 1513         changes = []
 1514         # get stime (from the brick) and purge changelogs
 1515         # that are _historical_ to that time.
 1516         data_stime = self.get_data_stime()
 1517 
 1518         libgfchangelog.scan()
 1519         self.crawls += 1
 1520         changes = libgfchangelog.getchanges()
 1521         if changes:
 1522             if data_stime:
 1523                 logging.info(lf("slave's time",
 1524                                 stime=data_stime))
 1525                 processed = [x for x in changes
 1526                              if int(x.split('.')[-1]) < data_stime[0]]
 1527                 for pr in processed:
 1528                     logging.debug(
 1529                         lf('skipping already processed change',
 1530                            changelog=os.path.basename(pr)))
 1531                     self.changelog_done_func(pr)
 1532                     changes.remove(pr)
 1533                 self.archive_and_purge_changelogs(processed)
 1534 
 1535         self.changelogs_batch_process(changes)
 1536 
 1537     def register(self, register_time, status):
 1538         self.sleep_interval = gconf.get("change-interval")
 1539         self.changelog_done_func = libgfchangelog.done
 1540         self.tempdir = self.setup_working_dir()
 1541         self.processed_changelogs_dir = os.path.join(self.tempdir,
 1542                                                      ".processed")
 1543         self.name = "live_changelog"
 1544         self.status = status
 1545 
 1546 
 1547 class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
 1548     def register(self, register_time, status):
 1549         self.changelog_register_time = register_time
 1550         self.history_crawl_start_time = register_time
 1551         self.changelog_done_func = libgfchangelog.history_done
 1552         self.history_turns = 0
 1553         self.tempdir = self.setup_working_dir()
 1554         self.processed_changelogs_dir = os.path.join(self.tempdir,
 1555                                                      ".history/.processed")
 1556         self.name = "history_changelog"
 1557         self.status = status
 1558 
 1559     def crawl(self):
 1560         self.history_turns += 1
 1561         self.status.set_worker_crawl_status("History Crawl")
 1562         data_stime = self.get_data_stime()
 1563 
 1564         end_time = int(time.time())
 1565 
 1566         #as start of historical crawl marks Geo-rep worker restart
 1567         if gconf.get("ignore-deletes"):
 1568             logging.info(lf('ignore-deletes config option is set',
 1569                          stime=data_stime))
 1570 
 1571         logging.info(lf('starting history crawl',
 1572                         turns=self.history_turns,
 1573                         stime=data_stime,
 1574                         etime=end_time,
 1575                         entry_stime=self.get_entry_stime()))
 1576 
 1577         if not data_stime or data_stime == URXTIME:
 1578             raise NoStimeAvailable()
 1579 
 1580         # Changelogs backend path is hardcoded as
 1581         # <BRICK_PATH>/.glusterfs/changelogs, if user configured to different
 1582         # location then consuming history will not work(Known issue as of now)
 1583         changelog_path = os.path.join(rconf.args.local_path,
 1584                                       ".glusterfs/changelogs")
 1585         ret, actual_end = libgfchangelog.history_changelog(
 1586             changelog_path,
 1587             data_stime[0],
 1588             end_time,
 1589             gconf.get("sync-jobs"))
 1590 
 1591         # scan followed by getchanges till scan returns zero.
 1592         # history_scan() is blocking call, till it gets the number
 1593         # of changelogs to process. Returns zero when no changelogs
 1594         # to be processed. returns positive value as number of changelogs
 1595         # to be processed, which will be fetched using
 1596         # history_getchanges()
 1597         while libgfchangelog.history_scan() > 0:
 1598             self.crawls += 1
 1599 
 1600             changes = libgfchangelog.history_getchanges()
 1601             if changes:
 1602                 if data_stime:
 1603                     logging.info(lf("slave's time",
 1604                                     stime=data_stime))
 1605                     processed = [x for x in changes
 1606                                  if int(x.split('.')[-1]) < data_stime[0]]
 1607                     for pr in processed:
 1608                         logging.debug(lf('skipping already processed change',
 1609                                          changelog=os.path.basename(pr)))
 1610                         self.changelog_done_func(pr)
 1611                         changes.remove(pr)
 1612 
 1613             self.changelogs_batch_process(changes)
 1614 
 1615         history_turn_time = int(time.time()) - self.history_crawl_start_time
 1616 
 1617         logging.info(lf('finished history crawl',
 1618                         endtime=actual_end,
 1619                         stime=self.get_data_stime(),
 1620                         entry_stime=self.get_entry_stime()))
 1621 
 1622         # If TS returned from history_changelog is < register_time
 1623         # then FS crawl may be required, since history is only available
 1624         # till TS returned from history_changelog
 1625         if actual_end < self.changelog_register_time:
 1626             if self.history_turns < 2:
 1627                 sleep_time = 1
 1628                 if history_turn_time < CHANGELOG_ROLLOVER_TIME:
 1629                     sleep_time = CHANGELOG_ROLLOVER_TIME - history_turn_time
 1630                 time.sleep(sleep_time)
 1631                 self.history_crawl_start_time = int(time.time())
 1632                 self.crawl()
 1633             else:
 1634                 # This exception will be caught in resource.py and
 1635                 # fallback to xsync for the small gap.
 1636                 raise PartialHistoryAvailable(str(actual_end))
 1637 
 1638 
 1639 class GMasterXsyncMixin(GMasterChangelogMixin):
 1640 
 1641     """
 1642     This crawl needs to be xtime based (as of now
 1643     it's not. this is because we generate CHANGELOG
 1644     file during each crawl which is then processed
 1645     by process_change()).
 1646     For now it's used as a one-shot initial sync
 1647     mechanism and only syncs directories, regular
 1648     files, hardlinks and symlinks.
 1649     """
 1650 
 1651     XSYNC_MAX_ENTRIES = 1 << 13
 1652 
 1653     def register(self, register_time=None, status=None):
 1654         self.status = status
 1655         self.counter = 0
 1656         self.comlist = []
 1657         self.stimes = []
 1658         self.sleep_interval = 60
 1659         self.tempdir = self.setup_working_dir()
 1660         logging.info(lf('Working dir',
 1661                         path=self.tempdir))
 1662         self.tempdir = os.path.join(self.tempdir, 'xsync')
 1663         self.processed_changelogs_dir = self.tempdir
 1664         self.name = "xsync"
 1665         try:
 1666             os.makedirs(self.tempdir)
 1667         except OSError:
 1668             ex = sys.exc_info()[1]
 1669             if ex.errno == EEXIST and os.path.isdir(self.tempdir):
 1670                 pass
 1671             else:
 1672                 raise
 1673         # Purge stale unprocessed xsync changelogs
 1674         for f in os.listdir(self.tempdir):
 1675             if f.startswith("XSYNC-CHANGELOG"):
 1676                 os.remove(os.path.join(self.tempdir, f))
 1677 
 1678 
 1679     def crawl(self):
 1680         """
 1681         event dispatcher thread
 1682 
 1683         this thread dispatches either changelog or synchronizes stime.
 1684         additionally terminates itself on receiving a 'finale' event
 1685         """
 1686         def Xsyncer():
 1687             self.Xcrawl()
 1688         t = Thread(target=Xsyncer)
 1689         t.start()
 1690         logging.info(lf('starting hybrid crawl',
 1691                         stime=self.get_data_stime()))
 1692         self.status.set_worker_crawl_status("Hybrid Crawl")
 1693         while True:
 1694             try:
 1695                 item = self.comlist.pop(0)
 1696                 if item[0] == 'finale':
 1697                     logging.info(lf('finished hybrid crawl',
 1698                                     stime=self.get_data_stime()))
 1699                     break
 1700                 elif item[0] == 'xsync':
 1701                     logging.info(lf('processing xsync changelog',
 1702                                     path=item[1]))
 1703                     self.process([item[1]], 0)
 1704                     self.archive_and_purge_changelogs([item[1]])
 1705                 elif item[0] == 'stime':
 1706                     logging.debug(lf('setting slave time',
 1707                                      time=item[1]))
 1708                     self.upd_stime(item[1][1], item[1][0])
 1709                 else:
 1710                     logging.warn(lf('unknown tuple in comlist',
 1711                                     entry=item))
 1712             except IndexError:
 1713                 time.sleep(1)
 1714 
 1715     def write_entry_change(self, prefix, data=[]):
 1716         if not getattr(self, "fh", None):
 1717             self.open()
 1718 
 1719         self.fh.write("%s %s\n" % (prefix, ' '.join(data)))
 1720 
 1721     def open(self):
 1722         try:
 1723             self.xsync_change = os.path.join(
 1724                 self.tempdir, 'XSYNC-CHANGELOG.' + str(int(time.time())))
 1725             self.fh = open(self.xsync_change, 'w')
 1726         except IOError:
 1727             raise
 1728 
 1729     def close(self):
 1730         if getattr(self, "fh", None):
 1731             self.fh.flush()
 1732             os.fsync(self.fh.fileno())
 1733             self.fh.close()
 1734             self.fh = None
 1735 
 1736     def fname(self):
 1737         return self.xsync_change
 1738 
 1739     def put(self, mark, item):
 1740         self.comlist.append((mark, item))
 1741 
 1742     def sync_xsync(self, last):
 1743         """schedule a processing of changelog"""
 1744         self.close()
 1745         if self.counter > 0:
 1746             self.put('xsync', self.fname())
 1747         self.counter = 0
 1748         if not last:
 1749             time.sleep(1)  # make sure changelogs are 1 second apart
 1750 
 1751     def sync_stime(self, stime=None, last=False):
 1752         """schedule a stime synchronization"""
 1753         if stime:
 1754             self.put('stime', stime)
 1755         if last:
 1756             self.put('finale', None)
 1757 
 1758     def sync_done(self, stime=[], last=False):
 1759         self.sync_xsync(last)
 1760         if stime:
 1761             # Send last as True only for last stime entry
 1762             for st in stime[:-1]:
 1763                 self.sync_stime(st, False)
 1764 
 1765             if stime and stime[-1]:
 1766                 self.sync_stime(stime[-1], last)
 1767 
 1768     def is_sticky(self, path, mo):
 1769         """check for DHTs linkto sticky bit file"""
 1770         sticky = False
 1771         if mo & 0o1000:
 1772             sticky = self.master.server.linkto_check(path)
 1773         return sticky
 1774 
 1775     def Xcrawl(self, path='.', xtr_root=None):
 1776         """
 1777         generate a CHANGELOG file consumable by process_change.
 1778 
 1779         slave's xtime (stime) is _cached_ for comparisons across
 1780         the filesystem tree, but set after directory synchronization.
 1781         """
 1782         if path == '.':
 1783             self.crawls += 1
 1784         if not xtr_root:
 1785             # get the root stime and use it for all comparisons
 1786             xtr_root = self.xtime('.', self.slave)
 1787             if isinstance(xtr_root, int):
 1788                 if xtr_root != ENOENT:
 1789                     logging.warn(lf("slave cluster not returning the "
 1790                                     "xtime for root",
 1791                                     error=xtr_root))
 1792                 xtr_root = self.minus_infinity
 1793         xtl = self.xtime(path)
 1794         if isinstance(xtl, int):
 1795             logging.warn("master cluster's xtime not found")
 1796         xtr = self.xtime(path, self.slave)
 1797         if isinstance(xtr, int):
 1798             if xtr != ENOENT:
 1799                 logging.warn(lf("slave cluster not returning the "
 1800                                 "xtime for dir",
 1801                                 path=path,
 1802                                 error=xtr))
 1803             xtr = self.minus_infinity
 1804         xtr = max(xtr, xtr_root)
 1805         zero_zero = (0, 0)
 1806         if xtr_root == zero_zero:
 1807             xtr = self.minus_infinity
 1808         if not self.need_sync(path, xtl, xtr):
 1809             if path == '.':
 1810                 self.sync_done([(path, xtl)], True)
 1811             return
 1812         self.xtime_reversion_hook(path, xtl, xtr)
 1813         logging.debug("entering " + path)
 1814         dem = self.master.server.entries(path)
 1815         pargfid = self.master.server.gfid(path)
 1816         if isinstance(pargfid, int):
 1817             logging.warn(lf('skipping directory',
 1818                             path=path))
 1819         for e in dem:
 1820             bname = e
 1821             e = os.path.join(path, e)
 1822             xte = self.xtime(e)
 1823             if isinstance(xte, int):
 1824                 logging.warn(lf("irregular xtime",
 1825                                 path=e,
 1826                                 error=errno.errorcode[xte]))
 1827                 continue
 1828             if not self.need_sync(e, xte, xtr):
 1829                 continue
 1830             st = self.master.server.lstat(e)
 1831             if isinstance(st, int):
 1832                 logging.warn(lf('got purged in the interim',
 1833                                 path=e))
 1834                 continue
 1835             if self.is_sticky(e, st.st_mode):
 1836                 logging.debug(lf('ignoring sticky bit file',
 1837                                  path=e))
 1838                 continue
 1839             gfid = self.master.server.gfid(e)
 1840             if isinstance(gfid, int):
 1841                 logging.warn(lf('skipping entry',
 1842                                 path=e))
 1843                 continue
 1844             mo = st.st_mode
 1845             self.counter += 1 if ((stat.S_ISDIR(mo) or
 1846                                    stat.S_ISLNK(mo) or
 1847                                    stat.S_ISREG(mo))) else 0
 1848             if self.counter == self.XSYNC_MAX_ENTRIES:
 1849                 self.sync_done(self.stimes, False)
 1850                 self.stimes = []
 1851             if stat.S_ISDIR(mo):
 1852                 self.write_entry_change("E",
 1853                                         [gfid, 'MKDIR', str(mo),
 1854                                          str(0), str(0), escape_space_newline(
 1855                                              os.path.join(pargfid, bname))])
 1856                 self.write_entry_change("M", [gfid, "SETATTR", str(st.st_uid),
 1857                                               str(st.st_gid), str(st.st_mode),
 1858                                               str(st.st_atime),
 1859                                               str(st.st_mtime)])
 1860                 self.Xcrawl(e, xtr_root)
 1861                 stime_to_update = xte
 1862                 # Live Changelog Start time indicates that from that time
 1863                 # onwards Live changelogs are available. If we update stime
 1864                 # greater than live_changelog_start time then Geo-rep will
 1865                 # skip those changelogs as already processed. But Xsync
 1866                 # actually failed to sync the deletes and Renames. Update
 1867                 # stime as min(Live_changelogs_time, Actual_stime) When it
 1868                 # switches to Changelog mode, it syncs Deletes and Renames.
 1869                 if self.live_changelog_start_time:
 1870                     stime_to_update = min(self.live_changelog_start_time, xte)
 1871                 self.stimes.append((e, stime_to_update))
 1872             elif stat.S_ISLNK(mo):
 1873                 self.write_entry_change(
 1874                     "E", [gfid, 'SYMLINK', escape_space_newline(
 1875                         os.path.join(pargfid, bname))])
 1876             elif stat.S_ISREG(mo):
 1877                 nlink = st.st_nlink
 1878                 nlink -= 1  # fixup backend stat link count
 1879                 # if a file has a hardlink, create a Changelog entry as
 1880                 # 'LINK' so the slave side will decide if to create the
 1881                 # new entry, or to create link.
 1882                 if nlink == 1:
 1883                     self.write_entry_change("E",
 1884                                             [gfid, 'MKNOD', str(mo),
 1885                                              str(0), str(0),
 1886                                              escape_space_newline(
 1887                                                  os.path.join(
 1888                                                      pargfid, bname))])
 1889                 else:
 1890                     self.write_entry_change(
 1891                         "E", [gfid, 'LINK', escape_space_newline(
 1892                             os.path.join(pargfid, bname))])
 1893                 self.write_entry_change("D", [gfid])
 1894         if path == '.':
 1895             stime_to_update = xtl
 1896             if self.live_changelog_start_time:
 1897                 stime_to_update = min(self.live_changelog_start_time, xtl)
 1898             self.stimes.append((path, stime_to_update))
 1899             self.sync_done(self.stimes, True)
 1900 
 1901 
 1902 class BoxClosedErr(Exception):
 1903     pass
 1904 
 1905 
 1906 class PostBox(list):
 1907 
 1908     """synchronized collection for storing things thought of as "requests" """
 1909 
 1910     def __init__(self, *a):
 1911         list.__init__(self, *a)
 1912         # too bad Python stdlib does not have read/write locks...
 1913         # it would suffivce to grab the lock in .append as reader, in .close as
 1914         # writer
 1915         self.lever = Condition()
 1916         self.open = True
 1917         self.done = False
 1918 
 1919     def wait(self):
 1920         """wait on requests to be processed"""
 1921         self.lever.acquire()
 1922         if not self.done:
 1923             self.lever.wait()
 1924         self.lever.release()
 1925         return self.result
 1926 
 1927     def wakeup(self, data):
 1928         """wake up requestors with the result"""
 1929         self.result = data
 1930         self.lever.acquire()
 1931         self.done = True
 1932         self.lever.notifyAll()
 1933         self.lever.release()
 1934 
 1935     def append(self, e):
 1936         """post a request"""
 1937         self.lever.acquire()
 1938         if not self.open:
 1939             raise BoxClosedErr
 1940         list.append(self, e)
 1941         self.lever.release()
 1942 
 1943     def close(self):
 1944         """prohibit the posting of further requests"""
 1945         self.lever.acquire()
 1946         self.open = False
 1947         self.lever.release()
 1948 
 1949 
 1950 class Syncer(object):
 1951 
 1952     """a staged queue to relay rsync requests to rsync workers
 1953 
 1954     By "staged queue" its meant that when a consumer comes to the
 1955     queue, it takes _all_ entries, leaving the queue empty.
 1956     (I don't know if there is an official term for this pattern.)
 1957 
 1958     The queue uses a PostBox to accumulate incoming items.
 1959     When a consumer (rsync worker) comes, a new PostBox is
 1960     set up and the old one is passed on to the consumer.
 1961 
 1962     Instead of the simplistic scheme of having one big lock
 1963     which synchronizes both the addition of new items and
 1964     PostBox exchanges, use a separate lock to arbitrate consumers,
 1965     and rely on PostBox's synchronization mechanisms take
 1966     care about additions.
 1967 
 1968     There is a corner case racy situation, producers vs. consumers,
 1969     which is not handled by this scheme: namely, when the PostBox
 1970     exchange occurs in between being passed to the producer for posting
 1971     and the post placement. But that's what Postbox.close is for:
 1972     such a posting will find the PostBox closed, in which case
 1973     the producer can re-try posting against the actual PostBox of
 1974     the queue.
 1975 
 1976     To aid accumlation of items in the PostBoxen before grabbed
 1977     by an rsync worker, the worker goes to sleep a bit after
 1978     each completed syncjob.
 1979     """
 1980 
 1981     def __init__(self, slave, sync_engine, resilient_errnos=[]):
 1982         """spawn worker threads"""
 1983         self.log_err = False
 1984         self.slave = slave
 1985         self.lock = Lock()
 1986         self.pb = PostBox()
 1987         self.sync_engine = sync_engine
 1988         self.errnos_ok = resilient_errnos
 1989         for i in range(gconf.get("sync-jobs")):
 1990             t = Thread(target=self.syncjob, args=(i + 1, ))
 1991             t.start()
 1992 
 1993     def syncjob(self, job_id):
 1994         """the life of a worker"""
 1995         while True:
 1996             pb = None
 1997             while True:
 1998                 self.lock.acquire()
 1999                 if self.pb:
 2000                     pb, self.pb = self.pb, PostBox()
 2001                 self.lock.release()
 2002                 if pb:
 2003                     break
 2004                 time.sleep(0.5)
 2005             pb.close()
 2006             start = time.time()
 2007             po = self.sync_engine(pb, self.log_err)
 2008             logging.info(lf("Sync Time Taken",
 2009                             job=job_id,
 2010                             num_files=len(pb),
 2011                             return_code=po.returncode,
 2012                             duration="%.4f" % (time.time() - start)))
 2013 
 2014             if po.returncode == 0:
 2015                 ret = (True, 0)
 2016             elif po.returncode in self.errnos_ok:
 2017                 ret = (False, po.returncode)
 2018             else:
 2019                 po.errfail()
 2020             pb.wakeup(ret)
 2021 
 2022     def add(self, e):
 2023         while True:
 2024             pb = self.pb
 2025             try:
 2026                 pb.append(e)
 2027                 return pb
 2028             except BoxClosedErr:
 2029                 pass
 2030 
 2031     def enable_errorlog(self):
 2032         self.log_err = True
 2033 
 2034     def disable_errorlog(self):
 2035         self.log_err = False