"Fossies" - the Fresh Open Source Software Archive

Member "Atom/resources/app/apm/node_modules/readable-stream/lib/_stream_writable.js" (8 Mar 2017, 13069 Bytes) of archive /windows/misc/atom-windows.zip:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Javascript source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file.

    1 // Copyright Joyent, Inc. and other Node contributors.
    2 //
    3 // Permission is hereby granted, free of charge, to any person obtaining a
    4 // copy of this software and associated documentation files (the
    5 // "Software"), to deal in the Software without restriction, including
    6 // without limitation the rights to use, copy, modify, merge, publish,
    7 // distribute, sublicense, and/or sell copies of the Software, and to permit
    8 // persons to whom the Software is furnished to do so, subject to the
    9 // following conditions:
   10 //
   11 // The above copyright notice and this permission notice shall be included
   12 // in all copies or substantial portions of the Software.
   13 //
   14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
   15 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
   16 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
   17 // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
   18 // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
   19 // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
   20 // USE OR OTHER DEALINGS IN THE SOFTWARE.
   21 
   22 // A bit simpler than readable streams.
   23 // Implement an async ._write(chunk, cb), and it'll handle all
   24 // the drain event emission and buffering.
   25 
   26 module.exports = Writable;
   27 
   28 /*<replacement>*/
   29 var Buffer = require('buffer').Buffer;
   30 /*</replacement>*/
   31 
   32 Writable.WritableState = WritableState;
   33 
   34 
   35 /*<replacement>*/
   36 var util = require('core-util-is');
   37 util.inherits = require('inherits');
   38 /*</replacement>*/
   39 
   40 var Stream = require('stream');
   41 
   42 util.inherits(Writable, Stream);
   43 
   44 function WriteReq(chunk, encoding, cb) {
   45   this.chunk = chunk;
   46   this.encoding = encoding;
   47   this.callback = cb;
   48 }
   49 
   50 function WritableState(options, stream) {
   51   var Duplex = require('./_stream_duplex');
   52 
   53   options = options || {};
   54 
   55   // the point at which write() starts returning false
   56   // Note: 0 is a valid value, means that we always return false if
   57   // the entire buffer is not flushed immediately on write()
   58   var hwm = options.highWaterMark;
   59   var defaultHwm = options.objectMode ? 16 : 16 * 1024;
   60   this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;
   61 
   62   // object stream flag to indicate whether or not this stream
   63   // contains buffers or objects.
   64   this.objectMode = !!options.objectMode;
   65 
   66   if (stream instanceof Duplex)
   67     this.objectMode = this.objectMode || !!options.writableObjectMode;
   68 
   69   // cast to ints.
   70   this.highWaterMark = ~~this.highWaterMark;
   71 
   72   this.needDrain = false;
   73   // at the start of calling end()
   74   this.ending = false;
   75   // when end() has been called, and returned
   76   this.ended = false;
   77   // when 'finish' is emitted
   78   this.finished = false;
   79 
   80   // should we decode strings into buffers before passing to _write?
   81   // this is here so that some node-core streams can optimize string
   82   // handling at a lower level.
   83   var noDecode = options.decodeStrings === false;
   84   this.decodeStrings = !noDecode;
   85 
   86   // Crypto is kind of old and crusty.  Historically, its default string
   87   // encoding is 'binary' so we have to make this configurable.
   88   // Everything else in the universe uses 'utf8', though.
   89   this.defaultEncoding = options.defaultEncoding || 'utf8';
   90 
   91   // not an actual buffer we keep track of, but a measurement
   92   // of how much we're waiting to get pushed to some underlying
   93   // socket or file.
   94   this.length = 0;
   95 
   96   // a flag to see when we're in the middle of a write.
   97   this.writing = false;
   98 
   99   // when true all writes will be buffered until .uncork() call
  100   this.corked = 0;
  101 
  102   // a flag to be able to tell if the onwrite cb is called immediately,
  103   // or on a later tick.  We set this to true at first, because any
  104   // actions that shouldn't happen until "later" should generally also
  105   // not happen before the first write call.
  106   this.sync = true;
  107 
  108   // a flag to know if we're processing previously buffered items, which
  109   // may call the _write() callback in the same tick, so that we don't
  110   // end up in an overlapped onwrite situation.
  111   this.bufferProcessing = false;
  112 
  113   // the callback that's passed to _write(chunk,cb)
  114   this.onwrite = function(er) {
  115     onwrite(stream, er);
  116   };
  117 
  118   // the callback that the user supplies to write(chunk,encoding,cb)
  119   this.writecb = null;
  120 
  121   // the amount that is being written when _write is called.
  122   this.writelen = 0;
  123 
  124   this.buffer = [];
  125 
  126   // number of pending user-supplied write callbacks
  127   // this must be 0 before 'finish' can be emitted
  128   this.pendingcb = 0;
  129 
  130   // emit prefinish if the only thing we're waiting for is _write cbs
  131   // This is relevant for synchronous Transform streams
  132   this.prefinished = false;
  133 
  134   // True if the error was already emitted and should not be thrown again
  135   this.errorEmitted = false;
  136 }
  137 
  138 function Writable(options) {
  139   var Duplex = require('./_stream_duplex');
  140 
  141   // Writable ctor is applied to Duplexes, though they're not
  142   // instanceof Writable, they're instanceof Readable.
  143   if (!(this instanceof Writable) && !(this instanceof Duplex))
  144     return new Writable(options);
  145 
  146   this._writableState = new WritableState(options, this);
  147 
  148   // legacy.
  149   this.writable = true;
  150 
  151   Stream.call(this);
  152 }
  153 
  154 // Otherwise people can pipe Writable streams, which is just wrong.
  155 Writable.prototype.pipe = function() {
  156   this.emit('error', new Error('Cannot pipe. Not readable.'));
  157 };
  158 
  159 
  160 function writeAfterEnd(stream, state, cb) {
  161   var er = new Error('write after end');
  162   // TODO: defer error events consistently everywhere, not just the cb
  163   stream.emit('error', er);
  164   process.nextTick(function() {
  165     cb(er);
  166   });
  167 }
  168 
  169 // If we get something that is not a buffer, string, null, or undefined,
  170 // and we're not in objectMode, then that's an error.
  171 // Otherwise stream chunks are all considered to be of length=1, and the
  172 // watermarks determine how many objects to keep in the buffer, rather than
  173 // how many bytes or characters.
  174 function validChunk(stream, state, chunk, cb) {
  175   var valid = true;
  176   if (!util.isBuffer(chunk) &&
  177       !util.isString(chunk) &&
  178       !util.isNullOrUndefined(chunk) &&
  179       !state.objectMode) {
  180     var er = new TypeError('Invalid non-string/buffer chunk');
  181     stream.emit('error', er);
  182     process.nextTick(function() {
  183       cb(er);
  184     });
  185     valid = false;
  186   }
  187   return valid;
  188 }
  189 
  190 Writable.prototype.write = function(chunk, encoding, cb) {
  191   var state = this._writableState;
  192   var ret = false;
  193 
  194   if (util.isFunction(encoding)) {
  195     cb = encoding;
  196     encoding = null;
  197   }
  198 
  199   if (util.isBuffer(chunk))
  200     encoding = 'buffer';
  201   else if (!encoding)
  202     encoding = state.defaultEncoding;
  203 
  204   if (!util.isFunction(cb))
  205     cb = function() {};
  206 
  207   if (state.ended)
  208     writeAfterEnd(this, state, cb);
  209   else if (validChunk(this, state, chunk, cb)) {
  210     state.pendingcb++;
  211     ret = writeOrBuffer(this, state, chunk, encoding, cb);
  212   }
  213 
  214   return ret;
  215 };
  216 
  217 Writable.prototype.cork = function() {
  218   var state = this._writableState;
  219 
  220   state.corked++;
  221 };
  222 
  223 Writable.prototype.uncork = function() {
  224   var state = this._writableState;
  225 
  226   if (state.corked) {
  227     state.corked--;
  228 
  229     if (!state.writing &&
  230         !state.corked &&
  231         !state.finished &&
  232         !state.bufferProcessing &&
  233         state.buffer.length)
  234       clearBuffer(this, state);
  235   }
  236 };
  237 
  238 function decodeChunk(state, chunk, encoding) {
  239   if (!state.objectMode &&
  240       state.decodeStrings !== false &&
  241       util.isString(chunk)) {
  242     chunk = new Buffer(chunk, encoding);
  243   }
  244   return chunk;
  245 }
  246 
  247 // if we're already writing something, then just put this
  248 // in the queue, and wait our turn.  Otherwise, call _write
  249 // If we return false, then we need a drain event, so set that flag.
  250 function writeOrBuffer(stream, state, chunk, encoding, cb) {
  251   chunk = decodeChunk(state, chunk, encoding);
  252   if (util.isBuffer(chunk))
  253     encoding = 'buffer';
  254   var len = state.objectMode ? 1 : chunk.length;
  255 
  256   state.length += len;
  257 
  258   var ret = state.length < state.highWaterMark;
  259   // we must ensure that previous needDrain will not be reset to false.
  260   if (!ret)
  261     state.needDrain = true;
  262 
  263   if (state.writing || state.corked)
  264     state.buffer.push(new WriteReq(chunk, encoding, cb));
  265   else
  266     doWrite(stream, state, false, len, chunk, encoding, cb);
  267 
  268   return ret;
  269 }
  270 
  271 function doWrite(stream, state, writev, len, chunk, encoding, cb) {
  272   state.writelen = len;
  273   state.writecb = cb;
  274   state.writing = true;
  275   state.sync = true;
  276   if (writev)
  277     stream._writev(chunk, state.onwrite);
  278   else
  279     stream._write(chunk, encoding, state.onwrite);
  280   state.sync = false;
  281 }
  282 
  283 function onwriteError(stream, state, sync, er, cb) {
  284   if (sync)
  285     process.nextTick(function() {
  286       state.pendingcb--;
  287       cb(er);
  288     });
  289   else {
  290     state.pendingcb--;
  291     cb(er);
  292   }
  293 
  294   stream._writableState.errorEmitted = true;
  295   stream.emit('error', er);
  296 }
  297 
  298 function onwriteStateUpdate(state) {
  299   state.writing = false;
  300   state.writecb = null;
  301   state.length -= state.writelen;
  302   state.writelen = 0;
  303 }
  304 
  305 function onwrite(stream, er) {
  306   var state = stream._writableState;
  307   var sync = state.sync;
  308   var cb = state.writecb;
  309 
  310   onwriteStateUpdate(state);
  311 
  312   if (er)
  313     onwriteError(stream, state, sync, er, cb);
  314   else {
  315     // Check if we're actually ready to finish, but don't emit yet
  316     var finished = needFinish(stream, state);
  317 
  318     if (!finished &&
  319         !state.corked &&
  320         !state.bufferProcessing &&
  321         state.buffer.length) {
  322       clearBuffer(stream, state);
  323     }
  324 
  325     if (sync) {
  326       process.nextTick(function() {
  327         afterWrite(stream, state, finished, cb);
  328       });
  329     } else {
  330       afterWrite(stream, state, finished, cb);
  331     }
  332   }
  333 }
  334 
  335 function afterWrite(stream, state, finished, cb) {
  336   if (!finished)
  337     onwriteDrain(stream, state);
  338   state.pendingcb--;
  339   cb();
  340   finishMaybe(stream, state);
  341 }
  342 
  343 // Must force callback to be called on nextTick, so that we don't
  344 // emit 'drain' before the write() consumer gets the 'false' return
  345 // value, and has a chance to attach a 'drain' listener.
  346 function onwriteDrain(stream, state) {
  347   if (state.length === 0 && state.needDrain) {
  348     state.needDrain = false;
  349     stream.emit('drain');
  350   }
  351 }
  352 
  353 
  354 // if there's something in the buffer waiting, then process it
  355 function clearBuffer(stream, state) {
  356   state.bufferProcessing = true;
  357 
  358   if (stream._writev && state.buffer.length > 1) {
  359     // Fast case, write everything using _writev()
  360     var cbs = [];
  361     for (var c = 0; c < state.buffer.length; c++)
  362       cbs.push(state.buffer[c].callback);
  363 
  364     // count the one we are adding, as well.
  365     // TODO(isaacs) clean this up
  366     state.pendingcb++;
  367     doWrite(stream, state, true, state.length, state.buffer, '', function(err) {
  368       for (var i = 0; i < cbs.length; i++) {
  369         state.pendingcb--;
  370         cbs[i](err);
  371       }
  372     });
  373 
  374     // Clear buffer
  375     state.buffer = [];
  376   } else {
  377     // Slow case, write chunks one-by-one
  378     for (var c = 0; c < state.buffer.length; c++) {
  379       var entry = state.buffer[c];
  380       var chunk = entry.chunk;
  381       var encoding = entry.encoding;
  382       var cb = entry.callback;
  383       var len = state.objectMode ? 1 : chunk.length;
  384 
  385       doWrite(stream, state, false, len, chunk, encoding, cb);
  386 
  387       // if we didn't call the onwrite immediately, then
  388       // it means that we need to wait until it does.
  389       // also, that means that the chunk and cb are currently
  390       // being processed, so move the buffer counter past them.
  391       if (state.writing) {
  392         c++;
  393         break;
  394       }
  395     }
  396 
  397     if (c < state.buffer.length)
  398       state.buffer = state.buffer.slice(c);
  399     else
  400       state.buffer.length = 0;
  401   }
  402 
  403   state.bufferProcessing = false;
  404 }
  405 
  406 Writable.prototype._write = function(chunk, encoding, cb) {
  407   cb(new Error('not implemented'));
  408 
  409 };
  410 
  411 Writable.prototype._writev = null;
  412 
  413 Writable.prototype.end = function(chunk, encoding, cb) {
  414   var state = this._writableState;
  415 
  416   if (util.isFunction(chunk)) {
  417     cb = chunk;
  418     chunk = null;
  419     encoding = null;
  420   } else if (util.isFunction(encoding)) {
  421     cb = encoding;
  422     encoding = null;
  423   }
  424 
  425   if (!util.isNullOrUndefined(chunk))
  426     this.write(chunk, encoding);
  427 
  428   // .end() fully uncorks
  429   if (state.corked) {
  430     state.corked = 1;
  431     this.uncork();
  432   }
  433 
  434   // ignore unnecessary end() calls.
  435   if (!state.ending && !state.finished)
  436     endWritable(this, state, cb);
  437 };
  438 
  439 
  440 function needFinish(stream, state) {
  441   return (state.ending &&
  442           state.length === 0 &&
  443           !state.finished &&
  444           !state.writing);
  445 }
  446 
  447 function prefinish(stream, state) {
  448   if (!state.prefinished) {
  449     state.prefinished = true;
  450     stream.emit('prefinish');
  451   }
  452 }
  453 
  454 function finishMaybe(stream, state) {
  455   var need = needFinish(stream, state);
  456   if (need) {
  457     if (state.pendingcb === 0) {
  458       prefinish(stream, state);
  459       state.finished = true;
  460       stream.emit('finish');
  461     } else
  462       prefinish(stream, state);
  463   }
  464   return need;
  465 }
  466 
  467 function endWritable(stream, state, cb) {
  468   state.ending = true;
  469   finishMaybe(stream, state);
  470   if (cb) {
  471     if (state.finished)
  472       process.nextTick(cb);
  473     else
  474       stream.once('finish', cb);
  475   }
  476   state.ended = true;
  477 }