"Fossies" - the Fresh Open Source Software Archive

Member "Atom/resources/app/apm/node_modules/bl/node_modules/readable-stream/lib/_stream_readable.js" (7 Feb 2017, 27120 Bytes) of archive /windows/misc/atom-windows.zip:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Javascript source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file.

    1 // Copyright Joyent, Inc. and other Node contributors.
    2 //
    3 // Permission is hereby granted, free of charge, to any person obtaining a
    4 // copy of this software and associated documentation files (the
    5 // "Software"), to deal in the Software without restriction, including
    6 // without limitation the rights to use, copy, modify, merge, publish,
    7 // distribute, sublicense, and/or sell copies of the Software, and to permit
    8 // persons to whom the Software is furnished to do so, subject to the
    9 // following conditions:
   10 //
   11 // The above copyright notice and this permission notice shall be included
   12 // in all copies or substantial portions of the Software.
   13 //
   14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
   15 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
   16 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
   17 // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
   18 // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
   19 // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
   20 // USE OR OTHER DEALINGS IN THE SOFTWARE.
   21 
   22 module.exports = Readable;
   23 
   24 /*<replacement>*/
   25 var isArray = require('isarray');
   26 /*</replacement>*/
   27 
   28 
   29 /*<replacement>*/
   30 var Buffer = require('buffer').Buffer;
   31 /*</replacement>*/
   32 
   33 Readable.ReadableState = ReadableState;
   34 
   35 var EE = require('events').EventEmitter;
   36 
   37 /*<replacement>*/
   38 if (!EE.listenerCount) EE.listenerCount = function(emitter, type) {
   39   return emitter.listeners(type).length;
   40 };
   41 /*</replacement>*/
   42 
   43 var Stream = require('stream');
   44 
   45 /*<replacement>*/
   46 var util = require('core-util-is');
   47 util.inherits = require('inherits');
   48 /*</replacement>*/
   49 
   50 var StringDecoder;
   51 
   52 util.inherits(Readable, Stream);
   53 
   54 function ReadableState(options, stream) {
   55   options = options || {};
   56 
   57   // the point at which it stops calling _read() to fill the buffer
   58   // Note: 0 is a valid value, means "don't call _read preemptively ever"
   59   var hwm = options.highWaterMark;
   60   this.highWaterMark = (hwm || hwm === 0) ? hwm : 16 * 1024;
   61 
   62   // cast to ints.
   63   this.highWaterMark = ~~this.highWaterMark;
   64 
   65   this.buffer = [];
   66   this.length = 0;
   67   this.pipes = null;
   68   this.pipesCount = 0;
   69   this.flowing = false;
   70   this.ended = false;
   71   this.endEmitted = false;
   72   this.reading = false;
   73 
   74   // In streams that never have any data, and do push(null) right away,
   75   // the consumer can miss the 'end' event if they do some I/O before
   76   // consuming the stream.  So, we don't emit('end') until some reading
   77   // happens.
   78   this.calledRead = false;
   79 
   80   // a flag to be able to tell if the onwrite cb is called immediately,
   81   // or on a later tick.  We set this to true at first, becuase any
   82   // actions that shouldn't happen until "later" should generally also
   83   // not happen before the first write call.
   84   this.sync = true;
   85 
   86   // whenever we return null, then we set a flag to say
   87   // that we're awaiting a 'readable' event emission.
   88   this.needReadable = false;
   89   this.emittedReadable = false;
   90   this.readableListening = false;
   91 
   92 
   93   // object stream flag. Used to make read(n) ignore n and to
   94   // make all the buffer merging and length checks go away
   95   this.objectMode = !!options.objectMode;
   96 
   97   // Crypto is kind of old and crusty.  Historically, its default string
   98   // encoding is 'binary' so we have to make this configurable.
   99   // Everything else in the universe uses 'utf8', though.
  100   this.defaultEncoding = options.defaultEncoding || 'utf8';
  101 
  102   // when piping, we only care about 'readable' events that happen
  103   // after read()ing all the bytes and not getting any pushback.
  104   this.ranOut = false;
  105 
  106   // the number of writers that are awaiting a drain event in .pipe()s
  107   this.awaitDrain = 0;
  108 
  109   // if true, a maybeReadMore has been scheduled
  110   this.readingMore = false;
  111 
  112   this.decoder = null;
  113   this.encoding = null;
  114   if (options.encoding) {
  115     if (!StringDecoder)
  116       StringDecoder = require('string_decoder/').StringDecoder;
  117     this.decoder = new StringDecoder(options.encoding);
  118     this.encoding = options.encoding;
  119   }
  120 }
  121 
  122 function Readable(options) {
  123   if (!(this instanceof Readable))
  124     return new Readable(options);
  125 
  126   this._readableState = new ReadableState(options, this);
  127 
  128   // legacy
  129   this.readable = true;
  130 
  131   Stream.call(this);
  132 }
  133 
  134 // Manually shove something into the read() buffer.
  135 // This returns true if the highWaterMark has not been hit yet,
  136 // similar to how Writable.write() returns true if you should
  137 // write() some more.
  138 Readable.prototype.push = function(chunk, encoding) {
  139   var state = this._readableState;
  140 
  141   if (typeof chunk === 'string' && !state.objectMode) {
  142     encoding = encoding || state.defaultEncoding;
  143     if (encoding !== state.encoding) {
  144       chunk = new Buffer(chunk, encoding);
  145       encoding = '';
  146     }
  147   }
  148 
  149   return readableAddChunk(this, state, chunk, encoding, false);
  150 };
  151 
  152 // Unshift should *always* be something directly out of read()
  153 Readable.prototype.unshift = function(chunk) {
  154   var state = this._readableState;
  155   return readableAddChunk(this, state, chunk, '', true);
  156 };
  157 
  158 function readableAddChunk(stream, state, chunk, encoding, addToFront) {
  159   var er = chunkInvalid(state, chunk);
  160   if (er) {
  161     stream.emit('error', er);
  162   } else if (chunk === null || chunk === undefined) {
  163     state.reading = false;
  164     if (!state.ended)
  165       onEofChunk(stream, state);
  166   } else if (state.objectMode || chunk && chunk.length > 0) {
  167     if (state.ended && !addToFront) {
  168       var e = new Error('stream.push() after EOF');
  169       stream.emit('error', e);
  170     } else if (state.endEmitted && addToFront) {
  171       var e = new Error('stream.unshift() after end event');
  172       stream.emit('error', e);
  173     } else {
  174       if (state.decoder && !addToFront && !encoding)
  175         chunk = state.decoder.write(chunk);
  176 
  177       // update the buffer info.
  178       state.length += state.objectMode ? 1 : chunk.length;
  179       if (addToFront) {
  180         state.buffer.unshift(chunk);
  181       } else {
  182         state.reading = false;
  183         state.buffer.push(chunk);
  184       }
  185 
  186       if (state.needReadable)
  187         emitReadable(stream);
  188 
  189       maybeReadMore(stream, state);
  190     }
  191   } else if (!addToFront) {
  192     state.reading = false;
  193   }
  194 
  195   return needMoreData(state);
  196 }
  197 
  198 
  199 
  200 // if it's past the high water mark, we can push in some more.
  201 // Also, if we have no data yet, we can stand some
  202 // more bytes.  This is to work around cases where hwm=0,
  203 // such as the repl.  Also, if the push() triggered a
  204 // readable event, and the user called read(largeNumber) such that
  205 // needReadable was set, then we ought to push more, so that another
  206 // 'readable' event will be triggered.
  207 function needMoreData(state) {
  208   return !state.ended &&
  209          (state.needReadable ||
  210           state.length < state.highWaterMark ||
  211           state.length === 0);
  212 }
  213 
  214 // backwards compatibility.
  215 Readable.prototype.setEncoding = function(enc) {
  216   if (!StringDecoder)
  217     StringDecoder = require('string_decoder/').StringDecoder;
  218   this._readableState.decoder = new StringDecoder(enc);
  219   this._readableState.encoding = enc;
  220 };
  221 
  222 // Don't raise the hwm > 128MB
  223 var MAX_HWM = 0x800000;
  224 function roundUpToNextPowerOf2(n) {
  225   if (n >= MAX_HWM) {
  226     n = MAX_HWM;
  227   } else {
  228     // Get the next highest power of 2
  229     n--;
  230     for (var p = 1; p < 32; p <<= 1) n |= n >> p;
  231     n++;
  232   }
  233   return n;
  234 }
  235 
  236 function howMuchToRead(n, state) {
  237   if (state.length === 0 && state.ended)
  238     return 0;
  239 
  240   if (state.objectMode)
  241     return n === 0 ? 0 : 1;
  242 
  243   if (n === null || isNaN(n)) {
  244     // only flow one buffer at a time
  245     if (state.flowing && state.buffer.length)
  246       return state.buffer[0].length;
  247     else
  248       return state.length;
  249   }
  250 
  251   if (n <= 0)
  252     return 0;
  253 
  254   // If we're asking for more than the target buffer level,
  255   // then raise the water mark.  Bump up to the next highest
  256   // power of 2, to prevent increasing it excessively in tiny
  257   // amounts.
  258   if (n > state.highWaterMark)
  259     state.highWaterMark = roundUpToNextPowerOf2(n);
  260 
  261   // don't have that much.  return null, unless we've ended.
  262   if (n > state.length) {
  263     if (!state.ended) {
  264       state.needReadable = true;
  265       return 0;
  266     } else
  267       return state.length;
  268   }
  269 
  270   return n;
  271 }
  272 
  273 // you can override either this method, or the async _read(n) below.
  274 Readable.prototype.read = function(n) {
  275   var state = this._readableState;
  276   state.calledRead = true;
  277   var nOrig = n;
  278   var ret;
  279 
  280   if (typeof n !== 'number' || n > 0)
  281     state.emittedReadable = false;
  282 
  283   // if we're doing read(0) to trigger a readable event, but we
  284   // already have a bunch of data in the buffer, then just trigger
  285   // the 'readable' event and move on.
  286   if (n === 0 &&
  287       state.needReadable &&
  288       (state.length >= state.highWaterMark || state.ended)) {
  289     emitReadable(this);
  290     return null;
  291   }
  292 
  293   n = howMuchToRead(n, state);
  294 
  295   // if we've ended, and we're now clear, then finish it up.
  296   if (n === 0 && state.ended) {
  297     ret = null;
  298 
  299     // In cases where the decoder did not receive enough data
  300     // to produce a full chunk, then immediately received an
  301     // EOF, state.buffer will contain [<Buffer >, <Buffer 00 ...>].
  302     // howMuchToRead will see this and coerce the amount to
  303     // read to zero (because it's looking at the length of the
  304     // first <Buffer > in state.buffer), and we'll end up here.
  305     //
  306     // This can only happen via state.decoder -- no other venue
  307     // exists for pushing a zero-length chunk into state.buffer
  308     // and triggering this behavior. In this case, we return our
  309     // remaining data and end the stream, if appropriate.
  310     if (state.length > 0 && state.decoder) {
  311       ret = fromList(n, state);
  312       state.length -= ret.length;
  313     }
  314 
  315     if (state.length === 0)
  316       endReadable(this);
  317 
  318     return ret;
  319   }
  320 
  321   // All the actual chunk generation logic needs to be
  322   // *below* the call to _read.  The reason is that in certain
  323   // synthetic stream cases, such as passthrough streams, _read
  324   // may be a completely synchronous operation which may change
  325   // the state of the read buffer, providing enough data when
  326   // before there was *not* enough.
  327   //
  328   // So, the steps are:
  329   // 1. Figure out what the state of things will be after we do
  330   // a read from the buffer.
  331   //
  332   // 2. If that resulting state will trigger a _read, then call _read.
  333   // Note that this may be asynchronous, or synchronous.  Yes, it is
  334   // deeply ugly to write APIs this way, but that still doesn't mean
  335   // that the Readable class should behave improperly, as streams are
  336   // designed to be sync/async agnostic.
  337   // Take note if the _read call is sync or async (ie, if the read call
  338   // has returned yet), so that we know whether or not it's safe to emit
  339   // 'readable' etc.
  340   //
  341   // 3. Actually pull the requested chunks out of the buffer and return.
  342 
  343   // if we need a readable event, then we need to do some reading.
  344   var doRead = state.needReadable;
  345 
  346   // if we currently have less than the highWaterMark, then also read some
  347   if (state.length - n <= state.highWaterMark)
  348     doRead = true;
  349 
  350   // however, if we've ended, then there's no point, and if we're already
  351   // reading, then it's unnecessary.
  352   if (state.ended || state.reading)
  353     doRead = false;
  354 
  355   if (doRead) {
  356     state.reading = true;
  357     state.sync = true;
  358     // if the length is currently zero, then we *need* a readable event.
  359     if (state.length === 0)
  360       state.needReadable = true;
  361     // call internal read method
  362     this._read(state.highWaterMark);
  363     state.sync = false;
  364   }
  365 
  366   // If _read called its callback synchronously, then `reading`
  367   // will be false, and we need to re-evaluate how much data we
  368   // can return to the user.
  369   if (doRead && !state.reading)
  370     n = howMuchToRead(nOrig, state);
  371 
  372   if (n > 0)
  373     ret = fromList(n, state);
  374   else
  375     ret = null;
  376 
  377   if (ret === null) {
  378     state.needReadable = true;
  379     n = 0;
  380   }
  381 
  382   state.length -= n;
  383 
  384   // If we have nothing in the buffer, then we want to know
  385   // as soon as we *do* get something into the buffer.
  386   if (state.length === 0 && !state.ended)
  387     state.needReadable = true;
  388 
  389   // If we happened to read() exactly the remaining amount in the
  390   // buffer, and the EOF has been seen at this point, then make sure
  391   // that we emit 'end' on the very next tick.
  392   if (state.ended && !state.endEmitted && state.length === 0)
  393     endReadable(this);
  394 
  395   return ret;
  396 };
  397 
  398 function chunkInvalid(state, chunk) {
  399   var er = null;
  400   if (!Buffer.isBuffer(chunk) &&
  401       'string' !== typeof chunk &&
  402       chunk !== null &&
  403       chunk !== undefined &&
  404       !state.objectMode) {
  405     er = new TypeError('Invalid non-string/buffer chunk');
  406   }
  407   return er;
  408 }
  409 
  410 
  411 function onEofChunk(stream, state) {
  412   if (state.decoder && !state.ended) {
  413     var chunk = state.decoder.end();
  414     if (chunk && chunk.length) {
  415       state.buffer.push(chunk);
  416       state.length += state.objectMode ? 1 : chunk.length;
  417     }
  418   }
  419   state.ended = true;
  420 
  421   // if we've ended and we have some data left, then emit
  422   // 'readable' now to make sure it gets picked up.
  423   if (state.length > 0)
  424     emitReadable(stream);
  425   else
  426     endReadable(stream);
  427 }
  428 
  429 // Don't emit readable right away in sync mode, because this can trigger
  430 // another read() call => stack overflow.  This way, it might trigger
  431 // a nextTick recursion warning, but that's not so bad.
  432 function emitReadable(stream) {
  433   var state = stream._readableState;
  434   state.needReadable = false;
  435   if (state.emittedReadable)
  436     return;
  437 
  438   state.emittedReadable = true;
  439   if (state.sync)
  440     process.nextTick(function() {
  441       emitReadable_(stream);
  442     });
  443   else
  444     emitReadable_(stream);
  445 }
  446 
  447 function emitReadable_(stream) {
  448   stream.emit('readable');
  449 }
  450 
  451 
  452 // at this point, the user has presumably seen the 'readable' event,
  453 // and called read() to consume some data.  that may have triggered
  454 // in turn another _read(n) call, in which case reading = true if
  455 // it's in progress.
  456 // However, if we're not ended, or reading, and the length < hwm,
  457 // then go ahead and try to read some more preemptively.
  458 function maybeReadMore(stream, state) {
  459   if (!state.readingMore) {
  460     state.readingMore = true;
  461     process.nextTick(function() {
  462       maybeReadMore_(stream, state);
  463     });
  464   }
  465 }
  466 
  467 function maybeReadMore_(stream, state) {
  468   var len = state.length;
  469   while (!state.reading && !state.flowing && !state.ended &&
  470          state.length < state.highWaterMark) {
  471     stream.read(0);
  472     if (len === state.length)
  473       // didn't get any data, stop spinning.
  474       break;
  475     else
  476       len = state.length;
  477   }
  478   state.readingMore = false;
  479 }
  480 
  481 // abstract method.  to be overridden in specific implementation classes.
  482 // call cb(er, data) where data is <= n in length.
  483 // for virtual (non-string, non-buffer) streams, "length" is somewhat
  484 // arbitrary, and perhaps not very meaningful.
  485 Readable.prototype._read = function(n) {
  486   this.emit('error', new Error('not implemented'));
  487 };
  488 
  489 Readable.prototype.pipe = function(dest, pipeOpts) {
  490   var src = this;
  491   var state = this._readableState;
  492 
  493   switch (state.pipesCount) {
  494     case 0:
  495       state.pipes = dest;
  496       break;
  497     case 1:
  498       state.pipes = [state.pipes, dest];
  499       break;
  500     default:
  501       state.pipes.push(dest);
  502       break;
  503   }
  504   state.pipesCount += 1;
  505 
  506   var doEnd = (!pipeOpts || pipeOpts.end !== false) &&
  507               dest !== process.stdout &&
  508               dest !== process.stderr;
  509 
  510   var endFn = doEnd ? onend : cleanup;
  511   if (state.endEmitted)
  512     process.nextTick(endFn);
  513   else
  514     src.once('end', endFn);
  515 
  516   dest.on('unpipe', onunpipe);
  517   function onunpipe(readable) {
  518     if (readable !== src) return;
  519     cleanup();
  520   }
  521 
  522   function onend() {
  523     dest.end();
  524   }
  525 
  526   // when the dest drains, it reduces the awaitDrain counter
  527   // on the source.  This would be more elegant with a .once()
  528   // handler in flow(), but adding and removing repeatedly is
  529   // too slow.
  530   var ondrain = pipeOnDrain(src);
  531   dest.on('drain', ondrain);
  532 
  533   function cleanup() {
  534     // cleanup event handlers once the pipe is broken
  535     dest.removeListener('close', onclose);
  536     dest.removeListener('finish', onfinish);
  537     dest.removeListener('drain', ondrain);
  538     dest.removeListener('error', onerror);
  539     dest.removeListener('unpipe', onunpipe);
  540     src.removeListener('end', onend);
  541     src.removeListener('end', cleanup);
  542 
  543     // if the reader is waiting for a drain event from this
  544     // specific writer, then it would cause it to never start
  545     // flowing again.
  546     // So, if this is awaiting a drain, then we just call it now.
  547     // If we don't know, then assume that we are waiting for one.
  548     if (!dest._writableState || dest._writableState.needDrain)
  549       ondrain();
  550   }
  551 
  552   // if the dest has an error, then stop piping into it.
  553   // however, don't suppress the throwing behavior for this.
  554   function onerror(er) {
  555     unpipe();
  556     dest.removeListener('error', onerror);
  557     if (EE.listenerCount(dest, 'error') === 0)
  558       dest.emit('error', er);
  559   }
  560   // This is a brutally ugly hack to make sure that our error handler
  561   // is attached before any userland ones.  NEVER DO THIS.
  562   if (!dest._events || !dest._events.error)
  563     dest.on('error', onerror);
  564   else if (isArray(dest._events.error))
  565     dest._events.error.unshift(onerror);
  566   else
  567     dest._events.error = [onerror, dest._events.error];
  568 
  569 
  570 
  571   // Both close and finish should trigger unpipe, but only once.
  572   function onclose() {
  573     dest.removeListener('finish', onfinish);
  574     unpipe();
  575   }
  576   dest.once('close', onclose);
  577   function onfinish() {
  578     dest.removeListener('close', onclose);
  579     unpipe();
  580   }
  581   dest.once('finish', onfinish);
  582 
  583   function unpipe() {
  584     src.unpipe(dest);
  585   }
  586 
  587   // tell the dest that it's being piped to
  588   dest.emit('pipe', src);
  589 
  590   // start the flow if it hasn't been started already.
  591   if (!state.flowing) {
  592     // the handler that waits for readable events after all
  593     // the data gets sucked out in flow.
  594     // This would be easier to follow with a .once() handler
  595     // in flow(), but that is too slow.
  596     this.on('readable', pipeOnReadable);
  597 
  598     state.flowing = true;
  599     process.nextTick(function() {
  600       flow(src);
  601     });
  602   }
  603 
  604   return dest;
  605 };
  606 
  607 function pipeOnDrain(src) {
  608   return function() {
  609     var dest = this;
  610     var state = src._readableState;
  611     state.awaitDrain--;
  612     if (state.awaitDrain === 0)
  613       flow(src);
  614   };
  615 }
  616 
  617 function flow(src) {
  618   var state = src._readableState;
  619   var chunk;
  620   state.awaitDrain = 0;
  621 
  622   function write(dest, i, list) {
  623     var written = dest.write(chunk);
  624     if (false === written) {
  625       state.awaitDrain++;
  626     }
  627   }
  628 
  629   while (state.pipesCount && null !== (chunk = src.read())) {
  630 
  631     if (state.pipesCount === 1)
  632       write(state.pipes, 0, null);
  633     else
  634       forEach(state.pipes, write);
  635 
  636     src.emit('data', chunk);
  637 
  638     // if anyone needs a drain, then we have to wait for that.
  639     if (state.awaitDrain > 0)
  640       return;
  641   }
  642 
  643   // if every destination was unpiped, either before entering this
  644   // function, or in the while loop, then stop flowing.
  645   //
  646   // NB: This is a pretty rare edge case.
  647   if (state.pipesCount === 0) {
  648     state.flowing = false;
  649 
  650     // if there were data event listeners added, then switch to old mode.
  651     if (EE.listenerCount(src, 'data') > 0)
  652       emitDataEvents(src);
  653     return;
  654   }
  655 
  656   // at this point, no one needed a drain, so we just ran out of data
  657   // on the next readable event, start it over again.
  658   state.ranOut = true;
  659 }
  660 
  661 function pipeOnReadable() {
  662   if (this._readableState.ranOut) {
  663     this._readableState.ranOut = false;
  664     flow(this);
  665   }
  666 }
  667 
  668 
  669 Readable.prototype.unpipe = function(dest) {
  670   var state = this._readableState;
  671 
  672   // if we're not piping anywhere, then do nothing.
  673   if (state.pipesCount === 0)
  674     return this;
  675 
  676   // just one destination.  most common case.
  677   if (state.pipesCount === 1) {
  678     // passed in one, but it's not the right one.
  679     if (dest && dest !== state.pipes)
  680       return this;
  681 
  682     if (!dest)
  683       dest = state.pipes;
  684 
  685     // got a match.
  686     state.pipes = null;
  687     state.pipesCount = 0;
  688     this.removeListener('readable', pipeOnReadable);
  689     state.flowing = false;
  690     if (dest)
  691       dest.emit('unpipe', this);
  692     return this;
  693   }
  694 
  695   // slow case. multiple pipe destinations.
  696 
  697   if (!dest) {
  698     // remove all.
  699     var dests = state.pipes;
  700     var len = state.pipesCount;
  701     state.pipes = null;
  702     state.pipesCount = 0;
  703     this.removeListener('readable', pipeOnReadable);
  704     state.flowing = false;
  705 
  706     for (var i = 0; i < len; i++)
  707       dests[i].emit('unpipe', this);
  708     return this;
  709   }
  710 
  711   // try to find the right one.
  712   var i = indexOf(state.pipes, dest);
  713   if (i === -1)
  714     return this;
  715 
  716   state.pipes.splice(i, 1);
  717   state.pipesCount -= 1;
  718   if (state.pipesCount === 1)
  719     state.pipes = state.pipes[0];
  720 
  721   dest.emit('unpipe', this);
  722 
  723   return this;
  724 };
  725 
  726 // set up data events if they are asked for
  727 // Ensure readable listeners eventually get something
  728 Readable.prototype.on = function(ev, fn) {
  729   var res = Stream.prototype.on.call(this, ev, fn);
  730 
  731   if (ev === 'data' && !this._readableState.flowing)
  732     emitDataEvents(this);
  733 
  734   if (ev === 'readable' && this.readable) {
  735     var state = this._readableState;
  736     if (!state.readableListening) {
  737       state.readableListening = true;
  738       state.emittedReadable = false;
  739       state.needReadable = true;
  740       if (!state.reading) {
  741         this.read(0);
  742       } else if (state.length) {
  743         emitReadable(this, state);
  744       }
  745     }
  746   }
  747 
  748   return res;
  749 };
  750 Readable.prototype.addListener = Readable.prototype.on;
  751 
  752 // pause() and resume() are remnants of the legacy readable stream API
  753 // If the user uses them, then switch into old mode.
  754 Readable.prototype.resume = function() {
  755   emitDataEvents(this);
  756   this.read(0);
  757   this.emit('resume');
  758 };
  759 
  760 Readable.prototype.pause = function() {
  761   emitDataEvents(this, true);
  762   this.emit('pause');
  763 };
  764 
  765 function emitDataEvents(stream, startPaused) {
  766   var state = stream._readableState;
  767 
  768   if (state.flowing) {
  769     // https://github.com/isaacs/readable-stream/issues/16
  770     throw new Error('Cannot switch to old mode now.');
  771   }
  772 
  773   var paused = startPaused || false;
  774   var readable = false;
  775 
  776   // convert to an old-style stream.
  777   stream.readable = true;
  778   stream.pipe = Stream.prototype.pipe;
  779   stream.on = stream.addListener = Stream.prototype.on;
  780 
  781   stream.on('readable', function() {
  782     readable = true;
  783 
  784     var c;
  785     while (!paused && (null !== (c = stream.read())))
  786       stream.emit('data', c);
  787 
  788     if (c === null) {
  789       readable = false;
  790       stream._readableState.needReadable = true;
  791     }
  792   });
  793 
  794   stream.pause = function() {
  795     paused = true;
  796     this.emit('pause');
  797   };
  798 
  799   stream.resume = function() {
  800     paused = false;
  801     if (readable)
  802       process.nextTick(function() {
  803         stream.emit('readable');
  804       });
  805     else
  806       this.read(0);
  807     this.emit('resume');
  808   };
  809 
  810   // now make it start, just in case it hadn't already.
  811   stream.emit('readable');
  812 }
  813 
  814 // wrap an old-style stream as the async data source.
  815 // This is *not* part of the readable stream interface.
  816 // It is an ugly unfortunate mess of history.
  817 Readable.prototype.wrap = function(stream) {
  818   var state = this._readableState;
  819   var paused = false;
  820 
  821   var self = this;
  822   stream.on('end', function() {
  823     if (state.decoder && !state.ended) {
  824       var chunk = state.decoder.end();
  825       if (chunk && chunk.length)
  826         self.push(chunk);
  827     }
  828 
  829     self.push(null);
  830   });
  831 
  832   stream.on('data', function(chunk) {
  833     if (state.decoder)
  834       chunk = state.decoder.write(chunk);
  835 
  836     // don't skip over falsy values in objectMode
  837     //if (state.objectMode && util.isNullOrUndefined(chunk))
  838     if (state.objectMode && (chunk === null || chunk === undefined))
  839       return;
  840     else if (!state.objectMode && (!chunk || !chunk.length))
  841       return;
  842 
  843     var ret = self.push(chunk);
  844     if (!ret) {
  845       paused = true;
  846       stream.pause();
  847     }
  848   });
  849 
  850   // proxy all the other methods.
  851   // important when wrapping filters and duplexes.
  852   for (var i in stream) {
  853     if (typeof stream[i] === 'function' &&
  854         typeof this[i] === 'undefined') {
  855       this[i] = function(method) { return function() {
  856         return stream[method].apply(stream, arguments);
  857       }}(i);
  858     }
  859   }
  860 
  861   // proxy certain important events.
  862   var events = ['error', 'close', 'destroy', 'pause', 'resume'];
  863   forEach(events, function(ev) {
  864     stream.on(ev, self.emit.bind(self, ev));
  865   });
  866 
  867   // when we try to consume some more bytes, simply unpause the
  868   // underlying stream.
  869   self._read = function(n) {
  870     if (paused) {
  871       paused = false;
  872       stream.resume();
  873     }
  874   };
  875 
  876   return self;
  877 };
  878 
  879 
  880 
  881 // exposed for testing purposes only.
  882 Readable._fromList = fromList;
  883 
  884 // Pluck off n bytes from an array of buffers.
  885 // Length is the combined lengths of all the buffers in the list.
  886 function fromList(n, state) {
  887   var list = state.buffer;
  888   var length = state.length;
  889   var stringMode = !!state.decoder;
  890   var objectMode = !!state.objectMode;
  891   var ret;
  892 
  893   // nothing in the list, definitely empty.
  894   if (list.length === 0)
  895     return null;
  896 
  897   if (length === 0)
  898     ret = null;
  899   else if (objectMode)
  900     ret = list.shift();
  901   else if (!n || n >= length) {
  902     // read it all, truncate the array.
  903     if (stringMode)
  904       ret = list.join('');
  905     else
  906       ret = Buffer.concat(list, length);
  907     list.length = 0;
  908   } else {
  909     // read just some of it.
  910     if (n < list[0].length) {
  911       // just take a part of the first list item.
  912       // slice is the same for buffers and strings.
  913       var buf = list[0];
  914       ret = buf.slice(0, n);
  915       list[0] = buf.slice(n);
  916     } else if (n === list[0].length) {
  917       // first list is a perfect match
  918       ret = list.shift();
  919     } else {
  920       // complex case.
  921       // we have enough to cover it, but it spans past the first buffer.
  922       if (stringMode)
  923         ret = '';
  924       else
  925         ret = new Buffer(n);
  926 
  927       var c = 0;
  928       for (var i = 0, l = list.length; i < l && c < n; i++) {
  929         var buf = list[0];
  930         var cpy = Math.min(n - c, buf.length);
  931 
  932         if (stringMode)
  933           ret += buf.slice(0, cpy);
  934         else
  935           buf.copy(ret, c, 0, cpy);
  936 
  937         if (cpy < buf.length)
  938           list[0] = buf.slice(cpy);
  939         else
  940           list.shift();
  941 
  942         c += cpy;
  943       }
  944     }
  945   }
  946 
  947   return ret;
  948 }
  949 
  950 function endReadable(stream) {
  951   var state = stream._readableState;
  952 
  953   // If we get here before consuming all the bytes, then that is a
  954   // bug in node.  Should never happen.
  955   if (state.length > 0)
  956     throw new Error('endReadable called on non-empty stream');
  957 
  958   if (!state.endEmitted && state.calledRead) {
  959     state.ended = true;
  960     process.nextTick(function() {
  961       // Check that we didn't get one last unshift.
  962       if (!state.endEmitted && state.length === 0) {
  963         state.endEmitted = true;
  964         stream.readable = false;
  965         stream.emit('end');
  966       }
  967     });
  968   }
  969 }
  970 
  971 function forEach (xs, f) {
  972   for (var i = 0, l = xs.length; i < l; i++) {
  973     f(xs[i], i);
  974   }
  975 }
  976 
  977 function indexOf (xs, x) {
  978   for (var i = 0, l = xs.length; i < l; i++) {
  979     if (xs[i] === x) return i;
  980   }
  981   return -1;
  982 }