"Fossies" - the Fresh Open Source Software Archive

Member "pdns-auth-4.2.0/pdns/signingpipe.cc" (27 Aug 2019, 9259 Bytes) of package /linux/misc/dns/pdns-auth-4.2.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "signingpipe.cc" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 4.1.13_vs_4.2.0.

    1 #ifdef HAVE_CONFIG_H
    2 #include "config.h"
    3 #endif
    4 #include "signingpipe.hh"
    5 #include "misc.hh"
    6 #include <poll.h>
    7 
    8 #include <sys/socket.h>
    9 #include <netinet/in.h>
   10 #include <netinet/tcp.h>
   11 #include <sched.h>
   12 
   13 // deal with partial reads
   14 namespace {
   15 int readn(int fd, void* buffer, unsigned int len)
   16 {
   17   unsigned int pos=0;
   18   int res;
   19   for(;;) {
   20     res = read(fd, (char*)buffer + pos, len - pos);
   21     if(res == 0) {
   22       if(pos)
   23         throw runtime_error("Signing Pipe remote shut down in the middle of a message");
   24       else {
   25         //cerr<<"Got decent EOF on "<<fd<<endl;
   26         return 0;
   27       }
   28     }
   29       
   30     if(res < 0) {
   31       if(errno == EAGAIN || errno == EINTR) {
   32         if(pos==0)
   33           return -1;
   34         waitForData(fd, -1); 
   35         continue;
   36       }
   37       unixDie("Reading from socket in Signing Pipe loop");
   38     }
   39   
   40     pos+=res;
   41     if(pos == len)
   42       break;
   43   }
   44   return len;
   45 }
   46 }
   47 
   48 void* ChunkedSigningPipe::helperWorker(ChunkedSigningPipe* csp, int fd)
   49 try {
   50   csp->worker(fd);
   51   return nullptr;
   52 }
   53 catch(...) {
   54   g_log<<Logger::Error<<"Unknown exception in signing thread occurred"<<endl;
   55   return nullptr;
   56 }
   57 
   58 ChunkedSigningPipe::ChunkedSigningPipe(const DNSName& signerName, bool mustSign, unsigned int workers)
   59   : d_signed(0), d_queued(0), d_outstanding(0), d_numworkers(workers), d_submitted(0), d_signer(signerName),
   60     d_maxchunkrecords(100), d_threads(d_numworkers), d_mustSign(mustSign), d_final(false)
   61 {
   62   d_rrsetToSign = new rrset_t;
   63   d_chunks.push_back(vector<DNSZoneRecord>()); // load an empty chunk
   64   
   65   if(!d_mustSign)
   66     return;
   67   
   68   int fds[2];
   69   
   70   for(unsigned int n=0; n < d_numworkers; ++n) {
   71     if(socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0) 
   72       throw runtime_error("Unable to create communication socket in for ChunkedSigningPipe");
   73     setCloseOnExec(fds[0]);
   74     setCloseOnExec(fds[1]);
   75     d_threads[n] = std::thread(helperWorker, this, fds[1]);
   76     setNonBlocking(fds[0]);
   77     d_sockets.push_back(fds[0]);
   78     d_outstandings[fds[0]] = 0;
   79   }
   80 }
   81 
   82 ChunkedSigningPipe::~ChunkedSigningPipe()
   83 {
   84   delete d_rrsetToSign;
   85 
   86   if(!d_mustSign)
   87     return;
   88 
   89   for(int fd :  d_sockets) {
   90     close(fd); // this will trigger all threads to exit
   91   }
   92 
   93   for(auto& thread : d_threads) {
   94     thread.join();
   95   }
   96   //cout<<"Did: "<<d_signed<<", records (!= chunks) submitted: "<<d_submitted<<endl;
   97 }
   98 
   99 namespace {
  100 bool
  101 dedupLessThan(const DNSZoneRecord& a, const DNSZoneRecord &b)
  102 {
  103   return make_tuple(a.dr.d_content->getZoneRepresentation(), a.dr.d_ttl) < make_tuple(b.dr.d_content->getZoneRepresentation(), b.dr.d_ttl);  // XXX SLOW SLOW SLOW
  104 }
  105 
  106 bool dedupEqual(const DNSZoneRecord& a, const DNSZoneRecord &b)
  107 {
  108   return make_tuple(a.dr.d_content->getZoneRepresentation(), a.dr.d_ttl) == make_tuple(b.dr.d_content->getZoneRepresentation(), b.dr.d_ttl);  // XXX SLOW SLOW SLOW
  109 }
  110 }
  111 
  112 void ChunkedSigningPipe::dedupRRSet()
  113 {
  114   // our set contains contains records for one type and one name, but might not be sorted otherwise
  115   sort(d_rrsetToSign->begin(), d_rrsetToSign->end(), dedupLessThan);
  116   d_rrsetToSign->erase(unique(d_rrsetToSign->begin(), d_rrsetToSign->end(), dedupEqual), d_rrsetToSign->end());
  117 }
  118 
  119 bool ChunkedSigningPipe::submit(const DNSZoneRecord& rr)
  120 {
  121   ++d_submitted;
  122   // check if we have a full RRSET to sign
  123   if(!d_rrsetToSign->empty() && (d_rrsetToSign->begin()->dr.d_type != rr.dr.d_type ||  d_rrsetToSign->begin()->dr.d_name != rr.dr.d_name)) 
  124   {
  125     dedupRRSet();
  126     sendRRSetToWorker();
  127   }
  128   d_rrsetToSign->push_back(rr);
  129   return !d_chunks.empty() && d_chunks.front().size() >= d_maxchunkrecords; // "you can send more"
  130 }
  131 
  132 pair<vector<int>, vector<int> > ChunkedSigningPipe::waitForRW(bool rd, bool wr, int seconds)
  133 {
  134   vector<pollfd> pfds;
  135 
  136   for(unsigned int n = 0; n < d_sockets.size(); ++n) {    
  137     if(d_eof.count(d_sockets[n]))  
  138       continue;
  139     struct pollfd pfd;
  140     memset(&pfd, 0, sizeof(pfd));
  141     pfd.fd = d_sockets[n];
  142     if(rd)
  143       pfd.events |= POLLIN;
  144     if(wr)
  145       pfd.events |= POLLOUT;
  146     pfds.push_back(pfd);
  147   }
  148 
  149   int res = poll(&pfds[0], pfds.size(), (seconds < 0) ? -1 : (seconds * 1000)); // -1 = infinite
  150   if(res < 0)
  151     unixDie("polling for activity from signers, "+std::to_string(d_sockets.size()));
  152   pair<vector<int>, vector<int> > vects;
  153   for(unsigned int n = 0; n < pfds.size(); ++n) 
  154     if(pfds[n].revents & POLLIN)
  155       vects.first.push_back(pfds[n].fd);
  156     else if(pfds[n].revents & POLLOUT)
  157       vects.second.push_back(pfds[n].fd);
  158   
  159   return vects;
  160 }
  161 
  162 void ChunkedSigningPipe::addSignedToChunks(chunk_t* signedChunk)
  163 {
  164   chunk_t::const_iterator from = signedChunk->begin();
  165   
  166   while(from != signedChunk->end()) {
  167     chunk_t& fillChunk = d_chunks.back();
  168     
  169     chunk_t::size_type room = d_maxchunkrecords - fillChunk.size();
  170     
  171     unsigned int fit = std::min(room, (chunk_t::size_type)(signedChunk->end() - from));
  172   
  173     d_chunks.back().insert(fillChunk.end(), from , from + fit);
  174     from+=fit;
  175     
  176     if(from != signedChunk->end()) // it didn't fit, so add a new chunk
  177       d_chunks.push_back(chunk_t());
  178   }
  179 }
  180 
  181 void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist!
  182 {
  183   if(!d_mustSign) {
  184     addSignedToChunks(d_rrsetToSign);
  185     d_rrsetToSign->clear();
  186     return;
  187   }
  188   
  189   if(d_final && !d_outstanding) // nothing to do!
  190     return;
  191   
  192   bool wantRead, wantWrite;
  193   
  194   wantWrite = !d_rrsetToSign->empty();
  195   wantRead = d_outstanding || wantWrite;  // if we wrote, we want to read
  196   
  197   pair<vector<int>, vector<int> > rwVect;
  198   
  199   rwVect = waitForRW(wantRead, wantWrite, -1); // wait for something to happen
  200   
  201   if(wantWrite && !rwVect.second.empty()) {
  202     random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker
  203     writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign));
  204     d_rrsetToSign = new rrset_t;
  205     d_outstandings[*rwVect.second.begin()]++;
  206     d_outstanding++;
  207     d_queued++;
  208     wantWrite=false;
  209   } 
  210   
  211   if(wantRead) {
  212     while(d_outstanding) {
  213       chunk_t* chunk;
  214       
  215       for(int fd :  rwVect.first) {
  216         if(d_eof.count(fd))
  217           continue;
  218         
  219         while(d_outstanding) {
  220           int res = readn(fd, &chunk, sizeof(chunk));
  221           if(!res) {
  222             if (d_outstandings[fd] > 0) {
  223               throw std::runtime_error("A signing pipe worker died while we were waiting for its result");
  224             }
  225             d_eof.insert(fd);
  226             break;
  227           }
  228           if(res < 0) {
  229             if(errno != EAGAIN && errno != EINTR)
  230               unixDie("Error reading signed chunk from thread");
  231             else
  232               break;
  233           }
  234           
  235           --d_outstanding;
  236           d_outstandings[fd]--;
  237           
  238           addSignedToChunks(chunk);
  239           
  240           delete chunk;
  241         }
  242       }
  243       if(!d_outstanding || !d_final)
  244         break;
  245       rwVect = waitForRW(true, false, -1); // wait for something to happen
  246     }
  247   }
  248   
  249   if(wantWrite) {  // our optimization above failed, we now wait synchronously
  250     rwVect = waitForRW(false, wantWrite, -1); // wait for something to happen
  251     random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker
  252     writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign));
  253     d_rrsetToSign = new rrset_t;
  254     d_outstandings[*rwVect.second.begin()]++;
  255     d_outstanding++;
  256     d_queued++;
  257   }
  258   
  259 }
  260 
  261 unsigned int ChunkedSigningPipe::getReady() const
  262 {
  263    unsigned int sum=0; 
  264    for(const auto& v :  d_chunks) {
  265      sum += v.size(); 
  266    }
  267    return sum;
  268 }
  269 
  270 void ChunkedSigningPipe::worker(int fd)
  271 try
  272 {
  273   UeberBackend db("key-only");
  274   DNSSECKeeper dk(&db);
  275   
  276   chunk_t* chunk = nullptr;
  277   int res;
  278   for(;;) {
  279     res = readn(fd, &chunk, sizeof(chunk));
  280     if(!res)
  281       break;
  282     if(res < 0)
  283       unixDie("reading object pointer to sign from pdns");
  284     try {
  285       set<DNSName> authSet;
  286       authSet.insert(d_signer);
  287       addRRSigs(dk, db, authSet, *chunk);
  288       ++d_signed;
  289 
  290       writen2(fd, &chunk, sizeof(chunk));
  291       chunk = nullptr;
  292     }
  293     catch(const PDNSException& pe) {
  294       delete chunk;
  295       throw;
  296     }
  297     catch(const std::exception& e) {
  298       delete chunk;
  299       throw;
  300     }
  301   }
  302   close(fd);
  303 }
  304 catch(const PDNSException& pe)
  305 {
  306   g_log<<Logger::Error<<"Signing thread died because of PDNSException: "<<pe.reason<<endl;
  307   close(fd);
  308 }
  309 catch(const std::exception& e)
  310 {
  311   g_log<<Logger::Error<<"Signing thread died because of std::exception: "<<e.what()<<endl;
  312   close(fd);
  313 }
  314 
  315 void ChunkedSigningPipe::flushToSign()
  316 {
  317   sendRRSetToWorker();
  318   d_rrsetToSign->clear();
  319 }
  320 
  321 vector<DNSZoneRecord> ChunkedSigningPipe::getChunk(bool final)
  322 {
  323   if(final && !d_final) {
  324     // this means we should keep on reading until d_outstanding == 0
  325     d_final = true;
  326     flushToSign();
  327     
  328     for(int fd :  d_sockets) {
  329       shutdown(fd, SHUT_WR); // perhaps this transmits EOF the other side
  330       //cerr<<"shutdown of "<<fd<<endl;
  331     }
  332   }
  333   if(d_final)
  334     flushToSign(); // should help us wait
  335   vector<DNSZoneRecord> front=d_chunks.front();
  336   d_chunks.pop_front();
  337   if(d_chunks.empty())
  338     d_chunks.push_back(vector<DNSZoneRecord>());
  339 /*  if(d_final && front.empty())
  340       cerr<<"getChunk returning empty in final"<<endl; */
  341   return front;
  342 }
  343 
  344