"Fossies" - the Fresh Open Source Software Archive

Member "opensaf-5.21.09/src/rde/rded/role.cc" (14 Sep 2021, 15660 Bytes) of package /linux/misc/opensaf-5.21.09.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "role.cc" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 5.21.06_vs_5.21.09.

    1 /*      -*- OpenSAF  -*-
    2  *
    3  * (C) Copyright 2016 The OpenSAF Foundation
    4  *
    5  * This program is distributed in the hope that it will be useful, but
    6  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
    7  * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed
    8  * under the GNU Lesser General Public License Version 2.1, February 1999.
    9  * The complete license can be accessed from the following location:
   10  * http://opensource.org/licenses/lgpl-license.php
   11  * See the Copying file included with the OpenSAF distribution for full
   12  * licensing terms.
   13  *
   14  * Author(s): Ericsson AB
   15  *
   16  */
   17 
   18 #ifndef _GNU_SOURCE
   19 #define _GNU_SOURCE
   20 #endif
   21 
   22 #include "rde/rded/role.h"
   23 #include <cinttypes>
   24 #include <cstdint>
   25 #include <thread>
   26 #include "base/getenv.h"
   27 #include "base/logtrace.h"
   28 #include "base/ncs_main_papi.h"
   29 #include "base/ncssysf_def.h"
   30 #include "base/process.h"
   31 #include "base/time.h"
   32 #include "osaf/consensus/consensus.h"
   33 #include "rde/rded/rde_cb.h"
   34 
   35 const char* const Role::role_names_[] = {"Undefined", "ACTIVE",    "STANDBY",
   36                                          "QUIESCED",  "QUIESCING", "Invalid"};
   37 
   38 const char* const Role::pre_active_script_ = PKGLIBDIR "/opensaf_sc_active";
   39 
   40 PCS_RDA_ROLE Role::role() const { return role_; }
   41 
   42 const char* Role::to_string(PCS_RDA_ROLE role) {
   43   return role >= 0 && role < sizeof(role_names_) / sizeof(role_names_[0])
   44              ? role_names_[role]
   45              : role_names_[0];
   46 }
   47 
   48 void Role::MonitorCallback(const std::string& key, const std::string& new_value,
   49                            SYSF_MBX mbx) {
   50   TRACE_ENTER();
   51 
   52   rde_msg* msg = static_cast<rde_msg*>(malloc(sizeof(rde_msg)));
   53   if (key == Consensus::kTakeoverRequestKeyname) {
   54     std::string request;
   55     Consensus consensus_service;
   56 
   57     if (new_value.empty() == true) {
   58       // sometimes the KV store plugin doesn't return the new value,
   59       // let's try to read it in this thread to avoid stalling
   60       // the main thread
   61       TRACE("Empty takeover request from callback. Try reading it");
   62 
   63       SaAisErrorT rc = SA_AIS_ERR_TRY_AGAIN;
   64       constexpr uint8_t max_retry = 5;
   65       uint8_t retries = 0;
   66 
   67       while (retries < max_retry && rc != SA_AIS_OK) {
   68         rc = consensus_service.ReadTakeoverRequest(request);
   69         ++retries;
   70       }
   71     } else {
   72       // use the value received in callback
   73       request = new_value;
   74     }
   75 
   76     msg->type = RDE_MSG_TAKEOVER_REQUEST_CALLBACK;
   77     size_t len = request.length() + 1;
   78     msg->info.takeover_request = new char[len];
   79     strncpy(msg->info.takeover_request, request.c_str(), len);
   80     TRACE("Sending takeover request '%s' to main thread",
   81            msg->info.takeover_request);
   82     if (consensus_service.SelfFence(request) == false &&
   83         consensus_service.PrioritisePartitionSize() == true) {
   84       // don't send this to the main thread straight away, as it will
   85       // need some time to process topology changes.
   86       std::this_thread::sleep_for(
   87         std::chrono::seconds(
   88           consensus_service.PrioritisePartitionSizeWaitTime()));
   89     }
   90   } else {
   91     msg->type = RDE_MSG_NEW_ACTIVE_CALLBACK;
   92   }
   93 
   94   uint32_t status;
   95   status = m_NCS_IPC_SEND(&mbx, msg, NCS_IPC_PRIORITY_NORMAL);
   96   osafassert(status == NCSCC_RC_SUCCESS);
   97 }
   98 
   99 void Role::PromoteNode(const uint64_t cluster_size,
  100                        const bool relaxed_mode) {
  101   TRACE_ENTER();
  102   SaAisErrorT rc;
  103 
  104   Consensus consensus_service;
  105   bool promotion_pending = false;
  106 
  107   rc = consensus_service.PromoteThisNode(true, cluster_size);
  108   if (rc == SA_AIS_ERR_EXIST) {
  109     LOG_WA("Another controller is already active");
  110     if (role() == PCS_RDA_UNDEFINED) SetRole(PCS_RDA_QUIESCED);
  111     return;
  112   } else if (rc != SA_AIS_OK && relaxed_mode == true) {
  113     LOG_WA("Unable to set active controller in consensus service");
  114     if (role() == PCS_RDA_QUIESCED) {
  115       LOG_WA("Another controller is already promoted");
  116       return;
  117     }
  118     LOG_WA("Will become active anyway");
  119     promotion_pending = true;
  120   } else if (rc != SA_AIS_OK) {
  121     LOG_ER("Unable to set active controller in consensus service");
  122     opensaf_quick_reboot("Unable to set active controller "
  123         "in consensus service");
  124     return;
  125   }
  126 
  127   RDE_CONTROL_BLOCK* cb = rde_get_control_block();
  128 
  129   // send msg to main thread
  130   rde_msg* msg = static_cast<rde_msg*>(malloc(sizeof(rde_msg)));
  131   msg->type = RDE_MSG_ACTIVE_PROMOTION_SUCCESS;
  132   uint32_t status;
  133   status = m_NCS_IPC_SEND(&cb->mbx, msg, NCS_IPC_PRIORITY_HIGH);
  134   osafassert(status == NCSCC_RC_SUCCESS);
  135 
  136   if (promotion_pending) {
  137     osafassert(consensus_service.IsRelaxedNodePromotionEnabled() == true);
  138     // the node has been promoted, even though the lock has not been obtained
  139     // keep trying the consensus service
  140     while (rc != SA_AIS_OK) {
  141       rc = consensus_service.PromoteThisNode(true, cluster_size);
  142       if (rc == SA_AIS_ERR_EXIST) {
  143         LOG_ER("Unable to set active controller in consensus service");
  144         opensaf_quick_reboot("Unable to set active controller in "
  145             "consensus service");
  146         return;
  147       }
  148       std::this_thread::sleep_for(std::chrono::seconds(1));
  149     }
  150     LOG_NO("Successfully set active controller in consensus service");
  151   }
  152 }
  153 
  154 void Role::NodePromoted() {
  155   // promoted to active from election
  156   ExecutePreActiveScript();
  157   LOG_NO("Switched to ACTIVE from %s", to_string(role()));
  158   PCS_RDA_ROLE old_role = role_;
  159   role_ = PCS_RDA_ACTIVE;
  160   rde_rda_send_role(role_);
  161   if (UpdateMdsRegistration(role_, old_role) != NCSCC_RC_SUCCESS) {
  162     LOG_ER("Failed to update MDS Registration");
  163     abort();
  164   }
  165 
  166   Consensus consensus_service;
  167   RDE_CONTROL_BLOCK* cb = rde_get_control_block();
  168   if (cb->peer_controllers.empty() == false) {
  169     TRACE("Set state to kActiveElectedSeenPeer");
  170     cb->state = State::kActiveElectedSeenPeer;
  171   } else {
  172     TRACE("Set state to kActiveElected");
  173     cb->state = State::kActiveElected;
  174   }
  175 
  176   // register for callback if active controller is changed
  177   // in consensus service
  178   if (consensus_service.IsEnabled() == true &&
  179       cb->monitor_lock_thread_running == false) {
  180     cb->monitor_lock_thread_running = true;
  181     consensus_service.MonitorLock(MonitorCallback, cb->mbx);
  182   }
  183   if (consensus_service.IsEnabled() == true &&
  184       cb->monitor_takeover_req_thread_running == false) {
  185     cb->monitor_takeover_req_thread_running = true;
  186     consensus_service.MonitorTakeoverRequest(MonitorCallback, cb->mbx);
  187   }
  188 }
  189 
  190 Role::Role(NODE_ID own_node_id)
  191     : known_nodes_{},
  192       role_{PCS_RDA_QUIESCED},
  193       own_node_id_{own_node_id},
  194       proc_{new base::Process()},
  195       election_end_time_{},
  196       discover_peer_timeout_{base::GetEnv("RDE_DISCOVER_PEER_TIMEOUT",
  197                                           kDefaultDiscoverPeerTimeout)},
  198       pre_active_script_timeout_{base::GetEnv(
  199           "RDE_PRE_ACTIVE_SCRIPT_TIMEOUT", kDefaultPreActiveScriptTimeout)},
  200       received_peer_info_{true},
  201       peer_info_wait_time_{},
  202       peer_info_wait_timeout_ {kDefaultWaitPeerInfoTimeout} {}
  203 
  204 timespec* Role::Poll(timespec* ts) {
  205   TRACE_ENTER();
  206   timespec* timeout = nullptr;
  207   if (role_ == PCS_RDA_UNDEFINED) {
  208     timespec now = base::ReadMonotonicClock();
  209     if (election_end_time_ >= now) {
  210       *ts = election_end_time_ - now;
  211       timeout = ts;
  212     } else {
  213       election_end_time_ = base::kTimespecMax;
  214       RDE_CONTROL_BLOCK* cb = rde_get_control_block();
  215 
  216       bool is_candidate = IsCandidate();
  217       Consensus consensus_service;
  218       if (consensus_service.IsEnabled() == true &&
  219         is_candidate == false &&
  220         consensus_service.IsWritable() == false) {
  221         // node promotion will fail resulting in node reboot,
  222         // reset timer and try later
  223         TRACE("reset timer and try later");
  224         ResetElectionTimer();
  225         now = base::ReadMonotonicClock();
  226         *ts = election_end_time_ - now;
  227         timeout = ts;
  228       } else {
  229         std::thread(&Role::PromoteNode,
  230                     this, cb->cluster_members.size(),
  231                     is_candidate).detach();
  232       }
  233     }
  234   } else if (role_ == PCS_RDA_ACTIVE) {
  235     RDE_CONTROL_BLOCK* cb = rde_get_control_block();
  236     if (cb->consensus_service_state == ConsensusState::kUnknown ||
  237         cb->consensus_service_state == ConsensusState::kDisconnected) {
  238       // consensus service was previously disconnected, refresh state
  239       Consensus consensus_service;
  240       if (consensus_service.IsEnabled() == true &&
  241         cb->state_refresh_thread_started == false) {
  242         cb->state_refresh_thread_started = true;
  243         std::thread(&Role::RefreshConsensusState, this, cb).detach();
  244       }
  245       if (consensus_service.IsEnabled() == false) {
  246         // We are already ACTIVE, and has just discovered a new node
  247         // which makes the election_end_time_ reset
  248         if (received_peer_info_ == false) {
  249           timespec now = base::ReadMonotonicClock();
  250           if (peer_info_wait_time_ >= now) {
  251             *ts = peer_info_wait_time_ - now;
  252             timeout = ts;
  253           } else {
  254             // Timeout but haven't received peer info
  255             // The peer RDE could be in ACTIVE
  256             // thus self-fence to avoid split-brain risk
  257             LOG_ER("Discovery peer up without peer info. Risk in split-brain,"
  258                 "rebooting this node");
  259             opensaf_quick_reboot("Probable split-brain due to "
  260                 "unknown RDE peer info");
  261           }
  262         }
  263       }
  264     }
  265   }
  266   return timeout;
  267 }
  268 
  269 void Role::ExecutePreActiveScript() {
  270   int argc = 1;
  271   char* argv[] = {const_cast<char*>(pre_active_script_), nullptr};
  272   proc_->Execute(argc, argv,
  273                  std::chrono::milliseconds(pre_active_script_timeout_));
  274 }
  275 
  276 void Role::AddPeer(NODE_ID node_id) {
  277   TRACE_ENTER();
  278   auto result = known_nodes_.insert(node_id);
  279   if (result.second) {
  280     ResetElectionTimer();
  281     if (role_ == PCS_RDA_ACTIVE) {
  282       ResetPeerInfoWaitTimer();
  283       received_peer_info_ = false;
  284     }
  285   }
  286 }
  287 
  288 void Role::RemovePeer(NODE_ID node_id) {
  289   TRACE_ENTER();
  290   if (received_peer_info_ == false && role_ != PCS_RDA_ACTIVE) {
  291     StopPeerInfoWaitTimer();
  292   }
  293   known_nodes_.erase(node_id);
  294 }
  295 
  296 // call from main thread only
  297 bool Role::IsCandidate() {
  298   TRACE_ENTER();
  299   bool result = false;
  300   Consensus consensus_service;
  301   RDE_CONTROL_BLOCK* cb = rde_get_control_block();
  302 
  303   // if relaxed node promotion is enabled, allow this node to be promoted
  304   // active if it can see a peer SC and this node has the lowest node ID
  305   if (consensus_service.IsRelaxedNodePromotionEnabled() == true &&
  306       cb->state == State::kNotActiveSeenPeer) {
  307     LOG_NO("Relaxed node promotion enabled. This node is a candidate.");
  308     result = true;
  309   }
  310 
  311   return result;
  312 }
  313 
  314 bool Role::IsPeerPresent() {
  315   bool result = false;
  316   RDE_CONTROL_BLOCK* cb = rde_get_control_block();
  317 
  318   if (cb->peer_controllers.empty() == false) {
  319     result = true;
  320   }
  321 
  322   return result;
  323 }
  324 
  325 uint32_t Role::SetRole(PCS_RDA_ROLE new_role) {
  326   TRACE_ENTER();
  327   PCS_RDA_ROLE old_role = role_;
  328   if (new_role == PCS_RDA_ACTIVE &&
  329       (old_role == PCS_RDA_UNDEFINED || old_role == PCS_RDA_QUIESCED)) {
  330     LOG_NO("Requesting ACTIVE role");
  331     new_role = PCS_RDA_UNDEFINED;
  332     RDE_CONTROL_BLOCK* cb = rde_get_control_block();
  333     cb->promote_start = base::ReadMonotonicClock();
  334     cb->promote_pending = 0;
  335   }
  336   if (new_role != old_role) {
  337     LOG_NO("RDE role set to %s", to_string(new_role));
  338     if (new_role == PCS_RDA_ACTIVE) {
  339       ExecutePreActiveScript();
  340 
  341       // register for callback if active controller is changed
  342       // in consensus service
  343       Consensus consensus_service;
  344       RDE_CONTROL_BLOCK* cb = rde_get_control_block();
  345       cb->state = State::kActiveFailover;
  346       if (consensus_service.IsEnabled() == true &&
  347           cb->monitor_lock_thread_running == false) {
  348         cb->monitor_lock_thread_running = true;
  349         consensus_service.MonitorLock(MonitorCallback, cb->mbx);
  350       }
  351       if (consensus_service.IsEnabled() == true &&
  352           cb->monitor_takeover_req_thread_running == false) {
  353         cb->monitor_takeover_req_thread_running = true;
  354         consensus_service.MonitorTakeoverRequest(MonitorCallback, cb->mbx);
  355       }
  356     }
  357     role_ = new_role;
  358     if (new_role == PCS_RDA_UNDEFINED) {
  359       known_nodes_.clear();
  360       ResetElectionTimer();
  361     } else {
  362       rde_rda_send_role(new_role);
  363     }
  364   }
  365   return UpdateMdsRegistration(new_role, old_role);
  366 }
  367 
  368 void Role::ResetElectionTimer() {
  369   TRACE_ENTER();
  370   election_end_time_ = base::ReadMonotonicClock() +
  371                        base::MillisToTimespec(discover_peer_timeout_);
  372 }
  373 
  374 void Role::ResetPeerInfoWaitTimer() {
  375   TRACE_ENTER();
  376   LOG_NO("Start/restart waiting peer info timer");
  377   peer_info_wait_time_ = base::ReadMonotonicClock() +
  378                        base::MillisToTimespec(peer_info_wait_timeout_);
  379 }
  380 
  381 void Role::StopPeerInfoWaitTimer() {
  382   TRACE_ENTER();
  383   // Turn off peer_info_timer
  384   received_peer_info_ = true;
  385 }
  386 
  387 uint32_t Role::UpdateMdsRegistration(PCS_RDA_ROLE new_role,
  388                                      PCS_RDA_ROLE old_role) {
  389   uint32_t rc = NCSCC_RC_SUCCESS;
  390   bool mds_registered_before = old_role != PCS_RDA_QUIESCED;
  391   bool mds_registered_after = new_role != PCS_RDA_QUIESCED;
  392   if (mds_registered_after != mds_registered_before) {
  393     if (mds_registered_after) {
  394       if (rde_mds_register() != NCSCC_RC_SUCCESS) {
  395         LOG_ER("rde_mds_register() failed");
  396         rc = NCSCC_RC_FAILURE;
  397       }
  398     } else {
  399       if (rde_mds_unregister() != NCSCC_RC_SUCCESS) {
  400         LOG_ER("rde_mds_unregister() failed");
  401         rc = NCSCC_RC_FAILURE;
  402       }
  403     }
  404   }
  405   return rc;
  406 }
  407 
  408 void Role::SetPeerState(PCS_RDA_ROLE node_role, NODE_ID node_id,
  409                         uint64_t peer_promote_pending) {
  410   TRACE_ENTER();
  411   if (role() == PCS_RDA_UNDEFINED) {
  412     bool give_up = false;
  413     RDE_CONTROL_BLOCK *cb = rde_get_control_block();
  414     if (node_role == PCS_RDA_UNDEFINED) {
  415       if (cb->promote_pending == 0) {
  416         struct timespec now = base::ReadMonotonicClock();
  417         cb->promote_pending = base::TimespecToMillis(now - cb->promote_start);
  418       }
  419       if ((cb->promote_pending < peer_promote_pending) ||
  420           (cb->promote_pending == peer_promote_pending &&
  421            node_id < own_node_id_))
  422         give_up = true;
  423     }
  424     if (node_role == PCS_RDA_ACTIVE || node_role == PCS_RDA_STANDBY ||
  425         give_up) {
  426       // broadcast QUIESCED role to all peers to stop their waiting peer
  427       // info timer
  428       rde_msg peer_info_req;
  429       peer_info_req.type = RDE_MSG_PEER_INFO_RESP;
  430       peer_info_req.info.peer_info.ha_role = PCS_RDA_QUIESCED;
  431       peer_info_req.info.peer_info.promote_pending = 0;
  432       rde_mds_broadcast(&peer_info_req);
  433 
  434       SetRole(PCS_RDA_QUIESCED);
  435       LOG_NO("Giving up election against 0x%" PRIx32
  436              " with role %s. "
  437              "My role is now %s",
  438              node_id, to_string(node_role), to_string(role()));
  439     }
  440   }
  441   known_nodes_.insert(node_id);
  442   StopPeerInfoWaitTimer();
  443 }
  444 
  445 void Role::PromoteNodeLate() {
  446   TRACE_ENTER();
  447 
  448   // we are already active and split brain prevention has been
  449   // enabled during runtime, we need to obtain lock
  450   RDE_CONTROL_BLOCK* cb = rde_get_control_block();
  451   std::thread(&Role::PromoteNode,
  452               this, cb->cluster_members.size(),
  453               true).detach();
  454 }
  455 
  456 void Role::RefreshConsensusState(RDE_CONTROL_BLOCK* cb) {
  457   TRACE_ENTER();
  458 
  459   Consensus consensus_service;
  460   if (consensus_service.IsWritable() == true) {
  461     LOG_NO("Connectivity to consensus service established");
  462     cb->consensus_service_state = ConsensusState::kConnected;
  463   }
  464 
  465   cb->state_refresh_thread_started = false;
  466 }