"Fossies" - the Fresh Open Source Software Archive

Member "hylafax-7.0.2/faxd/faxQueueApp.c++" (17 Nov 2019, 143448 Bytes) of package /linux/misc/hylafax-7.0.2.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file.

    1 /*  $Id: faxQueueApp.c++ 1177 2013-07-28 01:07:28Z faxguy $ */
    2 /*
    3  * Copyright (c) 1990-1996 Sam Leffler
    4  * Copyright (c) 1991-1996 Silicon Graphics, Inc.
    5  * HylaFAX is a trademark of Silicon Graphics
    6  *
    7  * Permission to use, copy, modify, distribute, and sell this software and 
    8  * its documentation for any purpose is hereby granted without fee, provided
    9  * that (i) the above copyright notices and this permission notice appear in
   10  * all copies of the software and related documentation, and (ii) the names of
   11  * Sam Leffler and Silicon Graphics may not be used in any advertising or
   12  * publicity relating to the software without the specific, prior written
   13  * permission of Sam Leffler and Silicon Graphics.
   14  * 
   15  * THE SOFTWARE IS PROVIDED "AS-IS" AND WITHOUT WARRANTY OF ANY KIND, 
   16  * EXPRESS, IMPLIED OR OTHERWISE, INCLUDING WITHOUT LIMITATION, ANY 
   17  * WARRANTY OF MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE.  
   18  * 
   19  * IN NO EVENT SHALL SAM LEFFLER OR SILICON GRAPHICS BE LIABLE FOR
   20  * ANY SPECIAL, INCIDENTAL, INDIRECT OR CONSEQUENTIAL DAMAGES OF ANY KIND,
   21  * OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
   22  * WHETHER OR NOT ADVISED OF THE POSSIBILITY OF DAMAGE, AND ON ANY THEORY OF 
   23  * LIABILITY, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE 
   24  * OF THIS SOFTWARE.
   25  */
   26 #include "Sys.h"
   27 
   28 #include <ctype.h>
   29 #include <errno.h>
   30 #include <math.h>
   31 #include <limits.h>
   32 #include <sys/file.h>
   33 #include <tiffio.h>
   34 
   35 #include "Dispatcher.h"
   36 
   37 #include "FaxMachineInfo.h"
   38 #include "FaxAcctInfo.h"
   39 #include "FaxRequest.h"
   40 #include "FaxTrace.h"
   41 #include "FaxRecvInfo.h"
   42 #include "Timeout.h"
   43 #include "UUCPLock.h"
   44 #include "DialRules.h"
   45 #include "RE.h"
   46 #include "Modem.h"
   47 #include "Trigger.h"
   48 #include "faxQueueApp.h"
   49 #include "HylaClient.h"
   50 #include "MemoryDecoder.h"
   51 #include "FaxSendInfo.h"
   52 #include "PageSize.h"
   53 #include "SendFaxClient.h"
   54 #include "config.h"
   55 
   56 /*
   57  * HylaFAX Spooling and Command Agent.
   58  */
   59 
   60 const fxStr faxQueueApp::sendDir    = FAX_SENDDIR;
   61 const fxStr faxQueueApp::docDir     = FAX_DOCDIR;
   62 const fxStr faxQueueApp::clientDir  = FAX_CLIENTDIR;
   63 
   64 fxStr strTime(time_t t) { return fxStr(fmtTime(t)); }
   65 
   66 #define JOBHASH(pri)    (((pri) >> 4) & (NQHASH-1))
   67 
   68 faxQueueApp::SchedTimeout::SchedTimeout()
   69 {
   70     started = false;
   71     pending = false;
   72     lastRun = Sys::now() - 1;
   73 }
   74 
   75 faxQueueApp::SchedTimeout::~SchedTimeout() {}
   76 
   77 void
   78 faxQueueApp::SchedTimeout::timerExpired(long, long)
   79 {
   80     if (pending && lastRun <= Sys::now()) pending = false;
   81     if (faxQueueApp::instance().scheduling() ) {
   82     start(0);
   83     return;
   84     }
   85     faxQueueApp::instance().runScheduler();
   86     started = false;
   87 }
   88 
   89 void
   90 faxQueueApp::SchedTimeout::start(u_short s)
   91 {
   92     /*
   93      * If we don't throttle the scheduler then large
   94      * queues can halt the system with CPU consumption.
   95      * So we keep the scheduler from running more than
   96      * once per second.
   97      */
   98     if (!started && Sys::now() > lastRun) {
   99     started = true;
  100     pending = false;
  101     Dispatcher::instance().startTimer(s, 1, this);
  102     lastRun = Sys::now() + s;
  103     } else {
  104     if (!pending && lastRun <= Sys::now()) {
  105         /*
  106          * The scheduler is either running now or has been run
  107          * within the last second and there are no timers set
  108          * to trigger another scheduler run.  So we set a
  109          * timer to go off in one second to avoid a stalled
  110          * run queue.
  111          */
  112         Dispatcher::instance().startTimer(s + 1, 0, this);
  113         lastRun = Sys::now() + 1 + s;
  114         pending = true;
  115     }
  116     }
  117 }
  118 
  119 faxQueueApp* faxQueueApp::_instance = NULL;
  120 
  121 faxQueueApp::faxQueueApp()
  122     : configFile(FAX_CONFIG)
  123 {
  124     fifo = -1;
  125     quit = false;
  126     dialRules = NULL;
  127     inSchedule = false;
  128     setupConfig();
  129 
  130     fxAssert(_instance == NULL, "Cannot create multiple faxQueueApp instances");
  131     _instance = this;
  132 }
  133 
  134 faxQueueApp::~faxQueueApp()
  135 {
  136     HylaClient::purge();
  137     delete dialRules;
  138 }
  139 
  140 faxQueueApp& faxQueueApp::instance() { return *_instance; }
  141 
  142 void
  143 faxQueueApp::initialize(int argc, char** argv)
  144 {
  145     updateConfig(configFile);       // read config file
  146     faxApp::initialize(argc, argv);
  147 
  148     for (GetoptIter iter(argc, argv, getOpts()); iter.notDone(); iter++)
  149     switch (iter.option()) {
  150     case 'c':           // set configuration parameter
  151         readConfigItem(iter.optArg());
  152         break;
  153     }
  154 
  155     logInfo("%s", HYLAFAX_VERSION);
  156     logInfo("%s", "Copyright (c) 1990-1996 Sam Leffler");
  157     logInfo("%s", "Copyright (c) 1991-1996 Silicon Graphics, Inc.");
  158 
  159     scanForModems();
  160 }
  161 
  162 void
  163 faxQueueApp::open()
  164 {
  165     faxApp::open();
  166     scanQueueDirectory();
  167     Modem::broadcast("HELLO");      // announce queuer presence
  168     scanClientDirectory();      // announce queuer presence
  169     pokeScheduler();
  170 }
  171 
  172 void
  173 faxQueueApp::blockSignals()
  174 {
  175     sigset_t block;
  176     sigemptyset(&block);
  177     sigaddset(&block, SIGCHLD);
  178     sigprocmask(SIG_BLOCK, &block, NULL);
  179 }
  180 
  181 void
  182 faxQueueApp::releaseSignals()
  183 {
  184     sigset_t release;
  185     sigemptyset(&release);
  186     sigaddset(&release, SIGCHLD);
  187     sigprocmask (SIG_UNBLOCK, &release, NULL);
  188 }
  189 
  190 /*
  191  * Scan the spool area for modems.  We can't be certain the
  192  * modems are actively working without probing them; this
  193  * work is done simply to buildup the internal database for
  194  * broadcasting a ``HELLO'' message.  Later on, individual
  195  * modems are enabled for use based on messages received
  196  * through the FIFO.
  197  */
  198 void
  199 faxQueueApp::scanForModems()
  200 {
  201     DIR* dir = Sys::opendir(".");
  202     if (dir == NULL) {
  203     logError("Could not scan directory for modems");
  204     return;
  205     }
  206     fxStr fifoMatch(fifoName | ".");
  207     for (dirent* dp = readdir(dir); dp; dp = readdir(dir)) {
  208     if (dp->d_name[0] != fifoName[0])
  209         continue;
  210     if (!strneq(dp->d_name, fifoMatch, fifoMatch.length()))
  211         continue;
  212     if (Sys::isFIFOFile(dp->d_name)) {
  213         fxStr devid(dp->d_name);
  214         devid.remove(0, fifoMatch.length()-1);  // NB: leave "."
  215         if (Sys::isRegularFile(FAX_CONFIG | devid)) {
  216         devid.remove(0);            // strip "."
  217         (void) Modem::getModemByID(devid);  // adds to list
  218         }
  219     }
  220     }
  221     closedir(dir);
  222 }
  223 
  224 /*
  225  * Scan the spool directory for queue files and
  226  * enter them in the queues of outgoing jobs.
  227  */
  228 void
  229 faxQueueApp::scanQueueDirectory()
  230 {
  231     DIR* dir = Sys::opendir(sendDir);
  232     if (dir == NULL) {
  233     logError("Could not scan %s directory for outbound jobs",
  234         (const char*)sendDir);
  235     return;
  236     }
  237     for (dirent* dp = readdir(dir); dp; dp = readdir(dir)) {
  238     if (dp->d_name[0] == 'q')
  239         submitJob(&dp->d_name[1], true);
  240     }
  241     closedir(dir);
  242 }
  243 
  244 /*
  245  * Scan the client area for active client processes
  246  * and send a ``HELLO message'' to notify them the
  247  * queuer process has restarted.  If no process is
  248  * listening on the FIFO, remove it; the associated
  249  * client state will be purged later.
  250  */
  251 void
  252 faxQueueApp::scanClientDirectory()
  253 {
  254     DIR* dir = Sys::opendir(clientDir);
  255     if (dir == NULL) {
  256     logError("Could not scan %s directory for clients",
  257         (const char*) clientDir);
  258     return;
  259     }
  260     for (dirent* dp = readdir(dir); dp; dp = readdir(dir)) {
  261     if (!isdigit(dp->d_name[0]))
  262         continue;
  263     fxStr fifo(clientDir | "/" | dp->d_name);
  264     if (Sys::isFIFOFile((const char*) fifo))
  265         if (!HylaClient::getClient(fifo).send("HELLO", 6))
  266         Sys::unlink(fifo);
  267     }
  268     closedir(dir);
  269 }
  270 
  271 /*
  272  * Process a job.  Prepare it for transmission and
  273  * pass it on to the thread that does the actual
  274  * transmission work.  The job is marked ``active to
  275  * this destination'' prior to preparing it because
  276  * preparation may involve asynchronous activities.
  277  * The job is placed on the active list so that it
  278  * can be located by filename if necessary.
  279  */
  280 void
  281 faxQueueApp::processJob(Job& job, FaxRequest* req, DestInfo& di)
  282 {
  283     JobStatus status;
  284     FaxMachineInfo& info = di.getInfo(job.dest);
  285 
  286     Job* bjob = job.bfirst();   // first job in batch
  287     Job* cjob = &job;       // current job
  288     FaxRequest* creq = req; // current request
  289     Job* njob = NULL;       // next job
  290     
  291     for (; cjob != NULL; cjob = njob) {
  292     creq = cjob->breq;
  293     njob = cjob->bnext;
  294     cjob->commid = "";          // set on return
  295     req->notice = "";           // Clear for new procssing
  296     di.active(*cjob);
  297     setActive(*cjob);           // place job on active list
  298     updateRequest(*creq, *cjob);
  299     if (!prepareJobNeeded(*cjob, *creq, status)) {
  300         if (status != Job::done) {
  301         if (cjob->bprev == NULL)
  302             bjob = njob;
  303         cjob->state = FaxRequest::state_failed;
  304         deleteRequest(*cjob, creq, status, true);
  305         setDead(*cjob);
  306         }
  307     } else {
  308         numPrepares++;
  309         if (prepareJobStart(*cjob, creq, info))
  310         return;
  311         else if (cjob->bprev == NULL)
  312         bjob = njob;
  313     }
  314     }
  315     if (bjob != NULL)
  316     sendJobStart(*bjob, bjob->breq);
  317 }
  318 
  319 /*
  320  * Check if the job requires preparation that should
  321  * done in a fork'd copy of the server.  A sub-fork is
  322  * used if documents must be converted or a continuation
  323  * cover page must be crafted (i.e. the work may take
  324  * a while).
  325  */
  326 bool
  327 faxQueueApp::prepareJobNeeded(Job& job, FaxRequest& req, JobStatus& status)
  328 {
  329     if (!req.items.length()) {
  330     req.notice = "Job contains no documents {E323}";
  331     status = Job::rejected;
  332     jobError(job, "SEND REJECT: %s", (const char*) req.notice);
  333     return (false);
  334     }
  335     for (u_int i = 0, n = req.items.length(); i < n; i++)
  336     switch (req.items[i].op) {
  337     case FaxRequest::send_postscript:   // convert PostScript
  338     case FaxRequest::send_pcl:      // convert PCL
  339     case FaxRequest::send_tiff:     // verify&possibly convert TIFF
  340     case FaxRequest::send_pdf:      // convert PDF
  341         return (true);
  342     case FaxRequest::send_poll:     // verify modem is capable
  343         if (!job.modem->supportsPolling()) {
  344         req.notice = "Modem does not support polling {E324}";
  345         status = Job::rejected;
  346         jobError(job, "SEND REJECT: %s", (const char*) req.notice);
  347         return (false);
  348         }
  349         break;
  350     }
  351     status = Job::done;
  352     return (req.cover != "");           // need continuation cover page
  353 }
  354 
  355 /*
  356  * Handler used by job preparation subprocess
  357  * to pass signal from parent queuer process.
  358  * We mark state so job preparation will be aborted
  359  * at the next safe point in the procedure.
  360  */
  361 void
  362 faxQueueApp::prepareCleanup(int s)
  363 {
  364     int old_errno = errno;
  365     signal(s, fxSIGHANDLER(faxQueueApp::prepareCleanup));
  366     logError("CAUGHT SIGNAL %d, ABORT JOB PREPARATION", s);
  367     faxQueueApp::instance().abortPrepare = true;
  368     errno = old_errno;
  369 }
  370 
  371 /*
  372  * Start job preparation in a sub-fork.  The server process
  373  * forks and sets up a Dispatcher handler to reap the child
  374  * process.  The exit status from the child is actually the
  375  * return value from the prepareJob method; this and a
  376  * reference to the original Job are passed back into the
  377  * server thread at which point the transmit work is actually
  378  * initiated.
  379  */
  380 bool
  381 faxQueueApp::prepareJobStart(Job& job, FaxRequest* req,
  382     FaxMachineInfo& info)
  383 {
  384     traceQueue(job, "PREPARE START");
  385     abortPrepare = false;
  386     pid_t pid = fork();
  387     switch (pid) {
  388     case 0:             // child, do work
  389     /*
  390      * NB: There is a window here where the subprocess
  391      * doing the job preparation can have the old signal
  392      * handlers installed when a signal comes in.  This
  393      * could be fixed by using the appropriate POSIX calls
  394      * to block and unblock signals, but signal usage is
  395      * quite tenuous (i.e. what is and is not supported
  396      * on a system), so rather than depend on this
  397      * functionality being supported, we'll just leave
  398      * the (small) window in until it shows itself to
  399      * be a problem.
  400      */
  401     signal(SIGTERM, fxSIGHANDLER(faxQueueApp::prepareCleanup));
  402     signal(SIGINT, fxSIGHANDLER(faxQueueApp::prepareCleanup));
  403     _exit(prepareJob(job, *req, info));
  404     /*NOTREACHED*/
  405     case -1:                // fork failed, sleep and retry
  406     if (job.isOnList()) job.remove(); // Remove from active queue
  407     delayJob(job, *req, "Could not fork to prepare job for transmission {E340}",
  408         Sys::now() + random() % requeueInterval);
  409     delete req;
  410     return false;
  411     default:                // parent, setup handler to wait
  412     job.startPrepare(pid);
  413     delete req;         // must reread after preparation
  414     job.breq = NULL;
  415     Trigger::post(Trigger::JOB_PREP_BEGIN, job);
  416     return true;
  417     }
  418 }
  419 
  420 /*
  421  * Handle notification from the sub-fork that job preparation
  422  * is done.  The exit status is checked and interpreted as the
  423  * return value from prepareJob if it was passed via _exit.
  424  */
  425 void
  426 faxQueueApp::prepareJobDone(Job& job, int status)
  427 {
  428     traceQueue(job, "PREPARE DONE");
  429     Trigger::post(Trigger::JOB_PREP_END, job);
  430     if (status&0xff) {
  431     logError("JOB %s: bad exit status %#x from sub-fork",
  432         (const char*) job.jobid, status);
  433     status = Job::failed;
  434     } else
  435     status >>= 8;
  436     if (job.suspendPending) {       // co-thread waiting
  437     job.suspendPending = false;
  438     releaseModem(job);
  439     } else {
  440     FaxRequest* req = readRequest(job);
  441     if (!req) {
  442         // NB: no way to notify the user (XXX)
  443         logError("JOB %s: qfile vanished during preparation",
  444         (const char*) job.jobid);
  445         setDead(job);
  446     } else {
  447         bool processnext = false;
  448         bool startsendjob = false;
  449         Job* targetjob = &job;
  450         if (status == Job::done) {      // preparation completed successfully
  451         job.breq = req;
  452         startsendjob = (job.bnext == NULL);
  453         processnext = !startsendjob;
  454         if (processnext) {
  455             targetjob = job.bnext;
  456         }
  457         } else {
  458         /*
  459          * Job preparation did not complete successfully.
  460          *
  461          * If there is more than one job in this batch, then remove this job
  462          * and adjust the batch accordingly.
  463          */
  464         if (job.bnext == NULL) {    // no jobs left in batch
  465             targetjob = job.bprev;
  466             startsendjob = (targetjob != NULL);
  467         } else {            // more jobs in batch
  468             targetjob = job.bnext;
  469             processnext = true;
  470         }
  471         /*
  472          * If there are other jobs in the batch, we have to be
  473          * careful to *not* release the modem, otherwise faxq will
  474          * schedule new jobs on this modem while the rest of the jobs
  475          * in the batch are still using it.
  476          */
  477         if (startsendjob || processnext)
  478             job.modem = NULL;
  479         if (status == Job::requeued) {
  480             if (job.isOnList()) job.remove();
  481             delayJob(job, *req, "Could not fork to prepare job for transmission {E340}",
  482             Sys::now() + random() % requeueInterval);
  483             delete req;
  484         } else {
  485             deleteRequest(job, req, status, true);
  486             setDead(job);
  487         }
  488         }
  489         if (processnext) {
  490         processJob(*targetjob, targetjob->breq, destJobs[targetjob->dest]);
  491         } else if (startsendjob)
  492         sendJobStart(*targetjob->bfirst(), targetjob->bfirst()->breq);
  493         else {
  494         /*
  495          * This job may be blocking others.
  496          */
  497         DestInfo& di = destJobs[job.dest];
  498         unblockDestJobs(di);        // release any blocked jobs
  499         pokeScheduler();
  500         }
  501     }
  502     }
  503     if (numPrepares) numPrepares--;
  504     if (numPrepares < maxConcurrentPreps) {
  505     /*
  506      * The number of simultaneous job preparations is
  507      * low enough to allow another job preparation.
  508      */
  509     pokeScheduler();
  510     }
  511 }
  512 
  513 /*
  514  * Document Use Database.
  515  *
  516  * The server minimizes imaging operations by checking for the
  517  * existence of compatible, previously imaged, versions of documents.
  518  * This is done by using a file naming convention that includes the
  519  * source document name and the remote machine capabilities that are
  520  * used for imaging.  The work done here (and in other HylaFAX apps)
  521  * also assumes certain naming convention used by hfaxd when creating
  522  * document files.  Source documents are named:
  523  *
  524  *     doc<docnum>.<type>
  525  *
  526  * where <docnum> is a unique document number that is assigned by
  527  * hfaxd at the time the document is stored on the server.  Document
  528  * references by a job are then done using filenames (i.e. hard
  529  * links) of the form:
  530  *
  531  *  doc<docnum>.<type>.<jobid>
  532  *
  533  * where <jobid> is the unique identifier assigned to each outbound
  534  * job.  Then, each imaged document is named:
  535  *
  536  *  doc<docnum>.<type>;<encoded-capabilities>
  537  *
  538  * where <encoded-capabilities> is a string that encodes the remote
  539  * machine's capabilities.
  540  *
  541  * Before imaging a document the scheduler checks to see if there is
  542  * an existing file with the appropriate name.  If so then the file
  543  * is used and no preparation work is needed for sending the document.
  544  * Otherwise the document must be converted for transmission; this
  545  * result is written to a file with the appropriate name.  After an
  546  * imaged document has been transmitted it is not immediately removed,
  547  * but rather the scheduler is informed that the job no longer holds
  548  * (needs) a reference to the document and the scheduler records this
  549  * information so that when no jobs reference the original source
  550  * document, all imaged forms may be expunged.  As documents are
  551  * transmitted the job references to the original source documents are
  552  * converted to references to the ``base document name'' (the form
  553  * without the <jobid>) so that the link count on the inode for this
  554  * file reflects the number of references from jobs that are still
  555  * pending transmission.  This means that the scheduler can use the
  556  * link count to decide when to expunge imaged versions of a document.
  557  *
  558  * Note that the reference counts used here do not necessarily
  559  * *guarantee* that a pre-imaged version of a document will be available.
  560  * There are race conditions under which a document may be re-imaged
  561  * because a previously imaged version was removed.
  562  *
  563  * A separate document scavenger program should be run periodically
  564  * to expunge any document files that might be left in the docq for
  565  * unexpected reasons.  This program should read the set of jobs in
  566  * the sendq to build a onetime table of uses and then remove any
  567  * files found in the docq that are not referenced by a job.
  568  */
  569 
  570 /*
  571  * Remove a reference to an imaged document and if no
  572  * references exist for the corresponding source document,
  573  * expunge all imaged versions of the document.
  574  */
  575 void
  576 faxQueueApp::unrefDoc(const fxStr& file)
  577 {
  578     /*
  579      * Convert imaged document name to the base
  580      * (source) document name by removing the
  581      * encoded session parameters used for imaging.
  582      */
  583     u_int l = file.nextR(file.length(), ';');
  584     if (l == 0) {
  585     logError("Bogus document handed to unrefDoc: %s", (const char*) file);
  586     return;
  587     }
  588     fxStr doc = file.head(l-1);
  589     /*
  590      * Add file to the list of pending removals.  We
  591      * do this before checking below so that the list
  592      * of files will always have something on it.
  593      */
  594     fxStr& files = pendingDocs[doc];
  595     if (files.find(0, file) == files.length())      // suppress duplicates
  596     files.append(file | " ");
  597     if (tracingLevel & FAXTRACE_DOCREFS)
  598     logInfo("DOC UNREF: %s files %s",
  599         (const char*) file, (const char*) files);
  600     /*
  601      * The following assumes that any source document has
  602      * been renamed to the base document name *before* this
  603      * routine is invoked (either directly or via a msg
  604      * received on a FIFO).  Specifically, if the stat
  605      * call fails we assume the file does not exist and
  606      * that it is safe to remove the imaged documents.
  607      * This is conservative and if wrong will not break
  608      * anything; just potentially cause extra imaging
  609      * work to be done.
  610      */
  611     struct stat sb;
  612     if (Sys::stat(doc, sb) < 0 || sb.st_nlink == 1) {
  613     if (tracingLevel & FAXTRACE_DOCREFS)
  614         logInfo("DOC UNREF: expunge imaged files");
  615     /*
  616      * There are no additional references to the
  617      * original source document (all references
  618      * should be from completed jobs that reference
  619      * the original source document by its basename).
  620      * Expunge imaged documents that were waiting for
  621      * all potential uses to complete.
  622      */
  623     l = 0;
  624     do {
  625         fxStr filename = files.token(l, ' ');
  626         (void) Sys::unlink(filename);
  627         (void) Sys::unlink(filename|".color");
  628     } while (l < files.length());
  629     pendingDocs.remove(doc);
  630     }
  631 }
  632 
  633 #include "class2.h"
  634 
  635 /*
  636  * Prepare a job by converting any user-submitted documents
  637  * to a format suitable for transmission.
  638  */
  639 JobStatus
  640 faxQueueApp::prepareJob(Job& job, FaxRequest& req,
  641     const FaxMachineInfo& info)
  642 {
  643     /*
  644      * Select imaging parameters according to requested
  645      * values, client capabilities, and modem capabilities.
  646      * Note that by this time we believe the modem is capable
  647      * of certain requirements needed to transmit the document
  648      * (based on the capabilities passed to us by faxgetty).
  649      */
  650     Class2Params params;
  651     
  652     /*
  653      * User requested vres (98 or 196) and usexvres (1=true or 0=false)
  654      */
  655     int vres = req.resolution;
  656     int usexvres = req.usexvres;
  657     /*
  658      * System overrides in JobControl:
  659      * VRes: we check for vres = 98 or vres = 196 in JobControl;
  660      *       if vres is not set getVRes returns 0.
  661      * UseXVres: we check for usexvres = 0 or usexvres = 1 in JobControl;
  662      *           if usexvres is not set getUseXVRes retuns -1.
  663      */
  664     if (job.getJCI().getVRes() == 98)
  665     vres = 98;
  666     else if (job.getJCI().getVRes() == 196)
  667     vres = 196;
  668     if (job.getJCI().getUseXVRes() == 0)
  669     usexvres = 0;
  670     else if (job.getJCI().getUseXVRes() == 1)
  671     usexvres = 1;
  672 
  673     // use the highest resolution the client supports
  674     params.vr = VR_NORMAL;
  675 
  676     if (usexvres) {
  677     if (info.getSupportsVRes() & VR_200X100 && job.modem->supportsVR(VR_200X100))
  678         params.vr = VR_200X100;
  679     if (info.getSupportsVRes() & VR_FINE && job.modem->supportsVR(VR_FINE))
  680         params.vr = VR_FINE;
  681     if (info.getSupportsVRes() & VR_200X200 && job.modem->supportsVR(VR_200X200))
  682         params.vr = VR_200X200;
  683     if (info.getSupportsVRes() & VR_R8 && job.modem->supportsVR(VR_R8))
  684         params.vr = VR_R8;
  685     if (info.getSupportsVRes() & VR_200X400 && job.modem->supportsVR(VR_200X400))
  686         params.vr = VR_200X400;
  687     if (info.getSupportsVRes() & VR_300X300 && job.modem->supportsVR(VR_300X300))
  688         params.vr = VR_300X300;
  689     if (info.getSupportsVRes() & VR_R16 && job.modem->supportsVR(VR_R16))
  690         params.vr = VR_R16;
  691     } else {                // limit ourselves to normal and fine
  692     if (vres > 150) {
  693         if (info.getSupportsVRes() & VR_FINE && job.modem->supportsVR(VR_FINE))
  694         params.vr = VR_FINE;
  695     }
  696     }
  697     /*
  698      * Follow faxsend and use unlimited page length whenever possible.
  699      */
  700     useUnlimitedLN = (info.getMaxPageLengthInMM() == (u_short) -1);
  701 
  702     if (job.getJCI().getPageSize().length()) {
  703     PageSizeInfo* psi = PageSizeInfo::getPageSizeByName(job.getJCI().getPageSize());
  704     params.setPageWidthInMM(
  705         fxmin((u_int) psi->width(), (u_int) info.getMaxPageWidthInMM()));
  706     params.setPageLengthInMM(
  707         fxmin((u_int) psi->height(), (u_int) info.getMaxPageLengthInMM()));
  708     } else {
  709     params.setPageWidthInMM(
  710         fxmin((u_int) req.pagewidth, (u_int) info.getMaxPageWidthInMM()));
  711     params.setPageLengthInMM(
  712         fxmin((u_int) req.pagelength, (u_int) info.getMaxPageLengthInMM()));
  713     }
  714 
  715     /*
  716      * Generate MMR or 2D-encoded facsimile if:
  717      * o the server is permitted to generate it,
  718      * o the modem is capable of sending it,
  719      * o the remote side is known to be capable of it, and
  720      * o the user hasn't specified a desire to send 1D data.
  721      */
  722     int jcdf = job.getJCI().getDesiredDF();
  723     if (jcdf != -1) req.desireddf = jcdf;
  724     if (req.usecolor) params.jp = JP_COLOR;
  725     if (req.usecolor && req.desireddf > DF_JBIG) {
  726     // code for JPEG-only fax...
  727     params.df = (u_int) -1;
  728     } else if (req.desireddf == DF_2DMMR && (req.desiredec != EC_DISABLE) && 
  729     use2D && job.modem->supportsMMR() &&
  730      (! info.getCalledBefore() || info.getSupportsMMR()) )
  731         params.df = DF_2DMMR;
  732     else if (req.desireddf > DF_1DMH) {
  733     params.df = (use2D && job.modem->supports2D() &&
  734         (! info.getCalledBefore() || info.getSupports2DEncoding()) ) ?
  735         DF_2DMR : DF_1DMH;
  736     } else
  737     params.df = DF_1DMH;
  738 
  739     /*
  740      * Restrict parameter selection for destinations with poor audio quality.
  741      */
  742     u_int dataSent = info.getDataSent() + info.getDataSent1() + info.getDataSent2();
  743     u_int dataMissed = info.getDataMissed() + info.getDataMissed1() + info.getDataMissed2();
  744     if (class1RestrictPoorDestinations && dataSent && dataMissed * 100 / dataSent > class1RestrictPoorDestinations) {
  745     params.jp = JP_NONE;
  746     params.vr = VR_NORMAL;
  747     traceJob(job, "This destination exhibits poor call audio quality.  Restricting resolution and color support.");
  748     }
  749 
  750     /*
  751      * Check and process the documents to be sent
  752      * using the parameter selected above.
  753      */
  754     JobStatus status = Job::done;
  755     bool updateQFile = false;
  756     fxStr tmp;      // NB: here to avoid compiler complaint
  757     u_int i = 0;
  758     while (i < req.items.length() && status == Job::done && !abortPrepare) {
  759     FaxItem& fitem = req.items[i];
  760     switch (fitem.op) {
  761     case FaxRequest::send_postscript:   // convert PostScript
  762     case FaxRequest::send_pcl:      // convert PCL
  763     case FaxRequest::send_tiff:     // verify&possibly convert TIFF
  764         case FaxRequest::send_pdf:      // convert PDF
  765         tmp = FaxRequest::mkbasedoc(fitem.item) | ";" | params.encodePage();
  766         status = convertDocument(job, fitem, tmp, params, req.notice);
  767         if (status == Job::done) {
  768         /*
  769          * Insert converted file into list and mark the
  770          * original document so that it's saved, but
  771          * not processed again.  The converted file
  772          * is sent, while the saved file is kept around
  773          * in case it needs to be returned to the sender.
  774          */
  775         fitem.op++;         // NB: assumes order of enum
  776         req.insertFax(i+1, tmp);
  777         } else {
  778         Sys::unlink(tmp);       // bail out
  779         Sys::unlink(tmp|".color");
  780         }
  781         updateQFile = true;
  782         break;
  783     }
  784     i++;
  785     }
  786     if (status == Job::done && !abortPrepare) {
  787     if (req.pagehandling == "" && !abortPrepare) {
  788         /*
  789          * Calculate/recalculate the per-page session parameters
  790          * and check the page count against the max pages.  We
  791          * do this before generating continuation any cover page
  792          * to prevent any skippages setting from trying to skip
  793          * the continuation cover page.
  794          */
  795         if (!preparePageHandling(job, req, info, req.notice)) {
  796         status = Job::rejected;     // XXX
  797         req.notice.insert("Document preparation failed: ");
  798         }
  799         updateQFile = true;
  800     }
  801     if (req.cover != "" && !abortPrepare) {
  802         /*
  803          * Generate a continuation cover page if necessary.
  804          * Note that a failure in doing this is not considered
  805          * fatal; perhaps this should be configurable?
  806          */
  807         if (updateQFile) 
  808         updateRequest(req, job);    // cover-page generation may look at the file
  809         if (makeCoverPage(job, req, params))
  810         req.nocountcover++;
  811         updateQFile = true;
  812         /*
  813          * Recalculate the per-page session parameters.
  814          */
  815         if (!preparePageHandling(job, req, info, req.notice)) {
  816         status = Job::rejected;     // XXX
  817         req.notice.insert("Document preparation failed: ");
  818         }
  819     }    
  820     }
  821     if (updateQFile)
  822     updateRequest(req, job);
  823     return (status);
  824 }
  825 
  826 /*
  827  * Prepare the job for transmission by analysing
  828  * the page characteristics and determining whether
  829  * or not the page transfer parameters will have
  830  * to be renegotiated after the page is sent.  This
  831  * is done before the call is placed because it can
  832  * be slow and there can be timing problems if this
  833  * is done during transmission.
  834  */
  835 bool
  836 faxQueueApp::preparePageHandling(Job& job, FaxRequest& req,
  837     const FaxMachineInfo& info, fxStr& emsg)
  838 {
  839     /*
  840      * Figure out whether to try chopping off white space
  841      * from the bottom of pages.  This can only be done
  842      * if the remote device is thought to be capable of
  843      * accepting variable-length pages.
  844      */
  845     u_int pagechop;
  846     if (info.getMaxPageLengthInMM() == (u_short)-1) {
  847     pagechop = req.pagechop;
  848     if (pagechop == FaxRequest::chop_default)
  849         pagechop = pageChop;
  850     } else
  851     pagechop = FaxRequest::chop_none;
  852     u_int maxPages = job.getJCI().getMaxSendPages();
  853     /*
  854      * Scan the pages and figure out where session parameters
  855      * will need to be renegotiated.  Construct a string of
  856      * indicators to use when doing the actual transmission.
  857      *
  858      * NB: all files are coalesced into a single fax document
  859      *     if possible
  860      */
  861     Class2Params params;        // current parameters
  862     Class2Params next;          // parameters for ``next'' page
  863     TIFF* tif = NULL;           // current open TIFF image
  864     req.totpages = req.npages;      // count pages previously transmitted
  865     u_int docpages;
  866     for (u_int i = 0;;) {
  867     if (!tif || TIFFLastDirectory(tif)) {
  868         /*
  869          * Locate the next file to be sent.
  870          */
  871         if (tif)            // close previous file
  872         TIFFClose(tif), tif = NULL;
  873         if (i >= req.items.length()) {
  874         req.pagehandling.append('P');       // EOP
  875         req.skippages = 0;
  876         return (true);
  877         }
  878         i = req.findItem(FaxRequest::send_fax, i);
  879         if (i == fx_invalidArrayIndex) {
  880         req.pagehandling.append('P');       // EOP
  881         req.skippages = 0;
  882         return (true);
  883         }
  884         const FaxItem& fitem = req.items[i];
  885         tif = TIFFOpen(fitem.item, "r");
  886         if (tif == NULL) {
  887         emsg = "Can not open document file " | fitem.item | " {E314}";
  888         if (tif)
  889             TIFFClose(tif);
  890         return (false);
  891         }
  892         if (fitem.dirnum != 0 && !TIFFSetDirectory(tif, fitem.dirnum)) {
  893         emsg = fxStr::format(
  894             "Can not set directory %u in document file %s {E315}"
  895             , fitem.dirnum
  896             , (const char*) fitem.item
  897         );
  898         if (tif)
  899             TIFFClose(tif);
  900         return (false);
  901         }
  902         docpages = fitem.dirnum;
  903         i++;            // advance for next find
  904     } else {
  905         /*
  906          * Read the next TIFF directory.
  907          */
  908         if (!TIFFReadDirectory(tif)) {
  909         emsg = fxStr::format(
  910             "Error reading directory %u in document file %s {E316}"
  911             , TIFFCurrentDirectory(tif)
  912             , TIFFFileName(tif)
  913         );
  914         if (tif)
  915             TIFFClose(tif);
  916         return (false);
  917         }
  918         docpages++;
  919     }
  920     if (++req.totpages > maxPages) {
  921         emsg = fxStr::format("Too many pages in submission; max %u {E317}",
  922         maxPages);
  923         if (tif)
  924         TIFFClose(tif);
  925         return (false);
  926     }
  927     if (req.skippages) {
  928         req.totpages--;
  929         req.skippages--;
  930         req.skippedpages++;
  931         if (req.skippages == 0) {
  932         req.items[i-1].dirnum = docpages + 1;       // mark it
  933         }
  934         if (TIFFLastDirectory(tif)) {
  935         req.renameSaved(i-1);
  936         req.items.remove(i-1);
  937         }
  938     } else {
  939         next = params;
  940         setupParams(tif, next, info);
  941         if (params.df != (u_int) -1 || params.jp != (u_int) -1) {
  942         /*
  943          * The pagehandling string has:
  944          * 'M' = EOM, for when parameters must be renegotiated
  945          * 'S' = MPS, for when next page uses the same parameters
  946          * 'P' = EOP, for the last page to be transmitted
  947          */
  948         if (next != params) {
  949             /*
  950              * There is no reason to switch from VR_NORMAL to VR_200X100 or from VR_FINE
  951              * to VR_200X200 or from VR_R8 to VR_200X400 or vice-versa because they are
  952              * essentially the same thing.
  953              */
  954             Class2Params save;
  955             save = next;
  956             if (((next.vr == VR_NORMAL)  && (params.vr == VR_200X100)) ||
  957                 ((next.vr == VR_200X100) && (params.vr == VR_NORMAL))  ||
  958                 ((next.vr == VR_FINE)    && (params.vr == VR_200X200)) ||
  959                 ((next.vr == VR_200X200) && (params.vr == VR_FINE))    ||
  960                 ((next.vr == VR_R8)      && (params.vr == VR_200X400)) ||
  961                 ((next.vr == VR_200X400) && (params.vr == VR_R8))) {
  962             next.vr = params.vr;
  963             if (next != params) next = save;    // only ignore VR difference if there are no other differences
  964             }
  965         }
  966         if (next != params) {
  967             fxStr thismsg = "Document format change between pages requires EOM";
  968             if (next.vr != params.vr) thismsg = fxStr::format("%s; VR differs - this page: %d, next page: %d", (const char*) thismsg, params.vr, next.vr);
  969             if (next.br != params.br) thismsg = fxStr::format("%s; BR differs - this page: %d, next page: %d", (const char*) thismsg, params.br, next.br);
  970             if (next.wd != params.wd) thismsg = fxStr::format("%s; WD differs - this page: %d, next page: %d", (const char*) thismsg, params.wd, next.wd);
  971             if (next.ln != params.ln) thismsg = fxStr::format("%s; LN differs - this page: %d, next page: %d", (const char*) thismsg, params.ln, next.ln);
  972             if (next.df != params.df) thismsg = fxStr::format("%s; DF differs - this page: %d, next page: %d", (const char*) thismsg, params.df, next.df);
  973             if (next.ec != params.ec) thismsg = fxStr::format("%s; EC differs - this page: %d, next page: %d", (const char*) thismsg, params.ec, next.ec);
  974             if (next.bf != params.bf) thismsg = fxStr::format("%s; BF differs - this page: %d, next page: %d", (const char*) thismsg, params.bf, next.bf);
  975             if (next.st != params.st) thismsg = fxStr::format("%s; ST differs - this page: %d, next page: %d", (const char*) thismsg, params.st, next.st);
  976             if (next.jp != params.jp) thismsg = fxStr::format("%s; JP differs - this page: %d, next page: %d", (const char*) thismsg, params.jp, next.jp);
  977             traceJob(job, (const char*) thismsg);
  978         }
  979         req.pagehandling.append(next == params ? 'S' : 'M');
  980         }
  981         /*
  982          * Record the session parameters needed by each page
  983          * so that we can set the initial session parameters
  984          * as needed *before* dialing the telephone.  This is
  985          * to cope with Class 2 modems that do not properly
  986          * implement the +FDIS command.
  987          */
  988         req.pagehandling.append(next.encodePage());
  989         /*
  990          * If page is to be chopped (i.e. sent with trailing white
  991          * space removed so the received page uses minimal paper),
  992          * scan the data and, if possible, record the amount of data
  993          * that should not be sent.  The modem drivers will use this
  994          * information during transmission if it's actually possible
  995          * to do the chop (based on the negotiated session parameters).
  996          */
  997         if (pagechop == FaxRequest::chop_all ||
  998            (pagechop == FaxRequest::chop_last && TIFFLastDirectory(tif)))
  999             preparePageChop(req, tif, next, req.pagehandling);
 1000         params = next;
 1001     }
 1002     }
 1003 }
 1004 
 1005 /*
 1006  * Select session parameters according to the info
 1007  * in the TIFF file.  We setup the encoding scheme,
 1008  * page width & length, and vertical-resolution
 1009  * parameters.
 1010  */
 1011 void
 1012 faxQueueApp::setupParams(TIFF* tif, Class2Params& params, const FaxMachineInfo& info)
 1013 {
 1014     params.jp = 0;
 1015     uint16 compression = 0;
 1016     (void) TIFFGetField(tif, TIFFTAG_COMPRESSION, &compression);
 1017     if (compression == COMPRESSION_NONE) {
 1018     params.jp = JP_COLOR;
 1019     params.df = (u_int) -1;
 1020     } else if (compression == COMPRESSION_CCITTFAX4) {
 1021     params.df = DF_2DMMR;
 1022     } else {
 1023     uint32 g3opts = 0;
 1024     TIFFGetField(tif, TIFFTAG_GROUP3OPTIONS, &g3opts);
 1025     params.df = (g3opts&GROUP3OPT_2DENCODING ? DF_2DMR : DF_1DMH);
 1026     }
 1027 
 1028     uint32 w;
 1029     TIFFGetField(tif, TIFFTAG_IMAGEWIDTH, &w);
 1030     params.setPageWidthInPixels((u_int) w);
 1031 
 1032     /*
 1033      * Try to deduce the vertical resolution of the image
 1034      * image.  This can be problematical for arbitrary TIFF
 1035      * images 'cuz vendors sometimes don't give the units.
 1036      * We, however, can depend on the info in images that
 1037      * we generate 'cuz we're careful to include valid info.
 1038      */
 1039     float yres, xres;
 1040     if (TIFFGetField(tif, TIFFTAG_YRESOLUTION, &yres) && TIFFGetField(tif, TIFFTAG_XRESOLUTION, &xres)) {
 1041     uint16 resunit;
 1042     TIFFGetFieldDefaulted(tif, TIFFTAG_RESOLUTIONUNIT, &resunit);
 1043     if (resunit == RESUNIT_CENTIMETER) {
 1044         yres *= 25.4;
 1045         xres *= 25.4;
 1046     }
 1047     params.setRes((u_int) xres, (u_int) yres);
 1048     } else {
 1049     /*
 1050      * No resolution is specified, try
 1051      * to deduce one from the image length.
 1052      */
 1053     uint32 l;
 1054     TIFFGetField(tif, TIFFTAG_IMAGELENGTH, &l);
 1055     // B4 at 98 lpi is ~1400 lines
 1056     params.setRes(204, (l < 1450 ? 98 : 196));
 1057     }
 1058 
 1059     /*
 1060      * Select page length according to the image size and
 1061      * vertical resolution.  Note that if the resolution
 1062      * info is bogus, we may select the wrong page size.
 1063      */
 1064     if (info.getMaxPageLengthInMM() != (u_short)-1) {
 1065     uint32 h;
 1066     TIFFGetField(tif, TIFFTAG_IMAGELENGTH, &h);
 1067     params.setPageLengthInMM((u_int)(h / yres));
 1068     } else
 1069     params.ln = LN_INF;
 1070 }
 1071 
 1072 void
 1073 faxQueueApp::preparePageChop(const FaxRequest& req,
 1074     TIFF* tif, const Class2Params& params, fxStr& pagehandling)
 1075 {
 1076     tstrip_t s = TIFFNumberOfStrips(tif)-1;
 1077     TIFFSTRIPBYTECOUNTS* stripbytecount;
 1078     (void) TIFFGetField(tif, TIFFTAG_STRIPBYTECOUNTS, &stripbytecount);
 1079     u_int stripSize = (u_int) stripbytecount[s];
 1080     if (stripSize == 0)
 1081     return;
 1082     u_char* data = new u_char[stripSize];
 1083     if (TIFFReadRawStrip(tif, s, data, stripSize) >= 0) {
 1084     uint16 fillorder;
 1085     TIFFGetFieldDefaulted(tif, TIFFTAG_FILLORDER, &fillorder);
 1086 
 1087     MemoryDecoder dec(data, stripSize);
 1088     dec.scanPageForBlanks(fillorder, params);
 1089 
 1090     float threshold = req.chopthreshold;
 1091     if (threshold == -1)
 1092         threshold = pageChopThreshold;
 1093     u_int minRows = 0;
 1094     switch(params.vr) {
 1095         case VR_NORMAL:
 1096         case VR_200X100:
 1097         minRows = (u_int) (98. * threshold);
 1098         break;
 1099         case VR_FINE:
 1100         case VR_200X200:
 1101         minRows = (u_int) (196. * threshold);
 1102         break;
 1103         case VR_300X300:
 1104         minRows = (u_int) (300. * threshold);
 1105         break;
 1106         case VR_R8:
 1107         case VR_R16:
 1108         case VR_200X400:
 1109         minRows = (u_int) (391. * threshold);
 1110         break;
 1111     }
 1112     if (dec.getLastBlanks() > minRows)
 1113     {
 1114         pagehandling.append(fxStr::format("Z%04x",
 1115         fxmin((unsigned)0xFFFF, stripSize - (u_int)(dec.getEndOfPage() - data))));
 1116     }
 1117     }
 1118     delete [] data;
 1119 }
 1120 
 1121 /*
 1122  * Convert a document into a form suitable
 1123  * for transmission to the remote fax machine.
 1124  */
 1125 JobStatus
 1126 faxQueueApp::convertDocument(Job& job,
 1127     const FaxItem& req,
 1128     const fxStr& outFile,
 1129     const Class2Params& params,
 1130     fxStr& emsg)
 1131 {
 1132     JobStatus status;
 1133     /*
 1134      * Open/create the target file and lock it to guard against
 1135      * concurrent jobs imaging the same document with the same
 1136      * parameters.  The parent will hold the open file descriptor
 1137      * for the duration of the imaging job.  Concurrent jobs will
 1138      * block on flock and wait for the imaging to be completed.
 1139      * Previously imaged documents will be flock'd immediately
 1140      * and reused without delays after verifying that they were
 1141      * last modified *after* the source image.
 1142      *
 1143      * NB: There is a race condition here.  One process may create
 1144      * the file but another may get the shared lock above before
 1145      * the exclusive lock below is captured.  If this happens
 1146      * then the exclusive lock will block temporarily, but the
 1147      * process with the shared lock may attempt to send a document
 1148      * before it's preparation is completed.  We could add a delay
 1149      * before the shared lock but that would slow down the normal
 1150      * case and the window is small--so let's leave it there for now.
 1151      */
 1152     struct stat sin;
 1153     struct stat sout;
 1154     if (Sys::stat(outFile, sout) == 0 && Sys::stat(req.item, sin) == 0) {
 1155     if (sout.st_mtime < sin.st_mtime) {
 1156         /*
 1157          * It appears that the target file exists and is 
 1158          * older than the source image.  (Thus the old target is an image 
 1159          * file from a previous job.)  This can happen, for example,
 1160          * if faxqclean isn't being run frequently-enough and faxq
 1161          * for some reason did not delete the old target file after its job 
 1162          * completion.  Thus, we delete the old file before continuing.
 1163          */
 1164          jobError(job, "Removing old image file: %s (run faxqclean more often)", (const char*) outFile);
 1165          (void) Sys::unlink(outFile);
 1166          (void) Sys::unlink(outFile|".color");
 1167     }
 1168     }
 1169     int fd = Sys::open(outFile, O_RDWR|O_CREAT|O_EXCL, 0600);
 1170     int colorfd = -1;
 1171     if (params.jp)
 1172     colorfd = Sys::open(outFile|".color", O_RDWR|O_CREAT|O_EXCL, 0600);
 1173     if (fd == -1) {
 1174     if (errno == EEXIST) {
 1175         /*
 1176          * The file already exist, flock it in case it's
 1177          * being created (we'll block until the imaging
 1178          * is completed).  Otherwise, the document imaging
 1179          * has already been completed and we can just use it.
 1180          */
 1181         fd = Sys::open(outFile, O_RDWR);    // NB: RDWR for flock emulations
 1182         if (fd != -1) {
 1183         if (flock(fd, LOCK_SH) == -1) {
 1184             status = Job::format_failed;
 1185             emsg = "Unable to lock shared document file {E318}";
 1186         } else
 1187             status = Job::done;
 1188         (void) Sys::close(fd);      // NB: implicit unlock
 1189         } else {
 1190         /*
 1191          * This *can* happen if document preparation done
 1192          * by another job fails (e.g. because of a time
 1193          * limit or a malformed PostScript submission).
 1194          */
 1195         status = Job::format_failed;
 1196         emsg = "Unable to open shared document file {E319}";
 1197         }
 1198     } else {
 1199         status = Job::format_failed;
 1200         emsg = "Unable to create document file {E320}";
 1201     }
 1202     /*
 1203      * We were unable to open, create, or flock
 1204      * the file.  This should not happen.
 1205      */
 1206     if (status != Job::done)
 1207         jobError(job, "CONVERT DOCUMENT: %s: %m", (const char*) emsg);
 1208     } else {
 1209     (void) flock(fd, LOCK_EX);      // XXX check for errors?
 1210     if (params.jp) (void) flock(colorfd, LOCK_EX);
 1211     /*
 1212      * Imaged document does not exist, run the document converter
 1213      * to generate it.  The converter is invoked according to:
 1214      *   -i jobid       jobid number
 1215      *   -o file        output (temp) file
 1216      *   -r <res>       output resolution (dpi)
 1217      *   -w <pagewidth> output page width (pixels)
 1218      *   -l <pagelength>    output page length (mm)
 1219      *   -m <maxpages>  max pages to generate
 1220      *   -1|-2|-3       1d, 2d, or 2d-mmr encoding
 1221      */
 1222     fxStr rbuf = fxStr::format("%u", params.verticalRes());
 1223     fxStr wbuf = fxStr::format("%u", params.pageWidth());
 1224     fxStr lbuf = fxStr::format("%d", params.pageLength());
 1225     fxStr mbuf = fxStr::format("%u", job.getJCI().getMaxSendPages());
 1226     const char* argv[30];
 1227     int ac = 0;
 1228     switch (req.op) {
 1229     case FaxRequest::send_postscript: argv[ac++] = ps2faxCmd; break;
 1230     case FaxRequest::send_pdf:    argv[ac++] = pdf2faxCmd; break;
 1231     case FaxRequest::send_pcl:    argv[ac++] = pcl2faxCmd; break;
 1232     case FaxRequest::send_tiff:   argv[ac++] = tiff2faxCmd; break;
 1233     }
 1234     argv[ac++] = "-i"; argv[ac++] = (const char*)job.jobid;
 1235     argv[ac++] = "-o"; argv[ac++] = outFile;
 1236     argv[ac++] = "-r"; argv[ac++] = (const char*)rbuf;
 1237     argv[ac++] = "-w"; argv[ac++] = (const char*)wbuf;
 1238     argv[ac++] = "-l"; argv[ac++] = (const char*)lbuf;
 1239     argv[ac++] = "-m"; argv[ac++] = (const char*)mbuf;
 1240     if (useUnlimitedLN) argv[ac++] = "-U";
 1241     if (params.jp == JP_COLOR)
 1242         argv[ac++] = "-color";
 1243     // When df is -1, then it's a color-only job.
 1244     if (params.df != (u_int) -1) {
 1245         if (params.df == DF_2DMMR)
 1246         argv[ac++] = "-3";
 1247         else
 1248         argv[ac++] = params.df == DF_1DMH ? "-1" : "-2";
 1249     }
 1250     argv[ac++] = req.item;
 1251     argv[ac] = NULL;
 1252     // XXX the (char* const*) is a hack to force type compatibility
 1253     status = runConverter(job, argv[0], (char* const*) argv, emsg);
 1254     if (status == Job::done) {
 1255         /*
 1256          * Many converters exit with zero status even when
 1257          * there are problems so scan the the generated TIFF
 1258          * to verify the integrity of the converted data.
 1259          *
 1260          * NB: We must reopen the file instead of using the
 1261          *     previously opened file descriptor in case the
 1262          *     converter creates a new file with the given
 1263          *     output filename instead of just overwriting the
 1264          *     file created above.  This can easily happen if,
 1265          *     for example, the converter creates a link from
 1266          *     the input file to the target (e.g. tiff2fax
 1267          *     does this when no conversion is required).
 1268          */
 1269         TIFF* tif = TIFFOpen(outFile, "r");
 1270         if (tif) {
 1271         while (!TIFFLastDirectory(tif))
 1272             if (!TIFFReadDirectory(tif)) {
 1273             status = Job::format_failed;
 1274             emsg = "Converted document is not valid TIFF {E321}";
 1275             break;
 1276             }
 1277         TIFFClose(tif);
 1278         } else {
 1279         status = Job::format_failed;
 1280         emsg = "Could not reopen converted document to verify format {E322}";
 1281         }
 1282         if (status == Job::done)    // discard any debugging output
 1283         emsg = "";
 1284         else
 1285         jobError(job, "CONVERT DOCUMENT: %s", (const char*) emsg);
 1286     } else if (status == Job::rejected)
 1287         jobError(job, "SEND REJECT: %s", (const char*) emsg);
 1288     (void) Sys::close(fd);      // NB: implicit unlock
 1289     if (params.jp) (void) Sys::close(colorfd);  // NB: implicit unlock
 1290     }
 1291     return (status);
 1292 }
 1293 
 1294 static void
 1295 closeAllBut(int fd)
 1296 {
 1297     for (int f = Sys::getOpenMax()-1; f >= 0; f--)
 1298     if (f != fd)
 1299         Sys::close(f);
 1300 }
 1301 
 1302 /*
 1303  * Startup a document converter program in a subprocess
 1304  * with the output returned through a pipe.  We could just use
 1305  * popen or similar here, but we want to detect fork failure
 1306  * separately from others so that jobs can be requeued instead
 1307  * of rejected.
 1308  */
 1309 JobStatus
 1310 faxQueueApp::runConverter(Job& job, const char* app, char* const* argv, fxStr& emsg)
 1311 {
 1312     fxStr cmdline(argv[0]);
 1313     for (u_int i = 1; argv[i] != NULL; i++)
 1314     cmdline.append(fxStr::format(" %s", argv[i]));
 1315     traceQueue(job, "CONVERT DOCUMENT: %s", (const char*)cmdline);
 1316     JobStatus status;
 1317     int pfd[2];
 1318     if (pipe(pfd) >= 0) {
 1319     int fd;
 1320     pid_t pid = fork();
 1321     switch (pid) {
 1322     case -1:            // error
 1323         jobError(job, "CONVERT DOCUMENT: fork: %m");
 1324         status = Job::requeued; // job should be retried
 1325         Sys::close(pfd[1]);
 1326         break;
 1327     case 0:             // child, exec command
 1328         if (pfd[1] != STDOUT_FILENO)
 1329         dup2(pfd[1], STDOUT_FILENO);
 1330         /*
 1331          * We're redirecting application stdout and stderr back through
 1332          * the pipe to the faxq parent, but we don't want faxq stdin
 1333          * to be available to the application, so we close all except
 1334          * for stdout, redirect stderr to stdout, and redirect stdin
 1335          * to devnull.  We could, perhaps, just close stdin outright,
 1336          * but some applications and libc versions require stdin to not
 1337          * be closed.
 1338          */
 1339         closeAllBut(STDOUT_FILENO);
 1340         dup2(STDOUT_FILENO, STDERR_FILENO);
 1341         fd = Sys::open(_PATH_DEVNULL, O_RDWR);
 1342         if (fd != STDIN_FILENO)
 1343         {
 1344             dup2(fd, STDIN_FILENO);
 1345             Sys::close(fd);
 1346         }
 1347         Sys::execv(app, argv);
 1348         sleep(3);           // XXX give parent time to catch signal
 1349         _exit(255);
 1350         /*NOTREACHED*/
 1351     default:            // parent, read from pipe and wait
 1352         Sys::close(pfd[1]);
 1353         if (runConverter1(job, pfd[0], emsg)) {
 1354         int estat = -1;
 1355         (void) Sys::waitpid(pid, estat);
 1356         if (estat)
 1357             jobError(job, "CONVERT DOCUMENT: exit status %#x", estat);
 1358         switch (estat) {
 1359         case 0:          status = Job::done; break;
 1360             case (254<<8):       status = Job::rejected; break;
 1361         case (255<<8): case 255: status = Job::no_formatter; break;
 1362         default:         status = Job::format_failed; break;
 1363         }
 1364         } else {
 1365         kill(pid, SIGTERM);
 1366         (void) Sys::waitpid(pid);
 1367         status = Job::format_failed;
 1368         }
 1369         break;
 1370     }
 1371     Sys::close(pfd[0]);
 1372     } else {
 1373     jobError(job, "CONVERT DOCUMENT: pipe: %m");
 1374     status = Job::format_failed;
 1375     }
 1376     return (status);
 1377 }
 1378 
 1379 /*
 1380  * Replace unprintable characters with ``?''s.
 1381  */
 1382 static void
 1383 cleanse(char buf[], int n)
 1384 {
 1385     while (--n >= 0)
 1386     if (!isprint(buf[n]) && !isspace(buf[n]))
 1387         buf[n] = '?';
 1388 }
 1389 
 1390 /*
 1391  * Run the interpreter with the configured timeout and
 1392  * collect the output from the interpreter in case there
 1393  * is an error -- this is sent back to the user that
 1394  * submitted the job.
 1395  */
 1396 bool
 1397 faxQueueApp::runConverter1(Job& job, int fd, fxStr& output)
 1398 {
 1399     int n;
 1400     Timeout timer;
 1401     timer.startTimeout(postscriptTimeout*1000);
 1402     char buf[1024];
 1403     while ((n = Sys::read(fd, buf, sizeof (buf))) > 0 && !timer.wasTimeout()) {
 1404     cleanse(buf, n);
 1405     output.append(buf, n);
 1406     }
 1407     timer.stopTimeout();
 1408     if (timer.wasTimeout()) {
 1409     jobError(job, "CONVERT DOCUMENT: job time limit exceeded");
 1410     if (output.length()) output.append("\n");
 1411     output.append("[Job time limit exceeded]");
 1412     return (false);
 1413     } else
 1414     return (true);
 1415 }
 1416 
 1417 /*
 1418  * Generate a continuation cover page and insert it in
 1419  * the array of files to be sent.  Note that we assume
 1420  * the cover page command generates PostScript which we
 1421  * immediately image, discarding the PostScript.  We
 1422  * could have the cover page command script do this, but
 1423  * then it would need to know how to invoke the PostScript
 1424  * imager per the job characteristics.  Note that we could
 1425  * optimize things here by updating the pagehandling and
 1426  * page counts for the job instead of resetting pagehandling
 1427  * so that everything just gets recalculated from scratch.
 1428  */
 1429 bool
 1430 faxQueueApp::makeCoverPage(Job& job, FaxRequest& req, const Class2Params& params)
 1431 {
 1432     bool ok = true;
 1433     FaxItem fitem(FaxRequest::send_postscript, 0, fxStr::null, req.cover);
 1434     fxStr cmd(coverCmd
 1435     | quote | quoted(req.qfile)             | enquote
 1436     | quote | quoted(contCoverPageTemplate) | enquote
 1437     | quote | fitem.item                    | enquote
 1438     );
 1439     traceQueue(job, "COVER PAGE: %s", (const char*)cmd);
 1440     if (runCmd(cmd, true)) {
 1441     fxStr emsg;
 1442     fxStr tmp = fitem.item | ";" | params.encodePage();
 1443     if (convertDocument(job, fitem, tmp, params, emsg)) {
 1444         req.insertFax(0, tmp);
 1445         req.cover = tmp;            // needed in sendJobDone
 1446         req.pagehandling = "";      // XXX force recalculation
 1447     } else {
 1448         jobError(job, "SEND: No continuation cover page, "
 1449         " document conversion failed: %s", (const char*) emsg);
 1450         ok = false;
 1451     }
 1452     Sys::unlink(fitem.item);
 1453     } else {
 1454     jobError(job,
 1455         "SEND: No continuation cover page, generation cmd failed");
 1456     ok = false;
 1457     }
 1458     return (ok);
 1459 }
 1460 
 1461 const fxStr&
 1462 faxQueueApp::pickCmd(const FaxRequest& req)
 1463 {
 1464     if (req.jobtype == "pager")
 1465     return (sendPageCmd);
 1466     if (req.jobtype == "uucp")
 1467     return (sendUUCPCmd);
 1468     return (sendFaxCmd);            // XXX gotta return something
 1469 }
 1470 
 1471 /*
 1472  * Setup the argument vector and exec a subprocess.
 1473  * This code assumes the command and dargs strings have
 1474  * previously been processed to insert \0 characters
 1475  * between each argument string (see crackArgv below).
 1476  */
 1477 static void
 1478 doexec(const char* cmd, const fxStr& dargs, const char* devid, const char* files, int nfiles)
 1479 {
 1480 #define MAXARGS 128
 1481     const char* av[MAXARGS];
 1482     int ac = 0;
 1483     const char* cp = strrchr(cmd, '/');
 1484     // NB: can't use ?: 'cuz of AIX compiler (XXX)
 1485     if (cp)
 1486     av[ac++] = cp+1;            // program name
 1487     else
 1488     av[ac++] = cmd;
 1489     cp = strchr(cmd,'\0');
 1490     const char* ep = strchr(cmd, '\0');
 1491     while (cp < ep && ac < MAXARGS-4) {     // additional pre-split args
 1492     av[ac++] = ++cp;
 1493     cp = strchr(cp,'\0');
 1494     }
 1495     cp = dargs;
 1496     ep = cp + dargs.length();
 1497     while (cp < ep && ac < MAXARGS-4) {     // pre-split dargs
 1498     av[ac++] = cp;
 1499     cp = strchr(cp,'\0')+1;
 1500     }
 1501     av[ac++] = "-m"; av[ac++] = devid;
 1502 
 1503     if (! (MAXARGS > ac + nfiles))
 1504     {
 1505     sleep(1);
 1506         logError("%d files requires %d arguments, max %d", nfiles, ac+nfiles+1, MAXARGS);
 1507     return;
 1508     }
 1509     while (files)
 1510     {
 1511     av[ac++] = files;
 1512     files = strchr(files, ' ');
 1513     /*
 1514      * We can be naster with memory here - we're exec()ing right way
 1515      */
 1516     if (files)
 1517         *(char*)files++ = '\0';
 1518     }
 1519 
 1520     av[ac] = NULL;
 1521     Sys::execv(cmd, (char* const*) av);
 1522 }
 1523 #undef MAXARGS
 1524 
 1525 static void
 1526 join(fxStr& s, const fxStr& a)
 1527 {
 1528     const char* cp = a;
 1529     const char* ep = cp + a.length();
 1530     while (cp < ep) {
 1531     s.append(' ');
 1532     s.append(cp);
 1533     cp = strchr(cp,'\0')+1;
 1534     }
 1535 }
 1536 
 1537 static fxStr
 1538 joinargs(const fxStr& cmd, const fxStr& dargs)
 1539 {
 1540     fxStr s;
 1541     join(s, cmd);
 1542     join(s, dargs);
 1543     return s;
 1544 }
 1545 
 1546 void
 1547 faxQueueApp::sendJobStart(Job& job, FaxRequest* req)
 1548 {
 1549     Job* cjob;
 1550     int nfiles = 1;
 1551 
 1552     job.start = Sys::now();         // start of transmission
 1553     fxStr files = job.file;
 1554     for (cjob = job.bnext; cjob != NULL; cjob = cjob->bnext) {
 1555     files = files | " " | cjob->file;
 1556     cjob->start = job.start;
 1557     // XXX start deadman timeout on active jobs
 1558     nfiles++;
 1559     }
 1560     
 1561     const fxStr& cmd = pickCmd(*req);
 1562     fxStr dargs(job.getJCI().getArgs());
 1563 
 1564     DestInfo& di = destJobs[job.dest];
 1565     if (di.getCalls() > 1 && job.getJCI().getMaxConcurrentCalls() == 0) {
 1566     /*
 1567      * Tell faxsend/pagesend to not increment dials counters if busy signal.
 1568      */
 1569     if(dargs != "") dargs.append('\0');
 1570     dargs.append("-B");
 1571     }
 1572 
 1573     int fd;
 1574     pid_t pid = fork();
 1575     switch (pid) {
 1576     case 0:             // child, startup command
 1577     /*
 1578      * faxq doesn't pay attention to faxsend through stdout/stderr,
 1579      * and we don't want faxq stdin to be available to the application,
 1580      * either.  So we close all and redirect stdin to devnull.  We 
 1581      * could, perhaps, just close stdin outright, but some applications
 1582      * and libc versions require stdin to not be closed.
 1583      */
 1584     closeAllBut(-1);        // NB: close 'em all
 1585     fd = Sys::open(_PATH_DEVNULL, O_RDWR);
 1586     if (fd != STDIN_FILENO)
 1587     {
 1588         dup2(fd, STDIN_FILENO);
 1589         Sys::close(fd);
 1590     }
 1591     doexec(cmd, dargs, job.modem->getDeviceID(), files, nfiles);
 1592     sleep(10);          // XXX give parent time to catch signal
 1593     _exit(127);
 1594     /*NOTREACHED*/
 1595     case -1:                // fork failed, sleep and retry
 1596     /*
 1597      * We were unable to start the command because the
 1598      * system is out of processes.  Take the jobs off the
 1599      * active list and requeue them for a future time. 
 1600      * If it appears that the we're doing this a lot,
 1601      * then lengthen the backoff.
 1602      */
 1603     Job* njob;
 1604     for (cjob = &job; cjob != NULL; cjob = njob) {
 1605         njob = cjob->bnext;
 1606         req = cjob->breq;
 1607         cjob->remove();         // Remove from active queue
 1608         delayJob(*cjob, *req, "Could not fork to start job transmission {E341}",
 1609         cjob->start + random() % requeueInterval);
 1610         delete req;
 1611     }
 1612     break;
 1613     default:                // parent, setup handler to wait
 1614     // joinargs puts a leading space so this looks funny here
 1615     traceQueue(job, "CMD START%s -m %s %s (PID %d)"
 1616         , (const char*) joinargs(cmd, dargs)
 1617         , (const char*) job.modem->getDeviceID()
 1618         , (const char*) files
 1619         , pid
 1620     );
 1621     job.startSend(pid);
 1622     for (cjob = &job; cjob != NULL; cjob = njob) {
 1623         cjob->pid = pid;
 1624         njob = cjob->bnext;
 1625         Trigger::post(Trigger::SEND_BEGIN, *cjob);
 1626         delete cjob->breq;      // discard handle (NB: releases lock)
 1627         cjob->breq = NULL;
 1628     }
 1629     break;
 1630     }
 1631 }
 1632 
 1633 void
 1634 faxQueueApp::sendJobDone(Job& job, int status)
 1635 {
 1636     traceQueue(job, "CMD DONE: exit status %#x", status);
 1637     if (status&0xff)
 1638     logError("Send program terminated abnormally with exit status %#x", status);
 1639 
 1640     if (numProxyJobs && status&0x8000) numProxyJobs--;  // 0x8000 indicates proxied job
 1641 
 1642     Job* cjob;
 1643     Job* njob;
 1644     DestInfo& di = destJobs[job.dest];
 1645     if (status&0x8000) {
 1646     di.proxyHangup();
 1647     } else {
 1648     di.hangup();                // do before unblockDestJobs
 1649     }
 1650     if (job.modem) releaseModem(job);       // done with modem
 1651     FaxRequest* req = readRequest(job);
 1652     if (req && (req->status != send_done && req->status != send_reformat)) {
 1653     // prevent turnaround-redialing, delay any blocked jobs
 1654     time_t newtts = req->tts;
 1655     while ((cjob = di.nextBlocked())) {
 1656         FaxRequest* blockedreq = readRequest(*cjob);
 1657         if (blockedreq) {
 1658         /*
 1659          * If the error was call-related such as the errors "Busy", "No answer",
 1660          * or "No carrier" then that error could legitimately be applied to 
 1661          * all of the blocked jobs, too, as there would have been no difference 
 1662          * between the jobs in this respect.  So we make use of the 
 1663          * "ShareCallFailures" configuration here to control this.  This helps
 1664          * large sets of jobs going to the same destination fail sooner than 
 1665          * they would individually and consume less modem attention.
 1666          */
 1667         if ((req->errorcode == "E001" && (shareCallFailures.find(0, "busy") < shareCallFailures.length() || shareCallFailures == "always")) ||
 1668             (req->errorcode == "E002" && (shareCallFailures.find(0, "nocarrier") < shareCallFailures.length() || shareCallFailures == "always")) ||
 1669             (req->errorcode == "E003" && (shareCallFailures.find(0, "noanswer") < shareCallFailures.length() || shareCallFailures == "always")) ||
 1670             (req->errorcode == "E004" && (shareCallFailures.find(0, "nodialtone") < shareCallFailures.length() || shareCallFailures == "always"))) {
 1671             blockedreq->status = send_retry;
 1672             blockedreq->totdials++;
 1673             delayJob(*cjob, *blockedreq, (const char*) req->notice, newtts);
 1674             delete blockedreq;
 1675         } else {
 1676             delayJob(*cjob, *blockedreq, "Delayed by prior call {E342}", newtts);
 1677             delete blockedreq;
 1678         }
 1679         }
 1680     }
 1681     } else {
 1682     unblockDestJobs(di, 1);     // force one to unblock
 1683     }
 1684     for (cjob = &job; cjob != NULL; cjob = njob) {
 1685     njob = cjob->bnext;
 1686     if (cjob != &job) req = readRequest(*cjob); // the first was already read
 1687     if (!req) {
 1688         time_t now = Sys::now();
 1689         time_t duration = now - job.start;
 1690         logError("JOB %s: SEND FINISHED: %s; but job file vanished",
 1691         (const char*) cjob->jobid, fmtTime(duration));
 1692         setDead(*cjob);
 1693         continue;
 1694     }
 1695     sendJobDone(*cjob, req);
 1696     }
 1697     pokeScheduler();
 1698 }
 1699 
 1700 void
 1701 faxQueueApp::sendJobDone(Job& job, FaxRequest* req)
 1702 {
 1703     time_t now = Sys::now();
 1704     time_t duration = now - job.start;
 1705     job.bnext = NULL; job.bprev = NULL;     // clear any batching
 1706     job.commid = req->commid;           // passed from subprocess
 1707 
 1708     Trigger::post(Trigger::SEND_END, job);
 1709 
 1710     if (req->status == 127) {
 1711     req->notice = "Send program terminated abnormally; unable to exec " |
 1712         pickCmd(*req) | "{E343}";
 1713     req->status = send_failed;
 1714     logError("JOB %s: %s",
 1715         (const char*)job.jobid, (const char*)req->notice);
 1716     }
 1717     if (req->status == send_reformat) {
 1718     /*
 1719      * Job requires reformatting to deal with the discovery
 1720      * of unexpected remote capabilities (either because
 1721      * the capabilities changed or because the remote side
 1722      * has never been called before and the default setup
 1723      * created a capabilities mismatch).  Purge the job of
 1724      * any formatted information and reset the state so that
 1725      * when the job is retried it will be reformatted according
 1726      * to the updated remote capabilities.
 1727      */
 1728     Trigger::post(Trigger::SEND_REFORMAT, job);
 1729     u_int i = 0;
 1730     while (i < req->items.length()) {
 1731         FaxItem& fitem = req->items[i];
 1732         if (fitem.op == FaxRequest::send_fax) {
 1733         req->items.remove(i);
 1734         continue;
 1735         }
 1736         if (fitem.isSavedOp())
 1737         fitem.op--;         // assumes order of enum
 1738         i++;
 1739     }
 1740     req->pagehandling = "";         // force recalculation
 1741     req->status = send_retry;       // ... force retry
 1742     }
 1743     /*
 1744      * If the job did not finish and it is due to be
 1745      * suspended (possibly followed by termination),
 1746      * then treat it as if it is to be retried in case
 1747      * it does get rescheduled.
 1748      */
 1749     if (req->status != send_done && job.suspendPending) {
 1750     req->notice = "Job interrupted by user {E344}";
 1751     req->status = send_retry;
 1752     }
 1753     if (job.killtime == 0 && !job.suspendPending && req->status == send_retry) {
 1754     /*
 1755      * The job timed out during the send attempt.  We
 1756      * couldn't do anything then, but now the job can
 1757      * be cleaned up.  Not sure if the user should be
 1758      * notified of the requeue as well as the timeout?
 1759      */
 1760     req->notice = "Kill time expired {E325}";
 1761     updateRequest(*req, job);
 1762     job.state = FaxRequest::state_failed;
 1763     deleteRequest(job, req, Job::timedout, true);
 1764     setDead(job);
 1765     } else if (req->status == send_retry) {
 1766     /*
 1767      * If a continuation cover page is required for
 1768      * the retransmission, fixup the job state so
 1769      * that it'll get one when it's next processed.
 1770      */
 1771     if (req->cover != "") {
 1772         /*
 1773          * Job was previously setup to get a continuation
 1774          * cover page.  If the generated cover page was not
 1775          * sent, then delete it so that it'll get recreated.
 1776          */
 1777         if (req->items[0].item == req->cover) {
 1778         Sys::unlink(req->cover);
 1779         req->items.remove(0);
 1780         }
 1781     } else if (req->useccover &&
 1782       req->npages > 0 && contCoverPageTemplate != "") {
 1783         /*
 1784          * Setup to generate a cover page when the job is
 1785          * retried.  Note that we assume the continuation
 1786          * cover page will be PostScript (though the
 1787          * type is not used anywhere just now).
 1788          */
 1789         req->cover = docDir | "/cover" | req->jobid | ".ps";
 1790     }
 1791     if (req->tts < now) {
 1792         /*
 1793          * Send failed and send app didn't specify a new
 1794          * tts, bump the ``time to send'' by the requeue
 1795          * interval, then rewrite the queue file.  This causes
 1796          * the job to be rescheduled for transmission
 1797          * at a future time.
 1798          */
 1799         req->tts = now + (req->retrytime != 0
 1800         ? req->retrytime
 1801         : (requeueInterval>>1) + (random()%requeueInterval));
 1802         job.tts = req->tts;
 1803     }
 1804     /*
 1805      * Bump the job priority if is not bulk-style in which case
 1806      * we dip the job job priority.  This is intended to prevent
 1807      * non-bulk faxes from becoming buried by new jobs which
 1808      * could prevent a timely retry.  However, it is also intended
 1809      * to allow all bulk faxes to be attempted before retrying
 1810      * any that could not complete on the first attempt.  This 
 1811      * aids in timely delivery of bulk faxes as a group rather than
 1812      * preoccupation with individual jobs as is the case with 
 1813      * non-bulk style jobs.
 1814      */
 1815     if (job.pri != 255 && job.pri > 190) job.pri++;
 1816     else job.pri--; 
 1817     job.state = (req->tts > now) ?
 1818         FaxRequest::state_sleeping : FaxRequest::state_ready;
 1819     updateRequest(*req, job);       // update on-disk status
 1820     if (!job.suspendPending) {
 1821         if (job.isOnList()) job.remove();   // remove from active list
 1822         if (req->tts > now) {
 1823         traceQueue(job, "SEND INCOMPLETE: requeue for %s; %s",
 1824             (const char*)strTime(req->tts - now), (const char*)req->notice);
 1825         setSleep(job, req->tts);
 1826         Trigger::post(Trigger::SEND_REQUEUE, job);
 1827         if (job.getJCI().getNotify() != -1) {
 1828             if (job.getJCI().isNotify(FaxRequest::when_requeued))
 1829             notifySender(job, Job::requeued);
 1830         } else
 1831             if (req->isNotify(FaxRequest::when_requeued))
 1832             notifySender(job, Job::requeued);
 1833         } else {
 1834         traceQueue(job, "SEND INCOMPLETE: retry immediately; %s",
 1835             (const char*)req->notice);
 1836         setReadyToRun(job, false);  // NB: job.tts will be <= now
 1837         }
 1838     } else                  // signal waiting co-thread
 1839         job.suspendPending = false;
 1840     delete req;             // implicit unlock of q file
 1841     } else {
 1842     // NB: always notify client if job failed
 1843     if (req->status == send_failed) {
 1844         job.state = FaxRequest::state_failed;
 1845         deleteRequest(job, req, Job::failed, true, fmtTime(duration));
 1846     } else {
 1847         job.state = FaxRequest::state_done;
 1848         deleteRequest(job, req, Job::done, false, fmtTime(duration));
 1849     }
 1850     traceQueue(job, "SEND DONE: %s", (const char*)strTime(duration));
 1851     Trigger::post(Trigger::SEND_DONE, job);
 1852     setDead(job);
 1853     }
 1854 }
 1855 
 1856 /*
 1857  * Job Queue Management Routines.
 1858  */
 1859 
 1860 /*
 1861  * Begin the process to insert a job in the queue
 1862  * of ready-to-run jobs.  We run JobControl, and when it's done
 1863  * the job is placed on the ready-to-run queue.
 1864  */
 1865 void
 1866 faxQueueApp::setReadyToRun(Job& job, bool wait)
 1867 {
 1868     if (jobCtrlCmd.length()) {
 1869     const char *app[3];
 1870     app[0] = jobCtrlCmd;
 1871     app[1] = job.jobid;
 1872     app[2] = NULL;
 1873     traceJob(job, "CONTROL");
 1874     int pfd[2], fd;
 1875     if (pipe(pfd) >= 0) {
 1876         pid_t pid = fork();
 1877         switch (pid) {
 1878         case -1:            // error - continue with no JCI
 1879         jobError(job, "JOB CONTROL: fork: %m");
 1880         Sys::close(pfd[1]);
 1881         // When fork fails we need to run ctrlJobDone, since there
 1882         // will be no child signal to start it.
 1883         ctrlJobDone(job, -1);
 1884         break;
 1885         case 0:             // child, exec command
 1886         if (pfd[1] != STDOUT_FILENO)
 1887             dup2(pfd[1], STDOUT_FILENO);
 1888         /*
 1889          * We're redirecting application stdout back through the pipe
 1890          * to the faxq parent, but application stderr is not meaningful
 1891          * to faxq, and we don't want faxq stdin to be available to the 
 1892          * application, either.  However, some libc versions require 
 1893          * stdin to not be close, and some applications depend on stdin
 1894          * and stderr to be valid.  So we close all except for stdout, 
 1895          * and then redirect stdin and stderr to devnull.
 1896          */
 1897         closeAllBut(STDOUT_FILENO);
 1898         fd = Sys::open(_PATH_DEVNULL, O_RDWR);
 1899         if (fd != STDIN_FILENO)
 1900         {
 1901             dup2(fd, STDIN_FILENO);
 1902             Sys::close(fd);
 1903         }
 1904         fd = Sys::open(_PATH_DEVNULL, O_RDWR);
 1905         if (fd != STDERR_FILENO)
 1906         {
 1907             dup2(fd, STDERR_FILENO);
 1908             Sys::close(fd);
 1909         }
 1910         traceQueue(job, "JOB CONTROL: %s %s", app[0], app[1]);
 1911         Sys::execv(app[0], (char * const*)app);
 1912         sleep(1);           // XXX give parent time to catch signal
 1913         traceQueue(job, "JOB CONTROL: failed to exec: %m");
 1914         _exit(255);
 1915         /*NOTREACHED*/
 1916         default:            // parent, read from pipe and wait
 1917         {
 1918             Sys::close(pfd[1]);
 1919             int estat = -1;
 1920             char data[1024];
 1921             int n;
 1922             fxStr buf;
 1923             while ((n = Sys::read(pfd[0], data, sizeof(data))) > 0) {
 1924             buf.append(data, n);
 1925             }
 1926             Sys::close(pfd[0]);
 1927             job.jci = new JobControlInfo(buf);
 1928                     (void) Sys::waitpid(pid, estat);
 1929 
 1930             /*
 1931              * JobControl modification of job priority must be
 1932              * handled before ctrlJobDone, as that's where the
 1933              * job is placed into runq based on the priority.
 1934              */
 1935             if (job.getJCI().getPriority() != -1) {
 1936             job.pri = job.getJCI().getPriority();
 1937             }
 1938 
 1939             ctrlJobDone(job, estat);
 1940         }
 1941         break;
 1942         }
 1943     } else {
 1944         // If our pipe fails, we can't run the child, but we still
 1945         // Need ctrlJobDone to be called to proceed this job
 1946         ctrlJobDone(job, -1);
 1947     }
 1948     } else {
 1949         ctrlJobDone(job, 0);
 1950     }
 1951 }
 1952 
 1953 /*
 1954  * Insert the job into the runq.  We have finished
 1955  * all the JobControl execution
 1956  */
 1957 void
 1958 faxQueueApp::ctrlJobDone(Job& job, int status)
 1959 {
 1960     if (status) {
 1961     logError("JOB %s: bad exit status %#x from sub-fork",
 1962         (const char*) job.jobid, status);
 1963     }
 1964     blockSignals();
 1965     JobIter iter(runqs[JOBHASH(job.pri)]);
 1966     for (; iter.notDone() && (iter.job().pri < job.pri || 
 1967       (iter.job().pri == job.pri && iter.job().tts <= job.tts)); iter++)
 1968     ;
 1969     job.state = FaxRequest::state_ready;
 1970     job.insert(iter.job());
 1971     job.pid = 0;
 1972     releaseSignals();
 1973     traceJob(job, "READY");
 1974     Trigger::post(Trigger::JOB_READY, job);
 1975 }
 1976 
 1977 /*
 1978  * Place a job on the queue of jobs waiting to run
 1979  * and start the associated timer.
 1980  */
 1981 void
 1982 faxQueueApp::setSleep(Job& job, time_t tts)
 1983 {
 1984     blockSignals();
 1985     JobIter iter(sleepq);
 1986     for (; iter.notDone() && iter.job().tts <= tts; iter++)
 1987     ;
 1988     job.insert(iter.job());
 1989     job.startTTSTimer(tts);
 1990     releaseSignals();
 1991     traceJob(job, "SLEEP FOR %s", (const char*)strTime(tts - Sys::now()));
 1992     Trigger::post(Trigger::JOB_SLEEP, job);
 1993 }
 1994 
 1995 /*
 1996  * Process a job that's finished.  The corpse gets placed
 1997  * on the deadq and is reaped the next time the scheduler
 1998  * runs.  If any jobs are blocked waiting for this job to
 1999  * complete, one is made ready to run.
 2000  */
 2001 void
 2002 faxQueueApp::setDead(Job& job)
 2003 {
 2004     job.stopTTSTimer();
 2005     if (job.state != FaxRequest::state_done 
 2006       && job.state != FaxRequest::state_failed)
 2007     job.state = FaxRequest::state_failed;
 2008     job.suspendPending = false;
 2009     traceJob(job, "DEAD");
 2010     Trigger::post(Trigger::JOB_DEAD, job);
 2011     removeDestInfoJob(job);
 2012     if (job.isOnList())         // lazy remove from active list
 2013     job.remove();
 2014     job.insert(*deadq.next);        // setup job corpus for reaping
 2015     if (job.modem)          // called from many places
 2016     releaseModem(job);
 2017     pokeScheduler();
 2018 }
 2019 
 2020 /*
 2021  * Place a job on the list of jobs actively being processed.
 2022  */
 2023 void
 2024 faxQueueApp::setActive(Job& job)
 2025 {
 2026     job.state = FaxRequest::state_active;
 2027     traceJob(job, "ACTIVE");
 2028     Trigger::post(Trigger::JOB_ACTIVE, job);
 2029     job.insert(*activeq.next);
 2030 }
 2031 
 2032 /*
 2033  * Place a job on the list of jobs not being scheduled.
 2034  */
 2035 void
 2036 faxQueueApp::setSuspend(Job& job)
 2037 {
 2038     job.state = FaxRequest::state_suspended;
 2039     traceJob(job, "SUSPEND");
 2040     Trigger::post(Trigger::JOB_SUSPEND, job);
 2041     job.insert(*suspendq.next);
 2042 }
 2043 
 2044 /*
 2045  * Create a new job entry and place them on the
 2046  * appropriate queue.  A kill timer is also setup
 2047  * for the job.
 2048  */
 2049 bool
 2050 faxQueueApp::submitJob(FaxRequest& req, bool checkState)
 2051 {
 2052     Job* job = new Job(req);
 2053     traceJob(*job, "CREATE");
 2054     Trigger::post(Trigger::JOB_CREATE, *job);
 2055     return (submitJob(*job, req, checkState));
 2056 }
 2057 
 2058 bool
 2059 faxQueueApp::submitJob(Job& job, FaxRequest& req, bool checkState)
 2060 {
 2061     /*
 2062      * Check various submission parameters.  We setup the
 2063      * canonical version of the destination phone number
 2064      * first so that any rejections that cause the notification
 2065      * script to be run will return a proper value for the
 2066      * destination phone number.
 2067      */
 2068     job.dest = canonicalizePhoneNumber(req.number);
 2069     if (job.dest == "") {
 2070     if (req.external == "")         // NB: for notification logic
 2071         req.external = req.number;
 2072     rejectSubmission(job, req,
 2073         "REJECT: Unable to convert dial string to canonical format {E327}");
 2074     return (false);
 2075     }
 2076     req.canonical = job.dest;
 2077     time_t now = Sys::now();
 2078     if (req.killtime <= now) {
 2079     timeoutJob(job, req);
 2080     return (false);
 2081     }
 2082     if (!Modem::modemExists(req.modem, true) && !ModemGroup::find(req.modem)) {
 2083     rejectSubmission(job, req,
 2084         "REJECT: Requested modem " | req.modem | " is not registered {E328}");
 2085     return (false);
 2086     }
 2087     if (req.items.length() == 0) {
 2088     rejectSubmission(job, req, "REJECT: No work found in job file {E329}");
 2089     return (false);
 2090     }
 2091     if (req.pagewidth > 303) {
 2092     rejectSubmission(job, req,
 2093         fxStr::format("REJECT: Page width (%u) appears invalid {E330}",
 2094         req.pagewidth));
 2095     return (false);
 2096     }
 2097     /*
 2098      * Verify the killtime is ``reasonable''; otherwise
 2099      * select (through the Dispatcher) may be given a
 2100      * crazy time value, potentially causing problems.
 2101      */
 2102     if (req.killtime-now > 365*24*60*60) {  // XXX should be based on tts
 2103     rejectSubmission(job, req,
 2104         fxStr::format("REJECT: Job expiration time (%u) appears invalid {E331}",
 2105         req.killtime));
 2106     return (false);
 2107     }
 2108     if (checkState) {
 2109     /*
 2110      * Check the state from queue file and if
 2111      * it indicates the job was not being
 2112      * scheduled before then don't schedule it
 2113      * now.  This is used when the scheduler
 2114      * is restarted and reading the queue for
 2115      * the first time.
 2116      *
 2117      * NB: We reschedule blocked jobs in case
 2118      *     the job that was previously blocking
 2119      *     it was removed somehow.
 2120      */
 2121     switch (req.state) {
 2122     case FaxRequest::state_suspended:
 2123         setSuspend(job);
 2124         return (true);
 2125     case FaxRequest::state_done:
 2126     case FaxRequest::state_failed:
 2127         setDead(job);
 2128         return (true);
 2129     }
 2130     }
 2131     if (req.useccover && (req.serverdocover || req.skippedpages || req.skippages) && contCoverPageTemplate != "") {
 2132     /*
 2133      * The user submitted a job with "skipped" pages.  This equates
 2134      * to a user-initiated resubmission.  Add a continuation coverpage
 2135      * if appropriate.
 2136      */
 2137     req.cover = docDir | "/cover" | req.jobid | ".ps";
 2138     }
 2139     /*
 2140      * Put the job on the appropriate queue
 2141      * and start the job kill timer.
 2142      */
 2143     if (req.tts > now) {            // scheduled for future
 2144     /*
 2145      * Check time-to-send as for killtime above.
 2146      */
 2147     if (req.tts - now > 365*24*60*60) {
 2148         rejectSubmission(job, req,
 2149         fxStr::format("REJECT: Time-to-send (%u) appears invalid {E332}",
 2150             req.tts));
 2151         return (false);
 2152     }
 2153     job.startKillTimer(req.killtime);
 2154     job.state = FaxRequest::state_pending;
 2155     setSleep(job, job.tts);
 2156     } else {                    // ready to go now
 2157     job.startKillTimer(req.killtime);
 2158     setReadyToRun(job, true);
 2159     pokeScheduler();
 2160     }
 2161     updateRequest(req, job);
 2162     return (true);
 2163 }
 2164 
 2165 /*
 2166  * Reject a job submission.
 2167  */
 2168 void
 2169 faxQueueApp::rejectSubmission(Job& job, FaxRequest& req, const fxStr& reason)
 2170 {
 2171     Trigger::post(Trigger::JOB_REJECT, job);
 2172     req.status = send_failed;
 2173     req.notice = reason;
 2174     traceServer("JOB %s: ", (const char*)job.jobid, (const char*)reason);
 2175     deleteRequest(job, req, Job::rejected, true);
 2176     setDead(job);               // dispose of job
 2177 }
 2178 
 2179 /*
 2180  * Suspend a job by removing it from whatever
 2181  * queue it's currently on and/or stopping any
 2182  * timers.  If the job has an active subprocess
 2183  * then the process is optionally sent a signal
 2184  * and we wait for the process to stop before
 2185  * returning to the caller.
 2186  */
 2187 bool
 2188 faxQueueApp::suspendJob(Job& job, bool abortActive)
 2189 {
 2190     if (job.suspendPending)         // already being suspended
 2191     return (false);
 2192     switch (job.state) {
 2193     case FaxRequest::state_active:
 2194     /*
 2195      * Job is being handled by a subprocess; optionally
 2196      * signal the process and wait for it to terminate
 2197      * before returning.  We disable the kill timer so
 2198      * that if it goes off while we wait for the process
 2199      * to terminate the process completion work will not
 2200      * mistakenly terminate the job (see sendJobDone).
 2201      */
 2202     job.suspendPending = true;      // mark thread waiting
 2203     if (abortActive)
 2204         (void) kill(job.pid, SIGTERM);  // signal subprocess
 2205     job.stopKillTimer();
 2206     while (job.suspendPending)      // wait for subprocess to exit
 2207         Dispatcher::instance().dispatch();
 2208     /*
 2209      * Recheck the job state; it may have changed while
 2210      * we were waiting for the subprocess to terminate.
 2211      */
 2212     if (job.state != FaxRequest::state_done &&
 2213       job.state != FaxRequest::state_failed)
 2214         break;
 2215     /* fall thru... */
 2216     case FaxRequest::state_done:
 2217     case FaxRequest::state_failed:
 2218     return (false);
 2219     case FaxRequest::state_sleeping:
 2220     case FaxRequest::state_pending:
 2221     job.stopTTSTimer();         // cancel timeout
 2222     /* fall thru... */
 2223     case FaxRequest::state_suspended:
 2224     case FaxRequest::state_ready:
 2225     break;
 2226     case FaxRequest::state_blocked:
 2227     /*
 2228      * Decrement the count of job blocked to
 2229      * to the same destination.
 2230      */
 2231     destJobs[job.dest].unblock(job);
 2232     break;
 2233     }
 2234 
 2235     /*
 2236      * We must remove any DestInfo stuff this is recorded in
 2237      * When the job is resubmitted (or killed), we don't know
 2238      * when (could be hours/never), or even if the dest number
 2239      * will be the same
 2240      */
 2241     removeDestInfoJob(job);
 2242     if (job.isOnList()) job.remove();       // remove from old queue
 2243     job.stopKillTimer();            // clear kill timer
 2244     return (true);
 2245 }
 2246 
 2247 /*
 2248  * Suspend a job and place it on the suspend queue.
 2249  * If the job is currently active then we wait for
 2250  * it to reach a state where it can be safely suspended.
 2251  * This control is used by clients that want to modify
 2252  * the state of a job (i.e. suspend, modify, submit).
 2253  */
 2254 bool
 2255 faxQueueApp::suspendJob(const fxStr& jobid, bool abortActive)
 2256 {
 2257     Job* job = Job::getJobByID(jobid);
 2258     if (job && suspendJob(*job, abortActive)) {
 2259     setSuspend(*job);
 2260     FaxRequest* req = readRequest(*job);
 2261     if (req) {
 2262         updateRequest(*req, *job);
 2263         delete req;
 2264     }
 2265     return (true);
 2266     } else
 2267     return (false);
 2268 }
 2269 
 2270 /*
 2271  * Terminate a job in response to a command message.
 2272  * If the job is currently running the subprocess is
 2273  * sent a signal to request that it abort whatever
 2274  * it's doing and we wait for the process to terminate.
 2275  * Otherwise, the job is immediately removed from
 2276  * the appropriate queue and any associated resources
 2277  * are purged.
 2278  */
 2279 bool
 2280 faxQueueApp::terminateJob(const fxStr& jobid, JobStatus why)
 2281 {
 2282     Job* job = Job::getJobByID(jobid);
 2283     if (job && suspendJob(*job, true)) {
 2284     job->state = FaxRequest::state_failed;
 2285     Trigger::post(Trigger::JOB_KILL, *job);
 2286     FaxRequest* req = readRequest(*job);
 2287     if (req) {
 2288         req->notice = "Job aborted by request {E345}";
 2289         deleteRequest(*job, req, why, why != Job::removed);
 2290     }
 2291     setDead(*job);
 2292     return (true);
 2293     } else
 2294     return (false);
 2295 }
 2296 
 2297 /*
 2298  * Reject a job at some time before it's handed off to the server thread.
 2299  */
 2300 void
 2301 faxQueueApp::rejectJob(Job& job, FaxRequest& req, const fxStr& reason)
 2302 {
 2303     req.status = send_failed;
 2304     req.notice = reason;
 2305     traceServer("JOB %s: %s",
 2306         (const char*)job.jobid, (const char*)reason);
 2307     job.state = FaxRequest::state_failed;
 2308     Trigger::post(Trigger::JOB_REJECT, job);
 2309     setDead(job);               // dispose of job
 2310 }
 2311 
 2312 /*
 2313  * Deal with a job that's blocked by a concurrent call.
 2314  */
 2315 void
 2316 faxQueueApp::blockJob(Job& job, FaxRequest& req, const char* mesg)
 2317 {
 2318     int old_state = job.state;
 2319     job.state = FaxRequest::state_blocked;
 2320     req.notice = mesg;
 2321     updateRequest(req, job);
 2322     traceQueue(job, "%s", mesg);
 2323     if (old_state != FaxRequest::state_blocked) {
 2324     if (job.getJCI().getNotify() != -1) {
 2325         if (job.getJCI().isNotify(FaxRequest::when_requeued))
 2326         notifySender(job, Job::blocked);
 2327     } else
 2328         if (req.isNotify(FaxRequest::when_requeued))
 2329         notifySender(job, Job::blocked);
 2330     }
 2331     Trigger::post(Trigger::JOB_BLOCKED, job);
 2332 }
 2333 
 2334 /*
 2335  * Requeue a job that's delayed for some reason.
 2336  */
 2337 void
 2338 faxQueueApp::delayJob(Job& job, FaxRequest& req, const char* mesg, time_t tts)
 2339 {
 2340     job.state = FaxRequest::state_sleeping;
 2341     fxStr reason(mesg);
 2342     job.tts = tts;
 2343     req.tts = tts;
 2344     time_t delay = tts - Sys::now();
 2345     req.notice = reason;
 2346     updateRequest(req, job);
 2347     traceQueue(job, "%s: requeue for %s",
 2348         (const char*)mesg, (const char*)strTime(delay));
 2349     setSleep(job, tts);
 2350     Trigger::post(Trigger::JOB_DELAYED, job);
 2351     if (job.getJCI().getNotify() != -1) {
 2352     if (job.getJCI().isNotify(FaxRequest::when_requeued))
 2353         notifySender(job, Job::requeued); 
 2354     } else
 2355     if (req.isNotify(FaxRequest::when_requeued))
 2356         notifySender(job, Job::requeued); 
 2357     if (job.modem != NULL)
 2358     releaseModem(job);
 2359 }
 2360 
 2361 void
 2362 faxQueueApp::queueAccounting(Job& job, FaxRequest& req, const char* type)
 2363 {
 2364     FaxAcctInfo ai;
 2365     ai.jobid = (const char*) req.jobid;
 2366     ai.jobtag = (const char*) req.jobtag;
 2367     ai.user = (const char*) req.mailaddr;
 2368     ai.start = Sys::now();
 2369     ai.duration = 0;
 2370     ai.conntime = 0;
 2371     if (strstr(type, "PROXY")) {
 2372     ai.duration = req.duration;
 2373     ai.conntime = req.conntime;
 2374     ai.commid = (const char*) req.commid;
 2375     ai.device = (const char*) req.modemused;
 2376     } else {
 2377     ai.duration = 0;
 2378     ai.conntime = 0;
 2379     ai.commid = "";
 2380     ai.device = "";
 2381     }
 2382     ai.dest = (const char*) req.external;
 2383     ai.csi = "";
 2384     ai.npages = req.npages;
 2385     ai.params = 0;
 2386     if (req.status == send_done)
 2387     ai.status = "";
 2388     else {
 2389     ai.status = req.notice;
 2390     }
 2391     if (strstr(type, "SUBMIT"))
 2392     ai.status = "Submitted";
 2393     CallID empty_callid;
 2394     ai.callid = empty_callid;
 2395     ai.owner = (const char*) req.owner;
 2396     ai.faxdcs = "";
 2397     ai.jobinfo = fxStr::format("%u/%u/%u/%u/%u/%u/%u", 
 2398     req.totpages, req.ntries, req.ndials, req.totdials, req.maxdials, req.tottries, req.maxtries);
 2399     pid_t pid = fork();
 2400     switch (pid) {
 2401     case -1:            // error
 2402         if (!ai.record(type))
 2403         logError("Error writing %s accounting record, dest=%s",
 2404             type, (const char*) ai.dest);
 2405         break;
 2406     case 0:             // child
 2407         if (!ai.record(type))
 2408         logError("Error writing %s accounting record, dest=%s",
 2409             type, (const char*) ai.dest);
 2410         _exit(255);
 2411         /*NOTREACHED*/
 2412     default:            // parent
 2413         Dispatcher::instance().startChild(pid, this);
 2414         break;
 2415     }
 2416 }
 2417 
 2418 /*
 2419  * Process the job whose kill time expires.  The job is
 2420  * terminated unless it is currently being tried, in which
 2421  * case it's marked for termination after the attempt is
 2422  * completed.
 2423  */
 2424 void
 2425 faxQueueApp::timeoutJob(Job& job)
 2426 {
 2427     traceQueue(job, "KILL TIME EXPIRED");
 2428     Trigger::post(Trigger::JOB_TIMEDOUT, job);
 2429     if (job.state != FaxRequest::state_active) {
 2430     if (job.isOnList())
 2431         job.remove();           // i.e. remove from sleep queue
 2432     job.state = FaxRequest::state_failed;
 2433     FaxRequest* req = readRequest(job);
 2434     if (req) {
 2435         req->notice = "Kill time expired {E325}";
 2436         deleteRequest(job, req, Job::timedout, true);
 2437     }
 2438     setDead(job);
 2439     } else
 2440     job.killtime = 0;           // mark job to be removed
 2441 }
 2442 
 2443 /*
 2444  * Like above, but called for a job that times
 2445  * out at the point at which it is submitted (e.g.
 2446  * after the server is restarted).  The work here
 2447  * is subtley different; the q file must not be
 2448  * re-read because it may result in recursive flock
 2449  * calls which on some systems may cause deadlock
 2450  * (systems that emulate flock with lockf do not
 2451  * properly emulate flock).
 2452  */
 2453 void
 2454 faxQueueApp::timeoutJob(Job& job, FaxRequest& req)
 2455 {
 2456     job.state = FaxRequest::state_failed;
 2457     traceQueue(job, "KILL TIME EXPIRED");
 2458     Trigger::post(Trigger::JOB_TIMEDOUT, job);
 2459     req.notice = "Kill time expired {E325}";
 2460     deleteRequest(job, req, Job::timedout, true);
 2461     setDead(job);
 2462 }
 2463 
 2464 /*
 2465  * Resubmit an existing job or create a new job
 2466  * using the specified job description file.
 2467  */
 2468 bool
 2469 faxQueueApp::submitJob(const fxStr& jobid, bool checkState, bool nascent)
 2470 {
 2471     Job* job = Job::getJobByID(jobid);
 2472     if (job) {
 2473     bool ok = false;
 2474     if (job->state == FaxRequest::state_suspended) {
 2475         job->remove();          // remove from suspend queue
 2476         FaxRequest* req = readRequest(*job);// XXX need better mechanism
 2477         if (req) {
 2478         job->update(*req);      // update job state from file
 2479         ok = submitJob(*job, *req); // resubmit to scheduler
 2480         delete req;         // NB: unlock qfile
 2481         } else
 2482         setDead(*job);          // XXX???
 2483     } else if (job->state == FaxRequest::state_done ||
 2484       job->state == FaxRequest::state_failed)
 2485         jobError(*job, "Cannot resubmit a completed job");
 2486     else
 2487         ok = true;              // other, nothing to do
 2488     return (ok);
 2489     }
 2490     /*
 2491      * Create a job from a queue file and add it
 2492      * to the scheduling queues.
 2493      */
 2494     fxStr filename(FAX_SENDDIR "/" FAX_QFILEPREF | jobid);
 2495     if (!Sys::isRegularFile(filename)) {
 2496     logError("JOB %s: qfile %s is not a regular file: %s",
 2497         (const char*) jobid, (const char*) filename, strerror(errno));
 2498     return (false);
 2499     }
 2500     bool status = false;
 2501     int fd = Sys::open(filename, O_RDWR);
 2502     if (fd >= 0) {
 2503     if (flock(fd, LOCK_SH) >= 0) {
 2504         FaxRequest req(filename, fd);
 2505         /*
 2506          * There are four possibilities:
 2507          *
 2508          * 1. The queue file was read properly and the job
 2509          *    can be submitted.
 2510          * 2. There were problems reading the file, but
 2511          *    enough information was obtained to purge the
 2512          *    job from the queue.
 2513          * 3. The job was previously submitted and completed
 2514          *    (either with success or failure).
 2515          * 4. Insufficient information was obtained to purge
 2516          *    the job; just skip it.
 2517          */
 2518         bool reject;
 2519         if (req.readQFile(reject) && !reject &&
 2520           req.state != FaxRequest::state_done &&
 2521           req.state != FaxRequest::state_failed) {
 2522         status = submitJob(req, checkState);
 2523         if (nascent) queueAccounting(*job, req, "SUBMIT");
 2524         } else if (reject) {
 2525         Job job(req);
 2526         job.state = FaxRequest::state_failed;
 2527         req.status = send_failed;
 2528         req.notice = "Invalid or corrupted job description file {E326}";
 2529         traceServer("JOB %s : %s", (const char*)jobid, (const char*) req.notice);
 2530         // NB: this may not work, but we try...
 2531         deleteRequest(job, req, Job::rejected, true);
 2532         } else if (req.state == FaxRequest::state_done ||
 2533           req.state == FaxRequest::state_failed) {
 2534         logError("JOB %s: Cannot resubmit a completed job",
 2535             (const char*) jobid);
 2536         } else
 2537         traceServer("%s: Unable to purge job, ignoring it",
 2538             (const char*)filename);
 2539     } else
 2540         logError("JOB %s: Could not lock job file; %m.",
 2541         (const char*) jobid);
 2542     Sys::close(fd);
 2543     } else
 2544     logError("JOB %s: Could not open job file; %m.", (const char*) jobid);
 2545     return (status);
 2546 }
 2547 
 2548 /*
 2549  * Process the expiration of a job's time-to-send timer.
 2550  * The job is moved to the ready-to-run queues and the
 2551  * scheduler is poked.
 2552  */
 2553 void
 2554 faxQueueApp::runJob(Job& job)
 2555 {
 2556     if (job.state != FaxRequest::state_failed) {    // don't run a dead job corpus
 2557     if (job.isOnList()) job.remove();
 2558     setReadyToRun(job, true);
 2559     FaxRequest* req = readRequest(job);
 2560     if (req) {
 2561         updateRequest(*req, job);
 2562         delete req;
 2563     }
 2564     }
 2565     /*
 2566      * In order to deliberately batch jobs by using a common
 2567      * time-to-send we need to give time for the other jobs'
 2568      * timers to expire and to enter the run queue before
 2569      * running the scheduler.  Thus the scheduler is poked
 2570      * with a delay.
 2571      */
 2572     pokeScheduler(1);
 2573 }
 2574 
 2575 /*
 2576  * Process the DestInfo job-block list
 2577  * for this job.  If the job is active and blocking other
 2578  * jobs, we need to unblock...
 2579  */
 2580 #define isOKToCall(di, dci, n) \
 2581     (di.getCalls()+n <= dci.getMaxConcurrentCalls())
 2582 
 2583 void
 2584 faxQueueApp::unblockDestJobs(DestInfo& di, u_int force)
 2585 {
 2586     /*
 2587      * Check if there are blocked jobs waiting to run
 2588      * and that there is now room to run one.  If so,
 2589      * take jobs off the blocked queue and make them
 2590      * ready for processing.
 2591      */
 2592     Job* jb;
 2593     u_int n = 1, b = 1;
 2594     while ((jb = di.nextBlocked())) {
 2595     if (force || jb->getJCI().getMaxConcurrentCalls() == 0 || isOKToCall(di, jb->getJCI(), n)) {
 2596         FaxRequest* req = readRequest(*jb);
 2597         if (!req) {
 2598         setDead(*jb);
 2599         continue;
 2600         }
 2601         setReadyToRun(*jb, false);
 2602         if (!di.supportsBatching()) n++;
 2603         else if (++b > maxBatchJobs) {
 2604         n++;
 2605         b -= maxBatchJobs;
 2606         }
 2607         req->notice = "";
 2608         updateRequest(*req, *jb);
 2609         delete req;
 2610         if (force) force--;
 2611         /*
 2612          * We check isOKToCall again here now to avoid di.nextBlocked
 2613          * which would pull jb from the blocked list and then possibly
 2614          * require us to re-block it.
 2615          */
 2616         if (di.getBlocked() && !force && jb->getJCI().getMaxConcurrentCalls() != 0 && !isOKToCall(di, jb->getJCI(), n)) {
 2617         traceQueue("Continue BLOCK on %d job(s) to %s, current calls: %d, max concurrent calls: %d", 
 2618             di.getBlocked(), (const char*) jb->dest, di.getCalls()+n-1, jb->getJCI().getMaxConcurrentCalls());
 2619         break;
 2620         }
 2621     } else {
 2622         /*
 2623          * unblockDestJobs was called, but a new
 2624          * call cannot be placed.  This would be
 2625          * unusual, but because di.nextBlocked
 2626          * removed jb from the di list, we need
 2627          * to put it back.
 2628          */
 2629         di.block(*jb);
 2630         traceQueue("Continue BLOCK on %d job(s) to %s, current calls: %d, max concurrent calls: %d", 
 2631         di.getBlocked(), (const char*) jb->dest, di.getCalls()+n-1, jb->getJCI().getMaxConcurrentCalls());
 2632         break;
 2633     }
 2634     }
 2635 }
 2636 
 2637 void
 2638 faxQueueApp::removeDestInfoJob(Job& job)
 2639 {
 2640     DestInfo& di = destJobs[job.dest];
 2641     di.done(job);           // remove from active destination list
 2642     di.updateConfig();          // update file if something changed
 2643     if (di.isEmpty()) {
 2644     /*
 2645      * This is the last job to the destination; purge
 2646      * the entry from the destination jobs database.
 2647      */
 2648     destJobs.remove(job.dest);
 2649     }
 2650 }
 2651 
 2652 /*
 2653  * Compare two job requests to each other and to a selected
 2654  * job to see if they can be batched together.
 2655  */
 2656 bool
 2657 faxQueueApp::areBatchable(FaxRequest& reqa, FaxRequest& reqb, Job& job, Job& cjob)
 2658 {
 2659     // make sure the job's modem is in the requested ModemGroup 
 2660     if (!job.modem->isInGroup(reqb.modem)) return(false);
 2661     // make sure cjob's TimeOfDay is for now
 2662     time_t now = Sys::now();
 2663     if ((cjob.getJCI().nextTimeToSend(now) != now) || (cjob.tod.nextTimeOfDay(now) != now)) return (false);
 2664     return(true);
 2665 }
 2666 
 2667 class MySendFaxClient : public SendFaxClient {
 2668 public:
 2669     MySendFaxClient();
 2670     ~MySendFaxClient();
 2671 };
 2672 MySendFaxClient::MySendFaxClient() {}
 2673 MySendFaxClient::~MySendFaxClient() {}
 2674 
 2675 static bool
 2676 writeFile(void* ptr, const char* buf, int cc, fxStr&)
 2677 {
 2678     int* fd = (int*) ptr;
 2679     (void) Sys::write(*fd, buf, cc);
 2680     return (true);
 2681 }
 2682 
 2683 #define COMPLETE 2
 2684 
 2685 /*
 2686  * Send a fax job via a proxy HylaFAX server.  We do this by utilizing SendFaxJob jobWait.
 2687  *
 2688  * This does not currently work for pager requests or fax polls.
 2689  *
 2690  * Also not currently supported is job suspension or termination of the proxy job on the 
 2691  * proxy host.  Therefore, actions such as faxrm or faxalter will not currently propagate
 2692  * to the proxy.
 2693  */
 2694 void
 2695 faxQueueApp::sendViaProxy(Job& job, FaxRequest& req)
 2696 {
 2697     pid_t pid = fork();
 2698     switch (pid) {
 2699     case -1:            // error
 2700         logError("Error forking for proxy-send of job %s.", (const char*) job.jobid);
 2701         break;
 2702     case 0:             // child
 2703         {
 2704         Trigger::post(Trigger::SEND_BEGIN, job);
 2705 
 2706         MySendFaxClient* client = new MySendFaxClient;
 2707         client->readConfig(FAX_SYSCONF);
 2708         SendFaxJob& rjob = client->addJob();
 2709         if (job.getJCI().getDesiredDF() != -1) req.desireddf = job.getJCI().getDesiredDF();
 2710         // Since we want to send "now", and because we can't ensure that the remote clock matches ours
 2711         // we deliberately do not do the following.  The remote will default to its "now".
 2712         // rjob.setSendTime("now");
 2713         rjob.setPriority(job.pri);
 2714         // We use a special killtime "!" specifier to indicate that we're providing the raw 
 2715         // LASTTIME in order to provide a killtime that is relative to the remote clock.
 2716         rjob.setKillTime((const char*) fxStr::format("!%02d%02d%02d", 
 2717             (req.killtime - Sys::now())/(24*60*60), 
 2718            ((req.killtime - Sys::now())/(60*60))%24, 
 2719            ((req.killtime - Sys::now())/60)%60 ));
 2720         rjob.setDesiredDF(req.desireddf);
 2721         rjob.setMinSpeed(req.minbr);
 2722         rjob.setDesiredSpeed(req.desiredbr);
 2723         if (req.faxname != "") rjob.setFaxName(req.faxname);
 2724         //rjob.setDesiredEC(req.desiredec);     // disabled for compatibility with HylaFAX 4.1.x servers
 2725         if (job.getJCI().getProxyTagLineFormat().length()) {
 2726             rjob.setTagLineFormat(job.getJCI().getProxyTagLineFormat());
 2727         } else {
 2728             if (req.desiredtl) rjob.setTagLineFormat(req.tagline);
 2729         }
 2730         if (req.timezone != "") rjob.setTimeZone(req.timezone);
 2731         rjob.setUseXVRes(req.usexvres);
 2732         client->setHost(job.getJCI().getProxy());
 2733         if (job.getJCI().getProxyJobTag().length()) {
 2734             rjob.setJobTag(job.getJCI().getProxyJobTag());
 2735         } else {
 2736             rjob.setJobTag(job.jobid);
 2737         }
 2738         rjob.setVResolution(req.resolution);
 2739         rjob.setDesiredMST(req.desiredst);
 2740         rjob.setAutoCoverPage(false);
 2741         if (job.getJCI().getProxyMailbox().length())
 2742             rjob.setMailbox(job.getJCI().getProxyMailbox());
 2743         if (job.getJCI().getProxyNotification().length())
 2744             rjob.setNotification((const char*) job.getJCI().getProxyNotification());
 2745         else
 2746             rjob.setNotification("none");
 2747         rjob.setSkippedPages(req.skippedpages);
 2748         rjob.setSkipPages(req.skippages+req.npages);    // requires that the proxy be capable of skipping entire documents
 2749         rjob.setNoCountCover(req.nocountcover);
 2750         rjob.setUseColor(req.usecolor);
 2751         if (job.getJCI().getPageSize().length()) {
 2752             rjob.setPageSize(job.getJCI().getPageSize());
 2753         } else {
 2754             PageSizeInfo* info = PageSizeInfo::getPageSizeBySize(req.pagewidth, req.pagelength);
 2755             if (info) rjob.setPageSize(info->abbrev());
 2756         }
 2757         if (job.getJCI().getProxyTSI().length()) {
 2758             rjob.setTSI(job.getJCI().getProxyTSI());
 2759         } else {
 2760             if (req.tsi != "") rjob.setTSI(req.tsi);
 2761         }
 2762         int maxTries = 0;
 2763         if (job.getJCI().getProxyTries() > 0)
 2764             maxTries = job.getJCI().getProxyTries();
 2765         else
 2766             maxTries = req.maxtries - req.tottries; // don't let the proxy repeat tries already made
 2767         rjob.setMaxRetries(maxTries);
 2768         int maxDials = 0;
 2769         if (job.getJCI().getProxyDials() > 0)
 2770             maxDials = job.getJCI().getProxyDials();
 2771         else
 2772             maxDials = req.maxdials - req.totdials; // don't let the proxy repeat dials already made
 2773         rjob.setMaxDials(maxDials);
 2774         if (req.faxnumber != "") rjob.setFaxNumber(req.faxnumber);
 2775         rjob.setDialString(req.number);
 2776         for (u_int i = 0; i < req.items.length(); i++) {
 2777             switch (req.items[i].op) {
 2778             case FaxRequest::send_tiff:
 2779             case FaxRequest::send_tiff_saved:
 2780             case FaxRequest::send_pdf:
 2781             case FaxRequest::send_pdf_saved:
 2782             case FaxRequest::send_postscript:
 2783             case FaxRequest::send_postscript_saved:
 2784             case FaxRequest::send_pcl:
 2785             case FaxRequest::send_pcl_saved:
 2786                 client->addFile(req.items[i].item);
 2787                 break;
 2788             }
 2789         }
 2790         u_short prevPages = req.npages;     // queueAccounting() needs the pages sent only by the proxy, but updateRequest() needs the full count
 2791         bool status = false;
 2792         fxStr emsg;
 2793         if (client->callServer(emsg)) {
 2794             status = client->login(job.getJCI().getProxyUser().length() ? job.getJCI().getProxyUser() : req.owner, 
 2795                     job.getJCI().getProxyPass().length() ? (const char*) job.getJCI().getProxyPass() : NULL, emsg) 
 2796                 && client->prepareForJobSubmissions(emsg)
 2797                 && client->submitJobs(emsg);
 2798             if (status) {
 2799             fxStr r;
 2800             fxStr rjobid = client->getCurrentJob();
 2801             traceQueue(job, "submitted to proxy host %s as job %s", 
 2802                 (const char*) job.getJCI().getProxy(), (const char*) rjobid);
 2803             req.modemused = rjobid | "@" | job.getJCI().getProxy();
 2804             req.notice = "delivered to proxy host " | job.getJCI().getProxy() | " as job " | rjobid;
 2805             updateRequest(req, job);
 2806             int waits = 0;
 2807             time_t waitstart = Sys::now();
 2808             while (!client->jobWait((const char*) rjobid) && waits++ < job.getJCI().getProxyReconnects()) {
 2809                 /*
 2810                  * Our wait failed, and excepting a fault on the proxy this
 2811                  * can only mean that the connection was lost.  Reconnect
 2812                  * and re-start our wait.
 2813                  */
 2814                 time_t sleeptime = waitstart + 60 - Sys::now();
 2815                 if (sleeptime < 1) sleeptime = 2;
 2816                 logError("PROXY SEND: (job %s) lost connection to %s, attempt %d.  Attempting reconnection in %d seconds.", (const char*) job.jobid, (const char*) job.getJCI().getProxy(), waits, sleeptime);
 2817                 client->hangupServer();
 2818                 sleep(sleeptime);
 2819                 waitstart = Sys::now();
 2820                 status = false;
 2821                 if (client->callServer(emsg)) {
 2822                 status = client->login(job.getJCI().getProxyUser().length() ? job.getJCI().getProxyUser() : req.owner, 
 2823                     job.getJCI().getProxyPass().length() ? (const char*) job.getJCI().getProxyPass() : NULL, emsg) &&
 2824                     client->setCurrentJob(rjobid);
 2825                 if (status) {
 2826                     logError("PROXY SEND: (job %s) reconnected to %s.  Resuming wait for job %s.", (const char*) job.jobid, (const char*) job.getJCI().getProxy(), (const char*) rjobid);
 2827                     emsg = "";
 2828                     waits = 0;
 2829                 }
 2830                 }
 2831                 if (emsg != "") logError("PROXY SEND: (job %s) server %s: %s", (const char*) job.jobid, (const char*) job.getJCI().getProxy(), (const char*) emsg);
 2832             }
 2833             /*
 2834              * There is a long wait here now... possibly VERY long.
 2835              * When jobWait is done, then we query the proxy
 2836              * for the various job data and update the request here.
 2837              */
 2838             req.skippages = 0;
 2839             if (status && client->command((const char*) fxStr::format("JOB %s", (const char*) rjobid)) == COMPLETE) {
 2840                 if (client->jobParm("conntime")) {
 2841                 r = client->getLastResponse(); r.remove(0, r.length() > 4 ? 4 : r.length());
 2842                 req.conntime = atoi((const char*) r);
 2843                 }
 2844                 if (client->jobParm("duration")) {
 2845                 r = client->getLastResponse(); r.remove(0, r.length() > 4 ? 4 : r.length());
 2846                 req.duration = atoi((const char*) r);
 2847                 }
 2848                 if (client->jobParm("npages")) {
 2849                 r = client->getLastResponse(); r.remove(0, r.length() > 4 ? 4 : r.length());
 2850                 int pagessent = atoi((const char*) r);
 2851                 // The following are for preparePageHandling below
 2852                 req.npages += pagessent;
 2853                 req.skippages = pagessent;
 2854                 req.skippedpages -= pagessent;
 2855                 req.pagehandling = "";
 2856                 }
 2857                 if (client->jobParm("totpages")) {
 2858                 r = client->getLastResponse(); r.remove(0, r.length() > 4 ? 4 : r.length());
 2859                 u_short totpages = atoi((const char*) r);
 2860                 if (totpages > req.totpages) req.totpages = totpages;   // if the formatting occurred on the proxy, we'll need to update
 2861                 }
 2862                 if (client->jobParm("commid")) {
 2863                 r = client->getLastResponse(); r.remove(0, r.length() > 4 ? 4 : r.length());
 2864                 req.commid = r;
 2865                 }
 2866                 if (client->jobParm("status")) {
 2867                 r = client->getLastResponse(); r.remove(0, r.length() > 4 ? 4 : r.length());
 2868                 req.notice = r;
 2869                 } else {
 2870                 req.notice = "unknown status on proxy";
 2871                 }
 2872                 if (client->jobParm("ntries")) {
 2873                 r = client->getLastResponse(); r.remove(0, r.length() > 4 ? 4 : r.length());
 2874                 u_int tries = atoi((const char*) r);
 2875                 if (tries < (u_int) maxTries && strstr((const char*) req.notice, "oo many attempts to send")) tries = maxTries; // caught in a lie
 2876                 if (tries < (u_int) maxTries && strstr((const char*) req.notice, "oo many attempts to transmit")) tries = maxTries; // caught in a lie
 2877                 req.ntries += tries;
 2878                 req.tottries += tries;
 2879                 }
 2880                 if (client->jobParm("ndials")) {
 2881                 r = client->getLastResponse(); r.remove(0, r.length() > 4 ? 4 : r.length());
 2882                 u_int dials = atoi((const char*) r);
 2883                 if (dials < (u_int) maxDials && strstr((const char*) req.notice, "oo many attempts to dial")) dials = maxDials; // caught in a lie
 2884                 req.ndials += dials;
 2885                 req.totdials += dials;
 2886                 }
 2887                 if (req.notice.length() && client->jobParm("errorcode")) {
 2888                 r = client->getLastResponse(); r.remove(0, r.length() > 4 ? 4 : r.length());
 2889                 req.errorcode = r;
 2890                 }
 2891                 if (client->jobParm("state")) {
 2892                 r = client->getLastResponse(); r.remove(0, r.length() > 4 ? 4 : r.length());
 2893                 // Due to the jobWait we should only ever see "DONE" and "FAILED"...
 2894                 if (r == "DONE") {
 2895                     job.state = FaxRequest::state_done;
 2896                     req.status = send_done;
 2897                 } else {
 2898                     job.state = FaxRequest::state_failed;
 2899                     /*
 2900                      * The JobRetry* configurations are modem-specific.  With a proxy involved the
 2901                      * getProxyTries() and getProxyDials() are the analogous features.  If the configuration
 2902                      * leaves them unset they are both -1.  Such a configuration delegates all authority for
 2903                      * total job failure to the proxy.  So, if the proxy fails a job where our configuration
 2904                      * has delegated all tries and dials to the proxy, then we must also fail the job else we
 2905                      * will simply resubmit a job to the proxy that the proxy already has deemed as failed 
 2906                      * as if it were rejected (even if maxdials or maxtries were not exceeded).  However, in 
 2907                      * a situation where either configuration is not unset it means the opposite: that we are
 2908                      * not delegating authority for full job failure to the proxy (e.g. it's only entrusted
 2909                      * to handle one session at-a-time - or we want the proxy to make a minimum set of attempts
 2910                      * on each submission).
 2911                      */
 2912                     if ((job.getJCI().getProxyTries() == -1 && job.getJCI().getProxyDials() == -1) || 
 2913                     req.ndials >= req.maxdials || req.ntries >= req.maxtries || 
 2914                     strstr((const char*) req.notice, "REJECT"))
 2915                     req.status = send_failed;
 2916                     else
 2917                     req.status = send_retry;
 2918                 }
 2919                 } else {
 2920                 logError("PROXY SEND: (job %s) unknown state for job %s on %s", (const char*) job.jobid, (const char*) rjobid, (const char*) job.getJCI().getProxy());
 2921                 job.state = FaxRequest::state_failed;
 2922                 req.status = send_retry;
 2923                 }
 2924                 if (req.commid.length()) {      // there's no sense in trying to retrieve the log if we don't know the commid number
 2925                 /*
 2926                  * Try to get and store the session log from the proxy in a dedicated log subdirectory.
 2927                  */
 2928                 int isdir = 0;
 2929                 fxStr proxyLogs = fxStr(FAX_LOGDIR) | fxStr("/" | job.getJCI().getProxy());
 2930                 struct stat sb;
 2931                 if (Sys::stat(proxyLogs, sb) != 0) {
 2932                     mode_t logdirmode = job.getJCI().getProxyLogMode();
 2933                     if (logdirmode & S_IREAD) logdirmode |= S_IEXEC;
 2934                     if (logdirmode & S_IRGRP) logdirmode |= S_IXGRP;
 2935                     if (logdirmode & S_IROTH) logdirmode |= S_IXOTH;
 2936                     if (Sys::mkdir(proxyLogs, logdirmode) == 0) isdir = 1;
 2937                 } else {
 2938                    isdir = 1;
 2939                 }
 2940                 if (isdir) {
 2941                     fxStr proxylog = fxStr(proxyLogs | "/c" | req.commid);
 2942                     int fd = Sys::open(proxylog, O_RDWR|O_CREAT|O_EXCL, job.getJCI().getProxyLogMode());
 2943                     if (fd > 0) {
 2944                     client->setType(FaxClient::TYPE_I);
 2945                     if (!client->recvZData(writeFile, (void*) &fd, emsg, 0, fxStr("RETR log/c" | req.commid))) {
 2946                         logError("PROXY LOG: (job %s) server %s response: %s", (const char*) job.jobid, (const char*) job.getJCI().getProxy(), (const char*) client->getLastResponse());
 2947                     }
 2948                     Sys::close(fd);
 2949                     if (emsg != "") logError("PROXY LOG: (job %s) server %s retrieval: %s", (const char*) job.jobid, (const char*) job.getJCI().getProxy(), (const char*) emsg);
 2950                     } else {
 2951                     logError("PROXY LOG: (job %s) server: %s: %s: %s", (const char*) job.jobid, (const char*) job.getJCI().getProxy(), (const char*) proxylog, strerror(errno));
 2952                     }
 2953                 }
 2954                 } else {
 2955                 logError("PROXY LOG: (job %s) unknown commid number for job %s on %s", (const char*) job.jobid, (const char*) rjobid, (const char*) job.getJCI().getProxy());
 2956                 }
 2957             } else if (status) {
 2958                 logError("PROXY SEND: (job %s) %s failed to identify job %s", (const char*) job.jobid, (const char*) job.getJCI().getProxy(), (const char*) rjobid);
 2959                 job.state = FaxRequest::state_failed;
 2960                 req.status = send_retry;
 2961                 req.notice = "proxy lost job";
 2962             }
 2963             }
 2964             client->hangupServer();
 2965         }
 2966         if (!status) {
 2967             // some error occurred in callServer() or login()
 2968             logError("PROXY SEND: (job %s) %s", (const char*) job.jobid, (const char*) emsg);
 2969             job.state = FaxRequest::state_failed;
 2970             req.status = send_retry;
 2971             req.notice = "cannot log into proxy";
 2972         }
 2973         if (req.pagehandling == "") {
 2974             DestInfo& di = destJobs[job.dest];
 2975             FaxMachineInfo& info = di.getInfo(job.dest);
 2976             if (!preparePageHandling(job, req, info, req.notice)) {
 2977             req.notice.insert("Document preparation failed: ");
 2978             }
 2979         }
 2980         updateRequest(req, job);
 2981         req.npages -= prevPages;    // queueAccounting() only wants the pages sent by the proxy
 2982         queueAccounting(job, req, "PROXY");
 2983         _exit(req.status|0x80); // 0x80 indicates proxied job
 2984 
 2985         /*NOTREACHED*/
 2986         }
 2987     default:            // parent
 2988         DestInfo& di = destJobs[job.dest];
 2989         di.proxyCall();     // mark as called to correctly block other jobs
 2990         numProxyJobs++;
 2991         job.startSend(pid);
 2992         break;
 2993     }
 2994 }
 2995 
 2996 /*
 2997  * Scan the list of jobs and process those that are ready
 2998  * to go.  Note that the scheduler should only ever be
 2999  * invoked from the dispatcher via a timeout.  This way we
 3000  * can be certain there are no active contexts holding
 3001  * references to job corpses (or other data structures) that
 3002  * we want to reap.  To invoke the scheduler the pokeScheduler
 3003  * method should be called to setup an immediate timeout that
 3004  * will cause the scheduler to be invoked from the dispatcher.
 3005  */
 3006 void
 3007 faxQueueApp::runScheduler()
 3008 {
 3009     /*
 3010      * Terminate the server if there are no jobs currently
 3011      * being processed.  We must be sure to wait for jobs
 3012      * so that we can capture exit status from subprocesses
 3013      * and so that any locks held on behalf of outbound jobs
 3014      * do not appear to be stale (since they are held by this
 3015      * process).
 3016      */
 3017     if (quit && activeq.next == &activeq) {
 3018     close();
 3019     return;
 3020     }
 3021     fxAssert(inSchedule == false, "Scheduler running twice");
 3022     inSchedule = true;
 3023     /*
 3024      * Reread the configuration file if it has been
 3025      * changed.  We do this before each scheduler run
 3026      * since we are a long-running process and it should
 3027      * not be necessary to restart the process to have
 3028      * config file changes take effect.
 3029      */
 3030     (void) updateConfig(configFile);
 3031     /*
 3032      * Scan the job queue and locate a compatible modem to
 3033      * use in processing the job.  Doing things in this order
 3034      * insures the highest priority job is always processed
 3035      * first.
 3036      */
 3037     blockSignals();
 3038     if (! quit) {
 3039     for (u_int i = 0; i < NQHASH; i++) {
 3040         for (JobIter iter(runqs[i]); iter.notDone(); iter++) {
 3041 
 3042         if (numPrepares >= maxConcurrentPreps) {
 3043             /*
 3044              * Large numbers of simultaneous job preparations can cause problems.
 3045              * So if we're already preparing too many, we wait for them to finish
 3046              * before running the scheduler more.  This may prevent some jobs which
 3047              * are already prepared (i.e. jobs that failed to complete on a previous 
 3048              * attempt) from going out as soon as they could, but the delay should
 3049              * be minimal, and this approach prevents us from needing to run 
 3050              * prepareJobNeeded below, outside of prepareJob.
 3051              */
 3052             break;
 3053         }
 3054         Job& job = iter;
 3055         if (job.bprev != NULL) {
 3056             /*
 3057              * The batching sub-loop below already allocated this job to a batch.
 3058              * Thus, this loop's copy of the run queue is incorrect.
 3059              */
 3060             pokeScheduler();
 3061             break;
 3062         }
 3063         fxAssert(job.modem == NULL, "Job on run queue holding modem");
 3064 
 3065         /*
 3066          * Read the on-disk job state and process the job.
 3067          * Doing all the processing below each time the job
 3068          * is considered for processing could be avoided by
 3069          * doing it only after assigning a modem but that
 3070          * would potentially cause the order of dispatch
 3071          * to be significantly different from the order
 3072          * of submission; something some folks care about.
 3073          */
 3074         traceJob(job, "PROCESS");
 3075         Trigger::post(Trigger::JOB_PROCESS, job);
 3076         FaxRequest* req = readRequest(job);
 3077         if (!req) {         // problem reading job state on-disk
 3078             setDead(job);
 3079             continue;
 3080         }
 3081 
 3082         time_t tts;
 3083         time_t now = Sys::now();
 3084         /*
 3085          * A computer's clock can jump backwards.  For example, if 
 3086          * the system runs ntp and regularly syncs the system clock
 3087          * with some outside source it is possible that the local
 3088          * clock will move backwards.  We cannot die, then, simply
 3089          * because we find a job on the run queue that has a future 
 3090          * tts.  The possibility exists that it is due to some 
 3091          * adjustment in the system clock.
 3092          */
 3093         if (job.tts > now) {
 3094             traceJob(job, "WARNING: Job tts is %d seconds in the future.  Proceeding anyway.", job.tts - now);
 3095             job.tts = now;
 3096         }
 3097 
 3098         /*
 3099          * Do per-destination processing and checking.
 3100          */
 3101         DestInfo& di = destJobs[job.dest];
 3102         /*
 3103          * Constrain the maximum number of times the phone
 3104          * will be dialed and/or the number of attempts that
 3105          * will be made (and reject jobs accordingly).
 3106          */
 3107         u_short maxdials = fxmin((u_short) job.getJCI().getMaxDials(),req->maxdials);
 3108         if (req->totdials >= maxdials) {
 3109             rejectJob(job, *req, fxStr::format(
 3110             "REJECT: Too many attempts to dial {E333}: %u, max %u",
 3111             req->totdials, maxdials));
 3112             deleteRequest(job, req, Job::rejected, true);
 3113             continue;
 3114         }
 3115         u_short maxtries = fxmin((u_short) job.getJCI().getMaxTries(),req->maxtries);
 3116         if (req->tottries >= maxtries) {
 3117             rejectJob(job, *req, fxStr::format(
 3118             "REJECT: Too many attempts to transmit: %u, max %u {E334}",
 3119             req->tottries, maxtries));
 3120             deleteRequest(job, req, Job::rejected, true);
 3121             continue;
 3122         }
 3123         // NB: repeat this check so changes in max pages are applied
 3124         u_int maxpages = job.getJCI().getMaxSendPages();
 3125         if (req->totpages > maxpages) {
 3126             rejectJob(job, *req, fxStr::format(
 3127             "REJECT: Too many pages in submission: %u, max %u {E335}",
 3128             req->totpages, maxpages));
 3129             deleteRequest(job, req, Job::rejected, true);
 3130             continue;
 3131         }
 3132         if (job.getJCI().getRejectNotice() != "") {
 3133             /*
 3134              * Calls to this destination are being rejected for
 3135              * a specified reason that we return to the sender.
 3136              */
 3137             rejectJob(job, *req, "REJECT: " | job.getJCI().getRejectNotice());
 3138             deleteRequest(job, req, Job::rejected, true);
 3139             continue;
 3140         }
 3141         if ((job.getJCI().getMaxConcurrentCalls() == 0 && di.isPendingConnection()) || 
 3142             (job.getJCI().getMaxConcurrentCalls() != 0 && !isOKToCall(di, job.getJCI(), 1))) {
 3143             /*
 3144              * This job would exceed the max number of concurrent
 3145              * calls that may be made to this destination.  Put it
 3146              * on a ``blocked queue'' for the destination; the job
 3147              * will be made ready to run when one of the existing
 3148              * jobs terminates.
 3149              */
 3150             blockJob(job, *req, "Blocked by concurrent calls {E337}");
 3151             if (job.isOnList()) job.remove();   // remove from run queue
 3152             di.block(job);          // place at tail of di queue, honors job priority
 3153             delete req;
 3154         } else if (((tts = job.getJCI().nextTimeToSend(now)) != now) || ((tts = job.tod.nextTimeOfDay(now)) != now)) {
 3155             /*
 3156              * This job may not be started now because of time-of-day
 3157              * restrictions.  Reschedule it for the next possible time.
 3158              */
 3159             if (job.isOnList()) job.remove();   // remove from run queue
 3160             delayJob(job, *req, "Delayed by time-of-day restrictions {E338}", tts);
 3161             delete req;
 3162         } else if (staggerCalls && ((u_int) lastCall + staggerCalls) > (u_int) now) {
 3163             /*
 3164              * This job may not be started now because we last started
 3165              * another job too recently and we're staggering jobs.
 3166              * Reschedule it for the time when next okay.
 3167              */
 3168             if (job.isOnList()) job.remove();
 3169             delayJob(job, *req, "Delayed by outbound call staggering {E339}", lastCall + staggerCalls);
 3170             delete req;
 3171         } else if (job.getJCI().getProxy() != "") {
 3172             if (numProxyJobs >= maxProxyJobs) {
 3173             if (job.isOnList()) job.remove();
 3174             delayJob(job, *req, "Delayed by limit on proxy connections {E346}", Sys::now() + random() % requeueInterval);
 3175             delete req;
 3176             } else {
 3177             /*
 3178              * Send this job through a proxy HylaFAX server.
 3179              */
 3180             unblockDestJobs(di);            // this job may be blocking others
 3181             pokeScheduler();
 3182             if (job.isOnList()) job.remove();   // remove from run queue
 3183             job.commid = "";
 3184             job.start = now;
 3185             req->notice = "";
 3186             setActive(job);
 3187             updateRequest(*req, job);
 3188             sendViaProxy(job, *req);
 3189             delete req; 
 3190             }
 3191         } else if ((Modem::modemAvailable(job) || (allowIgnoreModemBusy && req->ignoremodembusy)) && assignModem(job, (allowIgnoreModemBusy && req->ignoremodembusy))) {
 3192             lastCall = now;
 3193             if (job.isOnList()) job.remove();   // remove from run queue
 3194             job.breq = req;
 3195             /*
 3196              * We have a modem and have assigned it to the
 3197              * job.  The job is not on any list; processJob
 3198              * is responsible for requeing the job according
 3199              * to the outcome of the work it does (which may
 3200              * take place asynchronously in a sub-process).
 3201              * Likewise the release of the assigned modem is
 3202              * also assumed to take place asynchronously in
 3203              * the context of the job's processing.
 3204              */
 3205             (void) di.getInfo(job.dest);    // must read file for supportsBatching
 3206             FaxMachineInfo info;
 3207             if (di.supportsBatching() && maxBatchJobs > 1
 3208                 && (req->jobtype == "facsimile"
 3209                     || (req->jobtype == "pager" 
 3210                         && streq(info.getPagingProtocol(), "ixo")))) { 
 3211                     // fax and IXO pages only for now
 3212             /*
 3213              * The destination supports batching and batching is enabled.  
 3214              * Continue down the queue and build an array of all processable 
 3215              * jobs to this destination allowed on this modem which are not 
 3216              * of a lesser priority than jobs to other destinations.
 3217              */
 3218             unblockDestJobs(di);
 3219 
 3220             /*
 3221              * Since job files are passed to the send program as command-line
 3222              * parameters, our batch size is limited by that number of
 3223              * parameters.  64 should be a portable number.
 3224              */
 3225             if (maxBatchJobs > 64) maxBatchJobs = 64;
 3226 
 3227             /*
 3228              * If the queue length becomes very large then scanning the queue
 3229              * for batching can become counter-productive as it consumes a 
 3230              * large amount of CPU attention spinning through the queue.  In
 3231              * all likelihood batching would not even be useful in those 
 3232              * scenarios, and the administrator simply has not attended to 
 3233              * disabling it.  So we prevent batching from traversing into deep 
 3234              * queues in able to prevent a large amount of unnecessary CPU 
 3235              * consumption.
 3236              */
 3237             u_int qlencount = 0;
 3238 
 3239             Job* bjob = &job;   // Last batched Job
 3240             Job* cjob = &job;   // current Job
 3241 
 3242             u_int batchedjobs = 1;
 3243             for (u_int j = 0; batchedjobs < maxBatchJobs && j < NQHASH && qlencount < maxTraversal; j++) {
 3244                 blockSignals();
 3245                 for (JobIter joblist(runqs[j]); batchedjobs < maxBatchJobs && joblist.notDone() && qlencount < maxTraversal; joblist++) {
 3246                 qlencount++;
 3247                 if (joblist.job().dest != cjob->dest)
 3248                     continue;
 3249                 cjob = joblist;
 3250                 if (job.jobid == cjob->jobid)
 3251                     continue;   // Skip the current job
 3252                 fxAssert(cjob->tts <= Sys::now(), "Sleeping job on run queue");
 3253                 fxAssert(cjob->modem == NULL, "Job on run queue holding modem");
 3254                 FaxRequest* creq = readRequest(*cjob);
 3255                 if (!areBatchable(*req, *creq, job, *cjob)) {
 3256                     delete creq;
 3257                     continue;
 3258                 }
 3259                 if (iter.notDone() && &iter.job() == bjob)
 3260                     iter++;
 3261 
 3262                 traceJob(job, "ADDING JOB %s TO BATCH", (const char*) cjob->jobid);
 3263                 cjob->modem = job.modem;
 3264                 cjob->remove();
 3265                 bjob->bnext = cjob;
 3266                 cjob->bprev = bjob;
 3267                 bjob = cjob;
 3268                 cjob->breq = creq;
 3269                 batchedjobs++;
 3270                 }
 3271                 releaseSignals();
 3272             }
 3273             /*
 3274              * Jobs that are on the sleep queue with state_sleeping
 3275              * can be batched because the tts that the submitter requested
 3276              * is known to have passed already.  So we pull these jobs out
 3277              * of the sleep queue and batch them directly.
 3278              */
 3279             blockSignals();
 3280             for (JobIter sleepiter(sleepq); batchedjobs < maxBatchJobs && sleepiter.notDone() && qlencount < maxTraversal; sleepiter++) {
 3281                 qlencount++;
 3282                 cjob = sleepiter;
 3283                 if (cjob->dest != job.dest || cjob->state != FaxRequest::state_sleeping)
 3284                 continue;
 3285                 FaxRequest* creq = readRequest(*cjob);
 3286                 if (!(req && areBatchable(*req, *creq, job, *cjob))) {
 3287                 delete creq;
 3288                 continue;
 3289                 }
 3290 
 3291                 traceJob(job, "ADDING JOB %s TO BATCH", (const char*) cjob->jobid);
 3292                 cjob->stopTTSTimer();
 3293                 cjob->tts = now;
 3294                 cjob->state = FaxRequest::state_ready;
 3295                 cjob->remove();
 3296                 cjob->modem = job.modem;
 3297                 bjob->bnext = cjob;
 3298                 cjob->bprev = bjob;
 3299                 cjob->breq = creq;
 3300                 bjob = cjob;
 3301                 // This job was batched from sleeping, things have
 3302                 // changed; Update the queue file for onlookers.
 3303                 creq->tts = now;
 3304                 updateRequest(*creq, *cjob);
 3305                 batchedjobs++;
 3306             }
 3307             bjob->bnext = NULL;
 3308             releaseSignals();
 3309             } else
 3310             job.bnext = NULL;
 3311             di.call();          // mark as called to correctly block other jobs
 3312             processJob(job, req, di);
 3313         } else if (job.state == FaxRequest::state_failed) {
 3314             rejectJob(job, *req, fxStr::format("REJECT: Modem is configured as exempt from accepting jobs {E336}"));
 3315             deleteRequest(job, req, Job::rejected, true);
 3316             continue;
 3317         } else              // leave job on run queue
 3318             delete req;
 3319         }
 3320     }
 3321     }
 3322     /*
 3323      * Reap dead jobs.
 3324      */
 3325     for (JobIter iter(deadq); iter.notDone(); iter++) {
 3326     Job* job = iter;
 3327     job->remove();
 3328     traceJob(*job, "DELETE");
 3329     Trigger::post(Trigger::JOB_REAP, *job);
 3330     delete job;
 3331     }
 3332     releaseSignals();
 3333     /*
 3334      * Reclaim resources associated with clients
 3335      * that terminated without telling us.
 3336      */
 3337     HylaClient::purge();        // XXX maybe do this less often
 3338 
 3339     inSchedule = false;
 3340 }
 3341 
 3342 bool
 3343 faxQueueApp::scheduling(void)
 3344 {
 3345     return inSchedule;
 3346 }
 3347 
 3348 /*
 3349  * Attempt to assign a modem to a job.  If we are
 3350  * unsuccessful and it was due to the modem being
 3351  * locked for use by another program then we start
 3352  * a thread to poll for the removal of the lock file;
 3353  * this is necessary for send-only setups where we
 3354  * do not get information about when modems are in
 3355  * use from faxgetty processes.
 3356  */
 3357 bool
 3358 faxQueueApp::assignModem(Job& job, bool ignorebusy)
 3359 {
 3360     fxAssert(job.modem == NULL, "Assigning modem to job that already has one");
 3361 
 3362     bool retryModemLookup;
 3363     do {
 3364     retryModemLookup = false;
 3365     Modem* modem = Modem::findModem(job, ignorebusy);
 3366     if (modem) {
 3367         if (modem->getState() == Modem::EXEMPT) {
 3368         job.state = FaxRequest::state_failed;
 3369         return (false);
 3370         }
 3371         if (modem->assign(job, (modem->getState() == Modem::BUSY && ignorebusy))) {
 3372         Trigger::post(Trigger::MODEM_ASSIGN, *modem);
 3373         return (true);
 3374         }
 3375         /*
 3376          * Modem could not be assigned to job.  The
 3377          * modem is assumed to be ``removed'' from
 3378          * the list of potential modems scanned by
 3379          * findModem so we arrange to re-lookup a
 3380          * suitable modem for this job.  (a goto would
 3381          * be fine here but too many C++ compilers
 3382          * can't handle jumping past the above code...)
 3383          */
 3384         traceJob(job, "Unable to assign modem %s (cannot lock)",
 3385         (const char*) modem->getDeviceID());
 3386         modem->startLockPolling(pollLockWait);
 3387         traceModem(*modem, "BUSY (begin polling)");
 3388         retryModemLookup = true;
 3389     } else
 3390         traceJob(job, "No assignable modem located");
 3391     } while (retryModemLookup);
 3392     return (false);
 3393 }
 3394 
 3395 /*
 3396  * Release a modem assigned to a job.  The scheduler
 3397  * is prodded since doing this may permit something
 3398  * else to be processed.
 3399  */
 3400 void
 3401 faxQueueApp::releaseModem(Job& job)
 3402 {
 3403     Trigger::post(Trigger::MODEM_RELEASE, *job.modem);
 3404     job.modem->release();
 3405     pokeScheduler();
 3406     Job* cjob;
 3407     for (cjob = &job; cjob != NULL; cjob = cjob->bnext) {
 3408     fxAssert(cjob->modem != NULL, "No assigned modem to release");
 3409     cjob->modem = NULL;         // remove reference to modem
 3410     }
 3411 }
 3412 
 3413 /*
 3414  * Poll to see if a modem's UUCP lock file is still
 3415  * present.  If the lock has been removed then mark
 3416  * the modem ready for use and poke the job scheduler
 3417  * in case jobs were waiting for an available modem.
 3418  * This work is only done when a modem is ``discovered''
 3419  * to be in-use by an outbound process when operating
 3420  * in a send-only environment (i.e. one w/o a faxgetty
 3421  * process monitoring the state of each modem).
 3422  */
 3423 void
 3424 faxQueueApp::pollForModemLock(Modem& modem)
 3425 {
 3426     if (modem.lock->lock()) {
 3427     modem.release();
 3428     traceModem(modem, "READY (end polling)");
 3429     pokeScheduler();
 3430     } else
 3431     modem.startLockPolling(pollLockWait);
 3432 }
 3433 
 3434 /*
 3435  * Set a timeout so that the job scheduler runs the
 3436  * next time the dispatcher is invoked.
 3437  */
 3438 void
 3439 faxQueueApp::pokeScheduler(u_short s)
 3440 {
 3441     schedTimeout.start(s);
 3442 }
 3443 
 3444 /*
 3445  * Create a request instance and read the
 3446  * associated queue file into it.
 3447  */
 3448 FaxRequest*
 3449 faxQueueApp::readRequest(Job& job)
 3450 {
 3451     int fd = Sys::open(job.file, O_RDWR);
 3452     if (fd >= 0) {
 3453     if (flock(fd, LOCK_EX) >= 0) {
 3454         FaxRequest* req = new FaxRequest(job.file, fd);
 3455         bool reject;
 3456         if (req->readQFile(reject)) {
 3457         if (reject) {
 3458             jobError(job, "qfile contains one or more errors {E347}");
 3459             delete req;
 3460             return (NULL);
 3461         }
 3462         if (req->external == "")
 3463             req->external = job.dest;
 3464         return (req);
 3465         }
 3466         jobError(job, "Could not read job file");
 3467         delete req;
 3468     } else
 3469         jobError(job, "Could not lock job file: %m");
 3470     Sys::close(fd);
 3471     } else {
 3472     // file might have been removed by another server
 3473     if (errno != ENOENT)
 3474         jobError(job, "Could not open job file: %m");
 3475     }
 3476     return (NULL);
 3477 }
 3478 
 3479 /*
 3480  * Update the request instance with information
 3481  * from the job structure and then write the
 3482  * associated queue file.
 3483  */
 3484 void
 3485 faxQueueApp::updateRequest(FaxRequest& req, Job& job)
 3486 {
 3487     req.state = job.state;
 3488     req.pri = job.pri;
 3489     req.writeQFile();
 3490 }
 3491 
 3492 /*
 3493  * Delete a request and associated state.
 3494  */
 3495 void
 3496 faxQueueApp::deleteRequest(Job& job, FaxRequest* req, JobStatus why,
 3497     bool force, const char* duration)
 3498 {
 3499     if (why != Job::done) queueAccounting(job, *req, "UNSENT");
 3500     deleteRequest(job, *req, why, force, duration);
 3501     delete req;
 3502 }
 3503 
 3504 void
 3505 faxQueueApp::deleteRequest(Job& job, FaxRequest& req, JobStatus why,
 3506     bool force, const char* duration)
 3507 {
 3508     fxStr dest = FAX_DONEDIR |
 3509     req.qfile.tail(req.qfile.length() - (sizeof (FAX_SENDDIR)-1));
 3510     /*
 3511      * Move completed jobs to the doneq area where
 3512      * they can be retrieved for a period of time;
 3513      * after which they are either removed or archived.
 3514      */
 3515     if (Sys::rename(req.qfile, dest) >= 0) {
 3516     u_int i = 0;
 3517     /*
 3518      * Remove entries for imaged documents and
 3519      * delete/rename references to source documents
 3520      * so the imaged versions can be expunged.
 3521      */
 3522     while (i < req.items.length()) {
 3523         FaxItem& fitem = req.items[i];
 3524         if (fitem.op == FaxRequest::send_fax) {
 3525         req.renameSaved(i);
 3526         req.items.remove(i);
 3527         } else
 3528         i++;
 3529     }
 3530     req.qfile = dest;           // moved to doneq
 3531     job.file = req.qfile;           // ...and track change
 3532     if (why == Job::done)
 3533         req.state = FaxRequest::state_done; // job is definitely done
 3534     else
 3535         req.state = FaxRequest::state_failed;// job is definitely done
 3536     req.pri = job.pri;          // just in case someone cares
 3537     req.tts = Sys::now();           // mark job termination time
 3538     job.tts = req.tts;
 3539     req.writeQFile();
 3540     if (force) {
 3541         notifySender(job, why, duration);
 3542     } else {
 3543         if (job.getJCI().getNotify() != -1) {
 3544         if (job.getJCI().isNotify(FaxRequest::notify_any))
 3545             notifySender(job, why, duration);
 3546         } else
 3547         if (req.isNotify(FaxRequest::notify_any))
 3548             notifySender(job, why, duration);
 3549     }
 3550     } else {
 3551     /*
 3552      * Move failed, probably because there's no
 3553      * directory.  Treat the job the way we used
 3554      * to: purge everything.  This avoids filling
 3555      * the disk with stuff that'll not get removed;
 3556      * except for a scavenger program.
 3557      */
 3558     jobError(job, "rename to %s failed: %s",
 3559         (const char*) dest, strerror(errno));
 3560     req.writeQFile();
 3561     if (force) {
 3562         notifySender(job, why, duration);
 3563     } else {
 3564         if (job.getJCI().getNotify() != -1) {
 3565         if (job.getJCI().isNotify(FaxRequest::notify_any))
 3566             notifySender(job, why, duration);
 3567         } else
 3568         if (req.isNotify(FaxRequest::notify_any))
 3569             notifySender(job, why, duration);
 3570     }
 3571     u_int n = req.items.length();
 3572     req.items.remove(0, n);
 3573     Sys::unlink(req.qfile);
 3574     }
 3575 }
 3576 
 3577 /*
 3578  * FIFO-related support.
 3579  */
 3580 
 3581 /*
 3582  * Open the requisite FIFO special files.
 3583  */
 3584 void
 3585 faxQueueApp::openFIFOs()
 3586 {
 3587     fifo = openFIFO(fifoName, 0600, true);
 3588     Dispatcher::instance().link(fifo, Dispatcher::ReadMask, this);
 3589 }
 3590 
 3591 void
 3592 faxQueueApp::closeFIFOs()
 3593 {
 3594     Sys::close(fifo), fifo = -1;
 3595 }
 3596 
 3597 int faxQueueApp::inputReady(int fd)     { return FIFOInput(fd); }
 3598 
 3599 /*
 3600  * Process a message received through a FIFO.
 3601  */
 3602 void
 3603 faxQueueApp::FIFOMessage(const char* cp)
 3604 {
 3605     if (tracingLevel & FAXTRACE_FIFO)
 3606     logInfo("FIFO RECV \"%s\"", cp);
 3607     if (cp[0] == '\0') {
 3608     logError("Empty FIFO message");
 3609     return;
 3610     }
 3611     const char* tp = strchr(++cp, ':');
 3612     if (tp && (tp[1] != '\0'))
 3613     FIFOMessage(cp[-1], fxStr(cp,tp-cp), tp+1);
 3614     else
 3615     FIFOMessage(cp[-1], fxStr::null, cp);
 3616 }
 3617 
 3618 /*
 3619  * Process a parsed FIFO message.
 3620  *
 3621  * If an application goes crazy, or if the FIFO overflows, then it's possible 
 3622  * to see corrupt FIFO messages.  Thus, the previous parsing of the FIFO message
 3623  * cannot be entirely trusted.  Here, "id" and "args" must be checked for size
 3624  * before continued processing.  The downstream functions will need to make sure 
 3625  * that the id and args are actually meaningful.
 3626  */
 3627 void
 3628 faxQueueApp::FIFOMessage(char cmd, const fxStr& id, const char* args)
 3629 {
 3630     bool status = false;
 3631     switch (cmd) {
 3632     case '+':               // modem status msg
 3633     if (id.length()) FIFOModemMessage(id, args);
 3634     return;
 3635     case '*':               // job status msg from subproc's
 3636     if (id.length()) FIFOJobMessage(id, args);
 3637     return;
 3638     case '@':               // receive status msg
 3639     if (id.length()) FIFORecvMessage(id, args);
 3640     return;
 3641     case 'Q':               // quit
 3642     if (!id.length()) {
 3643         traceServer("QUIT");
 3644         quit = true;
 3645         pokeScheduler();
 3646     }
 3647     return;             // NB: no return value expected
 3648     case 'T':               // create new trigger 
 3649     if (id.length()) {
 3650         traceServer("TRIGGER %s", args);
 3651         Trigger::create(id, args);
 3652     }
 3653     return;             // NB: trigger id returned specially
 3654 
 3655     /*
 3656      * The remaining commands generate a response if
 3657      * the client has included a return address.
 3658      */
 3659     case 'C':               // configuration control
 3660     if (args[0] == '\0') return;
 3661     traceServer("CONFIG %s", args);
 3662     status = readConfigItem(args);
 3663     break;
 3664     case 'D':               // cancel an existing trigger
 3665     if (args[0] == '\0') return;
 3666     traceServer("DELETE %s", args);
 3667     status = Trigger::cancel(args);
 3668     break;
 3669     case 'R':               // remove job
 3670     if (args[0] == '\0') return;
 3671     traceServer("REMOVE JOB %s", args);
 3672     status = terminateJob(args, Job::removed);
 3673     break;
 3674     case 'K':               // kill job
 3675     if (args[0] == '\0') return;
 3676     traceServer("KILL JOB %s", args);
 3677     status = terminateJob(args, Job::killed);
 3678     break;
 3679     case 'S':               // submit an outbound job
 3680     if (args[0] == '\0') return;
 3681     traceServer("SUBMIT JOB %s", args);
 3682     status = submitJob(args, false, true);
 3683     if (status)
 3684         pokeScheduler();
 3685     break;
 3686     case 'U':               // unreference file
 3687     if (args[0] == '\0') return;
 3688     traceServer("UNREF DOC %s", args);
 3689     unrefDoc(args);
 3690     status = true;
 3691     break;
 3692     case 'X':               // suspend job
 3693     if (args[0] == '\0') return;
 3694     traceServer("SUSPEND JOB %s", args);
 3695     status = suspendJob(args, false);
 3696     if (status)
 3697         pokeScheduler();
 3698     break;
 3699     case 'Y':               // interrupt job
 3700     if (args[0] == '\0') return;
 3701     traceServer("INTERRUPT JOB %s", args);
 3702     status = suspendJob(args, true);
 3703     if (status)
 3704         pokeScheduler();
 3705     break;
 3706     case 'N':               // noop
 3707     status = true;
 3708     break;
 3709     case 'Z':
 3710     showDebugState();
 3711     break;
 3712     default:
 3713     logError("Bad FIFO cmd '%c' from client %s", cmd, (const char*) id);
 3714     break;
 3715     }
 3716     if (id != fxStr::null) {
 3717     char msg[3];
 3718     msg[0] = cmd;
 3719     msg[1] = (status ? '*' : '!');
 3720     msg[2] = '\0';
 3721     if (tracingLevel & FAXTRACE_FIFO)
 3722         logInfo("FIFO SEND %s msg \"%s\"", (const char*) id, msg);
 3723     HylaClient::getClient(id).send(msg, sizeof (msg));
 3724     }
 3725 }
 3726 
 3727 void
 3728 faxQueueApp::notifyModemWedged(Modem& modem)
 3729 {
 3730     fxStr dev(idToDev(modem.getDeviceID()));
 3731     logError("MODEM %s appears to be wedged", (const char*)dev);
 3732     fxStr cmd(wedgedCmd
 3733     | quote | quoted(modem.getDeviceID()) | enquote
 3734     | quote |                 quoted(dev) | enquote
 3735     );
 3736     traceServer("MODEM WEDGED: %s", (const char*) cmd);
 3737     runCmd(cmd, true, this);
 3738 }
 3739 
 3740 void
 3741 faxQueueApp::FIFOModemMessage(const fxStr& devid, const char* msg)
 3742 {
 3743     if (! (devid.length() > 0))
 3744     {
 3745     traceServer("Invalid modem FIFO message");
 3746     return;
 3747     }
 3748 
 3749     Modem& modem = Modem::getModemByID(devid);
 3750     switch (msg[0]) {
 3751     case 'R':           // modem ready, parse capabilities
 3752     modem.stopLockPolling();
 3753     if (msg[1] != '\0') {
 3754         modem.setCapabilities(msg+1);   // NB: also sets modem READY
 3755         traceModem(modem, "READY, capabilities %s", msg+1);
 3756     } else {
 3757         modem.setState(Modem::READY);
 3758         traceModem(modem, "READY (no capabilities)");
 3759     }
 3760     Trigger::post(Trigger::MODEM_READY, modem);
 3761     pokeScheduler();
 3762     break;
 3763     case 'B':           // modem busy doing something
 3764     modem.stopLockPolling();
 3765     traceModem(modem, "BUSY");
 3766     modem.setState(Modem::BUSY);
 3767     Trigger::post(Trigger::MODEM_BUSY, modem);
 3768     break;
 3769     case 'D':           // modem to be marked down
 3770     modem.stopLockPolling();
 3771     traceModem(modem, "DOWN");
 3772     modem.setState(Modem::DOWN);
 3773     Trigger::post(Trigger::MODEM_DOWN, modem);
 3774     break;
 3775     case 'E':           // modem exempt from sending use
 3776     modem.stopLockPolling();
 3777     traceModem(modem, "EXEMPT");
 3778     modem.setState(Modem::EXEMPT);
 3779     Trigger::post(Trigger::MODEM_EXEMPT, modem);
 3780     // clear any pending jobs for this modem
 3781     pokeScheduler();
 3782     break;
 3783     case 'N':           // modem phone number updated
 3784     traceModem(modem, "NUMBER %s", msg+1);
 3785     modem.setNumber(msg+1);
 3786     break;
 3787     case 'I':           // modem communication ID
 3788     traceModem(modem, "COMID %s", msg+1);
 3789     modem.setCommID(msg+1);
 3790     break;
 3791     case 'W':           // modem appears wedged
 3792     // NB: modem should be marked down in a separate message
 3793     notifyModemWedged(modem);
 3794         Trigger::post(Trigger::MODEM_WEDGED, modem);
 3795     break;
 3796     case 'U':           // modem inuse by outbound job
 3797     modem.stopLockPolling();
 3798     traceModem(modem, "BUSY");
 3799     modem.setState(Modem::BUSY);
 3800     Trigger::post(Trigger::MODEM_INUSE, modem);
 3801     break;
 3802     case 'C':           // caller-ID information
 3803     Trigger::post(Trigger::MODEM_CID, modem, msg+1);
 3804     break;
 3805     case 'd':           // data call begun
 3806     Trigger::post(Trigger::MODEM_DATA_BEGIN, modem);
 3807     break;
 3808     case 'e':           // data call finished
 3809     Trigger::post(Trigger::MODEM_DATA_END, modem);
 3810     break;
 3811     case 'v':           // voice call begun
 3812     Trigger::post(Trigger::MODEM_VOICE_BEGIN, modem);
 3813     break;
 3814     case 'w':           // voice call finished
 3815     Trigger::post(Trigger::MODEM_VOICE_END, modem);
 3816     break;
 3817     default:
 3818     traceServer("FIFO: Bad modem message \"%s\" for modem %s",
 3819         msg, (const char*)devid);
 3820     break;
 3821     }
 3822 }
 3823 
 3824 void
 3825 faxQueueApp::FIFOJobMessage(const fxStr& jobid, const char* msg)
 3826 {
 3827     Job* jp = Job::getJobByID(jobid);
 3828     if (!jp) {
 3829     traceServer("FIFO: JOB %s not found for msg \"%s\"",
 3830         (const char*) jobid, msg);
 3831     return;
 3832     }
 3833     switch (msg[0]) {
 3834     case 'c':           // call placed
 3835     Trigger::post(Trigger::SEND_CALL, *jp);
 3836     break;
 3837     case 'C':           // call connected with fax
 3838     Trigger::post(Trigger::SEND_CONNECTED, *jp);
 3839     {
 3840         DestInfo& di = destJobs[(*jp).dest];
 3841         di.connected();
 3842         if ((*jp).getJCI().getMaxConcurrentCalls() == 0) {
 3843             unblockDestJobs(di, 1);     // release one blocked job
 3844         pokeScheduler();
 3845         }
 3846     }
 3847     break;
 3848     case 'd':           // page sent
 3849     Trigger::post(Trigger::SEND_PAGE, *jp, msg+1);
 3850     break;
 3851     case 'D':           // document sent
 3852     { FaxSendInfo si; si.decode(msg+1); }
 3853     Trigger::post(Trigger::SEND_DOC, *jp, msg+1);
 3854     break;
 3855     case 'p':           // polled document received
 3856     Trigger::post(Trigger::SEND_POLLRCVD, *jp, msg+1);
 3857     break;
 3858     case 'P':           // polling operation done
 3859     Trigger::post(Trigger::SEND_POLLDONE, *jp, msg+1);
 3860     break;
 3861     default:
 3862     traceServer("FIFO: Unknown job message \"%s\" for job %s",
 3863         msg, (const char*)jobid);
 3864     break;
 3865     }
 3866 }
 3867 
 3868 void
 3869 faxQueueApp::FIFORecvMessage(const fxStr& devid, const char* msg)
 3870 {
 3871     if (! (devid.length() > 0))
 3872     {
 3873     traceServer("Invalid modem FIFO message");
 3874     return;
 3875     }
 3876 
 3877     Modem& modem = Modem::getModemByID(devid);
 3878     switch (msg[0]) {
 3879     case 'B':           // inbound call started
 3880     Trigger::post(Trigger::RECV_BEGIN, modem);
 3881     break;
 3882     case 'E':           // inbound call finished
 3883     Trigger::post(Trigger::RECV_END, modem);
 3884     break;
 3885     case 'S':           // session started (received initial parameters)
 3886     Trigger::post(Trigger::RECV_START, modem, msg+1);
 3887     break;
 3888     case 'P':           // page done
 3889     Trigger::post(Trigger::RECV_PAGE, modem, msg+1);
 3890     break;
 3891     case 'D':           // document done
 3892     Trigger::post(Trigger::RECV_DOC, modem, msg+1);
 3893     break;
 3894     default:
 3895     traceServer("FIFO: Unknown recv message \"%s\" for modem %s",
 3896         msg, (const char*)devid);
 3897     break;
 3898     }
 3899 }
 3900 
 3901 /*
 3902  * Configuration support.
 3903  */
 3904 
 3905 void
 3906 faxQueueApp::resetConfig()
 3907 {
 3908     FaxConfig::resetConfig();
 3909     dialRules = NULL;
 3910     setupConfig();
 3911 }
 3912 
 3913 #define N(a)    (sizeof (a) / sizeof (a[0]))
 3914 
 3915 faxQueueApp::stringtag faxQueueApp::strings[] = {
 3916 { "logfacility",    &faxQueueApp::logFacility,  LOG_FAX },
 3917 { "areacode",       &faxQueueApp::areaCode  },
 3918 { "countrycode",    &faxQueueApp::countryCode },
 3919 { "longdistanceprefix", &faxQueueApp::longDistancePrefix },
 3920 { "internationalprefix",&faxQueueApp::internationalPrefix },
 3921 { "uucplockdir",    &faxQueueApp::uucpLockDir,  UUCP_LOCKDIR },
 3922 { "uucplocktype",   &faxQueueApp::uucpLockType, UUCP_LOCKTYPE },
 3923 { "contcoverpage",  &faxQueueApp::contCoverPageTemplate },
 3924 { "contcovercmd",   &faxQueueApp::coverCmd,     FAX_COVERCMD },
 3925 { "notifycmd",      &faxQueueApp::notifyCmd,    FAX_NOTIFYCMD },
 3926 { "ps2faxcmd",      &faxQueueApp::ps2faxCmd,    FAX_PS2FAXCMD },
 3927 { "pdf2faxcmd",     &faxQueueApp::pdf2faxCmd,   FAX_PDF2FAXCMD },
 3928 { "pcl2faxcmd",     &faxQueueApp::pcl2faxCmd,   FAX_PCL2FAXCMD },
 3929 { "tiff2faxcmd",    &faxQueueApp::tiff2faxCmd,  FAX_TIFF2FAXCMD },
 3930 { "sendfaxcmd",     &faxQueueApp::sendFaxCmd,
 3931    FAX_LIBEXEC "/faxsend" },
 3932 { "sendpagecmd",    &faxQueueApp::sendPageCmd,
 3933    FAX_LIBEXEC "/pagesend" },
 3934 { "senduucpcmd",    &faxQueueApp::sendUUCPCmd,
 3935    FAX_LIBEXEC "/uucpsend" },
 3936 { "wedgedcmd",      &faxQueueApp::wedgedCmd,    FAX_WEDGEDCMD },
 3937 { "jobcontrolcmd",  &faxQueueApp::jobCtrlCmd,   "" },
 3938 { "sharecallfailures",  &faxQueueApp::shareCallFailures,"none" },
 3939 };
 3940 faxQueueApp::numbertag faxQueueApp::numbers[] = {
 3941 { "tracingmask",    &faxQueueApp::tracingMask,  // NB: must be first
 3942    FAXTRACE_MODEMIO|FAXTRACE_TIMEOUTS },
 3943 { "servertracing",  &faxQueueApp::tracingLevel, FAXTRACE_SERVER },
 3944 { "uucplocktimeout",    &faxQueueApp::uucpLockTimeout,  0 },
 3945 { "postscripttimeout",  &faxQueueApp::postscriptTimeout, 3*60 },
 3946 { "maxconcurrentjobs",  &faxQueueApp::maxConcurrentCalls, 1 },
 3947 { "maxconcurrentcalls", &faxQueueApp::maxConcurrentCalls, 1 },
 3948 { "maxproxyjobs",   &faxQueueApp::maxProxyJobs, (u_int) 64 },
 3949 { "maxbatchjobs",   &faxQueueApp::maxBatchJobs, (u_int) 64 },
 3950 { "maxtraversal",   &faxQueueApp::maxTraversal, (u_int) 256 },
 3951 { "maxsendpages",   &faxQueueApp::maxSendPages, (u_int) -1 },
 3952 { "maxtries",       &faxQueueApp::maxTries,     (u_int) FAX_RETRIES },
 3953 { "maxdials",       &faxQueueApp::maxDials,     (u_int) FAX_REDIALS },
 3954 { "jobreqother",    &faxQueueApp::requeueInterval,  FAX_REQUEUE },
 3955 { "polllockwait",   &faxQueueApp::pollLockWait, 30 },
 3956 { "staggercalls",   &faxQueueApp::staggerCalls, 0 },
 3957 { "maxconcurrentpreps", &faxQueueApp::maxConcurrentPreps,   1 },
 3958 };
 3959 
 3960 void
 3961 faxQueueApp::setupConfig()
 3962 {
 3963     int i;
 3964 
 3965     for (i = N(strings)-1; i >= 0; i--)
 3966     (*this).*strings[i].p = (strings[i].def ? strings[i].def : "");
 3967     for (i = N(numbers)-1; i >= 0; i--)
 3968     (*this).*numbers[i].p = numbers[i].def;
 3969     tod.reset();            // any day, any time
 3970     use2D = true;           // ok to use 2D data
 3971     class1RestrictPoorDestinations = 0; // no restrictions
 3972     useUnlimitedLN = true;      // ok to use LN_INF
 3973     allowIgnoreModemBusy = false;   // to allow jobs to ignore modem busy status
 3974     uucpLockMode = UUCP_LOCKMODE;
 3975     delete dialRules, dialRules = NULL;
 3976     ModemGroup::reset();        // clear+add ``any modem'' class
 3977     ModemGroup::set(MODEM_ANY, new RE(".*"), fxStr("0"));
 3978     pageChop = FaxRequest::chop_last;
 3979     pageChopThreshold = 3.0;        // minimum of 3" of white space
 3980     lastCall = Sys::now() - 3600;
 3981     numProxyJobs = numPrepares = 0;
 3982 }
 3983 
 3984 void
 3985 faxQueueApp::configError(const char* fmt, ...)
 3986 {
 3987     va_list ap;
 3988     va_start(ap, fmt);
 3989     vlogError(fmt, ap);
 3990     va_end(ap);
 3991 }
 3992 
 3993 void
 3994 faxQueueApp::configTrace(const char* fmt, ...)
 3995 {
 3996     if (tracingLevel & FAXTRACE_CONFIG) {
 3997     va_list ap;
 3998     va_start(ap, fmt);
 3999     vlogError(fmt, ap);
 4000     va_end(ap);
 4001     }
 4002 }
 4003 
 4004 static void
 4005 crackArgv(fxStr& s)
 4006 {
 4007     u_int i = 0;
 4008     do {
 4009         while (i < s.length() && !isspace(s[i])) i++;
 4010         if (i < s.length()) {
 4011             s[i++] = '\0';
 4012             u_int j = i;
 4013             while (j < s.length() && isspace(s[j])) j++;
 4014             if (j > i) {
 4015                 s.remove(i, j - i);
 4016             }
 4017         }
 4018     } while (i < s.length());
 4019     s.resize(i);
 4020 }
 4021 
 4022 static void
 4023 tiffErrorHandler(const char* module, const char* fmt0, va_list ap)
 4024 {
 4025     fxStr fmt = (module != NULL) ?
 4026         fxStr::format("%s: Warning, %s.", module, fmt0)
 4027         : fxStr::format("Warning, %s.", fmt0);
 4028     vlogError(fmt, ap);
 4029 }
 4030 
 4031 static void
 4032 tiffWarningHandler(const char* module, const char* fmt0, va_list ap)
 4033 {
 4034     fxStr fmt = (module != NULL) ?
 4035         fxStr::format("%s: Warning, %s.", module, fmt0)
 4036         : fxStr::format("Warning, %s.", fmt0);
 4037     vlogWarning(fmt, ap);
 4038 }
 4039 
 4040 bool
 4041 faxQueueApp::setConfigItem(const char* tag, const char* value)
 4042 {
 4043     u_int ix;
 4044     if (findTag(tag, (const tags*) strings, N(strings), ix)) {
 4045     (*this).*strings[ix].p = value;
 4046     switch (ix) {
 4047     case 0: faxApp::setLogFacility(logFacility); break;
 4048     }
 4049     if (ix >= 8)
 4050         crackArgv((*this).*strings[ix].p);
 4051     } else if (findTag(tag, (const tags*) numbers, N(numbers), ix)) {
 4052     (*this).*numbers[ix].p = getNumber(value);
 4053     switch (ix) {
 4054     case 1:
 4055         tracingLevel &= ~tracingMask;
 4056         if (dialRules)
 4057         dialRules->setVerbose((tracingLevel&FAXTRACE_DIALRULES) != 0);
 4058         if (tracingLevel&FAXTRACE_TIFF) {
 4059         TIFFSetErrorHandler(tiffErrorHandler);
 4060         TIFFSetWarningHandler(tiffWarningHandler);
 4061         } else {
 4062         TIFFSetErrorHandler(NULL);
 4063         TIFFSetWarningHandler(NULL);
 4064         }
 4065         break;
 4066     case 2: UUCPLock::setLockTimeout(uucpLockTimeout); break;
 4067     }
 4068     } else if (streq(tag, "dialstringrules"))
 4069     setDialRules(value);
 4070     else if (streq(tag, "timeofday"))
 4071     tod.parse(value);
 4072     else if (streq(tag, "use2d"))
 4073     use2D = getBoolean(value);
 4074     else if (streq(tag, "class1restrictpoordestinations"))
 4075     class1RestrictPoorDestinations = getNumber(value);
 4076     else if (streq(tag, "allowignoremodembusy"))
 4077     allowIgnoreModemBusy = getBoolean(value);
 4078     else if (streq(tag, "uucplockmode"))
 4079     uucpLockMode = (mode_t) strtol(value, 0, 8);
 4080     else if (streq(tag, "modemgroup")) {
 4081     const char* cp;
 4082     for (cp = value; *cp && *cp != ':'; cp++)
 4083         ;
 4084     if (*cp == ':') {
 4085         fxStr name(value, cp-value);
 4086         fxStr limit("0");
 4087         for (cp++; *cp && isspace(*cp); cp++)
 4088         ;
 4089         const char* cp2 = strchr(cp, ':');
 4090         if (cp2) {
 4091         limit = fxStr(cp, cp2-cp);
 4092         cp = cp2 + 1;
 4093         }
 4094         if (*cp != '\0') {
 4095         RE* re = new RE(cp);
 4096         if (re->getErrorCode() > REG_NOMATCH) {
 4097             fxStr emsg;
 4098             re->getError(emsg);
 4099             configError("Bad pattern for modem group \"%s\": %s: %s", (const char*) emsg,
 4100             (const char*) name, re->pattern());
 4101         } else
 4102             ModemGroup::set(name, re, limit);
 4103         } else
 4104         configError("No regular expression for modem group");
 4105     } else
 4106         configError("Missing ':' separator in modem group specification");
 4107     } else if (streq(tag, "pagechop")) {
 4108     if (streq(value, "all"))
 4109         pageChop = FaxRequest::chop_all;
 4110     else if (streq(value, "none"))
 4111         pageChop = FaxRequest::chop_none;
 4112     else if (streq(value, "last"))
 4113         pageChop = FaxRequest::chop_last;
 4114     } else if (streq(tag, "pagechopthreshold"))
 4115     pageChopThreshold = atof(value);
 4116     else if (streq(tag, "audithook") )
 4117     {
 4118         const char* cp;
 4119     for (cp = value; *cp && *cp != ':'; cp++)
 4120         ;
 4121     if (*cp == ':') {
 4122         fxStr cmd(value, cp-value);
 4123         for (cp++; *cp && isspace(*cp); cp++)
 4124         ;
 4125         if (*cp != '\0') {
 4126             Trigger::setTriggerHook(cmd, cp);
 4127         } else
 4128         configError("No trigger specification for audit hook");
 4129     } else
 4130         configError("Missing ':' separator in audit hook specification");
 4131     
 4132         
 4133     } else
 4134     return (false);
 4135     return (true);
 4136 }
 4137 
 4138 /*
 4139  * Subclass DialStringRules so that we can redirect the
 4140  * diagnostic and tracing interfaces through the server.
 4141  */
 4142 class MyDialStringRules : public DialStringRules {
 4143 private:
 4144     virtual void parseError(const char* fmt ...);
 4145     virtual void traceParse(const char* fmt ...);
 4146     virtual void traceRules(const char* fmt ...);
 4147 public:
 4148     MyDialStringRules(const char* filename);
 4149     ~MyDialStringRules();
 4150 };
 4151 MyDialStringRules::MyDialStringRules(const char* f) : DialStringRules(f) {}
 4152 MyDialStringRules::~MyDialStringRules() {}
 4153 
 4154 void
 4155 MyDialStringRules::parseError(const char* fmt ...)
 4156 {
 4157     va_list ap;
 4158     va_start(ap, fmt);
 4159     vlogError(fmt, ap);
 4160     va_end(ap);
 4161 }
 4162 void
 4163 MyDialStringRules::traceParse(const char* fmt ...)
 4164 {
 4165     if (faxQueueApp::instance().getTracingLevel() & FAXTRACE_DIALRULES) {
 4166     va_list ap;
 4167     va_start(ap, fmt);
 4168     vlogInfo(fmt, ap);
 4169     va_end(ap);
 4170     }
 4171 }
 4172 void
 4173 MyDialStringRules::traceRules(const char* fmt ...)
 4174 {
 4175     if (faxQueueApp::instance().getTracingLevel() & FAXTRACE_DIALRULES) {
 4176     va_list ap;
 4177     va_start(ap, fmt);
 4178     vlogInfo(fmt, ap);
 4179     va_end(ap);
 4180     }
 4181 }
 4182 
 4183 void
 4184 faxQueueApp::setDialRules(const char* name)
 4185 {
 4186     delete dialRules;
 4187     dialRules = new MyDialStringRules(name);
 4188     dialRules->setVerbose((tracingLevel & FAXTRACE_DIALRULES) != 0);
 4189     /*
 4190      * Setup configuration environment.
 4191      */
 4192     dialRules->def("AreaCode", areaCode);
 4193     dialRules->def("CountryCode", countryCode);
 4194     dialRules->def("LongDistancePrefix", longDistancePrefix);
 4195     dialRules->def("InternationalPrefix", internationalPrefix);
 4196     if (!dialRules->parse()) {
 4197     configError("Parse error in dial string rules \"%s\"", name);
 4198     delete dialRules, dialRules = NULL;
 4199     }
 4200 }
 4201 
 4202 /*
 4203  * Convert a dialing string to a canonical format.
 4204  */
 4205 fxStr
 4206 faxQueueApp::canonicalizePhoneNumber(const fxStr& ds)
 4207 {
 4208     if (dialRules)
 4209     return dialRules->canonicalNumber(ds);
 4210     else
 4211     return ds;
 4212 }
 4213 
 4214 /*
 4215  * Create an appropriate UUCP lock instance.
 4216  */
 4217 UUCPLock*
 4218 faxQueueApp::getUUCPLock(const fxStr& deviceName)
 4219 {
 4220     return UUCPLock::newLock(uucpLockType,
 4221     uucpLockDir, deviceName, uucpLockMode);
 4222 }
 4223 
 4224 u_int faxQueueApp::getTracingLevel() const
 4225     { return tracingLevel; }
 4226 u_int faxQueueApp::getMaxConcurrentCalls() const
 4227     { return maxConcurrentCalls; }
 4228 u_int faxQueueApp::getMaxSendPages() const
 4229     { return maxSendPages; }
 4230 u_int faxQueueApp::getMaxDials() const
 4231     { return maxDials; }
 4232 u_int faxQueueApp::getMaxTries() const
 4233     { return maxTries; }
 4234 time_t faxQueueApp::nextTimeToSend(time_t t) const
 4235     { return tod.nextTimeOfDay(t); }
 4236 
 4237 /*
 4238  * Miscellaneous stuff.
 4239  */
 4240 
 4241 /*
 4242  * Notify the sender of a job that something has
 4243  * happened -- the job has completed, it's been requeued
 4244  * for later processing, etc.
 4245  */
 4246 void
 4247 faxQueueApp::notifySender(Job& job, JobStatus why, const char* duration)
 4248 {
 4249     fxStr cmd(notifyCmd
 4250     | quote |        quoted(job.file) | enquote
 4251     | quote | quoted(Job::jobStatusName(why)) | enquote
 4252     | quote |        quoted(duration) | enquote
 4253     );
 4254     if (why == Job::requeued) {
 4255     /*
 4256      * It's too hard to do localtime in an awk script,
 4257      * so if we may need it, we calculate it here
 4258      * and pass the result as an optional argument.
 4259      */
 4260     char buf[30];
 4261     strftime(buf, sizeof (buf), " '%H:%M'", localtime(&job.tts));
 4262     cmd.append(buf);
 4263     }
 4264     traceServer("NOTIFY: %s", (const char*) cmd);
 4265     runCmd(cmd, true, this);
 4266 }
 4267 
 4268 void
 4269 faxQueueApp::vtraceServer(const char* fmt, va_list ap)
 4270 {
 4271     if (tracingLevel & FAXTRACE_SERVER)
 4272     vlogInfo(fmt, ap);
 4273 }
 4274 
 4275 void
 4276 faxQueueApp::traceServer(const char* fmt ...)
 4277 {
 4278     if (tracingLevel & FAXTRACE_SERVER) {
 4279     va_list ap;
 4280     va_start(ap, fmt);
 4281     vlogInfo(fmt, ap);
 4282     va_end(ap);
 4283     }
 4284 }
 4285 
 4286 static void
 4287 vtraceJob(const Job& job, const char* fmt, va_list ap)
 4288 {
 4289     static const char* stateNames[] = {
 4290         "state#0", "suspended", "pending", "sleeping", "blocked",
 4291     "ready", "active", "done", "failed"
 4292     };
 4293     time_t now = Sys::now();
 4294     vlogInfo(
 4295       "JOB " | job.jobid
 4296     | " (" | stateNames[job.state%9]
 4297     | " dest " | job.dest
 4298     | fxStr::format(" pri %u", job.pri)
 4299     | " tts " | strTime(job.tts - now)
 4300     | " killtime " | strTime(job.killtime - now)
 4301     | "): "
 4302     | fmt, ap);
 4303 }
 4304 
 4305 void
 4306 faxQueueApp::traceQueue(const Job& job, const char* fmt ...)
 4307 {
 4308     if (tracingLevel & FAXTRACE_QUEUEMGMT) {
 4309     va_list ap;
 4310     va_start(ap, fmt);
 4311     vtraceJob(job, fmt, ap);
 4312     va_end(ap);
 4313     }
 4314 }
 4315 
 4316 void
 4317 faxQueueApp::traceJob(const Job& job, const char* fmt ...)
 4318 {
 4319     if (tracingLevel & FAXTRACE_JOBMGMT) {
 4320     va_list ap;
 4321     va_start(ap, fmt);
 4322     vtraceJob(job, fmt, ap);
 4323     va_end(ap);
 4324     }
 4325 }
 4326 
 4327 void
 4328 faxQueueApp::traceQueue(const char* fmt ...)
 4329 {
 4330     if (tracingLevel & FAXTRACE_QUEUEMGMT) {
 4331     va_list ap;
 4332     va_start(ap, fmt);
 4333     vlogInfo(fmt, ap);
 4334     va_end(ap);
 4335     }
 4336 }
 4337 
 4338 void
 4339 faxQueueApp::traceModem(const Modem& modem, const char* fmt ...)
 4340 {
 4341     if (tracingLevel & FAXTRACE_MODEMSTATE) {
 4342     va_list ap;
 4343     va_start(ap, fmt);
 4344     vlogInfo("MODEM " | modem.getDeviceID() | ": " | fmt, ap);
 4345     va_end(ap);
 4346     }
 4347 }
 4348 
 4349 void
 4350 faxQueueApp::jobError(const Job& job, const char* fmt ...)
 4351 {
 4352     va_list ap;
 4353     va_start(ap, fmt);
 4354     vlogError("JOB " | job.jobid | ": " | fmt, ap);
 4355     va_end(ap);
 4356 }
 4357 
 4358 void
 4359 faxQueueApp::showDebugState(void)
 4360 {
 4361     traceServer("DEBUG: Listing destJobs with %d items", destJobs.size());
 4362     for (DestInfoDictIter iter(destJobs); iter.notDone(); iter++)
 4363     {
 4364     const fxStr& dest(iter.key());
 4365     const DestInfo& di(iter.value());
 4366     traceServer("DestInfo (%p) to %s", &di, (const char*)dest);
 4367     }
 4368 
 4369 
 4370     for (int i = 0; i < NQHASH; i++)
 4371     {
 4372     traceServer("DEBUG: runqs[%d](%p) next %p", i, &runqs[i], runqs[i].next);
 4373     for (JobIter iter(runqs[i]); iter.notDone(); iter++)
 4374     {
 4375         Job& job(iter);
 4376         traceJob(job, "In run queue");
 4377     }
 4378     }
 4379 
 4380     traceServer("DEBUG: sleepq(%p) next %p", &sleepq, sleepq.next);
 4381     for (JobIter iter(sleepq); iter.notDone(); iter++)
 4382     {
 4383     Job& job(iter);
 4384     traceJob(job, "In sleep queue");
 4385     }
 4386 
 4387     traceServer("DEBUG: suspendq(%p) next %p", &suspendq, suspendq.next);
 4388     for (JobIter iter(suspendq); iter.notDone(); iter++)
 4389     {
 4390     Job& job(iter);
 4391     traceJob(job, "In suspend queue");
 4392     }
 4393 
 4394     traceServer("DEBUG: activeq(%p) next %p", &activeq, activeq.next);
 4395     for (JobIter iter(activeq); iter.notDone(); iter++)
 4396     {
 4397     Job& job(iter);
 4398     traceJob(job, "In active queue");
 4399     }
 4400 
 4401     traceServer("DEBUG: inSchedule: %s", inSchedule ? "YES" : "NO");
 4402 }
 4403 
 4404 
 4405 void faxQueueApp::childStatus(pid_t pid, int status)
 4406 {
 4407     // We don't do anything here - nothing to act on.
 4408     //traceServer("Child exit status: %#o (%u)", status, pid);
 4409 }
 4410 
 4411 static void
 4412 usage(const char* appName)
 4413 {
 4414     faxApp::fatal("usage: %s [-q queue-directory] [-D]", appName);
 4415 }
 4416 
 4417 static void
 4418 sigCleanup(int)
 4419 {
 4420     faxQueueApp::instance().close();
 4421     _exit(-1);
 4422 }
 4423 
 4424 int
 4425 main(int argc, char** argv)
 4426 {
 4427     faxApp::setupLogging("FaxQueuer");
 4428 
 4429     fxStr appName = argv[0];
 4430     u_int l = appName.length();
 4431     appName = appName.tokenR(l, '/');
 4432 
 4433     faxApp::setupPermissions();
 4434 
 4435     faxApp::setOpts("q:Dc:");
 4436 
 4437     bool detach = true;
 4438     fxStr queueDir(FAX_SPOOLDIR);
 4439     for (GetoptIter iter(argc, argv, faxApp::getOpts()); iter.notDone(); iter++)
 4440     switch (iter.option()) {
 4441     case 'q': queueDir = iter.optArg(); break;
 4442     case 'D': detach = false; break;
 4443     case '?': usage(appName);
 4444     }
 4445     if (Sys::chdir(queueDir) < 0)
 4446     faxApp::fatal(queueDir | ": Can not change directory");
 4447     if (!Sys::isRegularFile(FAX_ETCDIR "/setup.cache"))
 4448     faxApp::fatal("No " FAX_ETCDIR "/setup.cache file; run faxsetup first");
 4449     if (detach)
 4450     faxApp::detachFromTTY();
 4451 
 4452     faxQueueApp* app = new faxQueueApp;
 4453 
 4454     signal(SIGTERM, fxSIGHANDLER(sigCleanup));
 4455     signal(SIGINT, fxSIGHANDLER(sigCleanup));
 4456 
 4457     app->initialize(argc, argv);
 4458     app->open();
 4459     while (app->isRunning())
 4460     Dispatcher::instance().dispatch();
 4461     app->close();
 4462     delete app;
 4463 
 4464 
 4465     Modem::CLEANUP();
 4466     delete &Dispatcher::instance();
 4467     
 4468     return 0;
 4469 }
 4470