"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "pdns/distributor.hh" between
pdns-auth-4.1.13.tar.gz and pdns-auth-4.2.0.tar.gz

About: PowerDNS Authoritative Nameserver is a versatile nameserver which supports a large number of backends (that can either be plain zone files or be more dynamic in nature).

distributor.hh  (pdns-auth-4.1.13):distributor.hh  (pdns-auth-4.2.0)
skipping to change at line 30 skipping to change at line 30
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/ */
#ifndef DISTRIBUTOR_HH #ifndef DISTRIBUTOR_HH
#define DISTRIBUTOR_HH #define DISTRIBUTOR_HH
#include <string> #include <string>
#include <deque> #include <deque>
#include <queue> #include <queue>
#include <vector> #include <vector>
#include <pthread.h> #include <pthread.h>
#include "threadname.hh"
#include <unistd.h> #include <unistd.h>
#include "logger.hh" #include "logger.hh"
#include "dns.hh" #include "dns.hh"
#include "dnsbackend.hh" #include "dnsbackend.hh"
#include "pdnsexception.hh" #include "pdnsexception.hh"
#include "arguments.hh" #include "arguments.hh"
#include <atomic> #include <atomic>
#include "statbag.hh" #include "statbag.hh"
extern StatBag S; extern StatBag S;
skipping to change at line 64 skipping to change at line 65
typedef std::function<void(Answer*)> callback_t; typedef std::function<void(Answer*)> callback_t;
virtual int question(Question *, callback_t callback) =0; //!< Submit a questi on to the Distributor virtual int question(Question *, callback_t callback) =0; //!< Submit a questi on to the Distributor
virtual int getQueueSize() =0; //!< Returns length of question queue virtual int getQueueSize() =0; //!< Returns length of question queue
virtual bool isOverloaded() =0; virtual bool isOverloaded() =0;
}; };
template<class Answer, class Question, class Backend> class SingleThreadDistribu tor template<class Answer, class Question, class Backend> class SingleThreadDistribu tor
: public Distributor<Answer, Question, Backend> : public Distributor<Answer, Question, Backend>
{ {
public: public:
SingleThreadDistributor(const SingleThreadDistributor&) = delete;
void operator=(const SingleThreadDistributor&) = delete;
SingleThreadDistributor(); SingleThreadDistributor();
typedef std::function<void(Answer*)> callback_t; typedef std::function<void(Answer*)> callback_t;
int question(Question *, callback_t callback) override; //!< Submit a question to the Distributor int question(Question *, callback_t callback) override; //!< Submit a question to the Distributor
int getQueueSize() override { int getQueueSize() override {
return 0; return 0;
} }
bool isOverloaded() override bool isOverloaded() override
{ {
return false; return false;
skipping to change at line 87 skipping to change at line 90
if (b) delete b; if (b) delete b;
} }
private: private:
Backend *b{0}; Backend *b{0};
}; };
template<class Answer, class Question, class Backend> class MultiThreadDistribut or template<class Answer, class Question, class Backend> class MultiThreadDistribut or
: public Distributor<Answer, Question, Backend> : public Distributor<Answer, Question, Backend>
{ {
public: public:
MultiThreadDistributor(const MultiThreadDistributor&) = delete;
void operator=(const MultiThreadDistributor&) = delete;
MultiThreadDistributor(int n); MultiThreadDistributor(int n);
typedef std::function<void(Answer*)> callback_t; typedef std::function<void(Answer*)> callback_t;
int question(Question *, callback_t callback) override; //!< Submit a question to the Distributor int question(Question *, callback_t callback) override; //!< Submit a question to the Distributor
static void* makeThread(void *); //!< helper function to create our n threads static void* makeThread(void *); //!< helper function to create our n threads
int getQueueSize() override { int getQueueSize() override {
return d_queued; return d_queued;
} }
struct QuestionData struct QuestionData
{ {
skipping to change at line 127 skipping to change at line 132
template<class Answer, class Question, class Backend>Distributor<Answer,Question ,Backend>* Distributor<Answer,Question,Backend>::Create(int n) template<class Answer, class Question, class Backend>Distributor<Answer,Question ,Backend>* Distributor<Answer,Question,Backend>::Create(int n)
{ {
if( n == 1 ) if( n == 1 )
return new SingleThreadDistributor<Answer,Question,Backend>(); return new SingleThreadDistributor<Answer,Question,Backend>();
else else
return new MultiThreadDistributor<Answer,Question,Backend>( n ); return new MultiThreadDistributor<Answer,Question,Backend>( n );
} }
template<class Answer, class Question, class Backend>SingleThreadDistributor<Ans wer,Question,Backend>::SingleThreadDistributor() template<class Answer, class Question, class Backend>SingleThreadDistributor<Ans wer,Question,Backend>::SingleThreadDistributor()
{ {
L<<Logger::Error<<"Only asked for 1 backend thread - operating unthreaded"<<en dl; g_log<<Logger::Error<<"Only asked for 1 backend thread - operating unthreaded" <<endl;
try { try {
b=new Backend; b=new Backend;
} }
catch(const PDNSException &AE) { catch(const PDNSException &AE) {
L<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl; g_log<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<end l;
_exit(1); _exit(1);
} }
catch(...) { catch(...) {
L<<Logger::Error<<"Caught an unknown exception when creating backend, probab ly"<<endl; g_log<<Logger::Error<<"Caught an unknown exception when creating backend, pr obably"<<endl;
_exit(1); _exit(1);
} }
} }
template<class Answer, class Question, class Backend>MultiThreadDistributor<Answ er,Question,Backend>::MultiThreadDistributor(int n) template<class Answer, class Question, class Backend>MultiThreadDistributor<Answ er,Question,Backend>::MultiThreadDistributor(int n)
{ {
d_num_threads=n; d_num_threads=n;
d_overloadQueueLength=::arg().asNum("overload-queue-length"); d_overloadQueueLength=::arg().asNum("overload-queue-length");
d_maxQueueLength=::arg().asNum("max-queue-length"); d_maxQueueLength=::arg().asNum("max-queue-length");
nextid=0; nextid=0;
skipping to change at line 159 skipping to change at line 164
pthread_t tid; pthread_t tid;
for(int i=0; i < n; ++i) { for(int i=0; i < n; ++i) {
int fds[2]; int fds[2];
if(pipe(fds) < 0) if(pipe(fds) < 0)
unixDie("Creating pipe"); unixDie("Creating pipe");
d_pipes.push_back({fds[0],fds[1]}); d_pipes.push_back({fds[0],fds[1]});
} }
if (n<1) { if (n<1) {
L<<Logger::Error<<"Asked for fewer than 1 threads, nothing to do"<<endl; g_log<<Logger::Error<<"Asked for fewer than 1 threads, nothing to do"<<endl;
_exit(1); _exit(1);
} }
L<<Logger::Warning<<"About to create "<<n<<" backend threads for UDP"<<endl; g_log<<Logger::Warning<<"About to create "<<n<<" backend threads for UDP"<<end l;
for(int i=0;i<n;i++) { for(int i=0;i<n;i++) {
pthread_create(&tid,0,&makeThread,static_cast<void *>(this)); pthread_create(&tid,0,&makeThread,static_cast<void *>(this));
Utility::usleep(50000); // we've overloaded mysql in the past :-) Utility::usleep(50000); // we've overloaded mysql in the past :-)
} }
L<<Logger::Warning<<"Done launching threads, ready to distribute questions"<<e ndl; g_log<<Logger::Warning<<"Done launching threads, ready to distribute questions "<<endl;
} }
// start of a new thread // start of a new thread
template<class Answer, class Question, class Backend>void *MultiThreadDistributo r<Answer,Question,Backend>::makeThread(void *p) template<class Answer, class Question, class Backend>void *MultiThreadDistributo r<Answer,Question,Backend>::makeThread(void *p)
{ {
setThreadName("pdns/distributo");
pthread_detach(pthread_self()); pthread_detach(pthread_self());
MultiThreadDistributor *us=static_cast<MultiThreadDistributor *>(p); MultiThreadDistributor *us=static_cast<MultiThreadDistributor *>(p);
int ournum=us->d_running++; int ournum=us->d_running++;
try { try {
Backend *b=new Backend(); // this will answer our questions Backend *b=new Backend(); // this will answer our questions
int queuetimeout=::arg().asNum("queue-limit"); int queuetimeout=::arg().asNum("queue-limit");
for(;;) { for(;;) {
QuestionData* QD; QuestionData* QD;
if(read(us->d_pipes[ournum].first, &QD, sizeof(QD)) != sizeof(QD)) if(read(us->d_pipes[ournum].first, &QD, sizeof(QD)) != sizeof(QD))
unixDie("read"); unixDie("read");
--us->d_queued; --us->d_queued;
Answer *a; Answer *a = nullptr;
if(queuetimeout && QD->Q->d_dt.udiff()>queuetimeout*1000) { if(queuetimeout && QD->Q->d_dt.udiff()>queuetimeout*1000) {
delete QD->Q; delete QD->Q;
delete QD; delete QD;
S.inc("timedout-packets"); S.inc("timedout-packets");
continue; continue;
} }
bool allowRetry=true; bool allowRetry=true;
retry: retry:
skipping to change at line 212 skipping to change at line 218
allowRetry=false; allowRetry=false;
b=new Backend(); b=new Backend();
} }
a=b->question(QD->Q); a=b->question(QD->Q);
delete QD->Q; delete QD->Q;
} }
catch(const PDNSException &e) { catch(const PDNSException &e) {
delete b; delete b;
b=NULL; b=NULL;
if (!allowRetry) { if (!allowRetry) {
L<<Logger::Error<<"Backend error: "<<e.reason<<endl; g_log<<Logger::Error<<"Backend error: "<<e.reason<<endl;
a=QD->Q->replyPacket(); a=QD->Q->replyPacket();
a->setRcode(RCode::ServFail); a->setRcode(RCode::ServFail);
S.inc("servfail-packets"); S.inc("servfail-packets");
S.ringAccount("servfail-queries",QD->Q->qdomain.toLogString()); S.ringAccount("servfail-queries", QD->Q->qdomain, QD->Q->qtype);
delete QD->Q; delete QD->Q;
} else { } else {
L<<Logger::Notice<<"Backend error (retry once): "<<e.reason<<endl; g_log<<Logger::Notice<<"Backend error (retry once): "<<e.reason<<endl;
goto retry; goto retry;
} }
} }
catch(...) { catch(...) {
delete b; delete b;
b=NULL; b=NULL;
if (!allowRetry) { if (!allowRetry) {
L<<Logger::Error<<"Caught unknown exception in Distributor thread "<<( long)pthread_self()<<endl; g_log<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(long)pthread_self()<<endl;
a=QD->Q->replyPacket(); a=QD->Q->replyPacket();
a->setRcode(RCode::ServFail); a->setRcode(RCode::ServFail);
S.inc("servfail-packets"); S.inc("servfail-packets");
S.ringAccount("servfail-queries",QD->Q->qdomain.toLogString()); S.ringAccount("servfail-queries", QD->Q->qdomain, QD->Q->qtype);
delete QD->Q; delete QD->Q;
} else { } else {
L<<Logger::Warning<<"Caught unknown exception in Distributor thread "< <(long)pthread_self()<<" (retry once)"<<endl; g_log<<Logger::Warning<<"Caught unknown exception in Distributor threa d "<<(long)pthread_self()<<" (retry once)"<<endl;
goto retry; goto retry;
} }
} }
QD->callback(a); QD->callback(a);
delete QD; delete QD;
} }
delete b; delete b;
} }
catch(const PDNSException &AE) { catch(const PDNSException &AE) {
L<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl; g_log<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<end l;
_exit(1); _exit(1);
} }
catch(...) { catch(...) {
L<<Logger::Error<<"Caught an unknown exception when creating backend, probab ly"<<endl; g_log<<Logger::Error<<"Caught an unknown exception when creating backend, pr obably"<<endl;
_exit(1); _exit(1);
} }
return 0; return 0;
} }
template<class Answer, class Question, class Backend>int SingleThreadDistributor <Answer,Question,Backend>::question(Question* q, callback_t callback) template<class Answer, class Question, class Backend>int SingleThreadDistributor <Answer,Question,Backend>::question(Question* q, callback_t callback)
{ {
Answer *a; Answer *a = nullptr;
bool allowRetry=true; bool allowRetry=true;
retry: retry:
try { try {
if (!b) { if (!b) {
allowRetry=false; allowRetry=false;
b=new Backend; b=new Backend;
} }
a=b->question(q); // a can be NULL! a=b->question(q); // a can be NULL!
} }
catch(const PDNSException &e) { catch(const PDNSException &e) {
delete b; delete b;
b=NULL; b=NULL;
if (!allowRetry) { if (!allowRetry) {
L<<Logger::Error<<"Backend error: "<<e.reason<<endl; g_log<<Logger::Error<<"Backend error: "<<e.reason<<endl;
a=q->replyPacket(); a=q->replyPacket();
a->setRcode(RCode::ServFail); a->setRcode(RCode::ServFail);
S.inc("servfail-packets"); S.inc("servfail-packets");
S.ringAccount("servfail-queries",q->qdomain.toLogString()); S.ringAccount("servfail-queries", q->qdomain, q->qtype);
} else { } else {
L<<Logger::Notice<<"Backend error (retry once): "<<e.reason<<endl; g_log<<Logger::Notice<<"Backend error (retry once): "<<e.reason<<endl;
goto retry; goto retry;
} }
} }
catch(...) { catch(...) {
delete b; delete b;
b=NULL; b=NULL;
if (!allowRetry) { if (!allowRetry) {
L<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(unsi gned long)pthread_self()<<endl; g_log<<Logger::Error<<"Caught unknown exception in Distributor thread "<<( unsigned long)pthread_self()<<endl;
a=q->replyPacket(); a=q->replyPacket();
a->setRcode(RCode::ServFail); a->setRcode(RCode::ServFail);
S.inc("servfail-packets"); S.inc("servfail-packets");
S.ringAccount("servfail-queries",q->qdomain.toLogString()); S.ringAccount("servfail-queries", q->qdomain, q->qtype);
} else { } else {
L<<Logger::Warning<<"Caught unknown exception in Distributor thread "<<(un signed long)pthread_self()<<" (retry once)"<<endl; g_log<<Logger::Warning<<"Caught unknown exception in Distributor thread "< <(unsigned long)pthread_self()<<" (retry once)"<<endl;
goto retry; goto retry;
} }
} }
callback(a); callback(a);
return 0; return 0;
} }
struct DistributorFatal{}; struct DistributorFatal{};
template<class Answer, class Question, class Backend>int MultiThreadDistributor< Answer,Question,Backend>::question(Question* q, callback_t callback) template<class Answer, class Question, class Backend>int MultiThreadDistributor< Answer,Question,Backend>::question(Question* q, callback_t callback)
{ {
q=new Question(*q); q=new Question(*q);
// this is passed to other process over pipe and released there // this is passed to other process over pipe and released there
auto QD=new QuestionData(); auto QD=new QuestionData();
QD->Q=q; QD->Q=q;
auto ret = QD->id = nextid++; // might be deleted after write! auto ret = QD->id = nextid++; // might be deleted after write!
QD->callback=callback; QD->callback=callback;
if(write(d_pipes[QD->id % d_pipes.size()].second, &QD, sizeof(QD)) != sizeof(Q ++d_queued;
D)) if(write(d_pipes[QD->id % d_pipes.size()].second, &QD, sizeof(QD)) != sizeof(Q
D)) {
--d_queued;
unixDie("write"); unixDie("write");
}
d_queued++;
if(d_queued > d_maxQueueLength) { if(d_queued > d_maxQueueLength) {
L<<Logger::Error<< d_queued <<" questions waiting for database/backend atten tion. Limit is "<<::arg().asNum("max-queue-length")<<", respawning"<<endl; g_log<<Logger::Error<< d_queued <<" questions waiting for database/backend a ttention. Limit is "<<::arg().asNum("max-queue-length")<<", respawning"<<endl;
// this will leak the entire contents of all pipes, nothing will be freed. R espawn when this happens! // this will leak the entire contents of all pipes, nothing will be freed. R espawn when this happens!
throw DistributorFatal(); throw DistributorFatal();
} }
return ret; return ret;
} }
#endif // DISTRIBUTOR_HH #endif // DISTRIBUTOR_HH
 End of changes. 29 change blocks. 
27 lines changed or deleted 34 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)