Node.js v0.11.11 マニュアル & ドキュメンテーション
Table of Contents
- Stream
- API for Stream Consumers
- API for Stream Implementors
- Streams: Under the Hood
Stream#
Stability: 2 - Unstable
ストリームは Node の様々なオブジェクトで実装される抽象的なインタフェースです。 例えば HTTP サーバへのリクエストは 標準出力と同様にストリームです。 ストリームは読み込み可能、書き込み可能、またはその両方です。 全てのストリームは EventEmitter のインスタンスです。
Stream のベースクラスは require('stream')
でロードすることができます。
Readable ストリーム、Writable ストリーム、Duplex ストリーム、
Transform ストリームのベースクラスが提供されます。
このドキュメントは 3 つのセクションに分かれています。 最初に、プログラムでストリームを利用するために知っておく必要がある API について説明します。 もし独自のストリーミング API を実装しないのであれば、 そこで終わりにすることができます。
2番目のセクションでは、独自のストリームを実装する場合に必要となる API について説明します。 この API はそれが簡単にできるように設計されています。
3番目のセクションは、理解することなく変更してはならない 内部的なメカニズムや関数群を含めて、ストリームがどのように動作するかについて より詳しく説明します。
API for Stream Consumers#
ストリームは、Readable、Writable、またはその両方 (Duplex) のいずれかになることができます。
全てのストリームは EventEmitter ですが、Readable、Writable、または Duplex のいずれであるかによって、独自のメソッドやプロパティを持ちます。
もしストリームが Readable とWritable の両方であるなら、 それは以下の全てのメソッドとイベントを実装します。 Duplex または Transform ストリームの実装は多少異なる場合がありますが、 この API によって詳細に説明されます。
プログラムの中でストリームからのデータを消費するために、 ストリームのインターフェースを実装する必要はありません。 もしプログラムの中でストリーミングインターフェースを実装 する なら、 以下の ストリーム実装者向けの API を参照してください。
ほとんど全ての Node プログラムは、どんなに単純であっても、 何らかの方法でストリームを利用します。 これはストリームを利用する Node プログラムの例です:
var http = require('http');
var server = http.createServer(function (req, res) {
// req is an http.IncomingMessage, which is a Readable Stream
// res is an http.ServerResponse, which is a Writable Stream
var body = '';
// we want to get the data as utf8 strings
// If you don't set an encoding, then you'll get Buffer objects
req.setEncoding('utf8');
// Readable streams emit 'data' events once a listener is added
req.on('data', function (chunk) {
body += chunk;
})
// the end event tells you that you have entire body
req.on('end', function () {
try {
var data = JSON.parse(body);
} catch (er) {
// uh oh! bad json!
res.statusCode = 400;
return res.end('error: ' + er.message);
}
// write back something interesting to the user:
res.write(typeof data);
res.end();
})
})
server.listen(1337);
// $ curl localhost:1337 -d '{}'
// object
// $ curl localhost:1337 -d '"foo"'
// string
// $ curl localhost:1337 -d 'not json'
// error: Unexpected token o
Class: stream.Readable#
Readable ストリームのインターフェースは、あなたが読み込むデータの抽象的な 発生源 です。言い換えると、データは Readable ストリームから 出て きます。
Readable ストリームは、あなたがデータを受け取る準備ができたと指示するまでは、 データの生成を開始しません。
Readable ストリームは二つの "モード": flowing モード と
paused モード を持っています。
flowing モードに入ると、データは下層のシステムから読み込まれると、
可能な限り素早くあなたのプログラムに届けられます。
paused モードでは、データの断片を取り出すために、明示的に
stream.read()
を呼び出す必要があります。
ストリームは paused モードから始まります。
注意: もし 'data'
イベントハンドラが割り当てられてなく、
pipe()
の出力先もなく、そしてストリームが flowing モードに
切り替わると、データは失われます。
以下のいずれかで flowing に切り替えることができます。
- データを監視するために
'data'
イベント ハンドラを追加する。 - 明示的にデータのフローを開始するために
resume()
を呼び出す。 - Writable にデータを送るために
pipe()
を呼び出す。
以下のいずれかで paused モードに戻すことができます。
- パイプの出力先がなければ、
pause()
メソッドを呼び出します。 - パイプの出力先があるなら、
'data'
イベント のハンドラを削除し、unpipe()
メソッドを呼び出して全てのパイプ出力先を削除します。
互換性の理由から、'data'
イベントのハンドラを削除してもストリームは
自動的には中断 しない ことに注意してください。
同様に、パイプの出力先があると、それらの出力先が空になるとより多くのデータを
要求するため、pause()
を呼び出してもストリームが中断した まま であることは
保証されません。
Readable ストリームを含む例:
- クライアントの http レスポンス
- サーバの http リクエスト
- fs の ReadStream
- zlib のストリーム
- crypto のストリーム
- tcp のソケット
- child_process の標準出力と標準エラー出力
- process.stdin
Event: 'readable'#
ストリームからデータの断片を読み込むことが可能となった時、
'readable'
イベントが生成されます。
あるケースでは、'readable'
イベントを監視することは下層のシステムからデータを内部バッファへ読み込む原因となります (それがまだ行われていなかった場合)。
var readable = getReadableStreamSomehow();
readable.on('readable', function() {
// there is some data to read now
})
内部バッファが空になると、データが利用可能になった時に
'readable'
イベントは再び生成されます。
Event: 'data'#
chunk
{Buffer | String} データの断片。
'data'
イベントのリスナをストリームに追加すると、明示的に中断されるまで
ストリームは flowing モードに切り替わります。
データは利用可能になるとすぐにあなたのハンドラに渡されます。
ストリームから出てくる全てのデータをできるだけ素早く欲しいのなら、 これが最善の方法です。
var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
console.log('got %d bytes of data', chunk.length);
})
Event: 'end'#
このイベントは、提供するデータがもう無くなった場合に生成されます。
'end'
イベントはデータが完全に消費されるまでは 生成されない
ことに注意してください。
それは flowing モードに切り替えることによって、または終わりに達するまで
read()
を繰り返し呼び出すことによって達成することができます。
var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
console.log('got %d bytes of data', chunk.length);
})
readable.on('end', function() {
console.log('there will be no more data.');
});
Event: 'close'#
下層のリソース (例えば背後のファイル記述子) がクローズされた時に生成されます。 全てのストリームがこのイベントを発生するわけではありません。
Event: 'error'#
データの受信でエラーがあると生成されます。
readable.read([size])#
size
{Number} どれだけのデータを読み込むか指定するオプションの引数。- Return {String | Buffer | null}
read()
メソッドは内部バッファからデータを取り出して返します。
もし利用可能なデータが無ければ、null
を返します。
size
引数を指定すると、その長さ (バイト数または文字数) のデータを返します。
もし size
で指定された長さのデータが揃っていない場合は null
を返します。
size
引数を指定しなかった場合は、内部バッファにある全てのデータが返されます。
このメソッドは paused モードの場合に限って呼び出されるべきです。 flowing モードでは、内部バッファが空になるまで このメソッドは自動的に呼び出されます。
var readable = getReadableStreamSomehow();
readable.on('readable', function() {
var chunk;
while (null !== (chunk = readable.read())) {
console.log('got %d bytes of data', chunk.length);
}
});
もしこのメソッドがデータの断片を返すと、それは 'data'
イベント の
生成も引き起こします。
readable.setEncoding(encoding)#
encoding
{String} 使用するエンコーディング。
この関数を呼び出すと、ストリームは Buffer オブジェクトの代わりに
指定されたエンコーディングによる文字列を返すようになります。
例えば、readable.setEncoding('utf8')
とすると、得られるデータは
UTF-8 のデータとして解釈され、文字列が返されます。
readable.setEncoding('hex')
とすると、データは 16 進フォーマットの
文字列にエンコードされます。
これは、Buffer を直接取得して単純に buf.toString(encoding)
を呼び出した場合は潜在的にめちゃくちゃになるのとは異なり、
マルチバイト文字を正しく扱います。
データを文字列として読み込みたければ、常にこのメソッドを使用してください。
var readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', function(chunk) {
assert.equal(typeof chunk, 'string');
console.log('got %d characters of string data', chunk.length);
})
readable.resume()#
- Return:
this
このメソッドは Readable ストリームが 'data'
イベントの生成を
再開するようにします。
このメソッドはストリームを flowing モードに切り替えます。
もしストリームからのデータを消費する必要が なく、しかし 'end'
イベントを
受け取る必要が ある なら、readable.resume()
を呼び出してデータの
フローを開くことができます。
var readable = getReadableStreamSomehow();
readable.resume();
readable.on('end', function(chunk) {
console.log('got to the end, but did not read anything');
})
readable.pause()#
- Return:
this
このメソッドはストリームを flowing モードに切り替えて、
'data'
イベントの生成を中断します。
利用可能になったデータは内部バッファの中に残ったままとなります。
var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
console.log('got %d bytes of data', chunk.length);
readable.pause();
console.log('there will be no more data for 1 second');
setTimeout(function() {
console.log('now data will start flowing again');
readable.resume();
}, 1000);
})
readable.pipe(destination, [options])#
destination
{Writable Stream} データの書き込み先。options
{Object} パイプオプションend
{Boolean} 読み込み元が終了すると書き込み先を終了します。 デフォルトはtrue
このメソッドは Readable ストリームから全てのデータを引き出し、 与えられた行き先に書き込みます。 高速な Readable ストリームによって出力先が圧迫されないように、 自動的にフロー制御を行います。
複数の出力先を安全に連結することができます。
var readable = getReadableStreamSomehow();
var writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt'
readable.pipe(writable);
この関数は出力先となるストリーム返すので、このようにパイプのチェーンを 組み立てることができます。
var r = fs.createReadStream('file.txt');
var z = zlib.createGzip();
var w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);
Unix の cat
コマンドをエミュレートする例:
process.stdin.pipe(process.stdout);
デフォルトでは、出力先の end()
は入力元のストリームで
'end'
が生成された時に呼び出されます。そのため、destination
はもう書き込み可能ではなくなります。
{end: false }
を options
として渡すことにより、出力先ストリームを
オープンしたままにしておくことができます。
これは writer
をオープンしたままにすることにより、最後に
"Goodbye"
と書き込むことができます。
reader.pipe(writer, { end: false });
reader.on('end', function() {
writer.end('Goodbye\n');
});
process.stderr
および process.stdout
は、オプションの指定に関係なく、
プロセスが終了するまで決してクローズされないことに注意してください。
readable.unpipe([destination])#
destination
{Writable Stream} オプションのパイプを解除するストリーム
このメソッドは以前の pipe()
呼び出しで設定されたフックを取り除きます。
destination
が指定されなかった場合は、全てのパイプが取り除かれます。
destination
が指定されたものの、それがパイプされていなかった場合、
これは何もしません。
var readable = getReadableStreamSomehow();
var writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt',
// but only for the first second
readable.pipe(writable);
setTimeout(function() {
console.log('stop writing to file.txt');
readable.unpipe(writable);
console.log('manually close the file stream');
writable.end();
}, 1000);
readable.unshift(chunk)#
chunk
{Buffer | String} 読み込みキューの先頭に戻されるデータの断片
これはストリームがパーサによって消費されるケースにおいて有用です。 それはソースから楽観的に取り出したデータを「消費しなかった」ことにして、 ストリームが他のところにデータを渡せるようにする場合に必要です。
stream.unshift(chunk)
を頻繁に呼び出さなくてはならないとしたら、
代わりに Transform ストリームを実装することを検討してください
(後述する ストリーム実装者向けの API を参照してください)。
// Pull off a header delimited by \n\n
// use unshift() if we get too much
// Call the callback with (error, header, stream)
var StringDecoder = require('string_decoder').StringDecoder;
function parseHeader(stream, callback) {
stream.on('error', callback);
stream.on('readable', onReadable);
var decoder = new StringDecoder('utf8');
var header = '';
function onReadable() {
var chunk;
while (null !== (chunk = stream.read())) {
var str = decoder.write(chunk);
if (str.match(/\n\n/)) {
// found the header boundary
var split = str.split(/\n\n/);
header += split.shift();
var remaining = split.join('\n\n');
var buf = new Buffer(remaining, 'utf8');
if (buf.length)
stream.unshift(buf);
stream.removeListener('error', callback);
stream.removeListener('readable', onReadable);
// now the body of the message can be read from the stream.
callback(null, header, stream);
} else {
// still reading the header.
header += str;
}
}
}
}
readable.wrap(stream)#
stream
{Stream} 「古いスタイル」の Readable ストリーム
v0.10 より前のバージョンの Node には、今日の全ストリーム API を実装していない ストリームがありました (より詳細は後述する「互換性」を参照してください)。
もし、'data'
イベントを生成し、アドバイスだけを行う pause()
メソッドを持つ、古い Node ライブラリを使っているなら、
wrap()
メソッドは古いストリームをデータソースとして使用する
Readable ストリームを作成します。
この関数を呼び出す必要は滅多にありませんが、これは古い Node プログラム及びライブラリと相互作用するための利便性のために存在します。
例:
var OldReader = require('./old-api-module.js').OldReader;
var oreader = new OldReader;
var Readable = require('stream').Readable;
var myReader = new Readable().wrap(oreader);
myReader.on('readable', function() {
myReader.read(); // etc.
});
Class: stream.Writable#
Writable ストリームのインターフェースは、あなたがデータを書き込む抽象的な 行き先 です。
Writable ストリームを含む例:
- クライアントの http リクエスト
- サーバの http レスポンス
- fs の WriteStream
- zlib のストリーム
- crypto のストリーム
- tcp のソケット
- child_process の標準入力
- process.stdout, process.stderr
writable.write(chunk, [encoding], [callback])#
chunk
{String | Buffer} 書き込まれるデータencoding
{String} もしchunk
が文字列なら、そのエンコーディングcallback
{Function} データが掃き出された時に呼び出されるコールバック- Returns: {Boolean} データが完全に処理された場合は
true
。
このメソッドはデータを下層のシステムに書き込み、データが完全に処理されると 与えられたコールバックを一度だけ呼び出します。
戻り値は書き込みをすぐに続けていいかどうかを示します。
もしデータが内部にバッファリングされなければならないなら false
を返します。
そうでなければ true
を返します。
この戻り値は完全にアドバイス的です。
もしこれが false
を返しても、あなたは書き込みを続けることが「できます」。
しかしながら、書き込まれたデータはメモリにバッファリングされるため、
これを過剰にしないことが最善です。
代わりに、より多くのデータを書く前に 'drain'
イベントを待機してください。
Event: 'drain'#
write(chunk, encoding, callback)
の呼び出しが false
を返した場合、
より多くのデータをいつストリームに書き始めるのが適切かを
'drain'
イベントによって示します。
// Write the data to the supplied writable stream 1MM times.
// Be attentive to back-pressure.
function writeOneMillionTimes(writer, data, encoding, callback) {
var i = 1000000;
write();
function write() {
var ok = true;
do {
i -= 1;
if (i === 0) {
// last time!
writer.write(data, encoding, callback);
} else {
// see if we should continue, or wait
// don't pass the callback, because we're not done yet.
ok = writer.write(data, encoding);
}
} while (i > 0 && ok);
if (i > 0) {
// had to stop early!
// write some more once it drains
writer.once('drain', write);
}
}
}
writable.cork()#
全ての書き込みをバッファリングするように強制します。
バッファリングされたデータは .uncork()
または .end()
のいずれかが
呼び出されるとフラッシュされます。
writable.uncork()#
.cork()
が呼び出されてからバッファリングされたデータをフラッシュします。
writable.end([chunk], [encoding], [callback])#
chunk
{String | Buffer} オプションの書き込まれるデータencoding
{String} もしchunk
が文字列なら、そのエンコーディングcallback
{Function} ストリームが終了時に呼び出される、 オプションのコールバック
これ以上データをストリームに書き込まない場合に呼び出してください。
コールバックが与えられた場合、それは 'finish'
イベントのリスナとして
アタッチされます。
end()
を呼び出した後で write()
を呼び出すとエラーになります。
// write 'hello, ' and then end with 'world!'
http.createServer(function (req, res) {
res.write('hello, ');
res.end('world!');
// writing more now is not allowed!
});
Event: 'finish'#
end()
メソッドが呼び出されて、全てのデータが下層のシステムに
掃き出されると、このイベントが生成されます。
var writer = getWritableStreamSomehow();
for (var i = 0; i < 100; i ++) {
writer.write('hello, #' + i + '!\n');
}
writer.end('this is the end\n');
writer.on('finish', function() {
console.error('all writes are now complete.');
});
Event: 'pipe'#
src
{Readable Stream} この Writable ストリームにつながれた 入力元のストリーム
これは、Readable ストリームの pipe()
メソッドが呼び出されて、
この Writable ストリームが出力先として加えられた時に生成されます。
var writer = getWritableStreamSomehow();
var reader = getReadableStreamSomehow();
writer.on('pipe', function(src) {
console.error('something is piping into the writer');
assert.equal(src, reader);
});
reader.pipe(writer);
Event: 'unpipe'#
これは、Readable ストリームで unpipe()
メソッドが呼び出され、
この Writable ストリームが出力先から取り除かれた時に生成されます。
var writer = getWritableStreamSomehow();
var reader = getReadableStreamSomehow();
writer.on('unpipe', function(src) {
console.error('something has stopped piping into the writer');
assert.equal(src, reader);
});
reader.pipe(writer);
reader.unpipe(writer);
Event: 'error'#
データの書き込みまたはデータのパイプ中にエラーがあると生成されます。
Class: stream.Duplex#
Duplex ストリームは Readable と Writable 両方のインターフェースを 実装したストリームです。使い方は上記を参照してください。
Duplex ストリームを含む例:
Class: stream.Transform#
Transform ストリームは、入力から何らかの方法で出力が計算される Duplex ストリームです。 それらは Readable と Writable 両方のインターフェースを実装します。 使い方は上記を参照してください。
Transform ストリームを含む例:
API for Stream Implementors#
どのストリームを実装する場合でも、パターンは同じです:
- それぞれの親クラスを拡張して、独自のサブクラスを作成する
(特に
util.inherits
メソッドはそのために役立ちます)。 - 内部のメカニズムがきちんとセットアップされることを確実にするために、 サブクラスのコンストラクタの中から親クラスのコンストラクタを呼び出す。
- 以下で詳述される、いくつかの特別なメソッドを実装する。
拡張するクラスと実装するメソッドは、あなたが書こうとしているストリームの種類に 依存します。
ユースケース |
クラス |
実装するメソッド |
---|---|---|
読み込みのみ |
||
書き込みのみ |
||
読み込みと書き込み |
||
書き込まれたデータを変換し、その結果を読み込む |
あなたの実装コードの中では、決して ストリーム利用者のための API で説明されたメソッドを呼び出さないことがとても重要です。 そうでなければ、あなたのストリーミングインターフェースを利用するプログラムに 有害な副作用を引き起こす原因となり得ます。
Class: stream.Readable#
stream.Readable
は抽象クラスで、下層の実装として _read(size)
メソッドを実装することで拡張されるように設計されています。
プログラムの中で Readable ストリームを利用する方法については、 前述の ストリーム利用者のための API を参照してください。 この後に続くのは、あなたのプログラムの中で Readable ストリームを 実装する方法の説明です。
Example: A Counting Stream#
これは Readable ストリームの基本的な例です。 それは 1 から 1,000,000 までの数を昇順で生成し、そして終了します。
var Readable = require('stream').Readable;
var util = require('util');
util.inherits(Counter, Readable);
function Counter(opt) {
Readable.call(this, opt);
this._max = 1000000;
this._index = 1;
}
Counter.prototype._read = function() {
var i = this._index++;
if (i > this._max)
this.push(null);
else {
var str = '' + i;
var buf = new Buffer(str, 'ascii');
this.push(buf);
}
};
Example: SimpleProtocol v1 (Sub-optimal)#
これは前に説明した parseHeader
関数とよく似ていますが、
独自のストリームとして実装されています。
また、この実装は入ってくるデータを文字列に変換しないことに注意してください。
しかしながら、これは Transform ストリームを使うことでよりうまく実装できます。 後述のよりよい実装を参照してください。
// A parser for a simple data protocol.
// The "header" is a JSON object, followed by 2 \n characters, and
// then a message body.
//
// NOTE: This can be done more simply as a Transform stream!
// Using Readable directly for this is sub-optimal. See the
// alternative example below under the Transform section.
var Readable = require('stream').Readable;
var util = require('util');
util.inherits(SimpleProtocol, Readable);
function SimpleProtocol(source, options) {
if (!(this instanceof SimpleProtocol))
return new SimpleProtocol(source, options);
Readable.call(this, options);
this._inBody = false;
this._sawFirstCr = false;
// source is a readable stream, such as a socket or file
this._source = source;
var self = this;
source.on('end', function() {
self.push(null);
});
// give it a kick whenever the source is readable
// read(0) will not consume any bytes
source.on('readable', function() {
self.read(0);
});
this._rawHeader = [];
this.header = null;
}
SimpleProtocol.prototype._read = function(n) {
if (!this._inBody) {
var chunk = this._source.read();
// if the source doesn't have data, we don't have data yet.
if (chunk === null)
return this.push('');
// check if the chunk has a \n\n
var split = -1;
for (var i = 0; i < chunk.length; i++) {
if (chunk[i] === 10) { // '\n'
if (this._sawFirstCr) {
split = i;
break;
} else {
this._sawFirstCr = true;
}
} else {
this._sawFirstCr = false;
}
}
if (split === -1) {
// still waiting for the \n\n
// stash the chunk, and try again.
this._rawHeader.push(chunk);
this.push('');
} else {
this._inBody = true;
var h = chunk.slice(0, split);
this._rawHeader.push(h);
var header = Buffer.concat(this._rawHeader).toString();
try {
this.header = JSON.parse(header);
} catch (er) {
this.emit('error', new Error('invalid simple protocol data'));
return;
}
// now, because we got some extra data, unshift the rest
// back into the read queue so that our consumer will see it.
var b = chunk.slice(split);
this.unshift(b);
// and let them know that we are done parsing the header.
this.emit('header', this.header);
}
} else {
// from there on, just provide the data to our consumer.
// careful not to push(null), since that would indicate EOF.
var chunk = this._source.read();
if (chunk) this.push(chunk);
}
};
// Usage:
// var parser = new SimpleProtocol(source);
// Now parser is a readable stream that will emit 'header'
// with the parsed header data.
new stream.Readable([options])#
options
{Object} (任意)highWaterMark
{Number} 下層のリソースから読み込むのを中断するまで 内部バッファに貯めておくバイト数の最大値。 デフォルトは 16kb、あるいはobjectMode
では 16。encoding
{String} 指定されるとバッファは指定のエンコーディングで デコードされます。デフォルトはnull
。objectMode
{Boolean} このストリームがオブジェクトストリームとして 振る舞うべきかどうか。これはstream.read(n)
がサイズ n のバッファではなく 一つの値を返すことを意味します。デフォルトはfalse
。
Readable
クラスを拡張するクラスでは、バッファリングの設定を確実に
初期化することができるように、必ずコンストラクタを呼び出してください。
readable._read(size)#
size
{Number} 非同期に読み込むバイト数
注意: この関数を実装してください、しかし直接呼び出さないでください。
この関数は直接呼び出すべきではありません。 これはサブクラスで実装されるべきであり、Readable クラスの内部から 呼び出されるべきです。
全ての Readable ストリームは、下層のリソースからデータを
取得するために _read()
メソッドを提供しなければなりません。
このメソッドはこれを定義するクラス内部のものであり、ユーザプログラムから 直接呼び出されるべきものではないため、アンダースコアの接頭辞を持ちます。 しかしながら、あなたの拡張クラスではこのメソッドをオーバーライドすることが 求められています。
データが利用可能になれば、readable.push(chunk)
を呼び出すことで
それを読み込みキューに追加します。
push()
が false を返した場合は、読み込みを止めるべきです。
_read()
が再び呼び出された時が、さらに多くのデータを追加を開始すべき時です。
size
引数はアドバイス的です。
"read()" が一回の呼び出しでデータを返す実装では、
どれだけのデータを取得すべきか知るためにこれを使うことができます。
TCPやTLSなど、それに関連しない実装ではこの引数は無視され、
利用可能になったデータをシンプルに提供するかもしれません。
たとえば stream.push(chunk)
が呼び出されるより前に、
size
バイトが利用可能になるまで「待つ」必要はありません。
readable.push(chunk, [encoding])#
chunk
{Buffer | null | String} 読み込みキューにプッシュされる、 データのチャンクencoding
{String} 文字列チャンクのエンコーディング。'utf8'
や'ascii'
など、Buffer の正しいエンコーディングの必要があります。- return {Boolean} さらにプッシュしてもいいかどうか
注意: この関数は Readable の実装から呼び出されるべきものであり、 Readable ストリームの利用者が呼び出すべきではありません。
少なくとも一回は push(chunk)
が呼び出されないと、_read()
関数が
再び呼び出されることはありません。
Readable
クラスは、read()
メソッドが呼び出されることで
後から取り出されるデータを、'readable'
イベントの生成時に
読み込みキューに入れておくことによって機能します。
push()
メソッドはいくつかのデータを明示的に読み込みキューに挿入します。
もし null
と共に呼び出されると、それはデータが終了した (EOF) ことを伝えます。
この API は可能な限り柔軟に設計されています。 例えば、ある種の中断/再開メカニズムとデータのコールバックを持つ、 より低水準のデータソースをラップするかもしれません。 それらのケースでは、このように低水準のソースオブジェクトを ラップすることができます。
// source is an object with readStop() and readStart() methods,
// and an `ondata` member that gets called when it has data, and
// an `onend` member that gets called when the data is over.
util.inherits(SourceWrapper, Readable);
function SourceWrapper(options) {
Readable.call(this, options);
this._source = getLowlevelSourceObject();
var self = this;
// Every time there's data, we push it into the internal buffer.
this._source.ondata = function(chunk) {
// if push() returns false, then we need to stop reading from source
if (!self.push(chunk))
self._source.readStop();
};
// When the source ends, we push the EOF-signalling `null` chunk
this._source.onend = function() {
self.push(null);
};
}
// _read will be called when the stream wants to pull more data in
// the advisory size argument is ignored in this case.
SourceWrapper.prototype._read = function(size) {
this._source.readStart();
};
Class: stream.Writable#
stream.Writable
は抽象クラスで、下層の実装として
_write(chunk, encoding, callback)
メソッドを実装することで
拡張されるように設計されています。
プログラムの中で Writable ストリームを利用する方法については、 前述の ストリーム利用者のための API を参照してください。 この後に続くのは、あなたのプログラムの中で Writable ストリームを 実装する方法の説明です。
new stream.Writable([options])#
options
{Object} (任意)
Writable
クラスを拡張するクラスでは、バッファリングの設定を確実に
初期化することができるように、必ずコンストラクタを呼び出してください。
writable._write(chunk, encoding, callback)#
chunk
{Buffer | Array} 書き込まれるデータ。decodeStrings
オプションがfalse
に設定されない限り常にバッファです。encoding
{String} チャンクが文字列の場合のエンコーディング方式。 チャンクがバッファの場合は無視されます。decodeStrings
オプションが明示的にfalse
に設定されない限り、 チャンクは 常に バッファであるべき事に注意してください。callback
{Function} チャンクを提供する処理が終了した時に、 (任意のエラー引数と共に) この関数を呼び出してください。
全ての Writable ストリームは、下層のリソースにデータを
送るために _write()
メソッドを提供しなければなりません。
注意: この関数は直接呼び出してはいけません。 これはサブクラスで実装されるべきであり、Writable クラスの内部からのみ 呼び出されるべきです。
コールバックは出力が成功して完了したか、エラーが発生したかを伝えるために、
標準的な callback(error)
パターンを使って呼び出します。
コンストラクタオプションの decodeStrings
フラグがセットされると、
chunk
を Buffer ではなく文字列にし、encoding
でその文字列の
種類を示すことができます。
これは、実装が文字列データのエンコーディングを最適化できるようにするためです。
decodeStrings
オプションを明示的に false
に設定しない場合、
endocing
引数は安全に無視することができます。
そして chunk
は常に Buffer であると見なせます。
このメソッドはこれを定義するクラス内部のものであり、ユーザプログラムから 直接呼び出されるべきものではないため、アンダースコアの接頭辞を持ちます。 しかしながら、あなたの拡張クラスではこのメソッドをオーバーライドすることが 求められています。
writable._writev(chunks, callback)#
chunks
{Array} 書き込まれるチャンクの配列。 それぞれのチャンクは 以下のフォーマット:{ chunk: ..., encoding: ... }
。callback
{Function} 与えられたチャンクの処理が完了すると、この関数が (オプションのエラー引数を伴って) 呼び出されます。
注意: この関数は直接呼び出しては いけません**。 これはおそらくサブクラスによって実装され、 Writable クラス内部のメソッドによってのみ呼び出されます。
この関数の実装は完全に任意です。ほとんどのケースでは必要ありません。 もし実装されると、書き込みキューにバッファリングされた全ての断片と共に 呼び出されます。
Class: stream.Duplex#
"duplex" ストリームは、TCP ソケットコネクションのように Readable であり Writable でもあるストリームの一種です。
stream.Duplex
は、Readable および Writable ストリームクラスと同様、
下層の実装である _read(size)
および
_write(chunk, encoding, callback)
メソッドによって拡張されるように
設計された抽象クラスであることに注意してください。
JavaScript は複数のプロトタイプ継承を持つことができないため、
このクラスは Readable からプロトタイプを継承したうえで、
Writable から寄生的な方法 (プロトタイプメンバーのコピー) を行います。
低水準の _read(size)
および _write(chunk, encoding, callback)
を実装することは、Duplex クラスを拡張するユーザの責務です。
new stream.Duplex(options)#
options
{Object} Writable および Readable のコンストラクタに渡されます。 以下のフィールドを持つこともできます:allowHalfOpen
{Boolean} デフォルトはtrue
。 もしfalse
に設定された場合、読み込み側が閉じられると 自動的に書き込み側も閉じられます。
Duplex
クラスを拡張するクラスでは、バッファリングの設定を確実に
初期化することができるように、必ずコンストラクタを呼び出してください。
Class: stream.Transform#
"Transform" ストリームは、zlib ストリームや crypto ストリームのように、 入力が何らかの方法で出力の元となっているような Duplex ストリームです。
出力は、入力と同じサイズ、同じ数のチャンク、同時に到着することを 要求されません。 たとえば、Hash ストリームは入力が終了すると一つだけのチャンクを出力します。 zlib ストリームは、入力より小さいか、またはより大きい出力を生成します。
_read()
および _write()
メソッドの代わりに、Transform クラスでは
_transform()
メソッドを実装しなければなりません。
また、任意で _flush()
メソッドを実装することもできます (後述)。
new stream.Transform([options])#
options
{Object} Writable および Readable のコンストラクタに渡されます。
Transform
クラスを拡張するクラスでは、バッファリングの設定を確実に
初期化することができるように、必ずコンストラクタを呼び出してください。
transform._transform(chunk, encoding, callback)#
chunk
{Buffer | Array} 書き込まれるデータ。decodeStrings
オプションがfalse
に設定されない限り常にバッファです。encoding
{String} チャンクが文字列の場合のエンコーディング方式 (チャンクがバッファの場合は無視されます)。callback
{Function} チャンクを提供する処理が終了した時に、 (任意のエラー引数と共に) この関数を呼び出してください。
注意: この関数は直接呼び出してはいけません。 これはサブクラスで実装されるべきであり、Transform クラスの内部からのみ 呼び出されるべきです。
全ての Transform ストリームの実装は、入力を受け取って出力を提供するために
_transform()
メソッドを提供しなければなりません。
書き込まれるバイトを処理し、読み込み可能なインタフェースに渡すなど、
Transform クラスでしなければならないことは全て _transform()
で行わなければなりません。非同期 I/O、何かの処理、その他。
この入力チャンクからの出力を生成するために、transform.push(outputChunk)
を 0 回以上呼び出してください。
それはこのチャンクの結果としてどれだけのデータを出力したいのかに依存します。
現在のチャンクの処理が完全に終了した場合のみ、コールバック関数を呼び出します。 特定の入力チャンクからの結果として、出力があるかもしれないし、 無いかもしれないことに注意してください。
このメソッドはこれを定義するクラス内部のものであり、ユーザプログラムから 直接呼び出されるべきものではないため、アンダースコアの接頭辞を持ちます。 しかしながら、あなたの拡張クラスではこのメソッドをオーバーライドすることが 求められています。
transform._flush(callback)#
callback
{Function} 与えられたチャンクの処理が終了した場合に、 (任意のエラー引数と共に) この関数を呼び出してください。
注意: この関数は直接呼び出してはいけません。 これはサブクラスで実装されるかもしれず、Transform クラスの内部からのみ 呼び出されるべきです。
場合によっては、変換操作はストリームの終端でより多くのデータを
生成する必要があります。
たとえば、Zlib
圧縮ストリームは出力を最適に圧縮できるように、
いくつかの内部状態を持ちます。
一方、終端ではデータが完全なものになるように、
残されたものに最善を尽くす必要があります。
この場合、最後の最後 (書き込まれた全てのデータが消費された後、
ただし読み込み側の終了を知らせる 'end'
が生成される前) に呼び出される
_flush()
メソッドを実装することができます。
_transform()
と同様、transform.push(chunk)
を何度 (0 回以上) でも
適切に呼び出し、フラッシュ操作が完了した時に callback
を呼び出します。
このメソッドはこれを定義するクラス内部のものであり、ユーザプログラムから 直接呼び出されるべきものではないため、アンダースコアの接頭辞を持ちます。 しかしながら、あなたの拡張クラスではこのメソッドをオーバーライドすることが 求められています。
Example: SimpleProtocol
parser v2#
前述した単純なプロトコルパーサの例は、より高水準な Transform ストリームクラスを
使うことで、さらにシンプルに実装することができます。
前述の parseHeader
および SimpleProtocol v1
とよく似た例です。
この例では、入力を引数で与えるのではなく、Node のストームにおける より慣用的なアプローチとしてパーサにパイプで送られます。
var util = require('util');
var Transform = require('stream').Transform;
util.inherits(SimpleProtocol, Transform);
function SimpleProtocol(options) {
if (!(this instanceof SimpleProtocol))
return new SimpleProtocol(options);
Transform.call(this, options);
this._inBody = false;
this._sawFirstCr = false;
this._rawHeader = [];
this.header = null;
}
SimpleProtocol.prototype._transform = function(chunk, encoding, done) {
if (!this._inBody) {
// check if the chunk has a \n\n
var split = -1;
for (var i = 0; i < chunk.length; i++) {
if (chunk[i] === 10) { // '\n'
if (this._sawFirstCr) {
split = i;
break;
} else {
this._sawFirstCr = true;
}
} else {
this._sawFirstCr = false;
}
}
if (split === -1) {
// still waiting for the \n\n
// stash the chunk, and try again.
this._rawHeader.push(chunk);
} else {
this._inBody = true;
var h = chunk.slice(0, split);
this._rawHeader.push(h);
var header = Buffer.concat(this._rawHeader).toString();
try {
this.header = JSON.parse(header);
} catch (er) {
this.emit('error', new Error('invalid simple protocol data'));
return;
}
// and let them know that we are done parsing the header.
this.emit('header', this.header);
// now, because we got some extra data, emit this first.
this.push(chunk.slice(split));
}
} else {
// from there on, just provide the data to our consumer as-is.
this.push(chunk);
}
done();
};
// Usage:
// var parser = new SimpleProtocol();
// source.pipe(parser)
// Now parser is a readable stream that will emit 'header'
// with the parsed header data.
Class: stream.PassThrough#
これは Transform ストリームの取るに足らない実装で、 入力したバイト列を出力に単純に渡すだけです。 これの主な目的はサンプル及びテストですが、新しい種類のストリームのための ビルディングブロックとして、何かと便利となるユースケースが時折存在します。
Streams: Under the Hood#
Buffering#
Readable 及び Writable ストリームはそれぞれ、_writableState.buffer
または
_readableState.buffer
と呼ばれる内部オブジェクトにデータを
バッファリングします。
バッファリングされるデータの量は、コンストラクタに渡される highWaterMark
オプションに依存します。
Readable ストリームにおけるバッファリングは、実装が stream.push(chunk)
を呼び出した時に起こります。
ストリームの利用者が stream.read()
を呼び出さないと、
データはそれが消費されるまで内部キューに留まります。
Writable ストリームにおけるバッファリングは、利用者が stream.write(chunk)
を繰り返し呼び出すと、write()
が false
を返した場合でも起こります。
ストリーム、特に pipe()
メソッドの目的は、データのバッファリングを
許容できるレベルに制限することです。そのため、様々な速度の入力元と出力先で、
利用可能なメモリを圧迫しません。
stream.read(0)
#
実際にデータを消費することなく、下層の Readable ストリームのメカニズムを
リフレッシュするきっかけが欲しくなるケースがあります。
そのケースでは、常に null
を返す stream.read(0)
を呼び出すことができます。
内部バッファが highWaterMark
を下回っていて、
ストリームが現在読み込み中でなければ、read(0)
の呼び出しは低水準の
_read()
を呼び出すきっかけとなります。
これをする必要はほとんどありません。 しかしながら Node の内部、特に Readable ストリームクラスの内部で、 これが使われているケースを見ることができるでしょう。
stream.push('')
#
ゼロバイトの長さの文字列またはバッファをプッシュすると、
(オブジェクトモードの場合を除き) 面白い副作用が起こります。
それは stream.push()
を呼び出すので、reading
プロセスを終了します。
しかしながら、それは読み込みバッファにどんなデータも加え ない ので、
ユーザが消費するものは何もありません。
ごくまれに、今は提供するデータが無い場合があります。しかし、stream.read(0)
を呼び出すことにより、ストリームの利用者 (あるいは、もしかするとあなたの
コードの一部) は再びチェックすべきなのがいつかを知ることができます。
このケースでは、stream.push('')
を呼び出すことが できます 。
現在の所、この機能の唯一のユースケースは v0.12 で廃止予定の
tls.CryptoStream の中にあります。
もし stream.push('')
を使わなければならないことになったら、それはおそらく
何かが恐ろしく間違っていることを示すので、他の方法を検討してください。
Compatibility with Older Node Versions#
v0.10 より前のバージョンの Node では、Readable ストリームのインタフェースは よりシンプルでしたが、強力ではなく使いやすくもありませんでした。
read()
メソッドが呼び出されるのを待つのではなく、'data'
イベントがすぐに生成され始めます。 もしデータを処理する方法を決定するためにいくらかの I/O をする 必要がある場合、データが失われないようにするためには チャンクを何らかのバッファに保存しなければなりませんでした。pause()
は保証というよりはむしろ助言でした。 それはストリームが中断された状態であったとしても、'data'
イベントを受け取る準備が必要だということを意味します。
Node v0.10 から、上記で説明した Readable クラスが追加されました。
古い Node プログラムとの後方互換性のために、Readable ストリームは
'data'
イベントのハンドラが加えられた場合や、
resume()
メソッドが読み出されると、「flowing モード」に切り替わります。
その結果として、新しい read()
メソッドや 'readable'
イベントを
使用していなくても、もう 'data'
イベントのチャンクが失われることを
心配する必要はありません。
ほとんどのプログラムはこれまで通りに機能するでしょう。 しかしながら、以下の条件でエッジケースが存在します。
'data'
イベント イベントハンドラが登録されていない。resume()
メソッドが呼び出されていない。- ストリームはどの書き込みストリームへもパイプされていない。
例えば、以下のコードを考えてみてください:
// WARNING! BROKEN!
net.createServer(function(socket) {
// we add an 'end' method, but never consume the data
socket.on('end', function() {
// It will never get here.
socket.end('I got your message (but didnt read it)\n');
});
}).listen(1337);
v0.10 より前の Node では、入ってきたデータは単純に破棄されていました。 しかしながら、Node v0.10 以降では、ソケットは中断したままとなります。
この状況の回避策は、データの流れを開始するために resume()
メソッドを呼び出すことです。
// Workaround
net.createServer(function(socket) {
socket.on('end', function() {
socket.end('I got your message (but didnt read it)\n');
});
// start the flow of data, discarding it.
socket.resume();
}).listen(1337);
新しい Readable ストリームを flowing モードに切り替えられることに加えて、
wrap()
メソッドを使って v0.10 より前のスタイルのストリームを
Readable クラスでラップすることもできます。
Object Mode#
通常、ストリームは文字列またはバッファのみを扱います。
オブジェクトモード のストリームは、文字列及びバッファ以外の 一般的なJavaScriptの値を扱うことができます。
オブジェクトモードの Readable ストリームは、stream.read(size)
のサイズ引数が
いくつであるかに関わらず、常に一つの項目を返します。
オブジェクトモードの Writable ストリームは、stream.write(data, encoding)
の encoding
引数を常に無視します。
特別な値 null
は、オブジェクトモードのストリームにおいても
特別な値を持ちます。
すなわち、オブジェクトモードの Readable ストリームでは、stream.read()
の戻り値 null
はもうデータが無いことを、stream.push(null)
はストリームデータの終端を示します (EOF
)。
Node のコアライブラリにはオブジェクトモードのストリームは存在しません。 このパターンはユーザランドのライブラリでのみ使われます。
ストリームのサブクラスはコストラクタの options
オブジェクトで objectMode
を設定すべきです。
objectMode
をストリームの途中で設定することは安全ではありません。
State Objects#
Readable ストリームは _readableState
と呼ばれるメンバを持っています。
Writable ストリームは _writableState
と呼ばれるメンバを持っています。
Duplex ストリームは両方を持っています。
通常、これらのオブジェクトはサブクラスで変更すべきではありません。
しかしながら、もし Duplex または Transform ストリームの読み込み側が
objectMode
で、書き込み側が objectMode
ではない場合、コンストラクタで
適切なステートオブジェクトにフラグを明示的に設定することになるかもしれません。
var util = require('util');
var StringDecoder = require('string_decoder').StringDecoder;
var Transform = require('stream').Transform;
util.inherits(JSONParseStream, Transform);
// Gets \n-delimited JSON string data, and emits the parsed objects
function JSONParseStream(options) {
if (!(this instanceof JSONParseStream))
return new JSONParseStream(options);
Transform.call(this, options);
this._writableState.objectMode = false;
this._readableState.objectMode = true;
this._buffer = '';
this._decoder = new StringDecoder('utf8');
}
JSONParseStream.prototype._transform = function(chunk, encoding, cb) {
this._buffer += this._decoder.write(chunk);
// split on newlines
var lines = this._buffer.split(/\r?\n/);
// keep the last partial line buffered
this._buffer = lines.pop();
for (var l = 0; l < lines.length; l++) {
var line = lines[l];
try {
var obj = JSON.parse(line);
} catch (er) {
this.emit('error', er);
return;
}
// push the parsed object out to the readable consumer
this.push(obj);
}
cb();
};
JSONParseStream.prototype._flush = function(cb) {
// Just handle any leftover
var rem = this._buffer.trim();
if (rem) {
try {
var obj = JSON.parse(rem);
} catch (er) {
this.emit('error', er);
return;
}
// push the parsed object out to the readable consumer
this.push(obj);
}
cb();
};
ステートオブジェクトは、デバッグで役に立つストリームの状態を 情報として持ちます。それを見ることは安全ですが、しかしコンストラクタで設定した オプションフラグを変更することは安全では ありません。