"Fossies" - the Fresh Open Source Software Archive

Member "mariadb-10.0.38/sql/wsrep_mysqld.cc" (1 Feb 2019, 53862 Bytes) of package /linux/misc/mariadb-galera-10.0.38.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. See also the last Fossies "Diffs" side-by-side code changes report for "wsrep_mysqld.cc": 10.0.36_vs_10.0.37.

    1 /* Copyright 2008-2015 Codership Oy <http://www.codership.com>
    2 
    3    This program is free software; you can redistribute it and/or modify
    4    it under the terms of the GNU General Public License as published by
    5    the Free Software Foundation; version 2 of the License.x1
    6 
    7    This program is distributed in the hope that it will be useful,
    8    but WITHOUT ANY WARRANTY; without even the implied warranty of
    9    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   10    GNU General Public License for more details.
   11 
   12    You should have received a copy of the GNU General Public License
   13    along with this program; if not, write to the Free Software
   14    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
   15 
   16 #include <sql_plugin.h> // SHOW_MY_BOOL
   17 #include <mysqld.h>
   18 #include <sql_class.h>
   19 #include <sql_parse.h>
   20 #include <sql_base.h> /* find_temporary_table() */
   21 #include "wsrep_priv.h"
   22 #include "wsrep_thd.h"
   23 #include "wsrep_sst.h"
   24 #include "wsrep_utils.h"
   25 #include "wsrep_var.h"
   26 #include "wsrep_binlog.h"
   27 #include "wsrep_applier.h"
   28 #include "wsrep_xid.h"
   29 #include <cstdio>
   30 #include <cstdlib>
   31 #include "log_event.h"
   32 #include <slave.h>
   33 
   34 wsrep_t *wsrep                  = NULL;
   35 my_bool wsrep_emulate_bin_log   = FALSE; // activating parts of binlog interface
   36 #ifdef GTID_SUPPORT
   37 /* Sidno in global_sid_map corresponding to group uuid */
   38 rpl_sidno wsrep_sidno= -1;
   39 #endif /* GTID_SUPPORT */
   40 my_bool wsrep_preordered_opt= FALSE;
   41 
   42 /*
   43  * Begin configuration options and their default values
   44  */
   45 
   46 const char* wsrep_data_home_dir = NULL;
   47 const char* wsrep_dbug_option   = "";
   48 
   49 long    wsrep_slave_threads            = 1; // # of slave action appliers wanted
   50 int     wsrep_slave_count_change       = 0; // # of appliers to stop or start
   51 my_bool wsrep_debug                    = 0; // enable debug level logging
   52 my_bool wsrep_convert_LOCK_to_trx      = 1; // convert locking sessions to trx
   53 ulong   wsrep_retry_autocommit         = 5; // retry aborted autocommit trx
   54 my_bool wsrep_auto_increment_control   = 1; // control auto increment variables
   55 my_bool wsrep_drupal_282555_workaround = 1; // retry autoinc insert after dupkey
   56 my_bool wsrep_incremental_data_collection = 0; // incremental data collection
   57 ulong   wsrep_max_ws_size              = 1073741824UL;//max ws (RBR buffer) size
   58 ulong   wsrep_max_ws_rows              = 65536; // max number of rows in ws
   59 int     wsrep_to_isolation             = 0; // # of active TO isolation threads
   60 my_bool wsrep_certify_nonPK            = 1; // certify, even when no primary key
   61 ulong   wsrep_certification_rules      = WSREP_CERTIFICATION_RULES_STRICT;
   62 long    wsrep_max_protocol_version     = 3; // maximum protocol version to use
   63 ulong   wsrep_forced_binlog_format     = BINLOG_FORMAT_UNSPEC;
   64 my_bool wsrep_recovery                 = 0; // recovery
   65 my_bool wsrep_replicate_myisam         = 0; // enable myisam replication
   66 my_bool wsrep_log_conflicts            = 0;
   67 ulong   wsrep_mysql_replication_bundle = 0;
   68 my_bool wsrep_desync                   = 0; // desynchronize the node from the
   69                                             // cluster
   70 my_bool wsrep_load_data_splitting      = 1; // commit load data every 10K intervals
   71 my_bool wsrep_restart_slave            = 0; // should mysql slave thread be
   72                                             // restarted, if node joins back
   73 my_bool wsrep_restart_slave_activated  = 0; // node has dropped, and slave
   74                                             // restart will be needed
   75 my_bool wsrep_slave_UK_checks          = 0; // slave thread does UK checks
   76 my_bool wsrep_slave_FK_checks          = 0; // slave thread does FK checks
   77 // Allow reads even if the node is not in the primary component.
   78 bool wsrep_dirty_reads                 = false;
   79 
   80 /*
   81   Set during the creation of first wsrep applier and rollback threads.
   82   Since these threads are critical, abort if the thread creation fails.
   83 */
   84 my_bool wsrep_creating_startup_threads = 0;
   85 
   86 /*
   87  * End configuration options
   88  */
   89 
   90 /*
   91  * Other wsrep global variables.
   92  */
   93 my_bool wsrep_inited                   = 0; // initialized ?
   94 
   95 static wsrep_uuid_t cluster_uuid = WSREP_UUID_UNDEFINED;
   96 static char         cluster_uuid_str[40]= { 0, };
   97 static const char*  cluster_status_str[WSREP_VIEW_MAX] =
   98 {
   99     "Primary",
  100     "non-Primary",
  101     "Disconnected"
  102 };
  103 
  104 static char provider_name[256]= { 0, };
  105 static char provider_version[256]= { 0, };
  106 static char provider_vendor[256]= { 0, };
  107 
  108 /*
  109  * wsrep status variables
  110  */
  111 my_bool     wsrep_connected          = FALSE;
  112 my_bool     wsrep_ready              = FALSE; // node can accept queries
  113 const char* wsrep_cluster_state_uuid = cluster_uuid_str;
  114 long long   wsrep_cluster_conf_id    = WSREP_SEQNO_UNDEFINED;
  115 const char* wsrep_cluster_status = cluster_status_str[WSREP_VIEW_DISCONNECTED];
  116 long        wsrep_cluster_size       = 0;
  117 long        wsrep_local_index        = -1;
  118 long long   wsrep_local_bf_aborts    = 0;
  119 const char* wsrep_provider_name      = provider_name;
  120 const char* wsrep_provider_version   = provider_version;
  121 const char* wsrep_provider_vendor    = provider_vendor;
  122 /* End wsrep status variables */
  123 
  124 wsrep_uuid_t     local_uuid   = WSREP_UUID_UNDEFINED;
  125 wsrep_seqno_t    local_seqno  = WSREP_SEQNO_UNDEFINED;
  126 wsp::node_status local_status;
  127 long             wsrep_protocol_version = 3;
  128 
  129 // Boolean denoting if server is in initial startup phase. This is needed
  130 // to make sure that main thread waiting in wsrep_sst_wait() is signaled
  131 // if there was no state gap on receiving first view event.
  132 static my_bool   wsrep_startup = TRUE;
  133 
  134 
  135 static void wsrep_log_cb(wsrep_log_level_t level, const char *msg) {
  136   switch (level) {
  137   case WSREP_LOG_INFO:
  138     sql_print_information("WSREP: %s", msg);
  139     break;
  140   case WSREP_LOG_WARN:
  141     sql_print_warning("WSREP: %s", msg);
  142     break;
  143   case WSREP_LOG_ERROR:
  144   case WSREP_LOG_FATAL:
  145     sql_print_error("WSREP: %s", msg);
  146     break;
  147   case WSREP_LOG_DEBUG:
  148     if (wsrep_debug) sql_print_information ("[Debug] WSREP: %s", msg);
  149   default:
  150     break;
  151   }
  152 }
  153 
  154 static void wsrep_log_states (wsrep_log_level_t   const level,
  155                               const wsrep_uuid_t* const group_uuid,
  156                               wsrep_seqno_t       const group_seqno,
  157                               const wsrep_uuid_t* const node_uuid,
  158                               wsrep_seqno_t       const node_seqno)
  159 {
  160   char uuid_str[37];
  161   char msg[256];
  162 
  163   wsrep_uuid_print (group_uuid, uuid_str, sizeof(uuid_str));
  164   snprintf (msg, 255, "WSREP: Group state: %s:%lld",
  165             uuid_str, (long long)group_seqno);
  166   wsrep_log_cb (level, msg);
  167 
  168   wsrep_uuid_print (node_uuid, uuid_str, sizeof(uuid_str));
  169   snprintf (msg, 255, "WSREP: Local state: %s:%lld",
  170             uuid_str, (long long)node_seqno);
  171   wsrep_log_cb (level, msg);
  172 }
  173 
  174 #ifdef GTID_SUPPORT
  175 void wsrep_init_sidno(const wsrep_uuid_t& wsrep_uuid)
  176 {
  177   /* generate new Sid map entry from inverted uuid */
  178   rpl_sid sid;
  179   wsrep_uuid_t ltid_uuid;
  180 
  181   for (size_t i= 0; i < sizeof(ltid_uuid.data); ++i)
  182   {
  183       ltid_uuid.data[i] = ~wsrep_uuid.data[i];
  184   }
  185 
  186   sid.copy_from(ltid_uuid.data);
  187   global_sid_lock->wrlock();
  188   wsrep_sidno= global_sid_map->add_sid(sid);
  189   WSREP_INFO("Initialized wsrep sidno %d", wsrep_sidno);
  190   global_sid_lock->unlock();
  191 }
  192 #endif /* GTID_SUPPORT */
  193 
  194 static wsrep_cb_status_t
  195 wsrep_view_handler_cb (void*                    app_ctx,
  196                        void*                    recv_ctx,
  197                        const wsrep_view_info_t* view,
  198                        const char*              state,
  199                        size_t                   state_len,
  200                        void**                   sst_req,
  201                        size_t*                  sst_req_len)
  202 {
  203   *sst_req     = NULL;
  204   *sst_req_len = 0;
  205 
  206   wsrep_member_status_t new_status= local_status.get();
  207 
  208   if (memcmp(&cluster_uuid, &view->state_id.uuid, sizeof(wsrep_uuid_t)))
  209   {
  210     memcpy(&cluster_uuid, &view->state_id.uuid, sizeof(cluster_uuid));
  211 
  212     wsrep_uuid_print (&cluster_uuid, cluster_uuid_str,
  213                       sizeof(cluster_uuid_str));
  214   }
  215 
  216   wsrep_cluster_conf_id= view->view;
  217   wsrep_cluster_status= cluster_status_str[view->status];
  218   wsrep_cluster_size= view->memb_num;
  219   wsrep_local_index= view->my_idx;
  220 
  221   WSREP_INFO("New cluster view: global state: %s:%lld, view# %lld: %s, "
  222              "number of nodes: %ld, my index: %ld, protocol version %d",
  223              wsrep_cluster_state_uuid, (long long)view->state_id.seqno,
  224              (long long)wsrep_cluster_conf_id, wsrep_cluster_status,
  225              wsrep_cluster_size, wsrep_local_index, view->proto_ver);
  226 
  227   /* Proceed further only if view is PRIMARY */
  228   if (WSREP_VIEW_PRIMARY != view->status)
  229   {
  230 #ifdef HAVE_QUERY_CACHE
  231     // query cache must be initialised by now
  232     query_cache.flush();
  233 #endif /* HAVE_QUERY_CACHE */
  234 
  235     wsrep_ready_set(FALSE);
  236     new_status= WSREP_MEMBER_UNDEFINED;
  237     /* Always record local_uuid and local_seqno in non-prim since this
  238      * may lead to re-initializing provider and start position is
  239      * determined according to these variables */
  240     // WRONG! local_uuid should be the last primary configuration uuid we were
  241     // a member of. local_seqno should be updated in commit calls.
  242     // local_uuid= cluster_uuid;
  243     // local_seqno= view->first - 1;
  244     goto out;
  245   }
  246 
  247   switch (view->proto_ver)
  248   {
  249   case 0:
  250   case 1:
  251   case 2:
  252   case 3:
  253       // version change
  254       if (view->proto_ver != wsrep_protocol_version)
  255       {
  256           my_bool wsrep_ready_saved= wsrep_ready_get();
  257           wsrep_ready_set(FALSE);
  258           WSREP_INFO("closing client connections for "
  259                      "protocol change %ld -> %d",
  260                      wsrep_protocol_version, view->proto_ver);
  261           wsrep_close_client_connections(TRUE);
  262           wsrep_protocol_version= view->proto_ver;
  263           wsrep_ready_set(wsrep_ready_saved);
  264       }
  265       break;
  266   default:
  267       WSREP_ERROR("Unsupported application protocol version: %d",
  268                   view->proto_ver);
  269       unireg_abort(1);
  270   }
  271 
  272   if (view->state_gap)
  273   {
  274     WSREP_WARN("Gap in state sequence. Need state transfer.");
  275 
  276     /* After that wsrep will call wsrep_sst_prepare. */
  277     /* keep ready flag 0 until we receive the snapshot */
  278     wsrep_ready_set(FALSE);
  279 
  280     /* Close client connections to ensure that they don't interfere
  281      * with SST. Necessary only if storage engines are initialized
  282      * before SST.
  283      * TODO: Just killing all ongoing transactions should be enough
  284      * since wsrep_ready is OFF and no new transactions can start.
  285      */
  286     if (!wsrep_before_SE())
  287     {
  288         WSREP_DEBUG("[debug]: closing client connections for PRIM");
  289         wsrep_close_client_connections(TRUE);
  290     }
  291 
  292     ssize_t const req_len= wsrep_sst_prepare (sst_req);
  293 
  294     if (req_len < 0)
  295     {
  296       WSREP_ERROR("SST preparation failed: %zd (%s)", -req_len,
  297                   strerror(-req_len));
  298       new_status= WSREP_MEMBER_UNDEFINED;
  299     }
  300     else
  301     {
  302       assert(sst_req != NULL);
  303       *sst_req_len= req_len;
  304       new_status= WSREP_MEMBER_JOINER;
  305     }
  306   }
  307   else
  308   {
  309     /*
  310      *  NOTE: Initialize wsrep_group_uuid here only if it wasn't initialized
  311      *  before - OR - it was reinitilized on startup (lp:992840)
  312      */
  313     if (wsrep_startup)
  314     {
  315       if (wsrep_before_SE())
  316       {
  317         wsrep_SE_init_grab();
  318         // Signal mysqld init thread to continue
  319         wsrep_sst_complete (&cluster_uuid, view->state_id.seqno, false);
  320         // and wait for SE initialization
  321         wsrep_SE_init_wait();
  322       }
  323       else
  324       {
  325         local_uuid=  cluster_uuid;
  326         local_seqno= view->state_id.seqno;
  327       }
  328       /* Init storage engine XIDs from first view */
  329       wsrep_set_SE_checkpoint(local_uuid, local_seqno);
  330 #ifdef GTID_SUPPORT
  331       wsrep_init_sidno(local_uuid);
  332 #endif /* GTID_SUPPORT */
  333       new_status= WSREP_MEMBER_JOINED;
  334     }
  335 
  336     // just some sanity check
  337     if (memcmp (&local_uuid, &cluster_uuid, sizeof (wsrep_uuid_t)))
  338     {
  339       WSREP_ERROR("Undetected state gap. Can't continue.");
  340       wsrep_log_states(WSREP_LOG_FATAL, &cluster_uuid, view->state_id.seqno,
  341                        &local_uuid, -1);
  342       unireg_abort(1);
  343     }
  344   }
  345 
  346   if (wsrep_auto_increment_control)
  347   {
  348     global_system_variables.auto_increment_offset= view->my_idx + 1;
  349     global_system_variables.auto_increment_increment= view->memb_num;
  350   }
  351 
  352   { /* capabilities may be updated on new configuration */
  353     uint64_t const caps(wsrep->capabilities (wsrep));
  354 
  355     my_bool const idc((caps & WSREP_CAP_INCREMENTAL_WRITESET) != 0);
  356     if (TRUE == wsrep_incremental_data_collection && FALSE == idc)
  357     {
  358       WSREP_WARN("Unsupported protocol downgrade: "
  359                  "incremental data collection disabled. Expect abort.");
  360     }
  361     wsrep_incremental_data_collection = idc;
  362   }
  363 
  364 out:
  365   if (view->status == WSREP_VIEW_PRIMARY) wsrep_startup= FALSE;
  366   local_status.set(new_status, view);
  367 
  368   return WSREP_CB_SUCCESS;
  369 }
  370 
  371 my_bool wsrep_ready_set (my_bool x)
  372 {
  373   WSREP_DEBUG("Setting wsrep_ready to %d", x);
  374   if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort();
  375   my_bool ret= (wsrep_ready != x);
  376   if (ret)
  377   {
  378     wsrep_ready= x;
  379     mysql_cond_signal (&COND_wsrep_ready);
  380   }
  381   mysql_mutex_unlock (&LOCK_wsrep_ready);
  382   return ret;
  383 }
  384 
  385 my_bool wsrep_ready_get (void)
  386 {
  387   if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort();
  388   my_bool ret= wsrep_ready;
  389   mysql_mutex_unlock (&LOCK_wsrep_ready);
  390   return ret;
  391 }
  392 
  393 int wsrep_show_ready(THD *thd, SHOW_VAR *var, char *buff)
  394 {
  395   var->type= SHOW_MY_BOOL;
  396   var->value= buff;
  397   *((my_bool *)buff)= wsrep_ready_get();
  398   return 0;
  399 }
  400 
  401 // Wait until wsrep has reached ready state
  402 void wsrep_ready_wait ()
  403 {
  404   if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort();
  405   while (!wsrep_ready)
  406   {
  407     WSREP_INFO("Waiting to reach ready state");
  408     mysql_cond_wait (&COND_wsrep_ready, &LOCK_wsrep_ready);
  409   }
  410   WSREP_INFO("ready state reached");
  411   mysql_mutex_unlock (&LOCK_wsrep_ready);
  412 }
  413 
  414 static void wsrep_synced_cb(void* app_ctx)
  415 {
  416   WSREP_INFO("Synchronized with group, ready for connections");
  417   my_bool signal_main= wsrep_ready_set(TRUE);
  418   local_status.set(WSREP_MEMBER_SYNCED);
  419 
  420   if (signal_main)
  421   {
  422       wsrep_SE_init_grab();
  423       // Signal mysqld init thread to continue
  424       wsrep_sst_complete (&local_uuid, local_seqno, false);
  425       // and wait for SE initialization
  426       wsrep_SE_init_wait();
  427   }
  428   if (wsrep_restart_slave_activated)
  429   {
  430     int rcode;
  431     WSREP_INFO("MySQL slave restart");
  432     wsrep_restart_slave_activated= FALSE;
  433 
  434     mysql_mutex_lock(&LOCK_active_mi);
  435     if ((rcode = start_slave_threads(1 /* need mutex */,
  436                             0 /* no wait for start*/,
  437                             active_mi,
  438                             master_info_file,
  439                             relay_log_info_file,
  440                             SLAVE_SQL)))
  441     {
  442       WSREP_WARN("Failed to create slave threads: %d", rcode);
  443     }
  444     mysql_mutex_unlock(&LOCK_active_mi);
  445 
  446   }
  447 }
  448 
  449 static void wsrep_init_position()
  450 {
  451   /* read XIDs from storage engines */
  452   wsrep_uuid_t uuid;
  453   wsrep_seqno_t seqno;
  454   wsrep_get_SE_checkpoint(uuid, seqno);
  455 
  456   if (!memcmp(&uuid, &WSREP_UUID_UNDEFINED, sizeof(wsrep_uuid_t)))
  457   {
  458     WSREP_INFO("Read nil XID from storage engines, skipping position init");
  459     return;
  460   }
  461 
  462   char uuid_str[40] = {0, };
  463   wsrep_uuid_print(&uuid, uuid_str, sizeof(uuid_str));
  464   WSREP_INFO("Initial position: %s:%lld", uuid_str, (long long)seqno);
  465 
  466   if (!memcmp(&local_uuid, &WSREP_UUID_UNDEFINED, sizeof(local_uuid)) &&
  467       local_seqno == WSREP_SEQNO_UNDEFINED)
  468   {
  469     // Initial state
  470     local_uuid= uuid;
  471     local_seqno= seqno;
  472   }
  473   else if (memcmp(&local_uuid, &uuid, sizeof(local_uuid)) ||
  474            local_seqno != seqno)
  475   {
  476     WSREP_WARN("Initial position was provided by configuration or SST, "
  477                "avoiding override");
  478   }
  479 }
  480 
  481 extern char* my_bind_addr_str;
  482 
  483 int wsrep_init()
  484 {
  485   int rcode= -1;
  486   DBUG_ASSERT(wsrep_inited == 0);
  487 
  488   wsrep_ready_set(FALSE);
  489   assert(wsrep_provider);
  490 
  491   wsrep_init_position();
  492 
  493   if ((rcode= wsrep_load(wsrep_provider, &wsrep, wsrep_log_cb)) != WSREP_OK)
  494   {
  495     if (strcasecmp(wsrep_provider, WSREP_NONE))
  496     {
  497       WSREP_ERROR("wsrep_load(%s) failed: %s (%d). Reverting to no provider.",
  498                   wsrep_provider, strerror(rcode), rcode);
  499       strcpy((char*)wsrep_provider, WSREP_NONE); // damn it's a dirty hack
  500       return wsrep_init();
  501     }
  502     else /* this is for recursive call above */
  503     {
  504       WSREP_ERROR("Could not revert to no provider: %s (%d). Need to abort.",
  505                   strerror(rcode), rcode);
  506       unireg_abort(1);
  507     }
  508   }
  509 
  510   if (!WSREP_PROVIDER_EXISTS)
  511   {
  512     // enable normal operation in case no provider is specified
  513     wsrep_ready_set(TRUE);
  514     wsrep_inited= 1;
  515     global_system_variables.wsrep_on = 0;
  516     wsrep_init_args args;
  517     args.logger_cb = wsrep_log_cb;
  518     args.options = (wsrep_provider_options) ?
  519             wsrep_provider_options : "";
  520     rcode = wsrep->init(wsrep, &args);
  521     if (rcode)
  522     {
  523       DBUG_PRINT("wsrep",("wsrep::init() failed: %d", rcode));
  524       WSREP_ERROR("wsrep::init() failed: %d, must shutdown", rcode);
  525       wsrep->free(wsrep);
  526       free(wsrep);
  527       wsrep = NULL;
  528     }
  529     return rcode;
  530   }
  531   else
  532   {
  533     global_system_variables.wsrep_on = 1;
  534     strncpy(provider_name,
  535             wsrep->provider_name,    sizeof(provider_name) - 1);
  536     strncpy(provider_version,
  537             wsrep->provider_version, sizeof(provider_version) - 1);
  538     strncpy(provider_vendor,
  539             wsrep->provider_vendor,  sizeof(provider_vendor) - 1);
  540   }
  541 
  542   if (!wsrep_data_home_dir || strlen(wsrep_data_home_dir) == 0)
  543     wsrep_data_home_dir = mysql_real_data_home;
  544 
  545   char node_addr[512]= { 0, };
  546   size_t const node_addr_max= sizeof(node_addr) - 1;
  547   if (!wsrep_node_address || !strcmp(wsrep_node_address, ""))
  548   {
  549     size_t const ret= wsrep_guess_ip(node_addr, node_addr_max);
  550     if (!(ret > 0 && ret < node_addr_max))
  551     {
  552       WSREP_WARN("Failed to guess base node address. Set it explicitly via "
  553                  "wsrep_node_address.");
  554       node_addr[0]= '\0';
  555     }
  556   }
  557   else
  558   {
  559     strncpy(node_addr, wsrep_node_address, node_addr_max);
  560   }
  561 
  562   char inc_addr[512]= { 0, };
  563   size_t const inc_addr_max= sizeof (inc_addr);
  564   if ((!wsrep_node_incoming_address ||
  565        !strcmp (wsrep_node_incoming_address, WSREP_NODE_INCOMING_AUTO)))
  566   {
  567     unsigned int my_bind_ip= INADDR_ANY; // default if not set
  568     if (my_bind_addr_str && strlen(my_bind_addr_str))
  569     {
  570       my_bind_ip= wsrep_check_ip(my_bind_addr_str);
  571     }
  572 
  573     if (INADDR_ANY != my_bind_ip)
  574     {
  575       if (INADDR_NONE != my_bind_ip && INADDR_LOOPBACK != my_bind_ip)
  576       {
  577         snprintf(inc_addr, inc_addr_max, "%s:%u",
  578                  my_bind_addr_str, (int)mysqld_port);
  579       } // else leave inc_addr an empty string - mysqld is not listening for
  580         // client connections on network interfaces.
  581     }
  582     else // mysqld binds to 0.0.0.0, take IP from wsrep_node_address if possible
  583     {
  584       size_t const node_addr_len= strlen(node_addr);
  585       if (node_addr_len > 0)
  586       {
  587         size_t const ip_len= wsrep_host_len(node_addr, node_addr_len);
  588         if (ip_len + 7 /* :55555\0 */ < inc_addr_max)
  589         {
  590           memcpy (inc_addr, node_addr, ip_len);
  591           snprintf(inc_addr + ip_len, inc_addr_max - ip_len, ":%u",
  592                    (int)mysqld_port);
  593         }
  594         else
  595         {
  596           WSREP_WARN("Guessing address for incoming client connections: "
  597                      "address too long.");
  598           inc_addr[0]= '\0';
  599         }
  600       }
  601 
  602       if (!strlen(inc_addr))
  603       {
  604           WSREP_WARN("Guessing address for incoming client connections failed. "
  605                      "Try setting wsrep_node_incoming_address explicitly.");
  606       }
  607     }
  608   }
  609   else if (!strchr(wsrep_node_incoming_address, ':')) // no port included
  610   {
  611     if ((int)inc_addr_max <=
  612         snprintf(inc_addr, inc_addr_max, "%s:%u",
  613                  wsrep_node_incoming_address,(int)mysqld_port))
  614     {
  615       WSREP_WARN("Guessing address for incoming client connections: "
  616                  "address too long.");
  617       inc_addr[0]= '\0';
  618     }
  619   }
  620   else
  621   {
  622     size_t const need = strlen (wsrep_node_incoming_address);
  623     if (need >= inc_addr_max) {
  624       WSREP_WARN("wsrep_node_incoming_address too long: %zu", need);
  625       inc_addr[0]= '\0';
  626     }
  627     else {
  628       memcpy (inc_addr, wsrep_node_incoming_address, need);
  629     }
  630   }
  631 
  632   struct wsrep_init_args wsrep_args;
  633 
  634   struct wsrep_gtid const state_id = { local_uuid, local_seqno };
  635 
  636   wsrep_args.data_dir        = wsrep_data_home_dir;
  637   wsrep_args.node_name       = (wsrep_node_name) ? wsrep_node_name : "";
  638   wsrep_args.node_address    = node_addr;
  639   wsrep_args.node_incoming   = inc_addr;
  640   wsrep_args.options         = (wsrep_provider_options) ?
  641                                 wsrep_provider_options : "";
  642   wsrep_args.proto_ver       = wsrep_max_protocol_version;
  643 
  644   wsrep_args.state_id        = &state_id;
  645 
  646   wsrep_args.logger_cb       = wsrep_log_cb;
  647   wsrep_args.view_handler_cb = wsrep_view_handler_cb;
  648   wsrep_args.apply_cb        = wsrep_apply_cb;
  649   wsrep_args.commit_cb       = wsrep_commit_cb;
  650   wsrep_args.unordered_cb    = wsrep_unordered_cb;
  651   wsrep_args.sst_donate_cb   = wsrep_sst_donate_cb;
  652   wsrep_args.synced_cb       = wsrep_synced_cb;
  653 
  654   rcode = wsrep->init(wsrep, &wsrep_args);
  655 
  656   if (rcode)
  657   {
  658     DBUG_PRINT("wsrep",("wsrep::init() failed: %d", rcode));
  659     WSREP_ERROR("wsrep::init() failed: %d, must shutdown", rcode);
  660     wsrep->free(wsrep);
  661     free(wsrep);
  662     wsrep = NULL;
  663   } else {
  664     wsrep_inited= 1;
  665   }
  666 
  667   return rcode;
  668 }
  669 
  670 extern int wsrep_on(void *);
  671 
  672 void wsrep_init_startup (bool first)
  673 {
  674   if (wsrep_init()) unireg_abort(1);
  675 
  676   wsrep_thr_lock_init(wsrep_thd_is_BF, wsrep_abort_thd,
  677                       wsrep_debug, wsrep_convert_LOCK_to_trx, wsrep_on);
  678 
  679   /* Skip replication start if dummy wsrep provider is loaded */
  680   if (!strcmp(wsrep_provider, WSREP_NONE)) return;
  681 
  682   /* Skip replication start if no cluster address */
  683   if (!wsrep_cluster_address || strlen(wsrep_cluster_address) == 0) return;
  684 
  685   if (first) wsrep_sst_grab(); // do it so we can wait for SST below
  686 
  687   if (!wsrep_start_replication()) unireg_abort(1);
  688 
  689   wsrep_creating_startup_threads= 1;
  690   wsrep_create_rollbacker();
  691   wsrep_create_appliers(1);
  692 
  693   if (first && !wsrep_sst_wait()) unireg_abort(1);// wait until SST is completed
  694 }
  695 
  696 
  697 void wsrep_deinit(bool free_options)
  698 {
  699   DBUG_ASSERT(wsrep_inited == 1);
  700   wsrep_unload(wsrep);
  701   wsrep= 0;
  702   provider_name[0]=    '\0';
  703   provider_version[0]= '\0';
  704   provider_vendor[0]=  '\0';
  705 
  706   wsrep_inited= 0;
  707 
  708   if (free_options)
  709   {
  710     wsrep_sst_auth_free();
  711   }
  712 }
  713 
  714 void wsrep_recover()
  715 {
  716   if (!memcmp(&local_uuid, &WSREP_UUID_UNDEFINED, sizeof(wsrep_uuid_t)) &&
  717       local_seqno == -2)
  718   {
  719     char uuid_str[40];
  720     wsrep_uuid_print(&local_uuid, uuid_str, sizeof(uuid_str));
  721     WSREP_INFO("Position %s:%lld given at startup, skipping position recovery",
  722                uuid_str, (long long)local_seqno);
  723     return;
  724   }
  725   wsrep_uuid_t uuid;
  726   wsrep_seqno_t seqno;
  727   wsrep_get_SE_checkpoint(uuid, seqno);
  728   char uuid_str[40];
  729   wsrep_uuid_print(&uuid, uuid_str, sizeof(uuid_str));
  730   WSREP_INFO("Recovered position: %s:%lld", uuid_str, (long long)seqno);
  731 }
  732 
  733 
  734 void wsrep_stop_replication(THD *thd)
  735 {
  736   WSREP_INFO("Stop replication");
  737   if (!wsrep)
  738   {
  739     WSREP_INFO("Provider was not loaded, in stop replication");
  740     return;
  741   }
  742 
  743   /* disconnect from group first to get wsrep_ready == FALSE */
  744   WSREP_DEBUG("Provider disconnect");
  745   wsrep->disconnect(wsrep);
  746 
  747   wsrep_connected= FALSE;
  748 
  749   wsrep_close_client_connections(TRUE);
  750 
  751   /* wait until appliers have stopped */
  752   wsrep_wait_appliers_close(thd);
  753 
  754   return;
  755 }
  756 
  757 
  758 bool wsrep_start_replication()
  759 {
  760   wsrep_status_t rcode;
  761 
  762   /*
  763     if provider is trivial, don't even try to connect,
  764     but resume local node operation
  765   */
  766   if (!WSREP_PROVIDER_EXISTS)
  767   {
  768     // enable normal operation in case no provider is specified
  769     wsrep_ready_set(TRUE);
  770     return true;
  771   }
  772 
  773   if (!wsrep_cluster_address || strlen(wsrep_cluster_address)== 0)
  774   {
  775     // if provider is non-trivial, but no address is specified, wait for address
  776     wsrep_ready_set(FALSE);
  777     return true;
  778   }
  779 
  780   bool const bootstrap= wsrep_new_cluster;
  781 
  782   WSREP_INFO("Start replication");
  783 
  784   if (wsrep_new_cluster)
  785   {
  786     WSREP_INFO("'wsrep-new-cluster' option used, bootstrapping the cluster");
  787     wsrep_new_cluster= false;
  788   }
  789 
  790   if ((rcode = wsrep->connect(wsrep,
  791                               wsrep_cluster_name,
  792                               wsrep_cluster_address,
  793                               wsrep_sst_donor,
  794                               bootstrap)))
  795   {
  796     DBUG_PRINT("wsrep",("wsrep->connect(%s) failed: %d",
  797                         wsrep_cluster_address, rcode));
  798     WSREP_ERROR("wsrep::connect(%s) failed: %d",
  799                 wsrep_cluster_address, rcode);
  800     return false;
  801   }
  802   else
  803   {
  804     wsrep_connected= TRUE;
  805 
  806     char* opts= wsrep->options_get(wsrep);
  807     if (opts)
  808     {
  809       wsrep_provider_options_init(opts);
  810       free(opts);
  811     }
  812     else
  813     {
  814       WSREP_WARN("Failed to get wsrep options");
  815     }
  816   }
  817 
  818   return true;
  819 }
  820 
  821 bool wsrep_must_sync_wait (THD* thd, uint mask)
  822 {
  823   return (thd->variables.wsrep_sync_wait & mask) &&
  824     thd->variables.wsrep_on &&
  825     !(thd->variables.wsrep_dirty_reads &&
  826       !is_update_query(thd->lex->sql_command)) &&
  827     !thd->in_active_multi_stmt_transaction() &&
  828     thd->wsrep_conflict_state != REPLAYING &&
  829     thd->wsrep_sync_wait_gtid.seqno == WSREP_SEQNO_UNDEFINED;
  830 }
  831 
  832 bool wsrep_sync_wait (THD* thd, uint mask)
  833 {
  834   if (wsrep_must_sync_wait(thd, mask))
  835   {
  836     WSREP_DEBUG("wsrep_sync_wait: thd->variables.wsrep_sync_wait = %u, mask = %u",
  837                 thd->variables.wsrep_sync_wait, mask);
  838     // This allows autocommit SELECTs and a first SELECT after SET AUTOCOMMIT=0
  839     // TODO: modify to check if thd has locked any rows.
  840     wsrep_status_t ret= wsrep->causal_read (wsrep, &thd->wsrep_sync_wait_gtid);
  841 
  842     if (unlikely(WSREP_OK != ret))
  843     {
  844       const char* msg;
  845       int err;
  846 
  847       // Possibly relevant error codes:
  848       // ER_CHECKREAD, ER_ERROR_ON_READ, ER_INVALID_DEFAULT, ER_EMPTY_QUERY,
  849       // ER_FUNCTION_NOT_DEFINED, ER_NOT_ALLOWED_COMMAND, ER_NOT_SUPPORTED_YET,
  850       // ER_FEATURE_DISABLED, ER_QUERY_INTERRUPTED
  851 
  852       switch (ret)
  853       {
  854       case WSREP_NOT_IMPLEMENTED:
  855         msg= "synchronous reads by wsrep backend. "
  856              "Please unset wsrep_causal_reads variable.";
  857         err= ER_NOT_SUPPORTED_YET;
  858         break;
  859       default:
  860         msg= "Synchronous wait failed.";
  861         err= ER_LOCK_WAIT_TIMEOUT; // NOTE: the above msg won't be displayed
  862                                    //       with ER_LOCK_WAIT_TIMEOUT
  863       }
  864 
  865       my_error(err, MYF(0), msg);
  866 
  867       return true;
  868     }
  869   }
  870 
  871   return false;
  872 }
  873 
  874 void wsrep_keys_free(wsrep_key_arr_t* key_arr)
  875 {
  876     for (size_t i= 0; i < key_arr->keys_len; ++i)
  877     {
  878         my_free((void*)key_arr->keys[i].key_parts);
  879     }
  880     my_free(key_arr->keys);
  881     key_arr->keys= 0;
  882     key_arr->keys_len= 0;
  883 }
  884 
  885 
  886 /*!
  887  * @param db      Database string
  888  * @param table   Table string
  889  * @param key     Array of wsrep_key_t
  890  * @param key_len In: number of elements in key array, Out: number of
  891  *                elements populated
  892  *
  893  * @return true if preparation was successful, otherwise false.
  894  */
  895 
  896 static bool wsrep_prepare_key_for_isolation(const char* db,
  897                                             const char* table,
  898                                             wsrep_buf_t* key,
  899                                             size_t* key_len)
  900 {
  901   if (*key_len < 2) return false;
  902 
  903   switch (wsrep_protocol_version)
  904   {
  905   case 0:
  906     *key_len= 0;
  907     break;
  908   case 1:
  909   case 2:
  910   case 3:
  911   {
  912     *key_len= 0;
  913     if (db)
  914     {
  915       // sql_print_information("%s.%s", db, table);
  916       key[*key_len].ptr= db;
  917       key[*key_len].len= strlen(db);
  918       ++(*key_len);
  919       if (table)
  920       {
  921         key[*key_len].ptr= table;
  922         key[*key_len].len= strlen(table);
  923         ++(*key_len);
  924       }
  925     }
  926     break;
  927   }
  928   default:
  929     return false;
  930   }
  931   return true;
  932 }
  933 
  934 
  935 static bool wsrep_prepare_key_for_isolation(const char* db,
  936                                             const char* table,
  937                                             wsrep_key_arr_t* ka)
  938 {
  939   wsrep_key_t* tmp;
  940 
  941   if (!ka->keys)
  942     tmp= (wsrep_key_t*)my_malloc((ka->keys_len + 1) * sizeof(wsrep_key_t),
  943                                  MYF(0));
  944   else
  945     tmp= (wsrep_key_t*)my_realloc(ka->keys,
  946                                  (ka->keys_len + 1) * sizeof(wsrep_key_t),
  947                                  MYF(0));
  948 
  949   if (!tmp)
  950   {
  951     WSREP_ERROR("Can't allocate memory for key_array");
  952     return false;
  953   }
  954   ka->keys= tmp;
  955   if (!(ka->keys[ka->keys_len].key_parts= (wsrep_buf_t*)
  956         my_malloc(sizeof(wsrep_buf_t)*2, MYF(0))))
  957   {
  958     WSREP_ERROR("Can't allocate memory for key_parts");
  959     return false;
  960   }
  961   ka->keys[ka->keys_len].key_parts_num= 2;
  962   ++ka->keys_len;
  963   if (!wsrep_prepare_key_for_isolation(db, table,
  964                                        (wsrep_buf_t*)ka->keys[ka->keys_len - 1].key_parts,
  965                                        &ka->keys[ka->keys_len - 1].key_parts_num))
  966   {
  967     WSREP_ERROR("Preparing keys for isolation failed");
  968     return false;
  969   }
  970 
  971   return true;
  972 }
  973 
  974 
  975 static bool wsrep_prepare_keys_for_alter_add_fk(char* child_table_db,
  976                                                 Alter_info* alter_info,
  977                                                 wsrep_key_arr_t* ka)
  978 {
  979   Key *key;
  980   List_iterator<Key> key_iterator(alter_info->key_list);
  981   while ((key= key_iterator++))
  982   {
  983     if (key->type == Key::FOREIGN_KEY)
  984     {
  985       Foreign_key *fk_key= (Foreign_key *)key;
  986       const char *db_name= fk_key->ref_db.str;
  987       const char *table_name= fk_key->ref_table.str;
  988       if (!db_name)
  989       {
  990         db_name= child_table_db;
  991       }
  992       if (!wsrep_prepare_key_for_isolation(db_name, table_name, ka))
  993       {
  994         return false;
  995       }
  996     }
  997   }
  998   return true;
  999 }
 1000 
 1001 
 1002 static bool wsrep_prepare_keys_for_isolation(THD*              thd,
 1003                                              const char*       db,
 1004                                              const char*       table,
 1005                                              const TABLE_LIST* table_list,
 1006                                              Alter_info*       alter_info,
 1007                                              wsrep_key_arr_t*  ka)
 1008 {
 1009   ka->keys= 0;
 1010   ka->keys_len= 0;
 1011 
 1012   if (db || table)
 1013   {
 1014     if (!wsrep_prepare_key_for_isolation(db, table, ka))
 1015       goto err;
 1016   }
 1017 
 1018   for (const TABLE_LIST* table= table_list; table; table= table->next_global)
 1019   {
 1020     if (!wsrep_prepare_key_for_isolation(table->db, table->table_name, ka))
 1021       goto err;
 1022   }
 1023 
 1024   if (alter_info && (alter_info->flags & (Alter_info::ADD_FOREIGN_KEY)))
 1025   {
 1026     if (!wsrep_prepare_keys_for_alter_add_fk(table_list->db, alter_info, ka))
 1027       goto err;
 1028   }
 1029 
 1030   return false;
 1031 
 1032 err:
 1033   wsrep_keys_free(ka);
 1034   return true;
 1035 }
 1036 
 1037 
 1038 /* Prepare key list from db/table and table_list */
 1039 bool wsrep_prepare_keys_for_isolation(THD*              thd,
 1040                                       const char*       db,
 1041                                       const char*       table,
 1042                                       const TABLE_LIST* table_list,
 1043                                       wsrep_key_arr_t*  ka)
 1044 {
 1045   return wsrep_prepare_keys_for_isolation(thd, db, table, table_list, NULL, ka);
 1046 }
 1047 
 1048 
 1049 bool wsrep_prepare_key_for_innodb(const uchar* cache_key,
 1050                                   size_t cache_key_len,
 1051                                   const uchar* row_id,
 1052                                   size_t row_id_len,
 1053                                   wsrep_buf_t* key,
 1054                                   size_t* key_len)
 1055 {
 1056     if (*key_len < 3) return false;
 1057 
 1058     *key_len= 0;
 1059     switch (wsrep_protocol_version)
 1060     {
 1061     case 0:
 1062     {
 1063         key[0].ptr = cache_key;
 1064         key[0].len = cache_key_len;
 1065 
 1066         *key_len = 1;
 1067         break;
 1068     }
 1069     case 1:
 1070     case 2:
 1071     case 3:
 1072     {
 1073         key[0].ptr = cache_key;
 1074         key[0].len = strlen( (char*)cache_key );
 1075 
 1076         key[1].ptr = cache_key + strlen( (char*)cache_key ) + 1;
 1077         key[1].len = strlen( (char*)(key[1].ptr) );
 1078 
 1079         *key_len = 2;
 1080         break;
 1081     }
 1082     default:
 1083         return false;
 1084     }
 1085 
 1086     key[*key_len].ptr = row_id;
 1087     key[*key_len].len = row_id_len;
 1088     ++(*key_len);
 1089 
 1090     return true;
 1091 }
 1092 
 1093 
 1094 /*
 1095  * Construct Query_log_Event from thd query and serialize it
 1096  * into buffer.
 1097  *
 1098  * Return 0 in case of success, 1 in case of error.
 1099  */
 1100 int wsrep_to_buf_helper(
 1101     THD* thd, const char *query, uint query_len, uchar** buf, size_t* buf_len)
 1102 {
 1103   IO_CACHE tmp_io_cache;
 1104   if (open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX,
 1105                        65536, MYF(MY_WME)))
 1106     return 1;
 1107   int ret(0);
 1108 
 1109   Format_description_log_event *tmp_fd= new Format_description_log_event(4);
 1110   tmp_fd->checksum_alg= binlog_checksum_options;
 1111   tmp_fd->write(&tmp_io_cache);
 1112   delete tmp_fd;
 1113 
 1114 #ifdef GTID_SUPPORT
 1115   if (thd->variables.gtid_next.type == GTID_GROUP)
 1116   {
 1117       Gtid_log_event gtid_ev(thd, FALSE, &thd->variables.gtid_next);
 1118       if (!gtid_ev.is_valid()) ret= 0;
 1119       if (!ret && gtid_ev.write(&tmp_io_cache)) ret= 1;
 1120   }
 1121 #endif /* GTID_SUPPORT */
 1122 
 1123   /* if there is prepare query, add event for it */
 1124   if (!ret && thd->wsrep_TOI_pre_query)
 1125   {
 1126     Query_log_event ev(thd, thd->wsrep_TOI_pre_query,
 1127                thd->wsrep_TOI_pre_query_len,
 1128                FALSE, FALSE, FALSE, 0);
 1129     if (ev.write(&tmp_io_cache)) ret= 1;
 1130   }
 1131 
 1132   /* continue to append the actual query */
 1133   Query_log_event ev(thd, query, query_len, FALSE, FALSE, FALSE, 0);
 1134   if (!ret && ev.write(&tmp_io_cache)) ret= 1;
 1135   if (!ret && wsrep_write_cache_buf(&tmp_io_cache, buf, buf_len)) ret= 1;
 1136   close_cached_file(&tmp_io_cache);
 1137   return ret;
 1138 }
 1139 
 1140 #include "sql_show.h"
 1141 static int
 1142 create_view_query(THD *thd, uchar** buf, size_t* buf_len)
 1143 {
 1144     LEX *lex= thd->lex;
 1145     SELECT_LEX *select_lex= &lex->select_lex;
 1146     TABLE_LIST *first_table= select_lex->table_list.first;
 1147     TABLE_LIST *views = first_table;
 1148 
 1149     String buff;
 1150     const LEX_STRING command[3]=
 1151       {{ C_STRING_WITH_LEN("CREATE ") },
 1152        { C_STRING_WITH_LEN("ALTER ") },
 1153        { C_STRING_WITH_LEN("CREATE OR REPLACE ") }};
 1154 
 1155     buff.append(command[thd->lex->create_view_mode].str,
 1156                 command[thd->lex->create_view_mode].length);
 1157 
 1158     LEX_USER *definer;
 1159 
 1160     if (lex->definer)
 1161     {
 1162       definer= get_current_user(thd, lex->definer);
 1163     }
 1164     else
 1165     {
 1166       /*
 1167         DEFINER-clause is missing; we have to create default definer in
 1168         persistent arena to be PS/SP friendly.
 1169         If this is an ALTER VIEW then the current user should be set as
 1170         the definer.
 1171       */
 1172       definer= create_default_definer(thd, false);
 1173     }
 1174 
 1175     if (definer)
 1176     {
 1177       views->definer.user = definer->user;
 1178       views->definer.host = definer->host;
 1179     } else {
 1180       WSREP_ERROR("Failed to get DEFINER for VIEW.");
 1181       return 1;
 1182     }
 1183 
 1184     views->algorithm    = lex->create_view_algorithm;
 1185     views->view_suid    = lex->create_view_suid;
 1186     views->with_check   = lex->create_view_check;
 1187 
 1188     view_store_options4(thd, views, &buff, true);
 1189     buff.append(STRING_WITH_LEN("VIEW "));
 1190     /* Test if user supplied a db (ie: we did not use thd->db) */
 1191     if (views->db && views->db[0] &&
 1192         (thd->db == NULL || strcmp(views->db, thd->db)))
 1193     {
 1194       append_identifier(thd, &buff, views->db,
 1195                         views->db_length);
 1196       buff.append('.');
 1197     }
 1198     append_identifier(thd, &buff, views->table_name,
 1199                       views->table_name_length);
 1200     if (lex->view_list.elements)
 1201     {
 1202       List_iterator_fast<LEX_STRING> names(lex->view_list);
 1203       LEX_STRING *name;
 1204       int i;
 1205 
 1206       for (i= 0; (name= names++); i++)
 1207       {
 1208         buff.append(i ? ", " : "(");
 1209         append_identifier(thd, &buff, name->str, name->length);
 1210       }
 1211       buff.append(')');
 1212     }
 1213     buff.append(STRING_WITH_LEN(" AS "));
 1214     //buff.append(views->source.str, views->source.length);
 1215     buff.append(thd->lex->create_view_select.str,
 1216                 thd->lex->create_view_select.length);
 1217     //int errcode= query_error_code(thd, TRUE);
 1218     //if (thd->binlog_query(THD::STMT_QUERY_TYPE,
 1219     //                      buff.ptr(), buff.length(), FALSE, FALSE, FALSE, errcod
 1220     return wsrep_to_buf_helper(thd, buff.ptr(), buff.length(), buf, buf_len);
 1221 }
 1222 
 1223 /*
 1224   Rewrite DROP TABLE for TOI. Temporary tables are eliminated from
 1225   the query as they are visible only to client connection.
 1226 
 1227   TODO: See comments for sql_base.cc:drop_temporary_table() and refine
 1228   the function to deal with transactional locked tables.
 1229  */
 1230 static int wsrep_drop_table_query(THD* thd, uchar** buf, size_t* buf_len)
 1231 {
 1232 
 1233   LEX* lex= thd->lex;
 1234   SELECT_LEX* select_lex= &lex->select_lex;
 1235   TABLE_LIST* first_table= select_lex->table_list.first;
 1236   String buff;
 1237 
 1238   DBUG_ASSERT(!lex->drop_temporary);
 1239 
 1240   bool found_temp_table= false;
 1241   for (TABLE_LIST* table= first_table; table; table= table->next_global)
 1242   {
 1243     if (find_temporary_table(thd, table->db, table->table_name))
 1244     {
 1245       found_temp_table= true;
 1246       break;
 1247     }
 1248   }
 1249 
 1250   if (found_temp_table)
 1251   {
 1252     buff.append("DROP TABLE ");
 1253     if (lex->check_exists)
 1254       buff.append("IF EXISTS ");
 1255 
 1256     for (TABLE_LIST* table= first_table; table; table= table->next_global)
 1257     {
 1258       if (!find_temporary_table(thd, table->db, table->table_name))
 1259       {
 1260         append_identifier(thd, &buff, table->db, strlen(table->db));
 1261         buff.append(".");
 1262         append_identifier(thd, &buff, table->table_name,
 1263                           strlen(table->table_name));
 1264         buff.append(",");
 1265       }
 1266     }
 1267 
 1268     /* Chop the last comma */
 1269     buff.chop();
 1270     buff.append(" /* generated by wsrep */");
 1271 
 1272     WSREP_DEBUG("Rewrote '%s' as '%s'", thd->query(), buff.ptr());
 1273 
 1274     return wsrep_to_buf_helper(thd, buff.ptr(), buff.length(), buf, buf_len);
 1275   }
 1276   else
 1277   {
 1278     return wsrep_to_buf_helper(thd, thd->query(), thd->query_length(),
 1279                                buf, buf_len);
 1280   }
 1281 }
 1282 
 1283 /*
 1284   Decide if statement should run in TOI.
 1285 
 1286   Look if table or table_list contain temporary tables. If the
 1287   statement affects only temporary tables,   statement should not run
 1288   in TOI. If the table list contains mix of regular and temporary tables
 1289   (DROP TABLE, OPTIMIZE, ANALYZE), statement should be run in TOI but
 1290   should be rewritten at later time for replication to contain only
 1291   non-temporary tables.
 1292  */
 1293 static bool wsrep_can_run_in_toi(THD *thd, const char *db, const char *table,
 1294                                  const TABLE_LIST *table_list)
 1295 {
 1296   DBUG_ASSERT(!table || db);
 1297   DBUG_ASSERT(table_list || db);
 1298 
 1299   LEX* lex= thd->lex;
 1300   SELECT_LEX* select_lex= &lex->select_lex;
 1301   TABLE_LIST* first_table= select_lex->table_list.first;
 1302 
 1303   switch (lex->sql_command)
 1304   {
 1305   case SQLCOM_CREATE_TABLE:
 1306     DBUG_ASSERT(!table_list);
 1307     if (thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE)
 1308     {
 1309       return false;
 1310     }
 1311     return true;
 1312 
 1313   case SQLCOM_CREATE_VIEW:
 1314 
 1315     DBUG_ASSERT(!table_list);
 1316     DBUG_ASSERT(first_table); /* First table is view name */
 1317     /*
 1318       If any of the remaining tables refer to temporary table error
 1319       is returned to client, so TOI can be skipped
 1320     */
 1321     for (TABLE_LIST* it= first_table->next_global; it; it= it->next_global)
 1322     {
 1323       if (find_temporary_table(thd, it))
 1324       {
 1325         return false;
 1326       }
 1327     }
 1328     return true;
 1329 
 1330   case SQLCOM_CREATE_TRIGGER:
 1331 
 1332     DBUG_ASSERT(!table_list);
 1333     DBUG_ASSERT(first_table);
 1334 
 1335     if (find_temporary_table(thd, first_table))
 1336     {
 1337       return false;
 1338     }
 1339     return true;
 1340 
 1341   default:
 1342     if (table && !find_temporary_table(thd, db, table))
 1343     {
 1344       return true;
 1345     }
 1346 
 1347     if (table_list)
 1348     {
 1349       for (TABLE_LIST* table= first_table; table; table= table->next_global)
 1350       {
 1351         if (!find_temporary_table(thd, table->db, table->table_name))
 1352         {
 1353           return true;
 1354         }
 1355       }
 1356     }
 1357     return !(table || table_list);
 1358   }
 1359 }
 1360 
 1361 static const char* wsrep_get_query_or_msg(const THD* thd)
 1362 {
 1363   switch(thd->lex->sql_command)
 1364   {
 1365     case SQLCOM_CREATE_USER:
 1366       return "CREATE USER";
 1367     case SQLCOM_GRANT:
 1368       return "GRANT";
 1369     case SQLCOM_REVOKE:
 1370       return "REVOKE";
 1371     case SQLCOM_SET_OPTION:
 1372       if (thd->lex->definer)
 1373     return "SET PASSWORD";
 1374       /* fallthrough */
 1375     default:
 1376       return thd->query();
 1377    }
 1378 }
 1379 
 1380 /*
 1381   returns: 
 1382    0: statement was replicated as TOI
 1383    1: TOI replication was skipped
 1384   -1: TOI replication failed 
 1385  */
 1386 static int wsrep_TOI_begin(THD *thd, char *db_, char *table_,
 1387                            const TABLE_LIST* table_list,
 1388                            Alter_info* alter_info)
 1389 {
 1390   wsrep_status_t ret(WSREP_WARNING);
 1391   uchar* buf(0);
 1392   size_t buf_len(0);
 1393   int buf_err;
 1394   int rc= 0;
 1395 
 1396   if (wsrep_can_run_in_toi(thd, db_, table_, table_list) == false)
 1397   {
 1398     WSREP_DEBUG("No TOI for %s", WSREP_QUERY(thd));
 1399     return 1;
 1400   }
 1401 
 1402   WSREP_DEBUG("TO BEGIN: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
 1403               thd->wsrep_exec_mode, wsrep_get_query_or_msg(thd));
 1404 
 1405   switch (thd->lex->sql_command)
 1406   {
 1407   case SQLCOM_CREATE_VIEW:
 1408     buf_err= create_view_query(thd, &buf, &buf_len);
 1409     break;
 1410   case SQLCOM_CREATE_PROCEDURE:
 1411   case SQLCOM_CREATE_SPFUNCTION:
 1412     buf_err= wsrep_create_sp(thd, &buf, &buf_len);
 1413     break;
 1414   case SQLCOM_CREATE_TRIGGER:
 1415     buf_err= wsrep_create_trigger_query(thd, &buf, &buf_len);
 1416     break;
 1417   case SQLCOM_CREATE_EVENT:
 1418     buf_err= wsrep_create_event_query(thd, &buf, &buf_len);
 1419     break;
 1420   case SQLCOM_ALTER_EVENT:
 1421     buf_err= wsrep_alter_event_query(thd, &buf, &buf_len);
 1422     break;
 1423   case SQLCOM_DROP_TABLE:
 1424     buf_err= wsrep_drop_table_query(thd, &buf, &buf_len);
 1425     break;
 1426   case SQLCOM_CREATE_ROLE:
 1427     if (sp_process_definer(thd))
 1428     {
 1429       WSREP_WARN("Failed to set CREATE ROLE definer for TOI.");
 1430     }
 1431     /* fallthrough */
 1432   default:
 1433     buf_err= wsrep_to_buf_helper(thd, thd->query(), thd->query_length(),
 1434                                  &buf, &buf_len);
 1435     break;
 1436   }
 1437 
 1438   wsrep_key_arr_t key_arr= {0, 0};
 1439   struct wsrep_buf buff = { buf, buf_len };
 1440   if (!buf_err                                                                  &&
 1441       !wsrep_prepare_keys_for_isolation(thd, db_, table_,
 1442                                         table_list, alter_info, &key_arr)       &&
 1443       key_arr.keys_len > 0                                                      &&
 1444       WSREP_OK == (ret = wsrep->to_execute_start(wsrep, thd->thread_id,
 1445                          key_arr.keys, key_arr.keys_len,
 1446                          &buff, 1,
 1447                          &thd->wsrep_trx_meta)))
 1448   {
 1449     thd->wsrep_exec_mode= TOTAL_ORDER;
 1450     wsrep_to_isolation++;
 1451     wsrep_keys_free(&key_arr);
 1452     WSREP_DEBUG("TO BEGIN: %lld, %d",(long long)wsrep_thd_trx_seqno(thd),
 1453         thd->wsrep_exec_mode);
 1454   }
 1455   else if (key_arr.keys_len > 0) {
 1456     /* jump to error handler in mysql_execute_command() */
 1457     WSREP_WARN("TO isolation failed for: %d, schema: %s, sql: %s. Check wsrep "
 1458                "connection state and retry the query.",
 1459                ret,
 1460                (thd->db ? thd->db : "(null)"),
 1461                (thd->query()) ? thd->query() : "void");
 1462     my_error(ER_LOCK_DEADLOCK, MYF(0), "WSREP replication failed. Check "
 1463          "your wsrep connection state and retry the query.");
 1464     wsrep_keys_free(&key_arr);
 1465     rc= -1;
 1466   }
 1467   else {
 1468     /* non replicated DDL, affecting temporary tables only */
 1469     WSREP_DEBUG("TO isolation skipped for: %d, sql: %s."
 1470         "Only temporary tables affected.",
 1471         ret, (thd->query()) ? thd->query() : "void");
 1472     rc= 1;
 1473   }
 1474   if (buf) my_free(buf);
 1475   return rc;
 1476 }
 1477 
 1478 static void wsrep_TOI_end(THD *thd) {
 1479   wsrep_status_t ret;
 1480   wsrep_to_isolation--;
 1481 
 1482   WSREP_DEBUG("TO END: %lld, %d: %s", (long long)wsrep_thd_trx_seqno(thd),
 1483               thd->wsrep_exec_mode, wsrep_get_query_or_msg(thd));
 1484 
 1485   wsrep_set_SE_checkpoint(thd->wsrep_trx_meta.gtid.uuid,
 1486                           thd->wsrep_trx_meta.gtid.seqno);
 1487   WSREP_DEBUG("TO END: %lld, update seqno",
 1488               (long long)wsrep_thd_trx_seqno(thd));
 1489   
 1490   if (WSREP_OK == (ret = wsrep->to_execute_end(wsrep, thd->thread_id))) {
 1491     WSREP_DEBUG("TO END: %lld", (long long)wsrep_thd_trx_seqno(thd));
 1492   }
 1493   else {
 1494     WSREP_WARN("TO isolation end failed for: %d, schema: %s, sql: %s",
 1495                ret,
 1496                (thd->db ? thd->db : "(null)"),
 1497                (thd->query()) ? thd->query() : "void");
 1498   }
 1499 }
 1500 
 1501 static int wsrep_RSU_begin(THD *thd, char *db_, char *table_)
 1502 {
 1503   wsrep_status_t ret(WSREP_WARNING);
 1504   WSREP_DEBUG("RSU BEGIN: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
 1505                thd->wsrep_exec_mode, thd->query() );
 1506 
 1507   ret = wsrep->desync(wsrep);
 1508   if (ret != WSREP_OK)
 1509   {
 1510     WSREP_WARN("RSU desync failed %d for schema: %s, query: %s",
 1511                ret, (thd->db ? thd->db : "(null)"), thd->query());
 1512     my_error(ER_LOCK_DEADLOCK, MYF(0));
 1513     return(ret);
 1514   }
 1515 
 1516   mysql_mutex_lock(&LOCK_wsrep_replaying);
 1517   wsrep_replaying++;
 1518   mysql_mutex_unlock(&LOCK_wsrep_replaying);
 1519 
 1520   if (wsrep_wait_committing_connections_close(5000))
 1521   {
 1522     /* no can do, bail out from DDL */
 1523     WSREP_WARN("RSU failed due to pending transactions, schema: %s, query %s",
 1524                (thd->db ? thd->db : "(null)"),
 1525                thd->query());
 1526     mysql_mutex_lock(&LOCK_wsrep_replaying);
 1527     wsrep_replaying--;
 1528     mysql_mutex_unlock(&LOCK_wsrep_replaying);
 1529 
 1530     ret = wsrep->resync(wsrep);
 1531     if (ret != WSREP_OK)
 1532     {
 1533       WSREP_WARN("resync failed %d for schema: %s, query: %s",
 1534                  ret, (thd->db ? thd->db : "(null)"), thd->query());
 1535     }
 1536 
 1537     my_error(ER_LOCK_DEADLOCK, MYF(0));
 1538     return(1);
 1539   }
 1540 
 1541   wsrep_seqno_t seqno = wsrep->pause(wsrep);
 1542   if (seqno == WSREP_SEQNO_UNDEFINED)
 1543   {
 1544     WSREP_WARN("pause failed %lld for schema: %s, query: %s", (long long)seqno,
 1545                (thd->db ? thd->db : "(null)"),
 1546                thd->query());
 1547     return(1);
 1548   }
 1549   WSREP_DEBUG("paused at %lld", (long long)seqno);
 1550   thd->variables.wsrep_on = 0;
 1551   return 0;
 1552 }
 1553 
 1554 static void wsrep_RSU_end(THD *thd)
 1555 {
 1556   wsrep_status_t ret(WSREP_WARNING);
 1557   WSREP_DEBUG("RSU END: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
 1558                thd->wsrep_exec_mode, thd->query() );
 1559 
 1560 
 1561   mysql_mutex_lock(&LOCK_wsrep_replaying);
 1562   wsrep_replaying--;
 1563   mysql_mutex_unlock(&LOCK_wsrep_replaying);
 1564 
 1565   ret = wsrep->resume(wsrep);
 1566   if (ret != WSREP_OK)
 1567   {
 1568     WSREP_WARN("resume failed %d for schema: %s, query: %s", ret,
 1569                (thd->db ? thd->db : "(null)"),
 1570                thd->query());
 1571   }
 1572 
 1573   ret = wsrep->resync(wsrep);
 1574   if (ret != WSREP_OK)
 1575   {
 1576     WSREP_WARN("resync failed %d for schema: %s, query: %s", ret,
 1577                (thd->db ? thd->db : "(null)"), thd->query());
 1578     return;
 1579   }
 1580 
 1581   thd->variables.wsrep_on = 1;
 1582 }
 1583 
 1584 int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_,
 1585                              const TABLE_LIST* table_list,
 1586                              Alter_info* alter_info)
 1587 {
 1588   /*
 1589     No isolation for applier or replaying threads.
 1590    */
 1591   if (thd->wsrep_exec_mode == REPL_RECV) return 0;
 1592 
 1593   int ret= 0;
 1594   mysql_mutex_lock(&thd->LOCK_wsrep_thd);
 1595 
 1596   if (thd->wsrep_conflict_state == MUST_ABORT)
 1597   {
 1598     WSREP_INFO("thread: %lu, schema: %s, query: %s has been aborted due to multi-master conflict",
 1599                thd->thread_id,
 1600                (thd->db ? thd->db : "(null)"),
 1601                thd->query());
 1602     mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
 1603     return WSREP_TRX_FAIL;
 1604   }
 1605   mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
 1606 
 1607   DBUG_ASSERT(thd->wsrep_exec_mode == LOCAL_STATE);
 1608   DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno == WSREP_SEQNO_UNDEFINED);
 1609 
 1610   if (thd->global_read_lock.can_acquire_protection())
 1611   {
 1612     WSREP_DEBUG("Aborting TOI: Global Read-Lock (FTWRL) in place: %s %lu",
 1613                 thd->query(), thd->thread_id);
 1614     return -1;
 1615   }
 1616 
 1617   if (wsrep_debug && thd->mdl_context.has_locks())
 1618   {
 1619     WSREP_DEBUG("thread holds MDL locks at TI begin: %s %lu",
 1620                 thd->query(), thd->thread_id);
 1621   }
 1622 
 1623   /*
 1624     It makes sense to set auto_increment_* to defaults in TOI operations.
 1625     Must be done before wsrep_TOI_begin() since Query_log_event encapsulating
 1626     TOI statement and auto inc variables for wsrep replication is constructed
 1627     there. Variables are reset back in THD::reset_for_next_command() before
 1628     processing of next command.
 1629    */
 1630   if (wsrep_auto_increment_control)
 1631   {
 1632     thd->variables.auto_increment_offset = 1;
 1633     thd->variables.auto_increment_increment = 1;
 1634   }
 1635 
 1636   if (thd->variables.wsrep_on && thd->wsrep_exec_mode==LOCAL_STATE)
 1637   {
 1638     switch (thd->variables.wsrep_OSU_method) {
 1639     case WSREP_OSU_TOI:
 1640       ret= wsrep_TOI_begin(thd, db_, table_, table_list, alter_info);
 1641       break;
 1642     case WSREP_OSU_RSU:
 1643       ret= wsrep_RSU_begin(thd, db_, table_);
 1644       break;
 1645     default:
 1646       WSREP_ERROR("Unsupported OSU method: %lu",
 1647                   thd->variables.wsrep_OSU_method);
 1648       ret= -1;
 1649       break;
 1650     }
 1651     switch (ret) {
 1652     case 0:  thd->wsrep_exec_mode= TOTAL_ORDER; break;
 1653     case 1:
 1654       /* TOI replication skipped, treat as success */
 1655       ret = 0;
 1656       break;
 1657     case -1:
 1658       /* TOI replication failed, treat as error */
 1659       break;
 1660     }
 1661   }
 1662   return ret;
 1663 }
 1664 
 1665 void wsrep_to_isolation_end(THD *thd)
 1666 {
 1667   if (thd->wsrep_exec_mode == TOTAL_ORDER)
 1668   {
 1669     switch(thd->variables.wsrep_OSU_method)
 1670     {
 1671     case WSREP_OSU_TOI: wsrep_TOI_end(thd); break;
 1672     case WSREP_OSU_RSU: wsrep_RSU_end(thd); break;
 1673     default:
 1674       WSREP_WARN("Unsupported wsrep OSU method at isolation end: %lu",
 1675                  thd->variables.wsrep_OSU_method);
 1676       break;
 1677     }
 1678     wsrep_cleanup_transaction(thd);
 1679   }
 1680 }
 1681 
 1682 #define WSREP_MDL_LOG(severity, msg, schema, schema_len, req, gra)             \
 1683     WSREP_##severity(                                                          \
 1684       "%s\n"                                                                   \
 1685       "schema:  %.*s\n"                                                        \
 1686       "request: (%lu \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)\n"      \
 1687       "granted: (%lu \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)",       \
 1688       msg, schema_len, schema,                                                 \
 1689       req->thread_id, (long long)wsrep_thd_trx_seqno(req),                     \
 1690       req->wsrep_exec_mode, req->wsrep_query_state, req->wsrep_conflict_state, \
 1691       req->get_command(), req->lex->sql_command, req->query(),                 \
 1692       gra->thread_id, (long long)wsrep_thd_trx_seqno(gra),                     \
 1693       gra->wsrep_exec_mode, gra->wsrep_query_state, gra->wsrep_conflict_state, \
 1694       gra->get_command(), gra->lex->sql_command, gra->query());
 1695 
 1696 /**
 1697   Check if request for the metadata lock should be granted to the requester.
 1698 
 1699   @param  requestor_ctx        The MDL context of the requestor
 1700   @param  ticket               MDL ticket for the requested lock
 1701 
 1702   @retval TRUE   Lock request can be granted
 1703   @retval FALSE  Lock request cannot be granted
 1704 */
 1705 
 1706 bool
 1707 wsrep_grant_mdl_exception(MDL_context *requestor_ctx,
 1708                           MDL_ticket *ticket,
 1709                           const MDL_key *key
 1710 ) {
 1711   /* Fallback to the non-wsrep behaviour */
 1712   if (!WSREP_ON) return FALSE;
 1713 
 1714   THD *request_thd  = requestor_ctx->get_thd();
 1715   THD *granted_thd  = ticket->get_ctx()->get_thd();
 1716   bool ret          = FALSE;
 1717 
 1718   const char* schema= key->db_name();
 1719   int schema_len= key->db_name_length();
 1720 
 1721   mysql_mutex_lock(&request_thd->LOCK_wsrep_thd);
 1722   if (request_thd->wsrep_exec_mode == TOTAL_ORDER ||
 1723       request_thd->wsrep_exec_mode == REPL_RECV)
 1724   {
 1725     mysql_mutex_unlock(&request_thd->LOCK_wsrep_thd);
 1726     WSREP_MDL_LOG(DEBUG, "MDL conflict ", schema, schema_len,
 1727                   request_thd, granted_thd);
 1728     ticket->wsrep_report(wsrep_debug);
 1729 
 1730     mysql_mutex_lock(&granted_thd->LOCK_wsrep_thd);
 1731     if (granted_thd->wsrep_exec_mode == TOTAL_ORDER ||
 1732         granted_thd->wsrep_exec_mode == REPL_RECV)
 1733     {
 1734       WSREP_MDL_LOG(INFO, "MDL BF-BF conflict", schema, schema_len,
 1735                     request_thd, granted_thd);
 1736       ticket->wsrep_report(true);
 1737       mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd);
 1738       ret = TRUE;
 1739     }
 1740     else if (granted_thd->lex->sql_command == SQLCOM_FLUSH ||
 1741              granted_thd->mdl_context.wsrep_has_explicit_locks())
 1742     {
 1743       WSREP_DEBUG("BF thread waiting for FLUSH");
 1744       ticket->wsrep_report(wsrep_debug);
 1745       mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd);
 1746       ret = FALSE;
 1747     }
 1748     else if (request_thd->lex->sql_command == SQLCOM_DROP_TABLE)
 1749     {
 1750       WSREP_DEBUG("DROP caused BF abort, conf %d", granted_thd->wsrep_conflict_state);
 1751       ticket->wsrep_report(wsrep_debug);
 1752       mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd);
 1753       wsrep_abort_thd((void*)request_thd, (void*)granted_thd, 1);
 1754       ret = FALSE;
 1755     }
 1756     else if (granted_thd->wsrep_query_state == QUERY_COMMITTING)
 1757     {
 1758       WSREP_DEBUG("mdl granted, but commiting thd abort scheduled");
 1759       ticket->wsrep_report(wsrep_debug);
 1760       mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd);
 1761       wsrep_abort_thd((void*)request_thd, (void*)granted_thd, 1);
 1762       ret = FALSE;
 1763     }
 1764     else
 1765     {
 1766       WSREP_MDL_LOG(DEBUG, "MDL conflict-> BF abort", schema, schema_len,
 1767                     request_thd, granted_thd);
 1768       ticket->wsrep_report(wsrep_debug);
 1769       mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd);
 1770       wsrep_abort_thd((void*)request_thd, (void*)granted_thd, 1);
 1771       ret = FALSE;
 1772     }
 1773   }
 1774   else
 1775   {
 1776     mysql_mutex_unlock(&request_thd->LOCK_wsrep_thd);
 1777   }
 1778   return ret;
 1779 }
 1780 
 1781 bool wsrep_node_is_donor()
 1782 {
 1783   return (WSREP_ON) ? (local_status.get() == 2) : false;
 1784 }
 1785 bool wsrep_node_is_synced()
 1786 {
 1787   return (WSREP_ON) ? (local_status.get() == 4) : false;
 1788 }