"Fossies" - the Fresh Open Source Software Archive

Member "PURELIB/trac/db/postgres_backend.py" (27 Aug 2019, 17100 Bytes) of package /windows/misc/Trac-1.4.win32.exe:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the last Fossies "Diffs" side-by-side code changes report for "postgres_backend.py": 1.3.5_vs_1.3.6.

    1 # -*- coding: utf-8 -*-
    2 #
    3 # Copyright (C) 2005-2019 Edgewall Software
    4 # Copyright (C) 2005 Christopher Lenz <cmlenz@gmx.de>
    5 # All rights reserved.
    6 #
    7 # This software is licensed as described in the file COPYING, which
    8 # you should have received as part of this distribution. The terms
    9 # are also available at https://trac.edgewall.org/wiki/TracLicense.
   10 #
   11 # This software consists of voluntary contributions made by many
   12 # individuals. For the exact contribution history, see the revision
   13 # history and logs, available at https://trac.edgewall.org/log/.
   14 #
   15 # Author: Christopher Lenz <cmlenz@gmx.de>
   16 
   17 from ctypes.util import find_library
   18 import ctypes
   19 import os
   20 import re
   21 from pkg_resources import DistributionNotFound
   22 
   23 from trac.core import *
   24 from trac.config import Option
   25 from trac.db.api import ConnectionBase, IDatabaseConnector, \
   26                         parse_connection_uri
   27 from trac.db.util import ConnectionWrapper, IterableCursor
   28 from trac.util import get_pkginfo, lazy
   29 from trac.util.compat import close_fds
   30 from trac.util.html import Markup
   31 from trac.util.text import empty, exception_to_unicode, to_unicode
   32 from trac.util.translation import _
   33 
   34 try:
   35     import psycopg2 as psycopg
   36     import psycopg2.extensions
   37     from psycopg2 import DataError, ProgrammingError
   38     from psycopg2.extensions import register_type, UNICODE, \
   39                                     register_adapter, AsIs, QuotedString
   40 except ImportError:
   41     raise DistributionNotFound('psycopg2>=2.0 or psycopg2-binary', ['Trac'])
   42 else:
   43     register_type(UNICODE)
   44     register_adapter(Markup, lambda markup: QuotedString(unicode(markup)))
   45     register_adapter(type(empty), lambda empty: AsIs("''"))
   46     psycopg2_version = get_pkginfo(psycopg).get('version',
   47                                                 psycopg.__version__)
   48     _libpq_pathname = None
   49     if not hasattr(psycopg, 'libpq_version'):
   50         # search path of libpq only if it is dynamically linked
   51         _f = _match = None
   52         try:
   53             with open(psycopg._psycopg.__file__, 'rb') as _f:
   54                 if os.name != 'nt':
   55                     _match = re.search(
   56                         r'''
   57                             \0(
   58                             (?:/[^/\0]+)*/?
   59                             libpq\.(?:so\.[0-9]+|[0-9]+\.dylib)
   60                             )\0
   61                         ''',
   62                         _f.read(), re.VERBOSE)
   63                     if _match:
   64                         _libpq_pathname = _match.group(1)
   65                 else:
   66                     if re.search(r'\0libpq\.dll\0', _f.read(), re.IGNORECASE):
   67                         _libpq_pathname = find_library('libpq')
   68         except AttributeError:
   69             pass
   70         del _f, _match
   71 
   72 _like_escape_re = re.compile(r'([/_%])')
   73 
   74 # Mapping from "abstract" SQL types to DB-specific types
   75 _type_map = {
   76     'int64': 'bigint',
   77 }
   78 
   79 min_postgresql_version = (9, 1, 0)
   80 
   81 
   82 def assemble_pg_dsn(path, user=None, password=None, host=None, port=None):
   83     """Quote the parameters and assemble the DSN."""
   84     def quote(value):
   85         if not isinstance(value, basestring):
   86             value = unicode(value)
   87         return "'%s'" % value.replace('\\', r'\\').replace("'", r"\'")
   88 
   89     dsn = {'dbname': path, 'user': user, 'password': password, 'host': host,
   90            'port': port}
   91     return ' '.join("%s=%s" % (name, quote(value))
   92                     for name, value in dsn.iteritems() if value)
   93 
   94 
   95 def _quote(identifier):
   96     return '"%s"' % identifier.replace('"', '""')
   97 
   98 
   99 def _version_tuple(ver):
  100     if ver:
  101         major, minor = divmod(ver, 10000)
  102         if major >= 10:
  103             # Extract 10.4 from 100004.
  104             return major, minor
  105         else:
  106             # Extract 9.1.23 from 90123.
  107             minor, patch = divmod(minor, 100)
  108             return major, minor, patch
  109 
  110 
  111 def _version_string(ver):
  112     if ver and not isinstance(ver, tuple):
  113         ver = _version_tuple(ver)
  114     if ver:
  115         return '.'.join(map(str, ver))
  116     else:
  117         return '(unknown)'
  118 
  119 
  120 class PostgreSQLConnector(Component):
  121     """Database connector for PostgreSQL.
  122 
  123     Database URLs should be of the form:
  124     {{{
  125     postgres://user[:password]@host[:port]/database[?schema=my_schema]
  126     }}}
  127     """
  128     implements(IDatabaseConnector)
  129 
  130     required = False
  131 
  132     pg_dump_path = Option('trac', 'pg_dump_path', 'pg_dump',
  133         """Location of pg_dump for Postgres database backups""")
  134 
  135     def __init__(self):
  136         self._postgresql_version = \
  137             'server: (not-connected), client: %s' % \
  138             _version_string(self._client_version)
  139 
  140     # IDatabaseConnector methods
  141 
  142     def get_supported_schemes(self):
  143         yield 'postgres', 1
  144 
  145     def get_connection(self, path, log=None, user=None, password=None,
  146                        host=None, port=None, params={}):
  147         params.setdefault('schema', 'public')
  148         cnx = PostgreSQLConnection(path, log, user, password, host, port,
  149                                    params)
  150         server_ver = _version_string(cnx.server_version)
  151         client_ver = _version_string(self._client_version)
  152         if not self.required:
  153             if cnx.server_version < min_postgresql_version:
  154                 error = _(
  155                     "PostgreSQL version is %(version)s. Minimum required "
  156                     "version is %(min_version)s.",
  157                     version=server_ver,
  158                     min_version=_version_string(min_postgresql_version))
  159                 raise TracError(error)
  160             self._postgresql_version = \
  161                 'server: %s, client: %s' % (server_ver, client_ver)
  162             self.required = True
  163         return cnx
  164 
  165     def get_exceptions(self):
  166         return psycopg
  167 
  168     def init_db(self, path, schema=None, log=None, user=None, password=None,
  169                 host=None, port=None, params={}):
  170         cnx = self.get_connection(path, log, user, password, host, port,
  171                                   params)
  172         cursor = cnx.cursor()
  173         if cnx.schema and cnx.schema != 'public':
  174             cursor.execute('CREATE SCHEMA ' + _quote(cnx.schema))
  175             cursor.execute('SET search_path TO %s', (cnx.schema,))
  176         if schema is None:
  177             from trac.db_default import schema
  178         for table in schema:
  179             for stmt in self.to_sql(table):
  180                 cursor.execute(stmt)
  181         cnx.commit()
  182 
  183     def destroy_db(self, path, log=None, user=None, password=None, host=None,
  184                    port=None, params={}):
  185         cnx = self.get_connection(path, log, user, password, host, port,
  186                                   params)
  187         if cnx.schema and cnx.schema != 'public':
  188             cnx.execute('DROP SCHEMA %s CASCADE' % _quote(cnx.schema))
  189         else:
  190             for table in cnx.get_table_names():
  191                 cnx.execute('DROP TABLE %s' % _quote(table))
  192         cnx.commit()
  193 
  194     def db_exists(self, path, log=None, user=None, password=None, host=None,
  195                   port=None, params={}):
  196         cnx = self.get_connection(path, log, user, password, host, port,
  197                                   params)
  198         cursor = cnx.cursor()
  199         cursor.execute("""
  200             SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname=%s);
  201             """, (cnx.schema,))
  202         return cursor.fetchone()[0]
  203 
  204     def to_sql(self, table):
  205         sql = ['CREATE TABLE %s (' % _quote(table.name)]
  206         coldefs = []
  207         for column in table.columns:
  208             ctype = column.type
  209             ctype = _type_map.get(ctype, ctype)
  210             if column.auto_increment:
  211                 ctype = 'SERIAL'
  212             if len(table.key) == 1 and column.name in table.key:
  213                 ctype += ' PRIMARY KEY'
  214             coldefs.append('    %s %s' % (_quote(column.name), ctype))
  215         if len(table.key) > 1:
  216             coldefs.append('    CONSTRAINT %s PRIMARY KEY (%s)' %
  217                            (_quote(table.name + '_pk'),
  218                             ','.join(_quote(col) for col in table.key)))
  219         sql.append(',\n'.join(coldefs) + '\n)')
  220         yield '\n'.join(sql)
  221         for index in table.indices:
  222             unique = 'UNIQUE' if index.unique else ''
  223             yield 'CREATE %s INDEX %s ON %s (%s)' % \
  224                   (unique,
  225                    _quote('%s_%s_idx' % (table.name, '_'.join(index.columns))),
  226                    _quote(table.name),
  227                    ','.join(_quote(col) for col in index.columns))
  228 
  229     def alter_column_types(self, table, columns):
  230         """Yield SQL statements altering the type of one or more columns of
  231         a table.
  232 
  233         Type changes are specified as a `columns` dict mapping column names
  234         to `(from, to)` SQL type tuples.
  235         """
  236         alterations = []
  237         for name, (from_, to) in sorted(columns.iteritems()):
  238             to = _type_map.get(to, to)
  239             if to != _type_map.get(from_, from_):
  240                 alterations.append((name, to))
  241         if alterations:
  242             yield 'ALTER TABLE %s %s' % \
  243                   (_quote(table),
  244                    ', '.join('ALTER COLUMN %s TYPE %s' % (_quote(name), type_)
  245                              for name, type_ in alterations))
  246 
  247     def backup(self, dest_file):
  248         from subprocess import Popen, PIPE
  249         db_url = self.env.config.get('trac', 'database')
  250         scheme, db_prop = parse_connection_uri(db_url)
  251         db_params = db_prop.setdefault('params', {})
  252         db_params.setdefault('schema', 'public')
  253         db_name = os.path.basename(db_prop['path'])
  254 
  255         args = [self.pg_dump_path, '-C', '--inserts', '-x', '-Z', '8']
  256         if 'user' in db_prop:
  257             args.extend(['-U', db_prop['user']])
  258         host = db_params.get('host', db_prop.get('host'))
  259         if host:
  260             args.extend(['-h', host])
  261             if '/' not in host:
  262                 args.extend(['-p', str(db_prop.get('port', '5432'))])
  263 
  264         # Need quote for -n (--schema) option
  265         args.extend(['-n', '"%s"' % db_params['schema']])
  266 
  267         dest_file += ".gz"
  268         args.extend(['-f', dest_file, db_name])
  269 
  270         environ = os.environ.copy()
  271         if 'password' in db_prop:
  272             environ['PGPASSWORD'] = str(db_prop['password'])
  273         try:
  274             p = Popen(args, env=environ, stderr=PIPE, close_fds=close_fds)
  275         except OSError as e:
  276             raise TracError(_("Unable to run %(path)s: %(msg)s",
  277                               path=self.pg_dump_path,
  278                               msg=exception_to_unicode(e)))
  279         errmsg = p.communicate()[1]
  280         if p.returncode != 0:
  281             raise TracError(_("pg_dump failed: %(msg)s",
  282                               msg=to_unicode(errmsg.strip())))
  283         if not os.path.exists(dest_file):
  284             raise TracError(_("No destination file created"))
  285         return dest_file
  286 
  287     def get_system_info(self):
  288         yield 'PostgreSQL', self._postgresql_version
  289         yield 'psycopg2', psycopg2_version
  290 
  291     @lazy
  292     def _client_version(self):
  293         version = None
  294         if hasattr(psycopg, 'libpq_version'):
  295             version = psycopg.libpq_version()
  296         elif _libpq_pathname:
  297             try:
  298                 lib = ctypes.CDLL(_libpq_pathname)
  299                 version = lib.PQlibVersion()
  300             except Exception as e:
  301                 self.log.warning("Exception caught while retrieving libpq's "
  302                                  "version%s",
  303                                  exception_to_unicode(e, traceback=True))
  304         return _version_tuple(version)
  305 
  306     def _pgdump_version(self):
  307         from subprocess import Popen, PIPE
  308         try:
  309             p = Popen([self.pg_dump_path, '--version'], stdout=PIPE,
  310                       close_fds=close_fds)
  311         except OSError as e:
  312             raise TracError(_("Unable to run %(path)s: %(msg)s",
  313                               path=self.pg_dump_path,
  314                               msg=exception_to_unicode(e)))
  315         return p.communicate()[0]
  316 
  317 
  318 class PostgreSQLConnection(ConnectionBase, ConnectionWrapper):
  319     """Connection wrapper for PostgreSQL."""
  320 
  321     poolable = True
  322 
  323     def __init__(self, path, log=None, user=None, password=None, host=None,
  324                  port=None, params={}):
  325         if path.startswith('/'):
  326             path = path[1:]
  327         if 'host' in params:
  328             host = params['host']
  329 
  330         cnx = psycopg.connect(assemble_pg_dsn(path, user, password, host,
  331                                               port))
  332 
  333         cnx.set_client_encoding('UNICODE')
  334         self.schema = params.get('schema', 'public')
  335         if self.schema != 'public':
  336             try:
  337                 cnx.cursor().execute('SET search_path TO %s', (self.schema,))
  338                 cnx.commit()
  339             except (DataError, ProgrammingError):
  340                 # probably the schema doesn't exist
  341                 cnx.rollback()
  342         ConnectionWrapper.__init__(self, cnx, log)
  343 
  344     def cursor(self):
  345         return IterableCursor(self.cnx.cursor(), self.log)
  346 
  347     def cast(self, column, type):
  348         # Temporary hack needed for the union of selects in the search module
  349         return 'CAST(%s AS %s)' % (column, _type_map.get(type, type))
  350 
  351     def concat(self, *args):
  352         return '||'.join(args)
  353 
  354     def drop_column(self, table, column):
  355         self.execute("""
  356             ALTER TABLE %s DROP COLUMN IF EXISTS %s
  357             """ % (self.quote(table), self.quote(column)))
  358 
  359     def drop_table(self, table):
  360         self.execute("DROP TABLE IF EXISTS " + self.quote(table))
  361 
  362     def get_column_names(self, table):
  363         rows = self.execute("""
  364             SELECT column_name FROM information_schema.columns
  365             WHERE table_schema=current_schema() AND table_name=%s
  366             ORDER BY ordinal_position
  367             """, (table,))
  368         return [row[0] for row in rows]
  369 
  370     def get_last_id(self, cursor, table, column='id'):
  371         cursor.execute("SELECT CURRVAL(%s)",
  372                        (self.quote(self._sequence_name(table, column)),))
  373         return cursor.fetchone()[0]
  374 
  375     def get_sequence_names(self):
  376         seqs = [name[:-len('_id_seq')] for name, in self.execute("""
  377                 SELECT c.relname
  378                 FROM pg_class c
  379                 INNER JOIN pg_namespace n ON c.relnamespace = n.oid
  380                 WHERE n.nspname = ANY (current_schemas(false))
  381                 AND c.relkind='S' AND c.relname LIKE %s ESCAPE '!'
  382                 """, ('%!_id!_seq',))]
  383         return sorted(name for name in seqs if name in self.get_table_names())
  384 
  385     def get_table_names(self):
  386         rows = self.execute("""
  387             SELECT table_name FROM information_schema.tables
  388             WHERE table_schema=current_schema()""")
  389         return [row[0] for row in rows]
  390 
  391     def has_table(self, table):
  392         rows = self.execute("""
  393             SELECT EXISTS (SELECT * FROM information_schema.columns
  394                            WHERE table_schema=current_schema()
  395                            AND table_name=%s)
  396             """, (table,))
  397         return rows[0][0]
  398 
  399     def like(self):
  400         return "ILIKE %s ESCAPE '/'"
  401 
  402     def like_escape(self, text):
  403         return _like_escape_re.sub(r'/\1', text)
  404 
  405     def ping(self):
  406         cursor = self.cnx.cursor()
  407         cursor.execute('SELECT 1')
  408 
  409     def prefix_match(self):
  410         return "LIKE %s ESCAPE '/'"
  411 
  412     def prefix_match_value(self, prefix):
  413         return self.like_escape(prefix) + '%'
  414 
  415     def quote(self, identifier):
  416         return _quote(identifier)
  417 
  418     def reset_tables(self):
  419         # reset sequences
  420         cursor = self.cursor()
  421         cursor.execute("""
  422             SELECT sequence_name FROM information_schema.sequences
  423             WHERE sequence_schema=%s
  424             """, (self.schema,))
  425         for seq, in cursor.fetchall():
  426             cursor.execute("ALTER SEQUENCE %s RESTART WITH 1" % seq)
  427         # clear tables
  428         table_names = self.get_table_names()
  429         for name in table_names:
  430             cursor.execute("DELETE FROM " + self.quote(name))
  431         # PostgreSQL supports TRUNCATE TABLE as well
  432         # (see https://www.postgresql.org/docs/9.1/static/sql-truncate.html)
  433         # but on the small tables used here, DELETE is actually much faster
  434         return table_names
  435 
  436     def update_sequence(self, cursor, table, column='id'):
  437         cursor.execute("SELECT SETVAL(%%s, (SELECT MAX(%s) FROM %s))"
  438                        % (self.quote(column), self.quote(table)),
  439                        (self.quote(self._sequence_name(table, column)),))
  440 
  441     def _sequence_name(self, table, column):
  442         return '%s_%s_seq' % (table, column)
  443 
  444     @lazy
  445     def server_version(self):
  446         return _version_tuple(self.cnx.server_version)