"Fossies" - the Fresh Open Source Software Archive

Member "Atom/resources/app/apm/node_modules/npm/node_modules/readable-stream/lib/_stream_readable.js" (7 Feb 2017, 26288 Bytes) of archive /windows/misc/atom-windows.zip:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Javascript source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file.

    1 'use strict';
    2 
    3 module.exports = Readable;
    4 
    5 /*<replacement>*/
    6 var processNextTick = require('process-nextick-args');
    7 /*</replacement>*/
    8 
    9 /*<replacement>*/
   10 var isArray = require('isarray');
   11 /*</replacement>*/
   12 
   13 Readable.ReadableState = ReadableState;
   14 
   15 /*<replacement>*/
   16 var EE = require('events').EventEmitter;
   17 
   18 var EElistenerCount = function (emitter, type) {
   19   return emitter.listeners(type).length;
   20 };
   21 /*</replacement>*/
   22 
   23 /*<replacement>*/
   24 var Stream;
   25 (function () {
   26   try {
   27     Stream = require('st' + 'ream');
   28   } catch (_) {} finally {
   29     if (!Stream) Stream = require('events').EventEmitter;
   30   }
   31 })();
   32 /*</replacement>*/
   33 
   34 var Buffer = require('buffer').Buffer;
   35 /*<replacement>*/
   36 var bufferShim = require('buffer-shims');
   37 /*</replacement>*/
   38 
   39 /*<replacement>*/
   40 var util = require('core-util-is');
   41 util.inherits = require('inherits');
   42 /*</replacement>*/
   43 
   44 /*<replacement>*/
   45 var debugUtil = require('util');
   46 var debug = void 0;
   47 if (debugUtil && debugUtil.debuglog) {
   48   debug = debugUtil.debuglog('stream');
   49 } else {
   50   debug = function () {};
   51 }
   52 /*</replacement>*/
   53 
   54 var StringDecoder;
   55 
   56 util.inherits(Readable, Stream);
   57 
   58 var hasPrependListener = typeof EE.prototype.prependListener === 'function';
   59 
   60 function prependListener(emitter, event, fn) {
   61   if (hasPrependListener) return emitter.prependListener(event, fn);
   62 
   63   // This is a brutally ugly hack to make sure that our error handler
   64   // is attached before any userland ones.  NEVER DO THIS. This is here
   65   // only because this code needs to continue to work with older versions
   66   // of Node.js that do not include the prependListener() method. The goal
   67   // is to eventually remove this hack.
   68   if (!emitter._events || !emitter._events[event]) emitter.on(event, fn);else if (isArray(emitter._events[event])) emitter._events[event].unshift(fn);else emitter._events[event] = [fn, emitter._events[event]];
   69 }
   70 
   71 var Duplex;
   72 function ReadableState(options, stream) {
   73   Duplex = Duplex || require('./_stream_duplex');
   74 
   75   options = options || {};
   76 
   77   // object stream flag. Used to make read(n) ignore n and to
   78   // make all the buffer merging and length checks go away
   79   this.objectMode = !!options.objectMode;
   80 
   81   if (stream instanceof Duplex) this.objectMode = this.objectMode || !!options.readableObjectMode;
   82 
   83   // the point at which it stops calling _read() to fill the buffer
   84   // Note: 0 is a valid value, means "don't call _read preemptively ever"
   85   var hwm = options.highWaterMark;
   86   var defaultHwm = this.objectMode ? 16 : 16 * 1024;
   87   this.highWaterMark = hwm || hwm === 0 ? hwm : defaultHwm;
   88 
   89   // cast to ints.
   90   this.highWaterMark = ~ ~this.highWaterMark;
   91 
   92   this.buffer = [];
   93   this.length = 0;
   94   this.pipes = null;
   95   this.pipesCount = 0;
   96   this.flowing = null;
   97   this.ended = false;
   98   this.endEmitted = false;
   99   this.reading = false;
  100 
  101   // a flag to be able to tell if the onwrite cb is called immediately,
  102   // or on a later tick.  We set this to true at first, because any
  103   // actions that shouldn't happen until "later" should generally also
  104   // not happen before the first write call.
  105   this.sync = true;
  106 
  107   // whenever we return null, then we set a flag to say
  108   // that we're awaiting a 'readable' event emission.
  109   this.needReadable = false;
  110   this.emittedReadable = false;
  111   this.readableListening = false;
  112   this.resumeScheduled = false;
  113 
  114   // Crypto is kind of old and crusty.  Historically, its default string
  115   // encoding is 'binary' so we have to make this configurable.
  116   // Everything else in the universe uses 'utf8', though.
  117   this.defaultEncoding = options.defaultEncoding || 'utf8';
  118 
  119   // when piping, we only care about 'readable' events that happen
  120   // after read()ing all the bytes and not getting any pushback.
  121   this.ranOut = false;
  122 
  123   // the number of writers that are awaiting a drain event in .pipe()s
  124   this.awaitDrain = 0;
  125 
  126   // if true, a maybeReadMore has been scheduled
  127   this.readingMore = false;
  128 
  129   this.decoder = null;
  130   this.encoding = null;
  131   if (options.encoding) {
  132     if (!StringDecoder) StringDecoder = require('string_decoder/').StringDecoder;
  133     this.decoder = new StringDecoder(options.encoding);
  134     this.encoding = options.encoding;
  135   }
  136 }
  137 
  138 var Duplex;
  139 function Readable(options) {
  140   Duplex = Duplex || require('./_stream_duplex');
  141 
  142   if (!(this instanceof Readable)) return new Readable(options);
  143 
  144   this._readableState = new ReadableState(options, this);
  145 
  146   // legacy
  147   this.readable = true;
  148 
  149   if (options && typeof options.read === 'function') this._read = options.read;
  150 
  151   Stream.call(this);
  152 }
  153 
  154 // Manually shove something into the read() buffer.
  155 // This returns true if the highWaterMark has not been hit yet,
  156 // similar to how Writable.write() returns true if you should
  157 // write() some more.
  158 Readable.prototype.push = function (chunk, encoding) {
  159   var state = this._readableState;
  160 
  161   if (!state.objectMode && typeof chunk === 'string') {
  162     encoding = encoding || state.defaultEncoding;
  163     if (encoding !== state.encoding) {
  164       chunk = bufferShim.from(chunk, encoding);
  165       encoding = '';
  166     }
  167   }
  168 
  169   return readableAddChunk(this, state, chunk, encoding, false);
  170 };
  171 
  172 // Unshift should *always* be something directly out of read()
  173 Readable.prototype.unshift = function (chunk) {
  174   var state = this._readableState;
  175   return readableAddChunk(this, state, chunk, '', true);
  176 };
  177 
  178 Readable.prototype.isPaused = function () {
  179   return this._readableState.flowing === false;
  180 };
  181 
  182 function readableAddChunk(stream, state, chunk, encoding, addToFront) {
  183   var er = chunkInvalid(state, chunk);
  184   if (er) {
  185     stream.emit('error', er);
  186   } else if (chunk === null) {
  187     state.reading = false;
  188     onEofChunk(stream, state);
  189   } else if (state.objectMode || chunk && chunk.length > 0) {
  190     if (state.ended && !addToFront) {
  191       var e = new Error('stream.push() after EOF');
  192       stream.emit('error', e);
  193     } else if (state.endEmitted && addToFront) {
  194       var _e = new Error('stream.unshift() after end event');
  195       stream.emit('error', _e);
  196     } else {
  197       var skipAdd;
  198       if (state.decoder && !addToFront && !encoding) {
  199         chunk = state.decoder.write(chunk);
  200         skipAdd = !state.objectMode && chunk.length === 0;
  201       }
  202 
  203       if (!addToFront) state.reading = false;
  204 
  205       // Don't add to the buffer if we've decoded to an empty string chunk and
  206       // we're not in object mode
  207       if (!skipAdd) {
  208         // if we want the data now, just emit it.
  209         if (state.flowing && state.length === 0 && !state.sync) {
  210           stream.emit('data', chunk);
  211           stream.read(0);
  212         } else {
  213           // update the buffer info.
  214           state.length += state.objectMode ? 1 : chunk.length;
  215           if (addToFront) state.buffer.unshift(chunk);else state.buffer.push(chunk);
  216 
  217           if (state.needReadable) emitReadable(stream);
  218         }
  219       }
  220 
  221       maybeReadMore(stream, state);
  222     }
  223   } else if (!addToFront) {
  224     state.reading = false;
  225   }
  226 
  227   return needMoreData(state);
  228 }
  229 
  230 // if it's past the high water mark, we can push in some more.
  231 // Also, if we have no data yet, we can stand some
  232 // more bytes.  This is to work around cases where hwm=0,
  233 // such as the repl.  Also, if the push() triggered a
  234 // readable event, and the user called read(largeNumber) such that
  235 // needReadable was set, then we ought to push more, so that another
  236 // 'readable' event will be triggered.
  237 function needMoreData(state) {
  238   return !state.ended && (state.needReadable || state.length < state.highWaterMark || state.length === 0);
  239 }
  240 
  241 // backwards compatibility.
  242 Readable.prototype.setEncoding = function (enc) {
  243   if (!StringDecoder) StringDecoder = require('string_decoder/').StringDecoder;
  244   this._readableState.decoder = new StringDecoder(enc);
  245   this._readableState.encoding = enc;
  246   return this;
  247 };
  248 
  249 // Don't raise the hwm > 8MB
  250 var MAX_HWM = 0x800000;
  251 function computeNewHighWaterMark(n) {
  252   if (n >= MAX_HWM) {
  253     n = MAX_HWM;
  254   } else {
  255     // Get the next highest power of 2
  256     n--;
  257     n |= n >>> 1;
  258     n |= n >>> 2;
  259     n |= n >>> 4;
  260     n |= n >>> 8;
  261     n |= n >>> 16;
  262     n++;
  263   }
  264   return n;
  265 }
  266 
  267 function howMuchToRead(n, state) {
  268   if (state.length === 0 && state.ended) return 0;
  269 
  270   if (state.objectMode) return n === 0 ? 0 : 1;
  271 
  272   if (n === null || isNaN(n)) {
  273     // only flow one buffer at a time
  274     if (state.flowing && state.buffer.length) return state.buffer[0].length;else return state.length;
  275   }
  276 
  277   if (n <= 0) return 0;
  278 
  279   // If we're asking for more than the target buffer level,
  280   // then raise the water mark.  Bump up to the next highest
  281   // power of 2, to prevent increasing it excessively in tiny
  282   // amounts.
  283   if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n);
  284 
  285   // don't have that much.  return null, unless we've ended.
  286   if (n > state.length) {
  287     if (!state.ended) {
  288       state.needReadable = true;
  289       return 0;
  290     } else {
  291       return state.length;
  292     }
  293   }
  294 
  295   return n;
  296 }
  297 
  298 // you can override either this method, or the async _read(n) below.
  299 Readable.prototype.read = function (n) {
  300   debug('read', n);
  301   var state = this._readableState;
  302   var nOrig = n;
  303 
  304   if (typeof n !== 'number' || n > 0) state.emittedReadable = false;
  305 
  306   // if we're doing read(0) to trigger a readable event, but we
  307   // already have a bunch of data in the buffer, then just trigger
  308   // the 'readable' event and move on.
  309   if (n === 0 && state.needReadable && (state.length >= state.highWaterMark || state.ended)) {
  310     debug('read: emitReadable', state.length, state.ended);
  311     if (state.length === 0 && state.ended) endReadable(this);else emitReadable(this);
  312     return null;
  313   }
  314 
  315   n = howMuchToRead(n, state);
  316 
  317   // if we've ended, and we're now clear, then finish it up.
  318   if (n === 0 && state.ended) {
  319     if (state.length === 0) endReadable(this);
  320     return null;
  321   }
  322 
  323   // All the actual chunk generation logic needs to be
  324   // *below* the call to _read.  The reason is that in certain
  325   // synthetic stream cases, such as passthrough streams, _read
  326   // may be a completely synchronous operation which may change
  327   // the state of the read buffer, providing enough data when
  328   // before there was *not* enough.
  329   //
  330   // So, the steps are:
  331   // 1. Figure out what the state of things will be after we do
  332   // a read from the buffer.
  333   //
  334   // 2. If that resulting state will trigger a _read, then call _read.
  335   // Note that this may be asynchronous, or synchronous.  Yes, it is
  336   // deeply ugly to write APIs this way, but that still doesn't mean
  337   // that the Readable class should behave improperly, as streams are
  338   // designed to be sync/async agnostic.
  339   // Take note if the _read call is sync or async (ie, if the read call
  340   // has returned yet), so that we know whether or not it's safe to emit
  341   // 'readable' etc.
  342   //
  343   // 3. Actually pull the requested chunks out of the buffer and return.
  344 
  345   // if we need a readable event, then we need to do some reading.
  346   var doRead = state.needReadable;
  347   debug('need readable', doRead);
  348 
  349   // if we currently have less than the highWaterMark, then also read some
  350   if (state.length === 0 || state.length - n < state.highWaterMark) {
  351     doRead = true;
  352     debug('length less than watermark', doRead);
  353   }
  354 
  355   // however, if we've ended, then there's no point, and if we're already
  356   // reading, then it's unnecessary.
  357   if (state.ended || state.reading) {
  358     doRead = false;
  359     debug('reading or ended', doRead);
  360   }
  361 
  362   if (doRead) {
  363     debug('do read');
  364     state.reading = true;
  365     state.sync = true;
  366     // if the length is currently zero, then we *need* a readable event.
  367     if (state.length === 0) state.needReadable = true;
  368     // call internal read method
  369     this._read(state.highWaterMark);
  370     state.sync = false;
  371   }
  372 
  373   // If _read pushed data synchronously, then `reading` will be false,
  374   // and we need to re-evaluate how much data we can return to the user.
  375   if (doRead && !state.reading) n = howMuchToRead(nOrig, state);
  376 
  377   var ret;
  378   if (n > 0) ret = fromList(n, state);else ret = null;
  379 
  380   if (ret === null) {
  381     state.needReadable = true;
  382     n = 0;
  383   }
  384 
  385   state.length -= n;
  386 
  387   // If we have nothing in the buffer, then we want to know
  388   // as soon as we *do* get something into the buffer.
  389   if (state.length === 0 && !state.ended) state.needReadable = true;
  390 
  391   // If we tried to read() past the EOF, then emit end on the next tick.
  392   if (nOrig !== n && state.ended && state.length === 0) endReadable(this);
  393 
  394   if (ret !== null) this.emit('data', ret);
  395 
  396   return ret;
  397 };
  398 
  399 function chunkInvalid(state, chunk) {
  400   var er = null;
  401   if (!Buffer.isBuffer(chunk) && typeof chunk !== 'string' && chunk !== null && chunk !== undefined && !state.objectMode) {
  402     er = new TypeError('Invalid non-string/buffer chunk');
  403   }
  404   return er;
  405 }
  406 
  407 function onEofChunk(stream, state) {
  408   if (state.ended) return;
  409   if (state.decoder) {
  410     var chunk = state.decoder.end();
  411     if (chunk && chunk.length) {
  412       state.buffer.push(chunk);
  413       state.length += state.objectMode ? 1 : chunk.length;
  414     }
  415   }
  416   state.ended = true;
  417 
  418   // emit 'readable' now to make sure it gets picked up.
  419   emitReadable(stream);
  420 }
  421 
  422 // Don't emit readable right away in sync mode, because this can trigger
  423 // another read() call => stack overflow.  This way, it might trigger
  424 // a nextTick recursion warning, but that's not so bad.
  425 function emitReadable(stream) {
  426   var state = stream._readableState;
  427   state.needReadable = false;
  428   if (!state.emittedReadable) {
  429     debug('emitReadable', state.flowing);
  430     state.emittedReadable = true;
  431     if (state.sync) processNextTick(emitReadable_, stream);else emitReadable_(stream);
  432   }
  433 }
  434 
  435 function emitReadable_(stream) {
  436   debug('emit readable');
  437   stream.emit('readable');
  438   flow(stream);
  439 }
  440 
  441 // at this point, the user has presumably seen the 'readable' event,
  442 // and called read() to consume some data.  that may have triggered
  443 // in turn another _read(n) call, in which case reading = true if
  444 // it's in progress.
  445 // However, if we're not ended, or reading, and the length < hwm,
  446 // then go ahead and try to read some more preemptively.
  447 function maybeReadMore(stream, state) {
  448   if (!state.readingMore) {
  449     state.readingMore = true;
  450     processNextTick(maybeReadMore_, stream, state);
  451   }
  452 }
  453 
  454 function maybeReadMore_(stream, state) {
  455   var len = state.length;
  456   while (!state.reading && !state.flowing && !state.ended && state.length < state.highWaterMark) {
  457     debug('maybeReadMore read 0');
  458     stream.read(0);
  459     if (len === state.length)
  460       // didn't get any data, stop spinning.
  461       break;else len = state.length;
  462   }
  463   state.readingMore = false;
  464 }
  465 
  466 // abstract method.  to be overridden in specific implementation classes.
  467 // call cb(er, data) where data is <= n in length.
  468 // for virtual (non-string, non-buffer) streams, "length" is somewhat
  469 // arbitrary, and perhaps not very meaningful.
  470 Readable.prototype._read = function (n) {
  471   this.emit('error', new Error('not implemented'));
  472 };
  473 
  474 Readable.prototype.pipe = function (dest, pipeOpts) {
  475   var src = this;
  476   var state = this._readableState;
  477 
  478   switch (state.pipesCount) {
  479     case 0:
  480       state.pipes = dest;
  481       break;
  482     case 1:
  483       state.pipes = [state.pipes, dest];
  484       break;
  485     default:
  486       state.pipes.push(dest);
  487       break;
  488   }
  489   state.pipesCount += 1;
  490   debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts);
  491 
  492   var doEnd = (!pipeOpts || pipeOpts.end !== false) && dest !== process.stdout && dest !== process.stderr;
  493 
  494   var endFn = doEnd ? onend : cleanup;
  495   if (state.endEmitted) processNextTick(endFn);else src.once('end', endFn);
  496 
  497   dest.on('unpipe', onunpipe);
  498   function onunpipe(readable) {
  499     debug('onunpipe');
  500     if (readable === src) {
  501       cleanup();
  502     }
  503   }
  504 
  505   function onend() {
  506     debug('onend');
  507     dest.end();
  508   }
  509 
  510   // when the dest drains, it reduces the awaitDrain counter
  511   // on the source.  This would be more elegant with a .once()
  512   // handler in flow(), but adding and removing repeatedly is
  513   // too slow.
  514   var ondrain = pipeOnDrain(src);
  515   dest.on('drain', ondrain);
  516 
  517   var cleanedUp = false;
  518   function cleanup() {
  519     debug('cleanup');
  520     // cleanup event handlers once the pipe is broken
  521     dest.removeListener('close', onclose);
  522     dest.removeListener('finish', onfinish);
  523     dest.removeListener('drain', ondrain);
  524     dest.removeListener('error', onerror);
  525     dest.removeListener('unpipe', onunpipe);
  526     src.removeListener('end', onend);
  527     src.removeListener('end', cleanup);
  528     src.removeListener('data', ondata);
  529 
  530     cleanedUp = true;
  531 
  532     // if the reader is waiting for a drain event from this
  533     // specific writer, then it would cause it to never start
  534     // flowing again.
  535     // So, if this is awaiting a drain, then we just call it now.
  536     // If we don't know, then assume that we are waiting for one.
  537     if (state.awaitDrain && (!dest._writableState || dest._writableState.needDrain)) ondrain();
  538   }
  539 
  540   src.on('data', ondata);
  541   function ondata(chunk) {
  542     debug('ondata');
  543     var ret = dest.write(chunk);
  544     if (false === ret) {
  545       // If the user unpiped during `dest.write()`, it is possible
  546       // to get stuck in a permanently paused state if that write
  547       // also returned false.
  548       // => Check whether `dest` is still a piping destination.
  549       if ((state.pipesCount === 1 && state.pipes === dest || state.pipesCount > 1 && indexOf(state.pipes, dest) !== -1) && !cleanedUp) {
  550         debug('false write response, pause', src._readableState.awaitDrain);
  551         src._readableState.awaitDrain++;
  552       }
  553       src.pause();
  554     }
  555   }
  556 
  557   // if the dest has an error, then stop piping into it.
  558   // however, don't suppress the throwing behavior for this.
  559   function onerror(er) {
  560     debug('onerror', er);
  561     unpipe();
  562     dest.removeListener('error', onerror);
  563     if (EElistenerCount(dest, 'error') === 0) dest.emit('error', er);
  564   }
  565 
  566   // Make sure our error handler is attached before userland ones.
  567   prependListener(dest, 'error', onerror);
  568 
  569   // Both close and finish should trigger unpipe, but only once.
  570   function onclose() {
  571     dest.removeListener('finish', onfinish);
  572     unpipe();
  573   }
  574   dest.once('close', onclose);
  575   function onfinish() {
  576     debug('onfinish');
  577     dest.removeListener('close', onclose);
  578     unpipe();
  579   }
  580   dest.once('finish', onfinish);
  581 
  582   function unpipe() {
  583     debug('unpipe');
  584     src.unpipe(dest);
  585   }
  586 
  587   // tell the dest that it's being piped to
  588   dest.emit('pipe', src);
  589 
  590   // start the flow if it hasn't been started already.
  591   if (!state.flowing) {
  592     debug('pipe resume');
  593     src.resume();
  594   }
  595 
  596   return dest;
  597 };
  598 
  599 function pipeOnDrain(src) {
  600   return function () {
  601     var state = src._readableState;
  602     debug('pipeOnDrain', state.awaitDrain);
  603     if (state.awaitDrain) state.awaitDrain--;
  604     if (state.awaitDrain === 0 && EElistenerCount(src, 'data')) {
  605       state.flowing = true;
  606       flow(src);
  607     }
  608   };
  609 }
  610 
  611 Readable.prototype.unpipe = function (dest) {
  612   var state = this._readableState;
  613 
  614   // if we're not piping anywhere, then do nothing.
  615   if (state.pipesCount === 0) return this;
  616 
  617   // just one destination.  most common case.
  618   if (state.pipesCount === 1) {
  619     // passed in one, but it's not the right one.
  620     if (dest && dest !== state.pipes) return this;
  621 
  622     if (!dest) dest = state.pipes;
  623 
  624     // got a match.
  625     state.pipes = null;
  626     state.pipesCount = 0;
  627     state.flowing = false;
  628     if (dest) dest.emit('unpipe', this);
  629     return this;
  630   }
  631 
  632   // slow case. multiple pipe destinations.
  633 
  634   if (!dest) {
  635     // remove all.
  636     var dests = state.pipes;
  637     var len = state.pipesCount;
  638     state.pipes = null;
  639     state.pipesCount = 0;
  640     state.flowing = false;
  641 
  642     for (var _i = 0; _i < len; _i++) {
  643       dests[_i].emit('unpipe', this);
  644     }return this;
  645   }
  646 
  647   // try to find the right one.
  648   var i = indexOf(state.pipes, dest);
  649   if (i === -1) return this;
  650 
  651   state.pipes.splice(i, 1);
  652   state.pipesCount -= 1;
  653   if (state.pipesCount === 1) state.pipes = state.pipes[0];
  654 
  655   dest.emit('unpipe', this);
  656 
  657   return this;
  658 };
  659 
  660 // set up data events if they are asked for
  661 // Ensure readable listeners eventually get something
  662 Readable.prototype.on = function (ev, fn) {
  663   var res = Stream.prototype.on.call(this, ev, fn);
  664 
  665   // If listening to data, and it has not explicitly been paused,
  666   // then call resume to start the flow of data on the next tick.
  667   if (ev === 'data' && false !== this._readableState.flowing) {
  668     this.resume();
  669   }
  670 
  671   if (ev === 'readable' && !this._readableState.endEmitted) {
  672     var state = this._readableState;
  673     if (!state.readableListening) {
  674       state.readableListening = true;
  675       state.emittedReadable = false;
  676       state.needReadable = true;
  677       if (!state.reading) {
  678         processNextTick(nReadingNextTick, this);
  679       } else if (state.length) {
  680         emitReadable(this, state);
  681       }
  682     }
  683   }
  684 
  685   return res;
  686 };
  687 Readable.prototype.addListener = Readable.prototype.on;
  688 
  689 function nReadingNextTick(self) {
  690   debug('readable nexttick read 0');
  691   self.read(0);
  692 }
  693 
  694 // pause() and resume() are remnants of the legacy readable stream API
  695 // If the user uses them, then switch into old mode.
  696 Readable.prototype.resume = function () {
  697   var state = this._readableState;
  698   if (!state.flowing) {
  699     debug('resume');
  700     state.flowing = true;
  701     resume(this, state);
  702   }
  703   return this;
  704 };
  705 
  706 function resume(stream, state) {
  707   if (!state.resumeScheduled) {
  708     state.resumeScheduled = true;
  709     processNextTick(resume_, stream, state);
  710   }
  711 }
  712 
  713 function resume_(stream, state) {
  714   if (!state.reading) {
  715     debug('resume read 0');
  716     stream.read(0);
  717   }
  718 
  719   state.resumeScheduled = false;
  720   stream.emit('resume');
  721   flow(stream);
  722   if (state.flowing && !state.reading) stream.read(0);
  723 }
  724 
  725 Readable.prototype.pause = function () {
  726   debug('call pause flowing=%j', this._readableState.flowing);
  727   if (false !== this._readableState.flowing) {
  728     debug('pause');
  729     this._readableState.flowing = false;
  730     this.emit('pause');
  731   }
  732   return this;
  733 };
  734 
  735 function flow(stream) {
  736   var state = stream._readableState;
  737   debug('flow', state.flowing);
  738   if (state.flowing) {
  739     do {
  740       var chunk = stream.read();
  741     } while (null !== chunk && state.flowing);
  742   }
  743 }
  744 
  745 // wrap an old-style stream as the async data source.
  746 // This is *not* part of the readable stream interface.
  747 // It is an ugly unfortunate mess of history.
  748 Readable.prototype.wrap = function (stream) {
  749   var state = this._readableState;
  750   var paused = false;
  751 
  752   var self = this;
  753   stream.on('end', function () {
  754     debug('wrapped end');
  755     if (state.decoder && !state.ended) {
  756       var chunk = state.decoder.end();
  757       if (chunk && chunk.length) self.push(chunk);
  758     }
  759 
  760     self.push(null);
  761   });
  762 
  763   stream.on('data', function (chunk) {
  764     debug('wrapped data');
  765     if (state.decoder) chunk = state.decoder.write(chunk);
  766 
  767     // don't skip over falsy values in objectMode
  768     if (state.objectMode && (chunk === null || chunk === undefined)) return;else if (!state.objectMode && (!chunk || !chunk.length)) return;
  769 
  770     var ret = self.push(chunk);
  771     if (!ret) {
  772       paused = true;
  773       stream.pause();
  774     }
  775   });
  776 
  777   // proxy all the other methods.
  778   // important when wrapping filters and duplexes.
  779   for (var i in stream) {
  780     if (this[i] === undefined && typeof stream[i] === 'function') {
  781       this[i] = function (method) {
  782         return function () {
  783           return stream[method].apply(stream, arguments);
  784         };
  785       }(i);
  786     }
  787   }
  788 
  789   // proxy certain important events.
  790   var events = ['error', 'close', 'destroy', 'pause', 'resume'];
  791   forEach(events, function (ev) {
  792     stream.on(ev, self.emit.bind(self, ev));
  793   });
  794 
  795   // when we try to consume some more bytes, simply unpause the
  796   // underlying stream.
  797   self._read = function (n) {
  798     debug('wrapped _read', n);
  799     if (paused) {
  800       paused = false;
  801       stream.resume();
  802     }
  803   };
  804 
  805   return self;
  806 };
  807 
  808 // exposed for testing purposes only.
  809 Readable._fromList = fromList;
  810 
  811 // Pluck off n bytes from an array of buffers.
  812 // Length is the combined lengths of all the buffers in the list.
  813 function fromList(n, state) {
  814   var list = state.buffer;
  815   var length = state.length;
  816   var stringMode = !!state.decoder;
  817   var objectMode = !!state.objectMode;
  818   var ret;
  819 
  820   // nothing in the list, definitely empty.
  821   if (list.length === 0) return null;
  822 
  823   if (length === 0) ret = null;else if (objectMode) ret = list.shift();else if (!n || n >= length) {
  824     // read it all, truncate the array.
  825     if (stringMode) ret = list.join('');else if (list.length === 1) ret = list[0];else ret = Buffer.concat(list, length);
  826     list.length = 0;
  827   } else {
  828     // read just some of it.
  829     if (n < list[0].length) {
  830       // just take a part of the first list item.
  831       // slice is the same for buffers and strings.
  832       var buf = list[0];
  833       ret = buf.slice(0, n);
  834       list[0] = buf.slice(n);
  835     } else if (n === list[0].length) {
  836       // first list is a perfect match
  837       ret = list.shift();
  838     } else {
  839       // complex case.
  840       // we have enough to cover it, but it spans past the first buffer.
  841       if (stringMode) ret = '';else ret = bufferShim.allocUnsafe(n);
  842 
  843       var c = 0;
  844       for (var i = 0, l = list.length; i < l && c < n; i++) {
  845         var _buf = list[0];
  846         var cpy = Math.min(n - c, _buf.length);
  847 
  848         if (stringMode) ret += _buf.slice(0, cpy);else _buf.copy(ret, c, 0, cpy);
  849 
  850         if (cpy < _buf.length) list[0] = _buf.slice(cpy);else list.shift();
  851 
  852         c += cpy;
  853       }
  854     }
  855   }
  856 
  857   return ret;
  858 }
  859 
  860 function endReadable(stream) {
  861   var state = stream._readableState;
  862 
  863   // If we get here before consuming all the bytes, then that is a
  864   // bug in node.  Should never happen.
  865   if (state.length > 0) throw new Error('"endReadable()" called on non-empty stream');
  866 
  867   if (!state.endEmitted) {
  868     state.ended = true;
  869     processNextTick(endReadableNT, state, stream);
  870   }
  871 }
  872 
  873 function endReadableNT(state, stream) {
  874   // Check that we didn't get one last unshift.
  875   if (!state.endEmitted && state.length === 0) {
  876     state.endEmitted = true;
  877     stream.readable = false;
  878     stream.emit('end');
  879   }
  880 }
  881 
  882 function forEach(xs, f) {
  883   for (var i = 0, l = xs.length; i < l; i++) {
  884     f(xs[i], i);
  885   }
  886 }
  887 
  888 function indexOf(xs, x) {
  889   for (var i = 0, l = xs.length; i < l; i++) {
  890     if (xs[i] === x) return i;
  891   }
  892   return -1;
  893 }