"Fossies" - the Fresh Open Source Software Archive

Member "opensaf-5.21.09/src/rde/rded/rde_main.cc" (14 Sep 2021, 17844 Bytes) of package /linux/misc/opensaf-5.21.09.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "rde_main.cc" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 5.21.06_vs_5.21.09.

    1 /*      -*- OpenSAF  -*-
    2  *
    3  * (C) Copyright 2010 The OpenSAF Foundation
    4  *
    5  * This program is distributed in the hope that it will be useful, but
    6  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
    7  * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed
    8  * under the GNU Lesser General Public License Version 2.1, February 1999.
    9  * The complete license can be accessed from the following location:
   10  * http://opensource.org/licenses/lgpl-license.php
   11  * See the Copying file included with the OpenSAF distribution for full
   12  * licensing terms.
   13  *
   14  * Author(s): Ericsson AB
   15  *            Wind River Systems
   16  *
   17  */
   18 
   19 #include <limits.h>
   20 #include <saAmf.h>
   21 #include <signal.h>
   22 #include <sys/resource.h>
   23 #include <sys/time.h>
   24 #include <sys/types.h>
   25 #include <sys/wait.h>
   26 #include <syslog.h>
   27 #include <unistd.h>
   28 #include <cassert>
   29 #include <cerrno>
   30 #include <cstdlib>
   31 #include <cstring>
   32 #include "base/conf.h"
   33 #include "base/time.h"
   34 #include "base/daemon.h"
   35 #include "base/logtrace.h"
   36 #include "base/ncs_main_papi.h"
   37 #include "base/osaf_poll.h"
   38 #include "mds/mds_papi.h"
   39 #include "nid/agent/nid_api.h"
   40 #include "osaf/consensus/consensus.h"
   41 #include "rde/rded/rde_cb.h"
   42 #include "rde/rded/role.h"
   43 
   44 #define RDA_MAX_CLIENTS 32
   45 
   46 enum { FD_TERM = 0, FD_AMF = 1, FD_MBX, FD_RDA_SERVER,
   47        FD_SIGHUP, FD_CLIENT_START };
   48 
   49 static void BroadcastPeerInfoResp();
   50 static void CheckForSplitBrain(const rde_msg *msg);
   51 
   52 const char *rde_msg_name[] = {"-",
   53                               "RDE_MSG_PEER_UP(1)",
   54                               "RDE_MSG_PEER_DOWN(2)",
   55                               "RDE_MSG_PEER_INFO_REQ(3)",
   56                               "RDE_MSG_PEER_INFO_RESP(4)",
   57                               "RDE_MSG_NEW_ACTIVE_CALLBACK(5)",
   58                               "RDE_MSG_NODE_UP(6)",
   59                               "RDE_MSG_NODE_DOWN(7)",
   60                               "RDE_MSG_TAKEOVER_REQUEST_CALLBACK(8)",
   61                               "RDE_MSG_ACTIVE_PROMOTION_SUCCESS(9)",
   62                               "RDE_MSG_CONTROLLER_UP(10)",
   63                               "RDE_MSG_CONTROLLER_DOWN(11)"};
   64 
   65 static RDE_CONTROL_BLOCK _rde_cb;
   66 static RDE_CONTROL_BLOCK *rde_cb = &_rde_cb;
   67 static NCS_SEL_OBJ usr1_sel_obj;
   68 static NODE_ID own_node_id;
   69 static Role *role;
   70 
   71 RDE_CONTROL_BLOCK *rde_get_control_block() { return rde_cb; }
   72 
   73 /**
   74  * USR1 signal is used when AMF wants instantiate us as a
   75  * component. Wake up the main thread so it can register with
   76  * AMF.
   77  *
   78  * @param i_sig_num
   79  */
   80 static void sigusr1_handler(int sig) {
   81   (void)sig;
   82   signal(SIGUSR1, SIG_IGN);
   83   ncs_sel_obj_ind(&usr1_sel_obj);
   84 }
   85 
   86 static int fd_to_client_ixd(int fd) {
   87   int i;
   88   RDE_RDA_CB *rde_rda_cb = &rde_cb->rde_rda_cb;
   89 
   90   for (i = 0; i < rde_rda_cb->client_count; i++)
   91     if (fd == rde_rda_cb->clients[i].fd) break;
   92 
   93   assert(i < MAX_RDA_CLIENTS);
   94 
   95   return i;
   96 }
   97 
   98 static void handle_mbx_event() {
   99   rde_msg *msg;
  100 
  101   TRACE_ENTER();
  102 
  103   msg = reinterpret_cast<rde_msg *>(ncs_ipc_non_blk_recv(&rde_cb->mbx));
  104 
  105   switch (msg->type) {
  106     case RDE_MSG_PEER_INFO_REQ:
  107     case RDE_MSG_PEER_INFO_RESP: {
  108       if (msg->fr_node_id != own_node_id) {
  109         LOG_NO("Got peer info %s from node 0x%x with role %s",
  110             msg->type == RDE_MSG_PEER_INFO_RESP ? "response" : "request",
  111                 msg->fr_node_id, Role::to_string(msg->info.peer_info.ha_role));
  112         CheckForSplitBrain(msg);
  113         role->SetPeerState(msg->info.peer_info.ha_role, msg->fr_node_id,
  114             msg->info.peer_info.promote_pending);
  115       }
  116       break;
  117     }
  118     case RDE_MSG_PEER_UP: {
  119       if (msg->fr_node_id != own_node_id) {
  120         LOG_NO("Peer up on node 0x%x", msg->fr_node_id);
  121         BroadcastPeerInfoResp();
  122         role->AddPeer(msg->fr_node_id);
  123       }
  124       break;
  125     }
  126     case RDE_MSG_PEER_DOWN:
  127       LOG_NO("Peer down on node 0x%x", msg->fr_node_id);
  128       role->RemovePeer(msg->fr_node_id);
  129       break;
  130     case RDE_MSG_NEW_ACTIVE_CALLBACK: {
  131       const std::string my_node = base::Conf::NodeName();
  132       rde_cb->monitor_lock_thread_running = false;
  133 
  134       // get current active controller
  135       Consensus consensus_service;
  136       if (consensus_service.IsEnabled() == false) {
  137         // disabled during runtime
  138         break;
  139       }
  140       std::string active_controller = consensus_service.CurrentActive();
  141 
  142       LOG_NO("New active controller notification from consensus service");
  143 
  144       if (role->role() == PCS_RDA_ACTIVE) {
  145         if (my_node.compare(active_controller) != 0 &&
  146             active_controller.empty() == false) {
  147           // we are meant to be active, but consensus service doesn't think so
  148           LOG_WA("Role does not match consensus service. New controller: %s",
  149                  active_controller.c_str());
  150           if (consensus_service.IsRemoteFencingEnabled() == false) {
  151             LOG_ER("Probable split-brain. Rebooting this node");
  152             opensaf_quick_reboot("Split-brain detected by consensus service");
  153           }
  154         }
  155 
  156         // register for callback
  157         rde_cb->monitor_lock_thread_running = true;
  158         consensus_service.MonitorLock(Role::MonitorCallback, rde_cb->mbx);
  159       }
  160       break;
  161     }
  162     case RDE_MSG_NODE_UP:
  163       rde_cb->cluster_members.insert(msg->fr_node_id);
  164       TRACE("cluster_size %zu", rde_cb->cluster_members.size());
  165       break;
  166     case RDE_MSG_NODE_DOWN:
  167       rde_cb->cluster_members.erase(msg->fr_node_id);
  168       TRACE("cluster_size %zu", rde_cb->cluster_members.size());
  169       break;
  170     case RDE_MSG_CONTROLLER_UP:
  171       if (msg->fr_node_id != own_node_id) {
  172         rde_cb->peer_controllers.insert(msg->fr_node_id);
  173         TRACE("peer_controllers: size %zu", rde_cb->peer_controllers.size());
  174         if (rde_cb->state == State::kNotActive) {
  175           TRACE("Set state to kNotActiveSeenPeer");
  176           rde_cb->state = State::kNotActiveSeenPeer;
  177         } else if (rde_cb->state == State::kActiveElected) {
  178           TRACE("Set state to kActiveElectedSeenPeer");
  179           rde_cb->state = State::kActiveElectedSeenPeer;
  180         }
  181       }
  182       break;
  183     case RDE_MSG_CONTROLLER_DOWN:
  184       rde_cb->peer_controllers.erase(msg->fr_node_id);
  185       TRACE("peer_controllers: size %zu", rde_cb->peer_controllers.size());
  186       if (role->role() == PCS_RDA_ACTIVE) {
  187         Consensus consensus_service;
  188         if (consensus_service.IsEnabled() == true &&
  189             rde_cb->consensus_service_state == ConsensusState::kDisconnected &&
  190             consensus_service.IsRelaxedNodePromotionEnabled() == true &&
  191             role->IsPeerPresent() == false) {
  192             LOG_NO("Lost connectivity to consensus service. No peer present");
  193             if (consensus_service.IsRemoteFencingEnabled() == false) {
  194                 opensaf_quick_reboot("Lost connectivity to consensus service. "
  195                                      "Rebooting this node");
  196             }
  197         }
  198       }
  199       break;
  200     case RDE_MSG_TAKEOVER_REQUEST_CALLBACK: {
  201       rde_cb->monitor_takeover_req_thread_running = false;
  202       const std::string takeover_request(msg->info.takeover_request);
  203       delete[] msg->info.takeover_request;
  204       msg->info.takeover_request = nullptr;
  205 
  206       if (role->role() == PCS_RDA_ACTIVE) {
  207         TRACE("Received takeover request '%s'. Our network size is %zu",
  208                takeover_request.c_str(),
  209                rde_cb->cluster_members.size());
  210 
  211         Consensus consensus_service;
  212         if (consensus_service.IsEnabled() == false) {
  213           // disabled during runtime
  214           break;
  215         }
  216         Consensus::TakeoverState state =
  217             consensus_service.HandleTakeoverRequest(
  218                 rde_cb->cluster_members.size(),
  219                 takeover_request);
  220 
  221         if (state == Consensus::TakeoverState::ACCEPTED) {
  222           LOG_NO("Accepted takeover request");
  223           if (consensus_service.IsRemoteFencingEnabled() == false) {
  224             opensaf_quick_reboot("Another controller is taking over "
  225                 "the active role. Rebooting this node");
  226           }
  227         } else if (state == Consensus::TakeoverState::UNDEFINED) {
  228           bool fencing_required = true;
  229 
  230           // differentiate when this occurs after election or
  231           // rde has been set active due to failover
  232           if (consensus_service.IsRelaxedNodePromotionEnabled() == true) {
  233               if (rde_cb->state == State::kActiveElected) {
  234                 TRACE("Relaxed mode is enabled");
  235                 TRACE("No peer SC yet seen, ignore consensus service failure");
  236                 // if relaxed node promotion is enabled, and we have yet to see
  237                 // a peer SC after being promoted, tolerate consensus service
  238                 // not working
  239                 fencing_required = false;
  240               } else if ((rde_cb->state == State::kActiveElectedSeenPeer ||
  241                          rde_cb->state == State::kActiveFailover) &&
  242                          role->IsPeerPresent() == true) {
  243                 TRACE("Relaxed mode is enabled");
  244                 TRACE("Peer SC can be seen, ignore consensus service failure");
  245                 // we have seen the peer, and peer is still connected, tolerate
  246                 // consensus service not working
  247                 fencing_required = false;
  248                 rde_cb->consensus_service_state = ConsensusState::kDisconnected;
  249               }
  250           }
  251           if (fencing_required == true) {
  252             LOG_NO("Lost connectivity to consensus service");
  253             if (consensus_service.IsRemoteFencingEnabled() == false) {
  254                 opensaf_quick_reboot("Lost connectivity to consensus service. "
  255                                      "Rebooting this node");
  256             }
  257           }
  258         }
  259 
  260         TRACE("Rejected takeover request");
  261 
  262         rde_cb->monitor_takeover_req_thread_running = true;
  263         consensus_service.MonitorTakeoverRequest(Role::MonitorCallback,
  264                                                  rde_cb->mbx);
  265       } else {
  266         LOG_WA("Received takeover request when not active");
  267       }
  268     } break;
  269     case RDE_MSG_ACTIVE_PROMOTION_SUCCESS:
  270       role->NodePromoted();
  271       break;
  272     default:
  273       LOG_ER("%s: discarding unknown message type %u", __FUNCTION__, msg->type);
  274       break;
  275   }
  276 
  277   free(msg);
  278 
  279   TRACE_LEAVE();
  280 }
  281 
  282 static void CheckForSplitBrain(const rde_msg *msg) {
  283   PCS_RDA_ROLE own_role = role->role();
  284   PCS_RDA_ROLE other_role = msg->info.peer_info.ha_role;
  285   if (own_role == PCS_RDA_ACTIVE && other_role == PCS_RDA_ACTIVE) {
  286     opensaf_quick_reboot("Split-brain detected");
  287   }
  288 }
  289 
  290 static void BroadcastPeerInfoResp() {
  291   RDE_CONTROL_BLOCK *cb = rde_get_control_block();
  292   rde_msg peer_info_req;
  293   peer_info_req.type = RDE_MSG_PEER_INFO_RESP;
  294   peer_info_req.info.peer_info.ha_role = role->role();
  295   if (role->role() == PCS_RDA_UNDEFINED && cb->promote_pending == 0) {
  296     struct timespec now = base::ReadMonotonicClock();
  297     cb->promote_pending = base::TimespecToMillis(now - cb->promote_start);
  298   }
  299   peer_info_req.info.peer_info.promote_pending = cb->promote_pending;
  300   rde_mds_broadcast(&peer_info_req);
  301 }
  302 
  303 /**
  304  * Initialize the RDE server.
  305  *
  306  * @return int, 0=OK
  307  */
  308 static int initialize_rde() {
  309   RDE_RDA_CB *rde_rda_cb = &rde_cb->rde_rda_cb;
  310   int rc = NCSCC_RC_FAILURE;
  311 
  312   if ((rc = rde_rda_open(RDE_RDA_SOCK_NAME, rde_rda_cb)) != NCSCC_RC_SUCCESS) {
  313     goto init_failed;
  314   }
  315 
  316   /* Determine how this process was started, by NID or AMF */
  317   if (getenv("SA_AMF_COMPONENT_NAME") == nullptr)
  318     rde_cb->rde_amf_cb.nid_started = true;
  319 
  320   rde_rda_cb->fmd_conf_file = base::GetEnv("FMS_CONF_FILE", "");
  321 
  322   if ((rc = ncs_core_agents_startup()) != NCSCC_RC_SUCCESS) {
  323     LOG_ER("ncs_core_agents_startup FAILED");
  324     goto init_failed;
  325   }
  326 
  327   own_node_id = ncs_get_node_id();
  328   role = new Role(own_node_id);
  329   rde_rda_cb->role = role;
  330   rde_cb->rde_amf_cb.role = role;
  331 
  332   if (rde_cb->rde_amf_cb.nid_started &&
  333       (rc = ncs_sel_obj_create(&usr1_sel_obj)) != NCSCC_RC_SUCCESS) {
  334     LOG_ER("ncs_sel_obj_create FAILED");
  335     goto init_failed;
  336   }
  337 
  338   if ((rc = ncs_ipc_create(&rde_cb->mbx)) != NCSCC_RC_SUCCESS) {
  339     LOG_ER("ncs_ipc_create FAILED");
  340     goto init_failed;
  341   }
  342 
  343   if ((rc = ncs_ipc_attach(&rde_cb->mbx)) != NCSCC_RC_SUCCESS) {
  344     LOG_ER("ncs_ipc_attach FAILED");
  345     goto init_failed;
  346   }
  347 
  348   if (rde_cb->rde_amf_cb.nid_started &&
  349       signal(SIGUSR1, sigusr1_handler) == SIG_ERR) {
  350     LOG_ER("signal USR1 FAILED: %s", strerror(errno));
  351     goto init_failed;
  352   }
  353 
  354   if (rde_discovery_mds_register() != NCSCC_RC_SUCCESS) {
  355     LOG_ER("rde_discovery_mds_register() failed");
  356     rc = NCSCC_RC_FAILURE;
  357   }
  358 
  359   rc = NCSCC_RC_SUCCESS;
  360 
  361 init_failed:
  362   return rc;
  363 }
  364 
  365 int main(int argc, char *argv[]) {
  366   nfds_t nfds = FD_CLIENT_START;
  367   pollfd fds[FD_CLIENT_START + RDA_MAX_CLIENTS];
  368   int ret;
  369   NCS_SEL_OBJ mbx_sel_obj;
  370   RDE_RDA_CB *rde_rda_cb = &rde_cb->rde_rda_cb;
  371   int term_fd;
  372   int hangup_fd;
  373   NCS_SEL_OBJ *hangup_sel_obj = nullptr;
  374   opensaf_reboot_prepare();
  375 
  376   daemonize(argc, argv);
  377 
  378   base::Conf::InitNodeName();
  379 
  380   if (initialize_rde() != NCSCC_RC_SUCCESS) goto init_failed;
  381 
  382   mbx_sel_obj = ncs_ipc_get_sel_obj(&rde_cb->mbx);
  383 
  384   /* If AMF started register immediately */
  385   if (!rde_cb->rde_amf_cb.nid_started &&
  386       rde_amf_init(&rde_cb->rde_amf_cb) != NCSCC_RC_SUCCESS) {
  387     goto init_failed;
  388   }
  389 
  390   daemon_sigterm_install(&term_fd);
  391   hangup_sel_obj = daemon_sighup_install(&hangup_fd);
  392 
  393   fds[FD_TERM].fd = term_fd;
  394   fds[FD_TERM].events = POLLIN;
  395 
  396   /* USR1/AMF fd */
  397   fds[FD_AMF].fd = rde_cb->rde_amf_cb.nid_started ? usr1_sel_obj.rmv_obj
  398                                                   : rde_cb->rde_amf_cb.amf_fd;
  399   fds[FD_AMF].events = POLLIN;
  400 
  401   fds[FD_SIGHUP].fd = hangup_fd;
  402   fds[FD_SIGHUP].events = POLLIN;
  403 
  404   /* Mailbox */
  405   fds[FD_MBX].fd = mbx_sel_obj.rmv_obj;
  406   fds[FD_MBX].events = POLLIN;
  407 
  408   /* RDA server socket */
  409   fds[FD_RDA_SERVER].fd = rde_cb->rde_rda_cb.fd;
  410   fds[FD_RDA_SERVER].events = POLLIN;
  411 
  412   if (rde_cb->rde_amf_cb.nid_started) {
  413     TRACE("NID started");
  414     if (nid_notify("RDE", NCSCC_RC_SUCCESS, nullptr) != NCSCC_RC_SUCCESS) {
  415       LOG_ER("nid_notify failed");
  416       goto init_failed;
  417     }
  418   } else {
  419     TRACE("Not NID started");
  420   }
  421 
  422   while (1) {
  423     nfds_t fds_to_poll =
  424         role->role() != PCS_RDA_UNDEFINED ? nfds : FD_CLIENT_START;
  425     ret = osaf_poll(fds, fds_to_poll, 0);
  426     if (ret == 0) {
  427       timespec ts;
  428       timespec *timeout = role->Poll(&ts);
  429       fds_to_poll = role->role() != PCS_RDA_UNDEFINED ? nfds : FD_CLIENT_START;
  430       ret = osaf_ppoll(fds, fds_to_poll, timeout, nullptr);
  431     }
  432 
  433     if (ret == -1) {
  434       if (errno == EINTR) continue;
  435 
  436       LOG_ER("poll failed - %s", strerror(errno));
  437       break;
  438     }
  439 
  440     if (fds[FD_TERM].revents & POLLIN) {
  441       rde_discovery_mds_unregister();
  442       daemon_exit();
  443     }
  444 
  445     if (fds[FD_AMF].revents & POLLIN) {
  446       if (rde_cb->rde_amf_cb.amf_hdl != 0) {
  447         SaAisErrorT error;
  448         TRACE("AMF event rec");
  449         if ((error = saAmfDispatch(rde_cb->rde_amf_cb.amf_hdl,
  450                                    SA_DISPATCH_ALL)) != SA_AIS_OK) {
  451           LOG_ER("saAmfDispatch failed: %u", error);
  452           goto done;
  453         }
  454       } else {
  455         TRACE("SIGUSR1 event rec");
  456         ncs_sel_obj_destroy(&usr1_sel_obj);
  457 
  458         if (rde_amf_init(&rde_cb->rde_amf_cb) != NCSCC_RC_SUCCESS) goto done;
  459 
  460         fds[FD_AMF].fd = rde_cb->rde_amf_cb.amf_fd;
  461       }
  462     }
  463 
  464     if (fds[FD_SIGHUP].revents & POLLIN) {
  465       ncs_sel_obj_rmv_ind(hangup_sel_obj, true, true);
  466       Consensus consensus_service;
  467       bool old_setting = consensus_service.IsEnabled();
  468       consensus_service.ReloadConfiguration();
  469       bool new_setting = consensus_service.IsEnabled();
  470       if (role->role() == PCS_RDA_ACTIVE) {
  471         if (old_setting == false && new_setting == true) {
  472           // if active and switched on, obtain lock
  473           role->PromoteNodeLate();
  474         } else if (old_setting == true && new_setting == false) {
  475           // if active and switched off
  476           // @todo remove lock in a new thread
  477         }
  478       }
  479     }
  480 
  481     if (fds[FD_MBX].revents & POLLIN) handle_mbx_event();
  482 
  483     if (fds[FD_RDA_SERVER].revents & POLLIN) {
  484       int newsockfd;
  485 
  486       newsockfd = accept(rde_rda_cb->fd, nullptr, nullptr);
  487       if (newsockfd < 0) {
  488         LOG_ER("accept FAILED %s", strerror(errno));
  489         goto done;
  490       }
  491 
  492       /* Add the new client fd to client-list */
  493       rde_rda_cb->clients[rde_rda_cb->client_count].is_async = false;
  494       rde_rda_cb->clients[rde_rda_cb->client_count].fd = newsockfd;
  495       rde_rda_cb->client_count++;
  496 
  497       /* Update poll fd selection */
  498       fds[nfds].fd = newsockfd;
  499       fds[nfds].events = POLLIN;
  500       nfds++;
  501 
  502       TRACE("accepted new client, fd=%d, idx=%d, nfds=%lu", newsockfd,
  503             rde_rda_cb->client_count, nfds);
  504     }
  505 
  506     for (nfds_t i = FD_CLIENT_START;
  507          role->role() != PCS_RDA_UNDEFINED && i < fds_to_poll; i++) {
  508       if (fds[i].revents & POLLIN) {
  509         int client_disconnected = 0;
  510         TRACE("received msg on fd %u", fds[i].fd);
  511         rde_rda_client_process_msg(rde_rda_cb, fd_to_client_ixd(fds[i].fd),
  512                                    &client_disconnected);
  513         if (client_disconnected) {
  514           /* reinitialize the fd array & nfds */
  515           nfds = FD_CLIENT_START;
  516           for (int j = 0; j < rde_rda_cb->client_count; j++, nfds++) {
  517             fds[j + FD_CLIENT_START].fd = rde_rda_cb->clients[j].fd;
  518             fds[j + FD_CLIENT_START].events = POLLIN;
  519           }
  520           TRACE("client disconnected, fd array reinitialized, nfds=%lu", nfds);
  521           break;
  522         }
  523       }
  524     }
  525   }
  526 
  527 init_failed:
  528   if (rde_cb->rde_amf_cb.nid_started &&
  529       nid_notify("RDE", NCSCC_RC_FAILURE, nullptr) != NCSCC_RC_SUCCESS) {
  530     LOG_ER("nid_notify failed");
  531   }
  532 
  533 done:
  534   syslog(LOG_ERR, "Exiting...");
  535   exit(1);
  536 }