"Fossies" - the Fresh Open Source Software Archive

Member "Atom/resources/app/apm/node_modules/npm/node_modules/readable-stream/lib/_stream_writable.js" (11 Apr 2017, 15062 Bytes) of package /windows/misc/atom-windows.zip:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Javascript source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file.

    1 // A bit simpler than readable streams.
    2 // Implement an async ._write(chunk, encoding, cb), and it'll handle all
    3 // the drain event emission and buffering.
    4 
    5 'use strict';
    6 
    7 module.exports = Writable;
    8 
    9 /*<replacement>*/
   10 var processNextTick = require('process-nextick-args');
   11 /*</replacement>*/
   12 
   13 /*<replacement>*/
   14 var asyncWrite = !process.browser && ['v0.10', 'v0.9.'].indexOf(process.version.slice(0, 5)) > -1 ? setImmediate : processNextTick;
   15 /*</replacement>*/
   16 
   17 Writable.WritableState = WritableState;
   18 
   19 /*<replacement>*/
   20 var util = require('core-util-is');
   21 util.inherits = require('inherits');
   22 /*</replacement>*/
   23 
   24 /*<replacement>*/
   25 var internalUtil = {
   26   deprecate: require('util-deprecate')
   27 };
   28 /*</replacement>*/
   29 
   30 /*<replacement>*/
   31 var Stream;
   32 (function () {
   33   try {
   34     Stream = require('st' + 'ream');
   35   } catch (_) {} finally {
   36     if (!Stream) Stream = require('events').EventEmitter;
   37   }
   38 })();
   39 /*</replacement>*/
   40 
   41 var Buffer = require('buffer').Buffer;
   42 /*<replacement>*/
   43 var bufferShim = require('buffer-shims');
   44 /*</replacement>*/
   45 
   46 util.inherits(Writable, Stream);
   47 
   48 function nop() {}
   49 
   50 function WriteReq(chunk, encoding, cb) {
   51   this.chunk = chunk;
   52   this.encoding = encoding;
   53   this.callback = cb;
   54   this.next = null;
   55 }
   56 
   57 var Duplex;
   58 function WritableState(options, stream) {
   59   Duplex = Duplex || require('./_stream_duplex');
   60 
   61   options = options || {};
   62 
   63   // object stream flag to indicate whether or not this stream
   64   // contains buffers or objects.
   65   this.objectMode = !!options.objectMode;
   66 
   67   if (stream instanceof Duplex) this.objectMode = this.objectMode || !!options.writableObjectMode;
   68 
   69   // the point at which write() starts returning false
   70   // Note: 0 is a valid value, means that we always return false if
   71   // the entire buffer is not flushed immediately on write()
   72   var hwm = options.highWaterMark;
   73   var defaultHwm = this.objectMode ? 16 : 16 * 1024;
   74   this.highWaterMark = hwm || hwm === 0 ? hwm : defaultHwm;
   75 
   76   // cast to ints.
   77   this.highWaterMark = ~ ~this.highWaterMark;
   78 
   79   this.needDrain = false;
   80   // at the start of calling end()
   81   this.ending = false;
   82   // when end() has been called, and returned
   83   this.ended = false;
   84   // when 'finish' is emitted
   85   this.finished = false;
   86 
   87   // should we decode strings into buffers before passing to _write?
   88   // this is here so that some node-core streams can optimize string
   89   // handling at a lower level.
   90   var noDecode = options.decodeStrings === false;
   91   this.decodeStrings = !noDecode;
   92 
   93   // Crypto is kind of old and crusty.  Historically, its default string
   94   // encoding is 'binary' so we have to make this configurable.
   95   // Everything else in the universe uses 'utf8', though.
   96   this.defaultEncoding = options.defaultEncoding || 'utf8';
   97 
   98   // not an actual buffer we keep track of, but a measurement
   99   // of how much we're waiting to get pushed to some underlying
  100   // socket or file.
  101   this.length = 0;
  102 
  103   // a flag to see when we're in the middle of a write.
  104   this.writing = false;
  105 
  106   // when true all writes will be buffered until .uncork() call
  107   this.corked = 0;
  108 
  109   // a flag to be able to tell if the onwrite cb is called immediately,
  110   // or on a later tick.  We set this to true at first, because any
  111   // actions that shouldn't happen until "later" should generally also
  112   // not happen before the first write call.
  113   this.sync = true;
  114 
  115   // a flag to know if we're processing previously buffered items, which
  116   // may call the _write() callback in the same tick, so that we don't
  117   // end up in an overlapped onwrite situation.
  118   this.bufferProcessing = false;
  119 
  120   // the callback that's passed to _write(chunk,cb)
  121   this.onwrite = function (er) {
  122     onwrite(stream, er);
  123   };
  124 
  125   // the callback that the user supplies to write(chunk,encoding,cb)
  126   this.writecb = null;
  127 
  128   // the amount that is being written when _write is called.
  129   this.writelen = 0;
  130 
  131   this.bufferedRequest = null;
  132   this.lastBufferedRequest = null;
  133 
  134   // number of pending user-supplied write callbacks
  135   // this must be 0 before 'finish' can be emitted
  136   this.pendingcb = 0;
  137 
  138   // emit prefinish if the only thing we're waiting for is _write cbs
  139   // This is relevant for synchronous Transform streams
  140   this.prefinished = false;
  141 
  142   // True if the error was already emitted and should not be thrown again
  143   this.errorEmitted = false;
  144 
  145   // count buffered requests
  146   this.bufferedRequestCount = 0;
  147 
  148   // allocate the first CorkedRequest, there is always
  149   // one allocated and free to use, and we maintain at most two
  150   this.corkedRequestsFree = new CorkedRequest(this);
  151 }
  152 
  153 WritableState.prototype.getBuffer = function writableStateGetBuffer() {
  154   var current = this.bufferedRequest;
  155   var out = [];
  156   while (current) {
  157     out.push(current);
  158     current = current.next;
  159   }
  160   return out;
  161 };
  162 
  163 (function () {
  164   try {
  165     Object.defineProperty(WritableState.prototype, 'buffer', {
  166       get: internalUtil.deprecate(function () {
  167         return this.getBuffer();
  168       }, '_writableState.buffer is deprecated. Use _writableState.getBuffer ' + 'instead.')
  169     });
  170   } catch (_) {}
  171 })();
  172 
  173 var Duplex;
  174 function Writable(options) {
  175   Duplex = Duplex || require('./_stream_duplex');
  176 
  177   // Writable ctor is applied to Duplexes, though they're not
  178   // instanceof Writable, they're instanceof Readable.
  179   if (!(this instanceof Writable) && !(this instanceof Duplex)) return new Writable(options);
  180 
  181   this._writableState = new WritableState(options, this);
  182 
  183   // legacy.
  184   this.writable = true;
  185 
  186   if (options) {
  187     if (typeof options.write === 'function') this._write = options.write;
  188 
  189     if (typeof options.writev === 'function') this._writev = options.writev;
  190   }
  191 
  192   Stream.call(this);
  193 }
  194 
  195 // Otherwise people can pipe Writable streams, which is just wrong.
  196 Writable.prototype.pipe = function () {
  197   this.emit('error', new Error('Cannot pipe, not readable'));
  198 };
  199 
  200 function writeAfterEnd(stream, cb) {
  201   var er = new Error('write after end');
  202   // TODO: defer error events consistently everywhere, not just the cb
  203   stream.emit('error', er);
  204   processNextTick(cb, er);
  205 }
  206 
  207 // If we get something that is not a buffer, string, null, or undefined,
  208 // and we're not in objectMode, then that's an error.
  209 // Otherwise stream chunks are all considered to be of length=1, and the
  210 // watermarks determine how many objects to keep in the buffer, rather than
  211 // how many bytes or characters.
  212 function validChunk(stream, state, chunk, cb) {
  213   var valid = true;
  214   var er = false;
  215   // Always throw error if a null is written
  216   // if we are not in object mode then throw
  217   // if it is not a buffer, string, or undefined.
  218   if (chunk === null) {
  219     er = new TypeError('May not write null values to stream');
  220   } else if (!Buffer.isBuffer(chunk) && typeof chunk !== 'string' && chunk !== undefined && !state.objectMode) {
  221     er = new TypeError('Invalid non-string/buffer chunk');
  222   }
  223   if (er) {
  224     stream.emit('error', er);
  225     processNextTick(cb, er);
  226     valid = false;
  227   }
  228   return valid;
  229 }
  230 
  231 Writable.prototype.write = function (chunk, encoding, cb) {
  232   var state = this._writableState;
  233   var ret = false;
  234 
  235   if (typeof encoding === 'function') {
  236     cb = encoding;
  237     encoding = null;
  238   }
  239 
  240   if (Buffer.isBuffer(chunk)) encoding = 'buffer';else if (!encoding) encoding = state.defaultEncoding;
  241 
  242   if (typeof cb !== 'function') cb = nop;
  243 
  244   if (state.ended) writeAfterEnd(this, cb);else if (validChunk(this, state, chunk, cb)) {
  245     state.pendingcb++;
  246     ret = writeOrBuffer(this, state, chunk, encoding, cb);
  247   }
  248 
  249   return ret;
  250 };
  251 
  252 Writable.prototype.cork = function () {
  253   var state = this._writableState;
  254 
  255   state.corked++;
  256 };
  257 
  258 Writable.prototype.uncork = function () {
  259   var state = this._writableState;
  260 
  261   if (state.corked) {
  262     state.corked--;
  263 
  264     if (!state.writing && !state.corked && !state.finished && !state.bufferProcessing && state.bufferedRequest) clearBuffer(this, state);
  265   }
  266 };
  267 
  268 Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
  269   // node::ParseEncoding() requires lower case.
  270   if (typeof encoding === 'string') encoding = encoding.toLowerCase();
  271   if (!(['hex', 'utf8', 'utf-8', 'ascii', 'binary', 'base64', 'ucs2', 'ucs-2', 'utf16le', 'utf-16le', 'raw'].indexOf((encoding + '').toLowerCase()) > -1)) throw new TypeError('Unknown encoding: ' + encoding);
  272   this._writableState.defaultEncoding = encoding;
  273   return this;
  274 };
  275 
  276 function decodeChunk(state, chunk, encoding) {
  277   if (!state.objectMode && state.decodeStrings !== false && typeof chunk === 'string') {
  278     chunk = bufferShim.from(chunk, encoding);
  279   }
  280   return chunk;
  281 }
  282 
  283 // if we're already writing something, then just put this
  284 // in the queue, and wait our turn.  Otherwise, call _write
  285 // If we return false, then we need a drain event, so set that flag.
  286 function writeOrBuffer(stream, state, chunk, encoding, cb) {
  287   chunk = decodeChunk(state, chunk, encoding);
  288 
  289   if (Buffer.isBuffer(chunk)) encoding = 'buffer';
  290   var len = state.objectMode ? 1 : chunk.length;
  291 
  292   state.length += len;
  293 
  294   var ret = state.length < state.highWaterMark;
  295   // we must ensure that previous needDrain will not be reset to false.
  296   if (!ret) state.needDrain = true;
  297 
  298   if (state.writing || state.corked) {
  299     var last = state.lastBufferedRequest;
  300     state.lastBufferedRequest = new WriteReq(chunk, encoding, cb);
  301     if (last) {
  302       last.next = state.lastBufferedRequest;
  303     } else {
  304       state.bufferedRequest = state.lastBufferedRequest;
  305     }
  306     state.bufferedRequestCount += 1;
  307   } else {
  308     doWrite(stream, state, false, len, chunk, encoding, cb);
  309   }
  310 
  311   return ret;
  312 }
  313 
  314 function doWrite(stream, state, writev, len, chunk, encoding, cb) {
  315   state.writelen = len;
  316   state.writecb = cb;
  317   state.writing = true;
  318   state.sync = true;
  319   if (writev) stream._writev(chunk, state.onwrite);else stream._write(chunk, encoding, state.onwrite);
  320   state.sync = false;
  321 }
  322 
  323 function onwriteError(stream, state, sync, er, cb) {
  324   --state.pendingcb;
  325   if (sync) processNextTick(cb, er);else cb(er);
  326 
  327   stream._writableState.errorEmitted = true;
  328   stream.emit('error', er);
  329 }
  330 
  331 function onwriteStateUpdate(state) {
  332   state.writing = false;
  333   state.writecb = null;
  334   state.length -= state.writelen;
  335   state.writelen = 0;
  336 }
  337 
  338 function onwrite(stream, er) {
  339   var state = stream._writableState;
  340   var sync = state.sync;
  341   var cb = state.writecb;
  342 
  343   onwriteStateUpdate(state);
  344 
  345   if (er) onwriteError(stream, state, sync, er, cb);else {
  346     // Check if we're actually ready to finish, but don't emit yet
  347     var finished = needFinish(state);
  348 
  349     if (!finished && !state.corked && !state.bufferProcessing && state.bufferedRequest) {
  350       clearBuffer(stream, state);
  351     }
  352 
  353     if (sync) {
  354       /*<replacement>*/
  355       asyncWrite(afterWrite, stream, state, finished, cb);
  356       /*</replacement>*/
  357     } else {
  358         afterWrite(stream, state, finished, cb);
  359       }
  360   }
  361 }
  362 
  363 function afterWrite(stream, state, finished, cb) {
  364   if (!finished) onwriteDrain(stream, state);
  365   state.pendingcb--;
  366   cb();
  367   finishMaybe(stream, state);
  368 }
  369 
  370 // Must force callback to be called on nextTick, so that we don't
  371 // emit 'drain' before the write() consumer gets the 'false' return
  372 // value, and has a chance to attach a 'drain' listener.
  373 function onwriteDrain(stream, state) {
  374   if (state.length === 0 && state.needDrain) {
  375     state.needDrain = false;
  376     stream.emit('drain');
  377   }
  378 }
  379 
  380 // if there's something in the buffer waiting, then process it
  381 function clearBuffer(stream, state) {
  382   state.bufferProcessing = true;
  383   var entry = state.bufferedRequest;
  384 
  385   if (stream._writev && entry && entry.next) {
  386     // Fast case, write everything using _writev()
  387     var l = state.bufferedRequestCount;
  388     var buffer = new Array(l);
  389     var holder = state.corkedRequestsFree;
  390     holder.entry = entry;
  391 
  392     var count = 0;
  393     while (entry) {
  394       buffer[count] = entry;
  395       entry = entry.next;
  396       count += 1;
  397     }
  398 
  399     doWrite(stream, state, true, state.length, buffer, '', holder.finish);
  400 
  401     // doWrite is almost always async, defer these to save a bit of time
  402     // as the hot path ends with doWrite
  403     state.pendingcb++;
  404     state.lastBufferedRequest = null;
  405     if (holder.next) {
  406       state.corkedRequestsFree = holder.next;
  407       holder.next = null;
  408     } else {
  409       state.corkedRequestsFree = new CorkedRequest(state);
  410     }
  411   } else {
  412     // Slow case, write chunks one-by-one
  413     while (entry) {
  414       var chunk = entry.chunk;
  415       var encoding = entry.encoding;
  416       var cb = entry.callback;
  417       var len = state.objectMode ? 1 : chunk.length;
  418 
  419       doWrite(stream, state, false, len, chunk, encoding, cb);
  420       entry = entry.next;
  421       // if we didn't call the onwrite immediately, then
  422       // it means that we need to wait until it does.
  423       // also, that means that the chunk and cb are currently
  424       // being processed, so move the buffer counter past them.
  425       if (state.writing) {
  426         break;
  427       }
  428     }
  429 
  430     if (entry === null) state.lastBufferedRequest = null;
  431   }
  432 
  433   state.bufferedRequestCount = 0;
  434   state.bufferedRequest = entry;
  435   state.bufferProcessing = false;
  436 }
  437 
  438 Writable.prototype._write = function (chunk, encoding, cb) {
  439   cb(new Error('not implemented'));
  440 };
  441 
  442 Writable.prototype._writev = null;
  443 
  444 Writable.prototype.end = function (chunk, encoding, cb) {
  445   var state = this._writableState;
  446 
  447   if (typeof chunk === 'function') {
  448     cb = chunk;
  449     chunk = null;
  450     encoding = null;
  451   } else if (typeof encoding === 'function') {
  452     cb = encoding;
  453     encoding = null;
  454   }
  455 
  456   if (chunk !== null && chunk !== undefined) this.write(chunk, encoding);
  457 
  458   // .end() fully uncorks
  459   if (state.corked) {
  460     state.corked = 1;
  461     this.uncork();
  462   }
  463 
  464   // ignore unnecessary end() calls.
  465   if (!state.ending && !state.finished) endWritable(this, state, cb);
  466 };
  467 
  468 function needFinish(state) {
  469   return state.ending && state.length === 0 && state.bufferedRequest === null && !state.finished && !state.writing;
  470 }
  471 
  472 function prefinish(stream, state) {
  473   if (!state.prefinished) {
  474     state.prefinished = true;
  475     stream.emit('prefinish');
  476   }
  477 }
  478 
  479 function finishMaybe(stream, state) {
  480   var need = needFinish(state);
  481   if (need) {
  482     if (state.pendingcb === 0) {
  483       prefinish(stream, state);
  484       state.finished = true;
  485       stream.emit('finish');
  486     } else {
  487       prefinish(stream, state);
  488     }
  489   }
  490   return need;
  491 }
  492 
  493 function endWritable(stream, state, cb) {
  494   state.ending = true;
  495   finishMaybe(stream, state);
  496   if (cb) {
  497     if (state.finished) processNextTick(cb);else stream.once('finish', cb);
  498   }
  499   state.ended = true;
  500   stream.writable = false;
  501 }
  502 
  503 // It seems a linked list but it is not
  504 // there will be only 2 of these for each stream
  505 function CorkedRequest(state) {
  506   var _this = this;
  507 
  508   this.next = null;
  509   this.entry = null;
  510 
  511   this.finish = function (err) {
  512     var entry = _this.entry;
  513     _this.entry = null;
  514     while (entry) {
  515       var cb = entry.callback;
  516       state.pendingcb--;
  517       cb(err);
  518       entry = entry.next;
  519     }
  520     if (state.corkedRequestsFree) {
  521       state.corkedRequestsFree.next = _this;
  522     } else {
  523       state.corkedRequestsFree = _this;
  524     }
  525   };
  526 }