"Fossies" - the Fresh Open Source Software Archive

Member "Atom/resources/app/apm/node_modules/concat-stream/node_modules/readable-stream/lib/_stream_readable.js" (8 Mar 2017, 25638 Bytes) of archive /windows/misc/atom-windows.zip:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Javascript source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file.

    1 'use strict';
    2 
    3 module.exports = Readable;
    4 
    5 /*<replacement>*/
    6 var processNextTick = require('process-nextick-args');
    7 /*</replacement>*/
    8 
    9 /*<replacement>*/
   10 var isArray = require('isarray');
   11 /*</replacement>*/
   12 
   13 /*<replacement>*/
   14 var Buffer = require('buffer').Buffer;
   15 /*</replacement>*/
   16 
   17 Readable.ReadableState = ReadableState;
   18 
   19 var EE = require('events');
   20 
   21 /*<replacement>*/
   22 var EElistenerCount = function (emitter, type) {
   23   return emitter.listeners(type).length;
   24 };
   25 /*</replacement>*/
   26 
   27 /*<replacement>*/
   28 var Stream;
   29 (function () {
   30   try {
   31     Stream = require('st' + 'ream');
   32   } catch (_) {} finally {
   33     if (!Stream) Stream = require('events').EventEmitter;
   34   }
   35 })();
   36 /*</replacement>*/
   37 
   38 var Buffer = require('buffer').Buffer;
   39 
   40 /*<replacement>*/
   41 var util = require('core-util-is');
   42 util.inherits = require('inherits');
   43 /*</replacement>*/
   44 
   45 /*<replacement>*/
   46 var debugUtil = require('util');
   47 var debug = undefined;
   48 if (debugUtil && debugUtil.debuglog) {
   49   debug = debugUtil.debuglog('stream');
   50 } else {
   51   debug = function () {};
   52 }
   53 /*</replacement>*/
   54 
   55 var StringDecoder;
   56 
   57 util.inherits(Readable, Stream);
   58 
   59 var Duplex;
   60 function ReadableState(options, stream) {
   61   Duplex = Duplex || require('./_stream_duplex');
   62 
   63   options = options || {};
   64 
   65   // object stream flag. Used to make read(n) ignore n and to
   66   // make all the buffer merging and length checks go away
   67   this.objectMode = !!options.objectMode;
   68 
   69   if (stream instanceof Duplex) this.objectMode = this.objectMode || !!options.readableObjectMode;
   70 
   71   // the point at which it stops calling _read() to fill the buffer
   72   // Note: 0 is a valid value, means "don't call _read preemptively ever"
   73   var hwm = options.highWaterMark;
   74   var defaultHwm = this.objectMode ? 16 : 16 * 1024;
   75   this.highWaterMark = hwm || hwm === 0 ? hwm : defaultHwm;
   76 
   77   // cast to ints.
   78   this.highWaterMark = ~ ~this.highWaterMark;
   79 
   80   this.buffer = [];
   81   this.length = 0;
   82   this.pipes = null;
   83   this.pipesCount = 0;
   84   this.flowing = null;
   85   this.ended = false;
   86   this.endEmitted = false;
   87   this.reading = false;
   88 
   89   // a flag to be able to tell if the onwrite cb is called immediately,
   90   // or on a later tick.  We set this to true at first, because any
   91   // actions that shouldn't happen until "later" should generally also
   92   // not happen before the first write call.
   93   this.sync = true;
   94 
   95   // whenever we return null, then we set a flag to say
   96   // that we're awaiting a 'readable' event emission.
   97   this.needReadable = false;
   98   this.emittedReadable = false;
   99   this.readableListening = false;
  100   this.resumeScheduled = false;
  101 
  102   // Crypto is kind of old and crusty.  Historically, its default string
  103   // encoding is 'binary' so we have to make this configurable.
  104   // Everything else in the universe uses 'utf8', though.
  105   this.defaultEncoding = options.defaultEncoding || 'utf8';
  106 
  107   // when piping, we only care about 'readable' events that happen
  108   // after read()ing all the bytes and not getting any pushback.
  109   this.ranOut = false;
  110 
  111   // the number of writers that are awaiting a drain event in .pipe()s
  112   this.awaitDrain = 0;
  113 
  114   // if true, a maybeReadMore has been scheduled
  115   this.readingMore = false;
  116 
  117   this.decoder = null;
  118   this.encoding = null;
  119   if (options.encoding) {
  120     if (!StringDecoder) StringDecoder = require('string_decoder/').StringDecoder;
  121     this.decoder = new StringDecoder(options.encoding);
  122     this.encoding = options.encoding;
  123   }
  124 }
  125 
  126 var Duplex;
  127 function Readable(options) {
  128   Duplex = Duplex || require('./_stream_duplex');
  129 
  130   if (!(this instanceof Readable)) return new Readable(options);
  131 
  132   this._readableState = new ReadableState(options, this);
  133 
  134   // legacy
  135   this.readable = true;
  136 
  137   if (options && typeof options.read === 'function') this._read = options.read;
  138 
  139   Stream.call(this);
  140 }
  141 
  142 // Manually shove something into the read() buffer.
  143 // This returns true if the highWaterMark has not been hit yet,
  144 // similar to how Writable.write() returns true if you should
  145 // write() some more.
  146 Readable.prototype.push = function (chunk, encoding) {
  147   var state = this._readableState;
  148 
  149   if (!state.objectMode && typeof chunk === 'string') {
  150     encoding = encoding || state.defaultEncoding;
  151     if (encoding !== state.encoding) {
  152       chunk = new Buffer(chunk, encoding);
  153       encoding = '';
  154     }
  155   }
  156 
  157   return readableAddChunk(this, state, chunk, encoding, false);
  158 };
  159 
  160 // Unshift should *always* be something directly out of read()
  161 Readable.prototype.unshift = function (chunk) {
  162   var state = this._readableState;
  163   return readableAddChunk(this, state, chunk, '', true);
  164 };
  165 
  166 Readable.prototype.isPaused = function () {
  167   return this._readableState.flowing === false;
  168 };
  169 
  170 function readableAddChunk(stream, state, chunk, encoding, addToFront) {
  171   var er = chunkInvalid(state, chunk);
  172   if (er) {
  173     stream.emit('error', er);
  174   } else if (chunk === null) {
  175     state.reading = false;
  176     onEofChunk(stream, state);
  177   } else if (state.objectMode || chunk && chunk.length > 0) {
  178     if (state.ended && !addToFront) {
  179       var e = new Error('stream.push() after EOF');
  180       stream.emit('error', e);
  181     } else if (state.endEmitted && addToFront) {
  182       var e = new Error('stream.unshift() after end event');
  183       stream.emit('error', e);
  184     } else {
  185       var skipAdd;
  186       if (state.decoder && !addToFront && !encoding) {
  187         chunk = state.decoder.write(chunk);
  188         skipAdd = !state.objectMode && chunk.length === 0;
  189       }
  190 
  191       if (!addToFront) state.reading = false;
  192 
  193       // Don't add to the buffer if we've decoded to an empty string chunk and
  194       // we're not in object mode
  195       if (!skipAdd) {
  196         // if we want the data now, just emit it.
  197         if (state.flowing && state.length === 0 && !state.sync) {
  198           stream.emit('data', chunk);
  199           stream.read(0);
  200         } else {
  201           // update the buffer info.
  202           state.length += state.objectMode ? 1 : chunk.length;
  203           if (addToFront) state.buffer.unshift(chunk);else state.buffer.push(chunk);
  204 
  205           if (state.needReadable) emitReadable(stream);
  206         }
  207       }
  208 
  209       maybeReadMore(stream, state);
  210     }
  211   } else if (!addToFront) {
  212     state.reading = false;
  213   }
  214 
  215   return needMoreData(state);
  216 }
  217 
  218 // if it's past the high water mark, we can push in some more.
  219 // Also, if we have no data yet, we can stand some
  220 // more bytes.  This is to work around cases where hwm=0,
  221 // such as the repl.  Also, if the push() triggered a
  222 // readable event, and the user called read(largeNumber) such that
  223 // needReadable was set, then we ought to push more, so that another
  224 // 'readable' event will be triggered.
  225 function needMoreData(state) {
  226   return !state.ended && (state.needReadable || state.length < state.highWaterMark || state.length === 0);
  227 }
  228 
  229 // backwards compatibility.
  230 Readable.prototype.setEncoding = function (enc) {
  231   if (!StringDecoder) StringDecoder = require('string_decoder/').StringDecoder;
  232   this._readableState.decoder = new StringDecoder(enc);
  233   this._readableState.encoding = enc;
  234   return this;
  235 };
  236 
  237 // Don't raise the hwm > 8MB
  238 var MAX_HWM = 0x800000;
  239 function computeNewHighWaterMark(n) {
  240   if (n >= MAX_HWM) {
  241     n = MAX_HWM;
  242   } else {
  243     // Get the next highest power of 2
  244     n--;
  245     n |= n >>> 1;
  246     n |= n >>> 2;
  247     n |= n >>> 4;
  248     n |= n >>> 8;
  249     n |= n >>> 16;
  250     n++;
  251   }
  252   return n;
  253 }
  254 
  255 function howMuchToRead(n, state) {
  256   if (state.length === 0 && state.ended) return 0;
  257 
  258   if (state.objectMode) return n === 0 ? 0 : 1;
  259 
  260   if (n === null || isNaN(n)) {
  261     // only flow one buffer at a time
  262     if (state.flowing && state.buffer.length) return state.buffer[0].length;else return state.length;
  263   }
  264 
  265   if (n <= 0) return 0;
  266 
  267   // If we're asking for more than the target buffer level,
  268   // then raise the water mark.  Bump up to the next highest
  269   // power of 2, to prevent increasing it excessively in tiny
  270   // amounts.
  271   if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n);
  272 
  273   // don't have that much.  return null, unless we've ended.
  274   if (n > state.length) {
  275     if (!state.ended) {
  276       state.needReadable = true;
  277       return 0;
  278     } else {
  279       return state.length;
  280     }
  281   }
  282 
  283   return n;
  284 }
  285 
  286 // you can override either this method, or the async _read(n) below.
  287 Readable.prototype.read = function (n) {
  288   debug('read', n);
  289   var state = this._readableState;
  290   var nOrig = n;
  291 
  292   if (typeof n !== 'number' || n > 0) state.emittedReadable = false;
  293 
  294   // if we're doing read(0) to trigger a readable event, but we
  295   // already have a bunch of data in the buffer, then just trigger
  296   // the 'readable' event and move on.
  297   if (n === 0 && state.needReadable && (state.length >= state.highWaterMark || state.ended)) {
  298     debug('read: emitReadable', state.length, state.ended);
  299     if (state.length === 0 && state.ended) endReadable(this);else emitReadable(this);
  300     return null;
  301   }
  302 
  303   n = howMuchToRead(n, state);
  304 
  305   // if we've ended, and we're now clear, then finish it up.
  306   if (n === 0 && state.ended) {
  307     if (state.length === 0) endReadable(this);
  308     return null;
  309   }
  310 
  311   // All the actual chunk generation logic needs to be
  312   // *below* the call to _read.  The reason is that in certain
  313   // synthetic stream cases, such as passthrough streams, _read
  314   // may be a completely synchronous operation which may change
  315   // the state of the read buffer, providing enough data when
  316   // before there was *not* enough.
  317   //
  318   // So, the steps are:
  319   // 1. Figure out what the state of things will be after we do
  320   // a read from the buffer.
  321   //
  322   // 2. If that resulting state will trigger a _read, then call _read.
  323   // Note that this may be asynchronous, or synchronous.  Yes, it is
  324   // deeply ugly to write APIs this way, but that still doesn't mean
  325   // that the Readable class should behave improperly, as streams are
  326   // designed to be sync/async agnostic.
  327   // Take note if the _read call is sync or async (ie, if the read call
  328   // has returned yet), so that we know whether or not it's safe to emit
  329   // 'readable' etc.
  330   //
  331   // 3. Actually pull the requested chunks out of the buffer and return.
  332 
  333   // if we need a readable event, then we need to do some reading.
  334   var doRead = state.needReadable;
  335   debug('need readable', doRead);
  336 
  337   // if we currently have less than the highWaterMark, then also read some
  338   if (state.length === 0 || state.length - n < state.highWaterMark) {
  339     doRead = true;
  340     debug('length less than watermark', doRead);
  341   }
  342 
  343   // however, if we've ended, then there's no point, and if we're already
  344   // reading, then it's unnecessary.
  345   if (state.ended || state.reading) {
  346     doRead = false;
  347     debug('reading or ended', doRead);
  348   }
  349 
  350   if (doRead) {
  351     debug('do read');
  352     state.reading = true;
  353     state.sync = true;
  354     // if the length is currently zero, then we *need* a readable event.
  355     if (state.length === 0) state.needReadable = true;
  356     // call internal read method
  357     this._read(state.highWaterMark);
  358     state.sync = false;
  359   }
  360 
  361   // If _read pushed data synchronously, then `reading` will be false,
  362   // and we need to re-evaluate how much data we can return to the user.
  363   if (doRead && !state.reading) n = howMuchToRead(nOrig, state);
  364 
  365   var ret;
  366   if (n > 0) ret = fromList(n, state);else ret = null;
  367 
  368   if (ret === null) {
  369     state.needReadable = true;
  370     n = 0;
  371   }
  372 
  373   state.length -= n;
  374 
  375   // If we have nothing in the buffer, then we want to know
  376   // as soon as we *do* get something into the buffer.
  377   if (state.length === 0 && !state.ended) state.needReadable = true;
  378 
  379   // If we tried to read() past the EOF, then emit end on the next tick.
  380   if (nOrig !== n && state.ended && state.length === 0) endReadable(this);
  381 
  382   if (ret !== null) this.emit('data', ret);
  383 
  384   return ret;
  385 };
  386 
  387 function chunkInvalid(state, chunk) {
  388   var er = null;
  389   if (!Buffer.isBuffer(chunk) && typeof chunk !== 'string' && chunk !== null && chunk !== undefined && !state.objectMode) {
  390     er = new TypeError('Invalid non-string/buffer chunk');
  391   }
  392   return er;
  393 }
  394 
  395 function onEofChunk(stream, state) {
  396   if (state.ended) return;
  397   if (state.decoder) {
  398     var chunk = state.decoder.end();
  399     if (chunk && chunk.length) {
  400       state.buffer.push(chunk);
  401       state.length += state.objectMode ? 1 : chunk.length;
  402     }
  403   }
  404   state.ended = true;
  405 
  406   // emit 'readable' now to make sure it gets picked up.
  407   emitReadable(stream);
  408 }
  409 
  410 // Don't emit readable right away in sync mode, because this can trigger
  411 // another read() call => stack overflow.  This way, it might trigger
  412 // a nextTick recursion warning, but that's not so bad.
  413 function emitReadable(stream) {
  414   var state = stream._readableState;
  415   state.needReadable = false;
  416   if (!state.emittedReadable) {
  417     debug('emitReadable', state.flowing);
  418     state.emittedReadable = true;
  419     if (state.sync) processNextTick(emitReadable_, stream);else emitReadable_(stream);
  420   }
  421 }
  422 
  423 function emitReadable_(stream) {
  424   debug('emit readable');
  425   stream.emit('readable');
  426   flow(stream);
  427 }
  428 
  429 // at this point, the user has presumably seen the 'readable' event,
  430 // and called read() to consume some data.  that may have triggered
  431 // in turn another _read(n) call, in which case reading = true if
  432 // it's in progress.
  433 // However, if we're not ended, or reading, and the length < hwm,
  434 // then go ahead and try to read some more preemptively.
  435 function maybeReadMore(stream, state) {
  436   if (!state.readingMore) {
  437     state.readingMore = true;
  438     processNextTick(maybeReadMore_, stream, state);
  439   }
  440 }
  441 
  442 function maybeReadMore_(stream, state) {
  443   var len = state.length;
  444   while (!state.reading && !state.flowing && !state.ended && state.length < state.highWaterMark) {
  445     debug('maybeReadMore read 0');
  446     stream.read(0);
  447     if (len === state.length)
  448       // didn't get any data, stop spinning.
  449       break;else len = state.length;
  450   }
  451   state.readingMore = false;
  452 }
  453 
  454 // abstract method.  to be overridden in specific implementation classes.
  455 // call cb(er, data) where data is <= n in length.
  456 // for virtual (non-string, non-buffer) streams, "length" is somewhat
  457 // arbitrary, and perhaps not very meaningful.
  458 Readable.prototype._read = function (n) {
  459   this.emit('error', new Error('not implemented'));
  460 };
  461 
  462 Readable.prototype.pipe = function (dest, pipeOpts) {
  463   var src = this;
  464   var state = this._readableState;
  465 
  466   switch (state.pipesCount) {
  467     case 0:
  468       state.pipes = dest;
  469       break;
  470     case 1:
  471       state.pipes = [state.pipes, dest];
  472       break;
  473     default:
  474       state.pipes.push(dest);
  475       break;
  476   }
  477   state.pipesCount += 1;
  478   debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts);
  479 
  480   var doEnd = (!pipeOpts || pipeOpts.end !== false) && dest !== process.stdout && dest !== process.stderr;
  481 
  482   var endFn = doEnd ? onend : cleanup;
  483   if (state.endEmitted) processNextTick(endFn);else src.once('end', endFn);
  484 
  485   dest.on('unpipe', onunpipe);
  486   function onunpipe(readable) {
  487     debug('onunpipe');
  488     if (readable === src) {
  489       cleanup();
  490     }
  491   }
  492 
  493   function onend() {
  494     debug('onend');
  495     dest.end();
  496   }
  497 
  498   // when the dest drains, it reduces the awaitDrain counter
  499   // on the source.  This would be more elegant with a .once()
  500   // handler in flow(), but adding and removing repeatedly is
  501   // too slow.
  502   var ondrain = pipeOnDrain(src);
  503   dest.on('drain', ondrain);
  504 
  505   var cleanedUp = false;
  506   function cleanup() {
  507     debug('cleanup');
  508     // cleanup event handlers once the pipe is broken
  509     dest.removeListener('close', onclose);
  510     dest.removeListener('finish', onfinish);
  511     dest.removeListener('drain', ondrain);
  512     dest.removeListener('error', onerror);
  513     dest.removeListener('unpipe', onunpipe);
  514     src.removeListener('end', onend);
  515     src.removeListener('end', cleanup);
  516     src.removeListener('data', ondata);
  517 
  518     cleanedUp = true;
  519 
  520     // if the reader is waiting for a drain event from this
  521     // specific writer, then it would cause it to never start
  522     // flowing again.
  523     // So, if this is awaiting a drain, then we just call it now.
  524     // If we don't know, then assume that we are waiting for one.
  525     if (state.awaitDrain && (!dest._writableState || dest._writableState.needDrain)) ondrain();
  526   }
  527 
  528   src.on('data', ondata);
  529   function ondata(chunk) {
  530     debug('ondata');
  531     var ret = dest.write(chunk);
  532     if (false === ret) {
  533       // If the user unpiped during `dest.write()`, it is possible
  534       // to get stuck in a permanently paused state if that write
  535       // also returned false.
  536       if (state.pipesCount === 1 && state.pipes[0] === dest && src.listenerCount('data') === 1 && !cleanedUp) {
  537         debug('false write response, pause', src._readableState.awaitDrain);
  538         src._readableState.awaitDrain++;
  539       }
  540       src.pause();
  541     }
  542   }
  543 
  544   // if the dest has an error, then stop piping into it.
  545   // however, don't suppress the throwing behavior for this.
  546   function onerror(er) {
  547     debug('onerror', er);
  548     unpipe();
  549     dest.removeListener('error', onerror);
  550     if (EElistenerCount(dest, 'error') === 0) dest.emit('error', er);
  551   }
  552   // This is a brutally ugly hack to make sure that our error handler
  553   // is attached before any userland ones.  NEVER DO THIS.
  554   if (!dest._events || !dest._events.error) dest.on('error', onerror);else if (isArray(dest._events.error)) dest._events.error.unshift(onerror);else dest._events.error = [onerror, dest._events.error];
  555 
  556   // Both close and finish should trigger unpipe, but only once.
  557   function onclose() {
  558     dest.removeListener('finish', onfinish);
  559     unpipe();
  560   }
  561   dest.once('close', onclose);
  562   function onfinish() {
  563     debug('onfinish');
  564     dest.removeListener('close', onclose);
  565     unpipe();
  566   }
  567   dest.once('finish', onfinish);
  568 
  569   function unpipe() {
  570     debug('unpipe');
  571     src.unpipe(dest);
  572   }
  573 
  574   // tell the dest that it's being piped to
  575   dest.emit('pipe', src);
  576 
  577   // start the flow if it hasn't been started already.
  578   if (!state.flowing) {
  579     debug('pipe resume');
  580     src.resume();
  581   }
  582 
  583   return dest;
  584 };
  585 
  586 function pipeOnDrain(src) {
  587   return function () {
  588     var state = src._readableState;
  589     debug('pipeOnDrain', state.awaitDrain);
  590     if (state.awaitDrain) state.awaitDrain--;
  591     if (state.awaitDrain === 0 && EElistenerCount(src, 'data')) {
  592       state.flowing = true;
  593       flow(src);
  594     }
  595   };
  596 }
  597 
  598 Readable.prototype.unpipe = function (dest) {
  599   var state = this._readableState;
  600 
  601   // if we're not piping anywhere, then do nothing.
  602   if (state.pipesCount === 0) return this;
  603 
  604   // just one destination.  most common case.
  605   if (state.pipesCount === 1) {
  606     // passed in one, but it's not the right one.
  607     if (dest && dest !== state.pipes) return this;
  608 
  609     if (!dest) dest = state.pipes;
  610 
  611     // got a match.
  612     state.pipes = null;
  613     state.pipesCount = 0;
  614     state.flowing = false;
  615     if (dest) dest.emit('unpipe', this);
  616     return this;
  617   }
  618 
  619   // slow case. multiple pipe destinations.
  620 
  621   if (!dest) {
  622     // remove all.
  623     var dests = state.pipes;
  624     var len = state.pipesCount;
  625     state.pipes = null;
  626     state.pipesCount = 0;
  627     state.flowing = false;
  628 
  629     for (var _i = 0; _i < len; _i++) {
  630       dests[_i].emit('unpipe', this);
  631     }return this;
  632   }
  633 
  634   // try to find the right one.
  635   var i = indexOf(state.pipes, dest);
  636   if (i === -1) return this;
  637 
  638   state.pipes.splice(i, 1);
  639   state.pipesCount -= 1;
  640   if (state.pipesCount === 1) state.pipes = state.pipes[0];
  641 
  642   dest.emit('unpipe', this);
  643 
  644   return this;
  645 };
  646 
  647 // set up data events if they are asked for
  648 // Ensure readable listeners eventually get something
  649 Readable.prototype.on = function (ev, fn) {
  650   var res = Stream.prototype.on.call(this, ev, fn);
  651 
  652   // If listening to data, and it has not explicitly been paused,
  653   // then call resume to start the flow of data on the next tick.
  654   if (ev === 'data' && false !== this._readableState.flowing) {
  655     this.resume();
  656   }
  657 
  658   if (ev === 'readable' && !this._readableState.endEmitted) {
  659     var state = this._readableState;
  660     if (!state.readableListening) {
  661       state.readableListening = true;
  662       state.emittedReadable = false;
  663       state.needReadable = true;
  664       if (!state.reading) {
  665         processNextTick(nReadingNextTick, this);
  666       } else if (state.length) {
  667         emitReadable(this, state);
  668       }
  669     }
  670   }
  671 
  672   return res;
  673 };
  674 Readable.prototype.addListener = Readable.prototype.on;
  675 
  676 function nReadingNextTick(self) {
  677   debug('readable nexttick read 0');
  678   self.read(0);
  679 }
  680 
  681 // pause() and resume() are remnants of the legacy readable stream API
  682 // If the user uses them, then switch into old mode.
  683 Readable.prototype.resume = function () {
  684   var state = this._readableState;
  685   if (!state.flowing) {
  686     debug('resume');
  687     state.flowing = true;
  688     resume(this, state);
  689   }
  690   return this;
  691 };
  692 
  693 function resume(stream, state) {
  694   if (!state.resumeScheduled) {
  695     state.resumeScheduled = true;
  696     processNextTick(resume_, stream, state);
  697   }
  698 }
  699 
  700 function resume_(stream, state) {
  701   if (!state.reading) {
  702     debug('resume read 0');
  703     stream.read(0);
  704   }
  705 
  706   state.resumeScheduled = false;
  707   stream.emit('resume');
  708   flow(stream);
  709   if (state.flowing && !state.reading) stream.read(0);
  710 }
  711 
  712 Readable.prototype.pause = function () {
  713   debug('call pause flowing=%j', this._readableState.flowing);
  714   if (false !== this._readableState.flowing) {
  715     debug('pause');
  716     this._readableState.flowing = false;
  717     this.emit('pause');
  718   }
  719   return this;
  720 };
  721 
  722 function flow(stream) {
  723   var state = stream._readableState;
  724   debug('flow', state.flowing);
  725   if (state.flowing) {
  726     do {
  727       var chunk = stream.read();
  728     } while (null !== chunk && state.flowing);
  729   }
  730 }
  731 
  732 // wrap an old-style stream as the async data source.
  733 // This is *not* part of the readable stream interface.
  734 // It is an ugly unfortunate mess of history.
  735 Readable.prototype.wrap = function (stream) {
  736   var state = this._readableState;
  737   var paused = false;
  738 
  739   var self = this;
  740   stream.on('end', function () {
  741     debug('wrapped end');
  742     if (state.decoder && !state.ended) {
  743       var chunk = state.decoder.end();
  744       if (chunk && chunk.length) self.push(chunk);
  745     }
  746 
  747     self.push(null);
  748   });
  749 
  750   stream.on('data', function (chunk) {
  751     debug('wrapped data');
  752     if (state.decoder) chunk = state.decoder.write(chunk);
  753 
  754     // don't skip over falsy values in objectMode
  755     if (state.objectMode && (chunk === null || chunk === undefined)) return;else if (!state.objectMode && (!chunk || !chunk.length)) return;
  756 
  757     var ret = self.push(chunk);
  758     if (!ret) {
  759       paused = true;
  760       stream.pause();
  761     }
  762   });
  763 
  764   // proxy all the other methods.
  765   // important when wrapping filters and duplexes.
  766   for (var i in stream) {
  767     if (this[i] === undefined && typeof stream[i] === 'function') {
  768       this[i] = function (method) {
  769         return function () {
  770           return stream[method].apply(stream, arguments);
  771         };
  772       }(i);
  773     }
  774   }
  775 
  776   // proxy certain important events.
  777   var events = ['error', 'close', 'destroy', 'pause', 'resume'];
  778   forEach(events, function (ev) {
  779     stream.on(ev, self.emit.bind(self, ev));
  780   });
  781 
  782   // when we try to consume some more bytes, simply unpause the
  783   // underlying stream.
  784   self._read = function (n) {
  785     debug('wrapped _read', n);
  786     if (paused) {
  787       paused = false;
  788       stream.resume();
  789     }
  790   };
  791 
  792   return self;
  793 };
  794 
  795 // exposed for testing purposes only.
  796 Readable._fromList = fromList;
  797 
  798 // Pluck off n bytes from an array of buffers.
  799 // Length is the combined lengths of all the buffers in the list.
  800 function fromList(n, state) {
  801   var list = state.buffer;
  802   var length = state.length;
  803   var stringMode = !!state.decoder;
  804   var objectMode = !!state.objectMode;
  805   var ret;
  806 
  807   // nothing in the list, definitely empty.
  808   if (list.length === 0) return null;
  809 
  810   if (length === 0) ret = null;else if (objectMode) ret = list.shift();else if (!n || n >= length) {
  811     // read it all, truncate the array.
  812     if (stringMode) ret = list.join('');else if (list.length === 1) ret = list[0];else ret = Buffer.concat(list, length);
  813     list.length = 0;
  814   } else {
  815     // read just some of it.
  816     if (n < list[0].length) {
  817       // just take a part of the first list item.
  818       // slice is the same for buffers and strings.
  819       var buf = list[0];
  820       ret = buf.slice(0, n);
  821       list[0] = buf.slice(n);
  822     } else if (n === list[0].length) {
  823       // first list is a perfect match
  824       ret = list.shift();
  825     } else {
  826       // complex case.
  827       // we have enough to cover it, but it spans past the first buffer.
  828       if (stringMode) ret = '';else ret = new Buffer(n);
  829 
  830       var c = 0;
  831       for (var i = 0, l = list.length; i < l && c < n; i++) {
  832         var buf = list[0];
  833         var cpy = Math.min(n - c, buf.length);
  834 
  835         if (stringMode) ret += buf.slice(0, cpy);else buf.copy(ret, c, 0, cpy);
  836 
  837         if (cpy < buf.length) list[0] = buf.slice(cpy);else list.shift();
  838 
  839         c += cpy;
  840       }
  841     }
  842   }
  843 
  844   return ret;
  845 }
  846 
  847 function endReadable(stream) {
  848   var state = stream._readableState;
  849 
  850   // If we get here before consuming all the bytes, then that is a
  851   // bug in node.  Should never happen.
  852   if (state.length > 0) throw new Error('endReadable called on non-empty stream');
  853 
  854   if (!state.endEmitted) {
  855     state.ended = true;
  856     processNextTick(endReadableNT, state, stream);
  857   }
  858 }
  859 
  860 function endReadableNT(state, stream) {
  861   // Check that we didn't get one last unshift.
  862   if (!state.endEmitted && state.length === 0) {
  863     state.endEmitted = true;
  864     stream.readable = false;
  865     stream.emit('end');
  866   }
  867 }
  868 
  869 function forEach(xs, f) {
  870   for (var i = 0, l = xs.length; i < l; i++) {
  871     f(xs[i], i);
  872   }
  873 }
  874 
  875 function indexOf(xs, x) {
  876   for (var i = 0, l = xs.length; i < l; i++) {
  877     if (xs[i] === x) return i;
  878   }
  879   return -1;
  880 }