![]() |
VOOZH | about |
We’re so glad you’re here. You can expect all the best TNS content to arrive Monday through Friday to keep you on top of the news and at the top of your game.
Check your inbox for a confirmation email where you can adjust your preferences and even join additional groups.
Follow TNS on your favorite social media networks.
Become a TNS follower on LinkedIn.
Check out the latest featured and trending stories while you wait for your first TNS newsletter.
Readable, which is the Node.js base class for read streams. It can also be a simple call to the new Readable() constructor, if you want a custom stream without defining your own class. I’m sure plenty of you have used streams from the likes of HTTP res handlers to fs.createReadStream file streams. An implementation, however, needs to respect the rules for streams, namely that certain functions are overridden when the system calls them for stream flow situations. Let’s talk about what some of this looks like.
const {Readable} = require('stream')
// This data can also come from other streams :]
let dataToStream = [
'This is line 1\n'
, 'This is line 2\n'
, 'This is line 3\n'
]
class MyReadable extends Readable {
constructor(opts) {
super(opts)
}
_read() {
// The consumer is ready for more data
this.push(dataToStream.shift())
if (!dataToStream.length) {
this.push(null) // End the stream
}
}
_destroy() {
// Not necessary, but illustrates things to do on end
dataToStream = null
}
}
new MyReadable().pipe(process.stdout)
super(opts)or nothing will work._read is required and is called automatically when new data is wanted.push(<some data>) will cause the data to go into an internal buffer, and it will be consumed when something, like a piped writable stream, wants it.push(null) is required to properly end the read stream.
'end' event will be emitted after this.'close' event will also be emitted unless emitClose: false was set in the constructor._destroy is optional for cleanup things. Never override destroy; always use the underscored method for this and for _read.Readable inline:
const {Readable} = require('stream')
// This data can also come from other streams :]
let dataToStream = [
'This is line 1\n'
, 'This is line 2\n'
, 'This is line 3\n'
]
const myReadable = new Readable({
read() {
this.push(dataToStream.shift())
if (!dataToStream.length) {
this.push(null) // End the stream
}
}
, destroy() {
dataToStream = null
}
})
myReadable.pipe(process.stdout)
highWaterMark property, and the default is 16KB of byte data, or 16 objects if the stream is in object mode. When data is pushed through the readable stream, the push method may return false. If so, that means that the highWaterMark is close to, or has been, exceeded, and that is called backpressure.
If that happens, it’s up to the implementation to stop pushing data and wait for the _read call to come, signifying that the consumer is ready for more data, so push calls can resume. This is where a lot of folks fail to implement streams properly. Here are a couple of tips about pushing data through read streams:
_read to be called to push data as long as backpressure is respected. Data can continually be pushed until backpressure is reached. If the data size isn’t very large, it’s possible that backpressure will never be reached.'data' events and no pipe, then backpressure will certainly be reached if the data size exceeds the default buffer size.TailFile, which reads chunks from the underlying resource until backpressure is reached or all the data is read. Upon backpressure, the stream is stored and reading is resumed when _read is called.
async _readChunks(stream) {
for await (const chunk of stream) {
this[kStartPos] += chunk.length
if (!this.push(chunk)) {
this[kStream] = stream
this[kPollTimer] = null
return
}
}
// Chunks read successfully (no backpressure)
return
}
_read() {
if (this[kStream]) {
this._readChunks(this[kStream])
}
return
}
push can be called continuously’, but trust me, it’s a thing, even though the backpressure doc below always recommends waiting for _read. The fact is, depending on what you’re trying to implement, the code becomes less clear-cut, but as long as backpressure rules are followed and methods are overridden as required, then you’re on the right track!