"Fossies" - the Fresh Open Source Software Archive

Member "pyzor-1.0.0/pyzor/engines/mysql.py" (10 Dec 2014, 12461 Bytes) of package /linux/privat/pyzor-1.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "mysql.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 0.9.0_vs_1.0.0.

    1 """MySQLdb database engine."""
    2 
    3 import time
    4 import logging
    5 import datetime
    6 import itertools
    7 import functools
    8 import threading
    9 
   10 try:
   11     import Queue
   12 except ImportError:
   13     import queue as Queue
   14 
   15 try:
   16     import MySQLdb
   17     import MySQLdb.cursors
   18     _has_mysql = True
   19 except ImportError:
   20     _has_mysql = False
   21 
   22 from pyzor.engines.common import *
   23 
   24 
   25 class MySQLDBHandle(BaseEngine):
   26     absolute_source = False
   27     handles_one_step = True
   28     # The table must already exist, and have this schema:
   29     #   CREATE TABLE `public` (
   30     #   `digest` char(40) default NULL,
   31     #   `r_count` int(11) default NULL,
   32     #   `wl_count` int(11) default NULL,
   33     #   `r_entered` datetime default NULL,
   34     #   `wl_entered` datetime default NULL,
   35     #   `r_updated` datetime default NULL,
   36     #   `wl_updated` datetime default NULL,
   37     #   PRIMARY KEY  (`digest`)
   38     #   )
   39     # XXX Re-organising might be faster with a r_updated index.  However,
   40     # XXX the re-organisation time isn't that important, and that would
   41     # XXX (slightly) slow down all inserts, so we leave it for now.
   42     reorganize_period = 3600 * 24  # 1 day
   43     reconnect_period = 60  # seconds
   44     log = logging.getLogger("pyzord")
   45 
   46     def __init__(self, fn, mode, max_age=None):
   47         self.max_age = max_age
   48         self.db = None
   49         # The 'fn' is host,user,password,db,table.  We ignore mode.
   50         # We store the authentication details so that we can reconnect if
   51         # necessary.
   52         self.host, self.user, self.passwd, self.db_name, \
   53             self.table_name = fn.split(",")
   54         self.last_connect_attempt = 0  # We have never connected.
   55         self.reorganize_timer = None
   56         self.reconnect()
   57         self.start_reorganizing()
   58 
   59     def _get_new_connection(self):
   60         """Returns a new db connection."""
   61         db = MySQLdb.connect(host=self.host, user=self.user,
   62                              db=self.db_name, passwd=self.passwd)
   63         db.autocommit(True)
   64         return db
   65 
   66     def _check_reconnect_time(self):
   67         if time.time() - self.last_connect_attempt < self.reconnect_period:
   68             # Too soon to reconnect.
   69             self.log.debug("Can't reconnect until %s",
   70                            (time.ctime(self.last_connect_attempt +
   71                                        self.reconnect_period)))
   72             return False
   73         return True
   74 
   75     def reconnect(self):
   76         if not self._check_reconnect_time():
   77             return
   78         if self.db:
   79             try:
   80                 self.db.close()
   81             except MySQLdb.Error:
   82                 pass
   83         try:
   84             self.db = self._get_new_connection()
   85         except MySQLdb.Error as e:
   86             self.log.error("Unable to connect to database: %s", e)
   87             self.db = None
   88         # Keep track of when we connected, so that we don't retry too often.
   89         self.last_connect_attempt = time.time()
   90 
   91     def _iter(self, db):
   92         c = db.cursor(cursorclass=MySQLdb.cursors.SSCursor)
   93         c.execute("SELECT digest FROM %s" % self.table_name)
   94         while True:
   95             row = c.fetchone()
   96             if not row:
   97                 break
   98             yield row[0]
   99         c.close()
  100 
  101     def __iter__(self):
  102         return self._safe_call("iter", self._iter, ())
  103 
  104     def _iteritems(self, db):
  105         c = db.cursor(cursorclass=MySQLdb.cursors.SSCursor)
  106         c.execute("SELECT digest, r_count, wl_count, r_entered, r_updated, "
  107                   "wl_entered, wl_updated FROM %s" % self.table_name)
  108         while True:
  109             row = c.fetchone()
  110             if not row:
  111                 break
  112             yield row[0], Record(*row[1:])
  113         c.close()
  114 
  115     def iteritems(self):
  116         return self._safe_call("iteritems", self._iteritems, ())
  117 
  118     def items(self):
  119         return list(self._safe_call("iteritems", self._iteritems, ()))
  120 
  121     def __del__(self):
  122         """Close the database when the object is no longer needed."""
  123         try:
  124             if self.db:
  125                 self.db.close()
  126         except MySQLdb.Error:
  127             pass
  128 
  129     def _safe_call(self, name, method, args):
  130         try:
  131             return method(*args, db=self.db)
  132         except (MySQLdb.Error, AttributeError) as ex:
  133             self.log.error("%s failed: %s", name, ex)
  134             self.reconnect()
  135             # Retrying just complicates the logic - we don't really care if
  136             # a single query fails (and it's possible that it would fail)
  137             # on the second attempt anyway.  Any exceptions are caught by
  138             # the server, and a 'nice' message provided to the caller.
  139             raise DatabaseError("Database temporarily unavailable.")
  140 
  141     def report(self, keys):
  142         return self._safe_call("report", self._report, (keys,))
  143 
  144     def whitelist(self, keys):
  145         return self._safe_call("whitelist", self._whitelist, (keys,))
  146 
  147     def __getitem__(self, key):
  148         return self._safe_call("getitem", self._really__getitem__, (key,))
  149 
  150     def __setitem__(self, key, value):
  151         return self._safe_call("setitem", self._really__setitem__,
  152                                (key, value))
  153 
  154     def __delitem__(self, key):
  155         return self._safe_call("delitem", self._really__delitem__, (key,))
  156 
  157     def _report(self, keys, db=None):
  158         c = db.cursor()
  159         try:
  160             c.executemany("INSERT INTO %s (digest, r_count, wl_count, "
  161                           "r_entered, r_updated, wl_entered, wl_updated) "
  162                           "VALUES (%%s, 1, 0, NOW(), NOW(), NOW(), NOW()) ON "
  163                           "DUPLICATE KEY UPDATE r_count=r_count+1, "
  164                           "r_updated=NOW()" % self.table_name,
  165                           itertools.imap(lambda key: (key,), keys))
  166         finally:
  167             c.close()
  168 
  169     def _whitelist(self, keys, db=None):
  170         c = db.cursor()
  171         try:
  172             c.executemany("INSERT INTO %s (digest, r_count, wl_count, "
  173                           "r_entered, r_updated, wl_entered, wl_updated) "
  174                           "VALUES (%%s, 0, 1, NOW(), NOW(), NOW(), NOW()) ON "
  175                           "DUPLICATE KEY UPDATE wl_count=wl_count+1, "
  176                           "wl_updated=NOW()" % self.table_name,
  177                           itertools.imap(lambda key: (key,), keys))
  178         finally:
  179             c.close()
  180 
  181     def _really__getitem__(self, key, db=None):
  182         """__getitem__ without the exception handling."""
  183         c = db.cursor()
  184         # The order here must match the order of the arguments to the
  185         # Record constructor.
  186         c.execute("SELECT r_count, wl_count, r_entered, r_updated, "
  187                   "wl_entered, wl_updated FROM %s WHERE digest=%%s" %
  188                   self.table_name, (key,))
  189         try:
  190             try:
  191                 return Record(*c.fetchone())
  192             except TypeError:
  193                 # fetchone() returned None, i.e. there is no such record
  194                 raise KeyError()
  195         finally:
  196             c.close()
  197 
  198     def _really__setitem__(self, key, value, db=None):
  199         """__setitem__ without the exception handling."""
  200         c = db.cursor()
  201         try:
  202             c.execute("INSERT INTO %s (digest, r_count, wl_count, "
  203                       "r_entered, r_updated, wl_entered, wl_updated) "
  204                       "VALUES (%%s, %%s, %%s, %%s, %%s, %%s, %%s) ON "
  205                       "DUPLICATE KEY UPDATE r_count=%%s, wl_count=%%s, "
  206                       "r_entered=%%s, r_updated=%%s, wl_entered=%%s, "
  207                       "wl_updated=%%s" % self.table_name,
  208                       (key, value.r_count, value.wl_count, value.r_entered,
  209                        value.r_updated, value.wl_entered, value.wl_updated,
  210                        value.r_count, value.wl_count, value.r_entered,
  211                        value.r_updated, value.wl_entered, value.wl_updated))
  212         finally:
  213             c.close()
  214 
  215     def _really__delitem__(self, key, db=None):
  216         """__delitem__ without the exception handling."""
  217         c = db.cursor()
  218         try:
  219             c.execute("DELETE FROM %s WHERE digest=%%s" % self.table_name,
  220                       (key,))
  221         finally:
  222             c.close()
  223 
  224     def start_reorganizing(self):
  225         if not self.max_age:
  226             return
  227         self.log.debug("reorganizing the database")
  228         breakpoint = (datetime.datetime.now() -
  229                       datetime.timedelta(seconds=self.max_age))
  230         db = self._get_new_connection()
  231         c = db.cursor()
  232         try:
  233             c.execute("DELETE FROM %s WHERE r_updated<%%s" %
  234                       self.table_name, (breakpoint,))
  235         except (MySQLdb.Error, AttributeError) as e:
  236             self.log.warn("Unable to reorganise: %s", e)
  237         finally:
  238             c.close()
  239             db.close()
  240         self.reorganize_timer = threading.Timer(self.reorganize_period,
  241                                                 self.start_reorganizing)
  242         self.reorganize_timer.setDaemon(True)
  243         self.reorganize_timer.start()
  244 
  245     @classmethod
  246     def get_prefork_connections(cls, fn, mode, max_age=None):
  247         """Yields a number of database connections suitable for a Pyzor
  248         pre-fork server.
  249         """
  250         # Only run the reorganize timer in the first child process.
  251         yield functools.partial(cls, fn, mode, max_age=max_age)
  252         while True:
  253             yield functools.partial(cls, fn, mode, max_age=None)
  254 
  255 
  256 class ThreadedMySQLDBHandle(MySQLDBHandle):
  257     def __init__(self, fn, mode, max_age=None, bound=None):
  258         self.bound = bound
  259         if self.bound:
  260             self.db_queue = Queue.Queue()
  261         MySQLDBHandle.__init__(self, fn, mode, max_age=max_age)
  262 
  263     def _get_connection(self):
  264         if self.bound:
  265             return self.db_queue.get()
  266         else:
  267             return self._get_new_connection()
  268 
  269     def _release_connection(self, db):
  270         if self.bound:
  271             self.db_queue.put(db)
  272         else:
  273             db.close()
  274 
  275     def _safe_call(self, name, method, args):
  276         db = self._get_connection()
  277         try:
  278             return method(*args, db=db)
  279         except (MySQLdb.Error, AttributeError) as ex:
  280             self.log.error("%s failed: %s", name, ex)
  281             if not self.bound:
  282                 raise DatabaseError("Database temporarily unavailable.")
  283             try:
  284                 # Connection might be timeout, ping and retry
  285                 db.ping(True)
  286                 return method(*args, db=db)
  287             except (MySQLdb.Error, AttributeError) as ex:
  288                 # attempt a new connection, if we can retry
  289                 db = self._reconnect(db)
  290                 raise DatabaseError("Database temporarily unavailable.")
  291         finally:
  292             self._release_connection(db)
  293 
  294     def reconnect(self):
  295         if not self.bound:
  296             return
  297         for _ in xrange(self.bound):
  298             self.db_queue.put(self._get_new_connection())
  299 
  300     def _reconnect(self, db):
  301         if not self._check_reconnect_time():
  302             return db
  303         else:
  304             self.last_connect_attempt = time.time()
  305             return self._get_new_connection()
  306 
  307     def __del__(self):
  308         if not self.bound:
  309             return
  310         for db in iter(self.db_queue.get_nowait):
  311             try:
  312                 db.close()
  313             except MySQLdb.Error:
  314                 continue
  315             except Queue.Empty:
  316                 break
  317 
  318 
  319 class ProcessMySQLDBHandle(MySQLDBHandle):
  320     def __init__(self, fn, mode, max_age=None):
  321         MySQLDBHandle.__init__(self, fn, mode, max_age=max_age)
  322 
  323     def reconnect(self):
  324         pass
  325 
  326     def __del__(self):
  327         pass
  328 
  329     def _safe_call(self, name, method, args):
  330         db = None
  331         try:
  332             db = self._get_new_connection()
  333             return method(*args, db=db)
  334         except (MySQLdb.Error, AttributeError) as ex:
  335             self.log.error("%s failed: %s", name, ex)
  336             raise DatabaseError("Database temporarily unavailable.")
  337         finally:
  338             if db is not None:
  339                 db.close()
  340 
  341 if not _has_mysql:
  342     handle = DBHandle(single_threaded=None,
  343                       multi_threaded=None,
  344                       multi_processing=None,
  345                       prefork=None)
  346 else:
  347     handle = DBHandle(single_threaded=MySQLDBHandle,
  348                       multi_threaded=ThreadedMySQLDBHandle,
  349                       multi_processing=ProcessMySQLDBHandle,
  350                       prefork=MySQLDBHandle)