"Fossies" - the Fresh Open Source Software Archive

Member "Atom/resources/app/apm/node_modules/readable-stream/lib/_stream_readable.js" (8 Mar 2017, 25959 Bytes) of archive /windows/misc/atom-windows.zip:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Javascript source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file.

    1 // Copyright Joyent, Inc. and other Node contributors.
    2 //
    3 // Permission is hereby granted, free of charge, to any person obtaining a
    4 // copy of this software and associated documentation files (the
    5 // "Software"), to deal in the Software without restriction, including
    6 // without limitation the rights to use, copy, modify, merge, publish,
    7 // distribute, sublicense, and/or sell copies of the Software, and to permit
    8 // persons to whom the Software is furnished to do so, subject to the
    9 // following conditions:
   10 //
   11 // The above copyright notice and this permission notice shall be included
   12 // in all copies or substantial portions of the Software.
   13 //
   14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
   15 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
   16 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
   17 // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
   18 // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
   19 // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
   20 // USE OR OTHER DEALINGS IN THE SOFTWARE.
   21 
   22 module.exports = Readable;
   23 
   24 /*<replacement>*/
   25 var isArray = require('isarray');
   26 /*</replacement>*/
   27 
   28 
   29 /*<replacement>*/
   30 var Buffer = require('buffer').Buffer;
   31 /*</replacement>*/
   32 
   33 Readable.ReadableState = ReadableState;
   34 
   35 var EE = require('events').EventEmitter;
   36 
   37 /*<replacement>*/
   38 if (!EE.listenerCount) EE.listenerCount = function(emitter, type) {
   39   return emitter.listeners(type).length;
   40 };
   41 /*</replacement>*/
   42 
   43 var Stream = require('stream');
   44 
   45 /*<replacement>*/
   46 var util = require('core-util-is');
   47 util.inherits = require('inherits');
   48 /*</replacement>*/
   49 
   50 var StringDecoder;
   51 
   52 
   53 /*<replacement>*/
   54 var debug = require('util');
   55 if (debug && debug.debuglog) {
   56   debug = debug.debuglog('stream');
   57 } else {
   58   debug = function () {};
   59 }
   60 /*</replacement>*/
   61 
   62 
   63 util.inherits(Readable, Stream);
   64 
   65 function ReadableState(options, stream) {
   66   var Duplex = require('./_stream_duplex');
   67 
   68   options = options || {};
   69 
   70   // the point at which it stops calling _read() to fill the buffer
   71   // Note: 0 is a valid value, means "don't call _read preemptively ever"
   72   var hwm = options.highWaterMark;
   73   var defaultHwm = options.objectMode ? 16 : 16 * 1024;
   74   this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;
   75 
   76   // cast to ints.
   77   this.highWaterMark = ~~this.highWaterMark;
   78 
   79   this.buffer = [];
   80   this.length = 0;
   81   this.pipes = null;
   82   this.pipesCount = 0;
   83   this.flowing = null;
   84   this.ended = false;
   85   this.endEmitted = false;
   86   this.reading = false;
   87 
   88   // a flag to be able to tell if the onwrite cb is called immediately,
   89   // or on a later tick.  We set this to true at first, because any
   90   // actions that shouldn't happen until "later" should generally also
   91   // not happen before the first write call.
   92   this.sync = true;
   93 
   94   // whenever we return null, then we set a flag to say
   95   // that we're awaiting a 'readable' event emission.
   96   this.needReadable = false;
   97   this.emittedReadable = false;
   98   this.readableListening = false;
   99 
  100 
  101   // object stream flag. Used to make read(n) ignore n and to
  102   // make all the buffer merging and length checks go away
  103   this.objectMode = !!options.objectMode;
  104 
  105   if (stream instanceof Duplex)
  106     this.objectMode = this.objectMode || !!options.readableObjectMode;
  107 
  108   // Crypto is kind of old and crusty.  Historically, its default string
  109   // encoding is 'binary' so we have to make this configurable.
  110   // Everything else in the universe uses 'utf8', though.
  111   this.defaultEncoding = options.defaultEncoding || 'utf8';
  112 
  113   // when piping, we only care about 'readable' events that happen
  114   // after read()ing all the bytes and not getting any pushback.
  115   this.ranOut = false;
  116 
  117   // the number of writers that are awaiting a drain event in .pipe()s
  118   this.awaitDrain = 0;
  119 
  120   // if true, a maybeReadMore has been scheduled
  121   this.readingMore = false;
  122 
  123   this.decoder = null;
  124   this.encoding = null;
  125   if (options.encoding) {
  126     if (!StringDecoder)
  127       StringDecoder = require('string_decoder/').StringDecoder;
  128     this.decoder = new StringDecoder(options.encoding);
  129     this.encoding = options.encoding;
  130   }
  131 }
  132 
  133 function Readable(options) {
  134   var Duplex = require('./_stream_duplex');
  135 
  136   if (!(this instanceof Readable))
  137     return new Readable(options);
  138 
  139   this._readableState = new ReadableState(options, this);
  140 
  141   // legacy
  142   this.readable = true;
  143 
  144   Stream.call(this);
  145 }
  146 
  147 // Manually shove something into the read() buffer.
  148 // This returns true if the highWaterMark has not been hit yet,
  149 // similar to how Writable.write() returns true if you should
  150 // write() some more.
  151 Readable.prototype.push = function(chunk, encoding) {
  152   var state = this._readableState;
  153 
  154   if (util.isString(chunk) && !state.objectMode) {
  155     encoding = encoding || state.defaultEncoding;
  156     if (encoding !== state.encoding) {
  157       chunk = new Buffer(chunk, encoding);
  158       encoding = '';
  159     }
  160   }
  161 
  162   return readableAddChunk(this, state, chunk, encoding, false);
  163 };
  164 
  165 // Unshift should *always* be something directly out of read()
  166 Readable.prototype.unshift = function(chunk) {
  167   var state = this._readableState;
  168   return readableAddChunk(this, state, chunk, '', true);
  169 };
  170 
  171 function readableAddChunk(stream, state, chunk, encoding, addToFront) {
  172   var er = chunkInvalid(state, chunk);
  173   if (er) {
  174     stream.emit('error', er);
  175   } else if (util.isNullOrUndefined(chunk)) {
  176     state.reading = false;
  177     if (!state.ended)
  178       onEofChunk(stream, state);
  179   } else if (state.objectMode || chunk && chunk.length > 0) {
  180     if (state.ended && !addToFront) {
  181       var e = new Error('stream.push() after EOF');
  182       stream.emit('error', e);
  183     } else if (state.endEmitted && addToFront) {
  184       var e = new Error('stream.unshift() after end event');
  185       stream.emit('error', e);
  186     } else {
  187       if (state.decoder && !addToFront && !encoding)
  188         chunk = state.decoder.write(chunk);
  189 
  190       if (!addToFront)
  191         state.reading = false;
  192 
  193       // if we want the data now, just emit it.
  194       if (state.flowing && state.length === 0 && !state.sync) {
  195         stream.emit('data', chunk);
  196         stream.read(0);
  197       } else {
  198         // update the buffer info.
  199         state.length += state.objectMode ? 1 : chunk.length;
  200         if (addToFront)
  201           state.buffer.unshift(chunk);
  202         else
  203           state.buffer.push(chunk);
  204 
  205         if (state.needReadable)
  206           emitReadable(stream);
  207       }
  208 
  209       maybeReadMore(stream, state);
  210     }
  211   } else if (!addToFront) {
  212     state.reading = false;
  213   }
  214 
  215   return needMoreData(state);
  216 }
  217 
  218 
  219 
  220 // if it's past the high water mark, we can push in some more.
  221 // Also, if we have no data yet, we can stand some
  222 // more bytes.  This is to work around cases where hwm=0,
  223 // such as the repl.  Also, if the push() triggered a
  224 // readable event, and the user called read(largeNumber) such that
  225 // needReadable was set, then we ought to push more, so that another
  226 // 'readable' event will be triggered.
  227 function needMoreData(state) {
  228   return !state.ended &&
  229          (state.needReadable ||
  230           state.length < state.highWaterMark ||
  231           state.length === 0);
  232 }
  233 
  234 // backwards compatibility.
  235 Readable.prototype.setEncoding = function(enc) {
  236   if (!StringDecoder)
  237     StringDecoder = require('string_decoder/').StringDecoder;
  238   this._readableState.decoder = new StringDecoder(enc);
  239   this._readableState.encoding = enc;
  240   return this;
  241 };
  242 
  243 // Don't raise the hwm > 128MB
  244 var MAX_HWM = 0x800000;
  245 function roundUpToNextPowerOf2(n) {
  246   if (n >= MAX_HWM) {
  247     n = MAX_HWM;
  248   } else {
  249     // Get the next highest power of 2
  250     n--;
  251     for (var p = 1; p < 32; p <<= 1) n |= n >> p;
  252     n++;
  253   }
  254   return n;
  255 }
  256 
  257 function howMuchToRead(n, state) {
  258   if (state.length === 0 && state.ended)
  259     return 0;
  260 
  261   if (state.objectMode)
  262     return n === 0 ? 0 : 1;
  263 
  264   if (isNaN(n) || util.isNull(n)) {
  265     // only flow one buffer at a time
  266     if (state.flowing && state.buffer.length)
  267       return state.buffer[0].length;
  268     else
  269       return state.length;
  270   }
  271 
  272   if (n <= 0)
  273     return 0;
  274 
  275   // If we're asking for more than the target buffer level,
  276   // then raise the water mark.  Bump up to the next highest
  277   // power of 2, to prevent increasing it excessively in tiny
  278   // amounts.
  279   if (n > state.highWaterMark)
  280     state.highWaterMark = roundUpToNextPowerOf2(n);
  281 
  282   // don't have that much.  return null, unless we've ended.
  283   if (n > state.length) {
  284     if (!state.ended) {
  285       state.needReadable = true;
  286       return 0;
  287     } else
  288       return state.length;
  289   }
  290 
  291   return n;
  292 }
  293 
  294 // you can override either this method, or the async _read(n) below.
  295 Readable.prototype.read = function(n) {
  296   debug('read', n);
  297   var state = this._readableState;
  298   var nOrig = n;
  299 
  300   if (!util.isNumber(n) || n > 0)
  301     state.emittedReadable = false;
  302 
  303   // if we're doing read(0) to trigger a readable event, but we
  304   // already have a bunch of data in the buffer, then just trigger
  305   // the 'readable' event and move on.
  306   if (n === 0 &&
  307       state.needReadable &&
  308       (state.length >= state.highWaterMark || state.ended)) {
  309     debug('read: emitReadable', state.length, state.ended);
  310     if (state.length === 0 && state.ended)
  311       endReadable(this);
  312     else
  313       emitReadable(this);
  314     return null;
  315   }
  316 
  317   n = howMuchToRead(n, state);
  318 
  319   // if we've ended, and we're now clear, then finish it up.
  320   if (n === 0 && state.ended) {
  321     if (state.length === 0)
  322       endReadable(this);
  323     return null;
  324   }
  325 
  326   // All the actual chunk generation logic needs to be
  327   // *below* the call to _read.  The reason is that in certain
  328   // synthetic stream cases, such as passthrough streams, _read
  329   // may be a completely synchronous operation which may change
  330   // the state of the read buffer, providing enough data when
  331   // before there was *not* enough.
  332   //
  333   // So, the steps are:
  334   // 1. Figure out what the state of things will be after we do
  335   // a read from the buffer.
  336   //
  337   // 2. If that resulting state will trigger a _read, then call _read.
  338   // Note that this may be asynchronous, or synchronous.  Yes, it is
  339   // deeply ugly to write APIs this way, but that still doesn't mean
  340   // that the Readable class should behave improperly, as streams are
  341   // designed to be sync/async agnostic.
  342   // Take note if the _read call is sync or async (ie, if the read call
  343   // has returned yet), so that we know whether or not it's safe to emit
  344   // 'readable' etc.
  345   //
  346   // 3. Actually pull the requested chunks out of the buffer and return.
  347 
  348   // if we need a readable event, then we need to do some reading.
  349   var doRead = state.needReadable;
  350   debug('need readable', doRead);
  351 
  352   // if we currently have less than the highWaterMark, then also read some
  353   if (state.length === 0 || state.length - n < state.highWaterMark) {
  354     doRead = true;
  355     debug('length less than watermark', doRead);
  356   }
  357 
  358   // however, if we've ended, then there's no point, and if we're already
  359   // reading, then it's unnecessary.
  360   if (state.ended || state.reading) {
  361     doRead = false;
  362     debug('reading or ended', doRead);
  363   }
  364 
  365   if (doRead) {
  366     debug('do read');
  367     state.reading = true;
  368     state.sync = true;
  369     // if the length is currently zero, then we *need* a readable event.
  370     if (state.length === 0)
  371       state.needReadable = true;
  372     // call internal read method
  373     this._read(state.highWaterMark);
  374     state.sync = false;
  375   }
  376 
  377   // If _read pushed data synchronously, then `reading` will be false,
  378   // and we need to re-evaluate how much data we can return to the user.
  379   if (doRead && !state.reading)
  380     n = howMuchToRead(nOrig, state);
  381 
  382   var ret;
  383   if (n > 0)
  384     ret = fromList(n, state);
  385   else
  386     ret = null;
  387 
  388   if (util.isNull(ret)) {
  389     state.needReadable = true;
  390     n = 0;
  391   }
  392 
  393   state.length -= n;
  394 
  395   // If we have nothing in the buffer, then we want to know
  396   // as soon as we *do* get something into the buffer.
  397   if (state.length === 0 && !state.ended)
  398     state.needReadable = true;
  399 
  400   // If we tried to read() past the EOF, then emit end on the next tick.
  401   if (nOrig !== n && state.ended && state.length === 0)
  402     endReadable(this);
  403 
  404   if (!util.isNull(ret))
  405     this.emit('data', ret);
  406 
  407   return ret;
  408 };
  409 
  410 function chunkInvalid(state, chunk) {
  411   var er = null;
  412   if (!util.isBuffer(chunk) &&
  413       !util.isString(chunk) &&
  414       !util.isNullOrUndefined(chunk) &&
  415       !state.objectMode) {
  416     er = new TypeError('Invalid non-string/buffer chunk');
  417   }
  418   return er;
  419 }
  420 
  421 
  422 function onEofChunk(stream, state) {
  423   if (state.decoder && !state.ended) {
  424     var chunk = state.decoder.end();
  425     if (chunk && chunk.length) {
  426       state.buffer.push(chunk);
  427       state.length += state.objectMode ? 1 : chunk.length;
  428     }
  429   }
  430   state.ended = true;
  431 
  432   // emit 'readable' now to make sure it gets picked up.
  433   emitReadable(stream);
  434 }
  435 
  436 // Don't emit readable right away in sync mode, because this can trigger
  437 // another read() call => stack overflow.  This way, it might trigger
  438 // a nextTick recursion warning, but that's not so bad.
  439 function emitReadable(stream) {
  440   var state = stream._readableState;
  441   state.needReadable = false;
  442   if (!state.emittedReadable) {
  443     debug('emitReadable', state.flowing);
  444     state.emittedReadable = true;
  445     if (state.sync)
  446       process.nextTick(function() {
  447         emitReadable_(stream);
  448       });
  449     else
  450       emitReadable_(stream);
  451   }
  452 }
  453 
  454 function emitReadable_(stream) {
  455   debug('emit readable');
  456   stream.emit('readable');
  457   flow(stream);
  458 }
  459 
  460 
  461 // at this point, the user has presumably seen the 'readable' event,
  462 // and called read() to consume some data.  that may have triggered
  463 // in turn another _read(n) call, in which case reading = true if
  464 // it's in progress.
  465 // However, if we're not ended, or reading, and the length < hwm,
  466 // then go ahead and try to read some more preemptively.
  467 function maybeReadMore(stream, state) {
  468   if (!state.readingMore) {
  469     state.readingMore = true;
  470     process.nextTick(function() {
  471       maybeReadMore_(stream, state);
  472     });
  473   }
  474 }
  475 
  476 function maybeReadMore_(stream, state) {
  477   var len = state.length;
  478   while (!state.reading && !state.flowing && !state.ended &&
  479          state.length < state.highWaterMark) {
  480     debug('maybeReadMore read 0');
  481     stream.read(0);
  482     if (len === state.length)
  483       // didn't get any data, stop spinning.
  484       break;
  485     else
  486       len = state.length;
  487   }
  488   state.readingMore = false;
  489 }
  490 
  491 // abstract method.  to be overridden in specific implementation classes.
  492 // call cb(er, data) where data is <= n in length.
  493 // for virtual (non-string, non-buffer) streams, "length" is somewhat
  494 // arbitrary, and perhaps not very meaningful.
  495 Readable.prototype._read = function(n) {
  496   this.emit('error', new Error('not implemented'));
  497 };
  498 
  499 Readable.prototype.pipe = function(dest, pipeOpts) {
  500   var src = this;
  501   var state = this._readableState;
  502 
  503   switch (state.pipesCount) {
  504     case 0:
  505       state.pipes = dest;
  506       break;
  507     case 1:
  508       state.pipes = [state.pipes, dest];
  509       break;
  510     default:
  511       state.pipes.push(dest);
  512       break;
  513   }
  514   state.pipesCount += 1;
  515   debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts);
  516 
  517   var doEnd = (!pipeOpts || pipeOpts.end !== false) &&
  518               dest !== process.stdout &&
  519               dest !== process.stderr;
  520 
  521   var endFn = doEnd ? onend : cleanup;
  522   if (state.endEmitted)
  523     process.nextTick(endFn);
  524   else
  525     src.once('end', endFn);
  526 
  527   dest.on('unpipe', onunpipe);
  528   function onunpipe(readable) {
  529     debug('onunpipe');
  530     if (readable === src) {
  531       cleanup();
  532     }
  533   }
  534 
  535   function onend() {
  536     debug('onend');
  537     dest.end();
  538   }
  539 
  540   // when the dest drains, it reduces the awaitDrain counter
  541   // on the source.  This would be more elegant with a .once()
  542   // handler in flow(), but adding and removing repeatedly is
  543   // too slow.
  544   var ondrain = pipeOnDrain(src);
  545   dest.on('drain', ondrain);
  546 
  547   function cleanup() {
  548     debug('cleanup');
  549     // cleanup event handlers once the pipe is broken
  550     dest.removeListener('close', onclose);
  551     dest.removeListener('finish', onfinish);
  552     dest.removeListener('drain', ondrain);
  553     dest.removeListener('error', onerror);
  554     dest.removeListener('unpipe', onunpipe);
  555     src.removeListener('end', onend);
  556     src.removeListener('end', cleanup);
  557     src.removeListener('data', ondata);
  558 
  559     // if the reader is waiting for a drain event from this
  560     // specific writer, then it would cause it to never start
  561     // flowing again.
  562     // So, if this is awaiting a drain, then we just call it now.
  563     // If we don't know, then assume that we are waiting for one.
  564     if (state.awaitDrain &&
  565         (!dest._writableState || dest._writableState.needDrain))
  566       ondrain();
  567   }
  568 
  569   src.on('data', ondata);
  570   function ondata(chunk) {
  571     debug('ondata');
  572     var ret = dest.write(chunk);
  573     if (false === ret) {
  574       debug('false write response, pause',
  575             src._readableState.awaitDrain);
  576       src._readableState.awaitDrain++;
  577       src.pause();
  578     }
  579   }
  580 
  581   // if the dest has an error, then stop piping into it.
  582   // however, don't suppress the throwing behavior for this.
  583   function onerror(er) {
  584     debug('onerror', er);
  585     unpipe();
  586     dest.removeListener('error', onerror);
  587     if (EE.listenerCount(dest, 'error') === 0)
  588       dest.emit('error', er);
  589   }
  590   // This is a brutally ugly hack to make sure that our error handler
  591   // is attached before any userland ones.  NEVER DO THIS.
  592   if (!dest._events || !dest._events.error)
  593     dest.on('error', onerror);
  594   else if (isArray(dest._events.error))
  595     dest._events.error.unshift(onerror);
  596   else
  597     dest._events.error = [onerror, dest._events.error];
  598 
  599 
  600 
  601   // Both close and finish should trigger unpipe, but only once.
  602   function onclose() {
  603     dest.removeListener('finish', onfinish);
  604     unpipe();
  605   }
  606   dest.once('close', onclose);
  607   function onfinish() {
  608     debug('onfinish');
  609     dest.removeListener('close', onclose);
  610     unpipe();
  611   }
  612   dest.once('finish', onfinish);
  613 
  614   function unpipe() {
  615     debug('unpipe');
  616     src.unpipe(dest);
  617   }
  618 
  619   // tell the dest that it's being piped to
  620   dest.emit('pipe', src);
  621 
  622   // start the flow if it hasn't been started already.
  623   if (!state.flowing) {
  624     debug('pipe resume');
  625     src.resume();
  626   }
  627 
  628   return dest;
  629 };
  630 
  631 function pipeOnDrain(src) {
  632   return function() {
  633     var state = src._readableState;
  634     debug('pipeOnDrain', state.awaitDrain);
  635     if (state.awaitDrain)
  636       state.awaitDrain--;
  637     if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
  638       state.flowing = true;
  639       flow(src);
  640     }
  641   };
  642 }
  643 
  644 
  645 Readable.prototype.unpipe = function(dest) {
  646   var state = this._readableState;
  647 
  648   // if we're not piping anywhere, then do nothing.
  649   if (state.pipesCount === 0)
  650     return this;
  651 
  652   // just one destination.  most common case.
  653   if (state.pipesCount === 1) {
  654     // passed in one, but it's not the right one.
  655     if (dest && dest !== state.pipes)
  656       return this;
  657 
  658     if (!dest)
  659       dest = state.pipes;
  660 
  661     // got a match.
  662     state.pipes = null;
  663     state.pipesCount = 0;
  664     state.flowing = false;
  665     if (dest)
  666       dest.emit('unpipe', this);
  667     return this;
  668   }
  669 
  670   // slow case. multiple pipe destinations.
  671 
  672   if (!dest) {
  673     // remove all.
  674     var dests = state.pipes;
  675     var len = state.pipesCount;
  676     state.pipes = null;
  677     state.pipesCount = 0;
  678     state.flowing = false;
  679 
  680     for (var i = 0; i < len; i++)
  681       dests[i].emit('unpipe', this);
  682     return this;
  683   }
  684 
  685   // try to find the right one.
  686   var i = indexOf(state.pipes, dest);
  687   if (i === -1)
  688     return this;
  689 
  690   state.pipes.splice(i, 1);
  691   state.pipesCount -= 1;
  692   if (state.pipesCount === 1)
  693     state.pipes = state.pipes[0];
  694 
  695   dest.emit('unpipe', this);
  696 
  697   return this;
  698 };
  699 
  700 // set up data events if they are asked for
  701 // Ensure readable listeners eventually get something
  702 Readable.prototype.on = function(ev, fn) {
  703   var res = Stream.prototype.on.call(this, ev, fn);
  704 
  705   // If listening to data, and it has not explicitly been paused,
  706   // then call resume to start the flow of data on the next tick.
  707   if (ev === 'data' && false !== this._readableState.flowing) {
  708     this.resume();
  709   }
  710 
  711   if (ev === 'readable' && this.readable) {
  712     var state = this._readableState;
  713     if (!state.readableListening) {
  714       state.readableListening = true;
  715       state.emittedReadable = false;
  716       state.needReadable = true;
  717       if (!state.reading) {
  718         var self = this;
  719         process.nextTick(function() {
  720           debug('readable nexttick read 0');
  721           self.read(0);
  722         });
  723       } else if (state.length) {
  724         emitReadable(this, state);
  725       }
  726     }
  727   }
  728 
  729   return res;
  730 };
  731 Readable.prototype.addListener = Readable.prototype.on;
  732 
  733 // pause() and resume() are remnants of the legacy readable stream API
  734 // If the user uses them, then switch into old mode.
  735 Readable.prototype.resume = function() {
  736   var state = this._readableState;
  737   if (!state.flowing) {
  738     debug('resume');
  739     state.flowing = true;
  740     if (!state.reading) {
  741       debug('resume read 0');
  742       this.read(0);
  743     }
  744     resume(this, state);
  745   }
  746   return this;
  747 };
  748 
  749 function resume(stream, state) {
  750   if (!state.resumeScheduled) {
  751     state.resumeScheduled = true;
  752     process.nextTick(function() {
  753       resume_(stream, state);
  754     });
  755   }
  756 }
  757 
  758 function resume_(stream, state) {
  759   state.resumeScheduled = false;
  760   stream.emit('resume');
  761   flow(stream);
  762   if (state.flowing && !state.reading)
  763     stream.read(0);
  764 }
  765 
  766 Readable.prototype.pause = function() {
  767   debug('call pause flowing=%j', this._readableState.flowing);
  768   if (false !== this._readableState.flowing) {
  769     debug('pause');
  770     this._readableState.flowing = false;
  771     this.emit('pause');
  772   }
  773   return this;
  774 };
  775 
  776 function flow(stream) {
  777   var state = stream._readableState;
  778   debug('flow', state.flowing);
  779   if (state.flowing) {
  780     do {
  781       var chunk = stream.read();
  782     } while (null !== chunk && state.flowing);
  783   }
  784 }
  785 
  786 // wrap an old-style stream as the async data source.
  787 // This is *not* part of the readable stream interface.
  788 // It is an ugly unfortunate mess of history.
  789 Readable.prototype.wrap = function(stream) {
  790   var state = this._readableState;
  791   var paused = false;
  792 
  793   var self = this;
  794   stream.on('end', function() {
  795     debug('wrapped end');
  796     if (state.decoder && !state.ended) {
  797       var chunk = state.decoder.end();
  798       if (chunk && chunk.length)
  799         self.push(chunk);
  800     }
  801 
  802     self.push(null);
  803   });
  804 
  805   stream.on('data', function(chunk) {
  806     debug('wrapped data');
  807     if (state.decoder)
  808       chunk = state.decoder.write(chunk);
  809     if (!chunk || !state.objectMode && !chunk.length)
  810       return;
  811 
  812     var ret = self.push(chunk);
  813     if (!ret) {
  814       paused = true;
  815       stream.pause();
  816     }
  817   });
  818 
  819   // proxy all the other methods.
  820   // important when wrapping filters and duplexes.
  821   for (var i in stream) {
  822     if (util.isFunction(stream[i]) && util.isUndefined(this[i])) {
  823       this[i] = function(method) { return function() {
  824         return stream[method].apply(stream, arguments);
  825       }}(i);
  826     }
  827   }
  828 
  829   // proxy certain important events.
  830   var events = ['error', 'close', 'destroy', 'pause', 'resume'];
  831   forEach(events, function(ev) {
  832     stream.on(ev, self.emit.bind(self, ev));
  833   });
  834 
  835   // when we try to consume some more bytes, simply unpause the
  836   // underlying stream.
  837   self._read = function(n) {
  838     debug('wrapped _read', n);
  839     if (paused) {
  840       paused = false;
  841       stream.resume();
  842     }
  843   };
  844 
  845   return self;
  846 };
  847 
  848 
  849 
  850 // exposed for testing purposes only.
  851 Readable._fromList = fromList;
  852 
  853 // Pluck off n bytes from an array of buffers.
  854 // Length is the combined lengths of all the buffers in the list.
  855 function fromList(n, state) {
  856   var list = state.buffer;
  857   var length = state.length;
  858   var stringMode = !!state.decoder;
  859   var objectMode = !!state.objectMode;
  860   var ret;
  861 
  862   // nothing in the list, definitely empty.
  863   if (list.length === 0)
  864     return null;
  865 
  866   if (length === 0)
  867     ret = null;
  868   else if (objectMode)
  869     ret = list.shift();
  870   else if (!n || n >= length) {
  871     // read it all, truncate the array.
  872     if (stringMode)
  873       ret = list.join('');
  874     else
  875       ret = Buffer.concat(list, length);
  876     list.length = 0;
  877   } else {
  878     // read just some of it.
  879     if (n < list[0].length) {
  880       // just take a part of the first list item.
  881       // slice is the same for buffers and strings.
  882       var buf = list[0];
  883       ret = buf.slice(0, n);
  884       list[0] = buf.slice(n);
  885     } else if (n === list[0].length) {
  886       // first list is a perfect match
  887       ret = list.shift();
  888     } else {
  889       // complex case.
  890       // we have enough to cover it, but it spans past the first buffer.
  891       if (stringMode)
  892         ret = '';
  893       else
  894         ret = new Buffer(n);
  895 
  896       var c = 0;
  897       for (var i = 0, l = list.length; i < l && c < n; i++) {
  898         var buf = list[0];
  899         var cpy = Math.min(n - c, buf.length);
  900 
  901         if (stringMode)
  902           ret += buf.slice(0, cpy);
  903         else
  904           buf.copy(ret, c, 0, cpy);
  905 
  906         if (cpy < buf.length)
  907           list[0] = buf.slice(cpy);
  908         else
  909           list.shift();
  910 
  911         c += cpy;
  912       }
  913     }
  914   }
  915 
  916   return ret;
  917 }
  918 
  919 function endReadable(stream) {
  920   var state = stream._readableState;
  921 
  922   // If we get here before consuming all the bytes, then that is a
  923   // bug in node.  Should never happen.
  924   if (state.length > 0)
  925     throw new Error('endReadable called on non-empty stream');
  926 
  927   if (!state.endEmitted) {
  928     state.ended = true;
  929     process.nextTick(function() {
  930       // Check that we didn't get one last unshift.
  931       if (!state.endEmitted && state.length === 0) {
  932         state.endEmitted = true;
  933         stream.readable = false;
  934         stream.emit('end');
  935       }
  936     });
  937   }
  938 }
  939 
  940 function forEach (xs, f) {
  941   for (var i = 0, l = xs.length; i < l; i++) {
  942     f(xs[i], i);
  943   }
  944 }
  945 
  946 function indexOf (xs, x) {
  947   for (var i = 0, l = xs.length; i < l; i++) {
  948     if (xs[i] === x) return i;
  949   }
  950   return -1;
  951 }