When the server streams data faster than the client can process it, excess data will queue up in the client's memory. This issue is called backpressure, and it can lead to memory overflow errors, or data loss when the client's memory reaches capacity.
In this recipe, you create an API endpoint that:
- Simulates backpressure by generating data faster than a stream can read it
- Handles backpressure by pushing data into a stream as it's needed, rather than as it's ready
Jump to the full example to see the full recipe.
In this case, it will be a generator function that yields a new integer indefinitely
// A generator that will yield positive integers
async function* integers() {
let i = 1;
while (true) {
console.log(`yielding ${i}`);
yield i++;
await sleep(100);
}
}
// Add a custom sleep function to create
// a delay that simulates how slow some
// Function responses are.
function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
Next, create a method that adds the generator function to a ReadableStream
. Using the pull
handler, you can prevent new data being added from the generator to the stream if no more data is being requested
// Wraps a generator into a ReadableStream
function createStream(iterator::AsyncGenerator<number, void, unknown>) {
return new ReadableStream({
// The pull method controls what happens
// when data is added to a stream.
async pull(controller) {
const { value, done } = await iterator.next();
// done == true when the generator will yield
// no more new values. If that's the case,
// close the stream.
if (done) {
controller.close();
} else {
controller.enqueue(value);
}
},
});
}
Finally, iterate through a loop and read data from the stream. Without the code that checks if the generator is done, the stream would continue taking values from integers()
indefinitely, filling up memory. Because the code checks if the generator is done, the stream closes after you iterator as many times as loopCount
:
// Demonstrate handling backpressure
async function backpressureDemo() {
// Set up a stream of integers
const stream = createStream(integers());
// Read values from the stream
const reader = stream.getReader();
const loopCount = 5;
// Read as much data as you want
for (let i = 0; i < loopCount; i++) {
// Get the newest value added to the stream
const { value } = await reader.read();
console.log(`Stream value: ${value}`);
await sleep(1000);
}
}
The final file, including the route handler function, will look like this:
// A generator that will yield positive integers
async function* integers() {
let i = 1;
while (true) {
console.log(`yielding ${i}`);
yield i++;
await sleep(100);
}
}
// Add a custom sleep function to create
// a delay that simulates how slow some
// Function responses are.
function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
// Wraps a generator into a ReadableStream
function createStream(iterator: AsyncGenerator<number, void, unknown>) {
return new ReadableStream({
// The pull method controls what happens
// when data is added to a stream.
async pull(controller) {
const { value, done } = await iterator.next();
// done == true when the generator will yield
// no more new values. If that's the case,
// close the stream.
if (done) {
controller.close();
} else {
controller.enqueue(value);
}
},
});
}
// Demonstrate handling backpressure
async function backpressureDemo() {
// Set up a stream of integers
const stream = createStream(integers());
// Read values from the stream
const reader = stream.getReader();
const loopCount = 5;
// Read as much data as you want
for (let i = 0; i < loopCount; i++) {
// Get the newest value added to the stream
const { value } = await reader.read();
console.log(`Stream value: ${value}`);
await sleep(1000);
}
}
export async function GET() {
backpressureDemo();
return new Response('Check your console to see the result!');
}
If you're not using a framework, you must either add
"type": "module"
to your
package.json
or change your JavaScript Functions'
file extensions from .js
to
.mjs