"Fossies" - the Fresh Open Source Software Archive

Member "pyzor-1.0.0/pyzor/engines/gdbm_.py" (10 Dec 2014, 6613 Bytes) of package /linux/privat/pyzor-1.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "gdbm_.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 0.9.0_vs_1.0.0.

    1 """Gdbm database engine."""
    2 
    3 try:
    4     import gdbm as gdbm
    5     _has_gdbm = True
    6 except ImportError:
    7     try:
    8         import dbm.gnu as gdbm
    9         _has_gdbm = True
   10     except ImportError:
   11         _has_gdbm = False
   12 
   13 import time
   14 import logging
   15 import datetime
   16 import threading
   17 
   18 from pyzor.engines.common import Record, DBHandle, BaseEngine
   19 
   20 
   21 def _dt_decode(datetime_str):
   22     """Decode a string into a datetime object."""
   23     if datetime_str == 'None':
   24         return None
   25     try:
   26         return datetime.datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S.%f")
   27     except ValueError:
   28         return datetime.datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S")
   29 
   30 
   31 class GdbmDBHandle(BaseEngine):
   32     absolute_source = True
   33     handles_one_step = False
   34 
   35     sync_period = 60
   36     reorganize_period = 3600 * 24  # 1 day
   37     fields = ('r_count', 'r_entered', 'r_updated',
   38               'wl_count', 'wl_entered', 'wl_updated')
   39     _fields = [('r_count', int),
   40                ('r_entered', _dt_decode),
   41                ('r_updated', _dt_decode),
   42                ('wl_count', int),
   43                ('wl_entered', _dt_decode),
   44                ('wl_updated', _dt_decode)]
   45     this_version = '1'
   46     log = logging.getLogger("pyzord")
   47 
   48     def __init__(self, fn, mode, max_age=None):
   49         self.max_age = max_age
   50         self.db = gdbm.open(fn, mode)
   51         self.reorganize_timer = None
   52         self.sync_timer = None
   53         self.start_reorganizing()
   54         self.start_syncing()
   55 
   56     def __iter__(self):
   57         k = self.db.firstkey()
   58         while k is not None:
   59             yield k
   60             k = self.db.nextkey(k)
   61 
   62     def _iteritems(self):
   63         for k in self:
   64             try:
   65                 yield k, self._really_getitem(k)
   66             except Exception as e:
   67                 self.log.warning("Invalid record %s: %s", k, e)
   68 
   69     def iteritems(self):
   70         return self._iteritems()
   71 
   72     def items(self):
   73         return list(self._iteritems())
   74 
   75     def apply_method(self, method, varargs=(), kwargs=None):
   76         if kwargs is None:
   77             kwargs = {}
   78         return apply(method, varargs, kwargs)
   79 
   80     def __getitem__(self, key):
   81         return self.apply_method(self._really_getitem, (key,))
   82 
   83     def _really_getitem(self, key):
   84         return GdbmDBHandle.decode_record(self.db[key])
   85 
   86     def __setitem__(self, key, value):
   87         self.apply_method(self._really_setitem, (key, value))
   88 
   89     def _really_setitem(self, key, value):
   90         self.db[key] = GdbmDBHandle.encode_record(value)
   91 
   92     def __delitem__(self, key):
   93         self.apply_method(self._really_delitem, (key,))
   94 
   95     def _really_delitem(self, key):
   96         del self.db[key]
   97 
   98     def start_syncing(self):
   99         if self.db:
  100             self.apply_method(self._really_sync)
  101         self.sync_timer = threading.Timer(self.sync_period,
  102                                           self.start_syncing)
  103         self.sync_timer.setDaemon(True)
  104         self.sync_timer.start()
  105 
  106     def _really_sync(self):
  107         self.db.sync()
  108 
  109     def start_reorganizing(self):
  110         if not self.max_age:
  111             return
  112         if self.db:
  113             self.apply_method(self._really_reorganize)
  114         self.reorganize_timer = threading.Timer(self.reorganize_period,
  115                                                 self.start_reorganizing)
  116         self.reorganize_timer.setDaemon(True)
  117         self.reorganize_timer.start()
  118 
  119     def _really_reorganize(self):
  120         self.log.debug("reorganizing the database")
  121         key = self.db.firstkey()
  122         breakpoint = time.time() - self.max_age
  123         while key is not None:
  124             rec = self._really_getitem(key)
  125             delkey = None
  126             if int(time.mktime(rec.r_updated.timetuple())) < breakpoint:
  127                 self.log.debug("deleting key %s", key)
  128                 delkey = key
  129             key = self.db.nextkey(key)
  130             if delkey:
  131                 self._really_delitem(delkey)
  132         self.db.reorganize()
  133 
  134     @classmethod
  135     def encode_record(cls, value):
  136         values = [cls.this_version]
  137         values.extend(["%s" % getattr(value, x) for x in cls.fields])
  138         return ",".join(values)
  139 
  140     @classmethod
  141     def decode_record(cls, s):
  142         try:
  143             s = s.decode("utf8")
  144         except UnicodeError:
  145             raise StandardError("don't know how to handle db value %s" %
  146                                 repr(s))
  147         parts = s.split(',')
  148         version = parts[0]
  149         if len(parts) == 3:
  150             dispatch = cls.decode_record_0
  151         elif version == '1':
  152             dispatch = cls.decode_record_1
  153         else:
  154             raise StandardError("don't know how to handle db value %s" %
  155                                 repr(s))
  156         return dispatch(s)
  157 
  158     @staticmethod
  159     def decode_record_0(s):
  160         r = Record()
  161         parts = s.split(',')
  162         fields = ('r_count', 'r_entered', 'r_updated')
  163         assert len(parts) == len(fields)
  164         for i in range(len(parts)):
  165             setattr(r, fields[i], int(parts[i]))
  166         return r
  167 
  168     @classmethod
  169     def decode_record_1(cls, s):
  170         r = Record()
  171         parts = s.split(',')[1:]
  172         assert len(parts) == len(cls.fields)
  173         for part, field in zip(parts, cls._fields):
  174             f, decode = field
  175             setattr(r, f, decode(part))
  176         return r
  177 
  178 
  179 class ThreadedGdbmDBHandle(GdbmDBHandle):
  180     """Like GdbmDBHandle, but handles multi-threaded access."""
  181 
  182     def __init__(self, fn, mode, max_age=None, bound=None):
  183         self.db_lock = threading.Lock()
  184         GdbmDBHandle.__init__(self, fn, mode, max_age=max_age)
  185 
  186     def apply_method(self, method, varargs=(), kwargs=None):
  187         if kwargs is None:
  188             kwargs = {}
  189         with self.db_lock:
  190             return GdbmDBHandle.apply_method(self, method, varargs=varargs,
  191                                              kwargs=kwargs)
  192 
  193 # This won't work because the gdbm object needs to be in shared memory of the
  194 # spawned processes.
  195 # class ProcessGdbmDBHandle(ThreadedGdbmDBHandle):
  196 # def __init__(self, fn, mode, max_age=None, bound=None):
  197 # ThreadedGdbmDBHandle.__init__(self, fn, mode, max_age=max_age,
  198 #                                       bound=bound)
  199 #         self.db_lock = multiprocessing.Lock()
  200 
  201 if not _has_gdbm:
  202     handle = DBHandle(single_threaded=None,
  203                       multi_threaded=None,
  204                       multi_processing=None,
  205                       prefork=None)
  206 else:
  207     handle = DBHandle(single_threaded=GdbmDBHandle,
  208                       multi_threaded=ThreadedGdbmDBHandle,
  209                       multi_processing=None,
  210                       prefork=None)