Basic utilities for web streams
pipesjs
includes an evolving bunch of modules of utilities and helpers for web streams
.
The core
module contains some basic utility functions to make working with web streams
a lot easier. Here’s more about Web Streams
from the spec itself:
Large swathes of the web platform are built on streaming data: that is, data that is created, processed, and consumed in an incremental fashion, without ever reading all of it into memory. The Streams Standard provides a common set of APIs for creating and interfacing with such streaming data, embodied in readable streams, writable streams, and transform streams.
The spec is still evolving but has reached a fairly stable stage with a reference implementation as well. The API has almost been finalized and Stream
s are coming to the web very soon!
The core
module is designed to be extremely lightweight and barebones, the minified and gzipped build is just 4 kb
.
At it’s core, the API exposes three major components:
ReadableStream
encapsulates a source producing values and emits them.TransformStream
are essentially { readable, writable}
pairs that take a function which can be used to transform the values flowing through it.WritableStream
encapsulates a sink that receives values and writes to it.Stream
s are essentially data structures that handle sequential flow of values. You can split streams, merge them and connect them together in various ways. What’s amazing is that, in most cases, they can handle backpressure automatically, so you don’t have to mess with the underlying details.
For further information, the spec is quite informative and easy to read. Jake Archibald also wrote a great blog post on them.
Heads up: If you’re coming from node
land, web streams
are quite a lot different from node streams
and incompatible with each other.
The library depends on a polyfill for browsers that don’t support Stream
APIs yet (which as of now, is all of them), so make sure you include it in before including the library.
You can use either of the builds from the dist
folder:
<script src="path/to/web-streams-polyfill.js"></script>
<script src="path/to/pipes.core.js"></script>
And in your code, all the functions will be available on the window.Pipes
variable.
let { pipe, flatten } = window.Pipes;
flatten(/* some streams here */);
The library has a peer-dependency on web-streams-polyfill, so to install it:
npm install web-streams-polyfill @pipes/core
The library is split up into modules, so you can both require the whole library or only parts of it:
let { flatten } = require("@pipes/core");
let merge = require("@pipes/core/merge");
If you want, you can directly import the es6 modules like so:
import pipes from "@pipes/core/src";
import { flatten } from "@pipes/core/src";
import flatten from "@pipes/core/src/flatten";
The core
library only consists of the following functions:
Set up code for examples
// Setup
let createReadable = data => new ReadableStream({
start (controller) {
this.data = data || [1,2,3];
// Kickstart stream
controller.enqueue( this.data.pop() );
},
pull (controller) {
if ( !this.data.length )
return controller.close()
controller.enqueue( this.data.pop() );
}
}),
createWritable = () => new WritableStream({
write (chunk) {
console.log( chunk );
}
});
This function takes a reducer function and an optional initial value and returns a transformstream that accumulates the values of any stream piped to it.
Parameters
reducer
function a function that takes sequential values and reduces theminit
any?
Examples
let readable, accumulator, accumulated, total;
// Create streams
readable = createTestReadable( [1,2,3] );
// Connect the streams
accumulator = accumulate( (a, b) => a+b, 4 );
accumulated = readable.pipeThrough( new accumulator ); // 10
Returns TransformStream a ReadableWritable that consumes piped stream, combining the values with the reducer and enqueues the result.
This function takes any number of transform streams
with an optional readable
at the head and a writable
at the tail.
It connects them together by applying pipeThrough
recursively and returns the resulting readable
that acts as a composition of the input streams
.
In case, a writable
is passed at the tail, the resulting readable
is pipeTo
d and the resulting promise
is returned.
Parameters
origin **(ReadableStream |
ReadableWritable)** |
streams **…Array<(WritableStream |
ReadableWritable)>** |
Examples
let readable = createReadable(),
writable = createWritable(),
passThrough = pipe( k => k );
let promise = connect( readable, passThrough, writable ); // 1, 2, 3
Returns **(ReadableStream | Promise<any>)** |
This function takes one or more transform streams / { readable, writable } pairs
connects them to each other. Then takes the readable of the end and the writable
of the head and returns the { readable, writable } pair that is
compatible with ReadableStream::pipeThrough
.
Parameters
origin
ReadableWritable
streams
…Array<ReadableWritable>
Examples
// Pure funtion example
let negator = pipe( n => -n ),
doubler = pipe( n => 2*n ),
composed = chain( new negator, new doubler ),
rIn = createReadable(),
rOut;
rOut = rIn.pipeThrough( composed ); // -2, -4, -6
Returns ReadableWritable
This function takes one or more streams and returns a readable combining the streams, returning chunks as they arrive in combined streams.
Parameters
streams
…Array<ReadableStream>
Examples
let r1 = createReadable([1,2,3]),
r2 = createReadable([4,5,6]),
writable = createWritable(),
flattened = flatten(r1,r2);
flattened.pipeTo( writable ); // 1,4,2,5,3,6 (order depends on order received so may vary)
Returns ReadableStream
This function takes one or more streams and returns a readable combining the streams, such that it gathers chunks from all streams into an array and then pushes them onto the combined stream, by waiting for all streams to have pushed a chunk.
Parameters
streams
…Array<ReadableStream>
Examples
let r1 = createReadable([1,2,3]),
r2 = createReadable([4,5,6,7]),
writable = createWritable(),
merged = merge(r1,r2);
merged.pipeTo( writable ); // [1,4], [2,5], [3,6]
Returns ReadableStream
This function takes any normal/generator func and returns a transform stream.
Parameters
fn
function a function or a generator that returns transformed valuesopts
object containing config optionsExamples
// Pure funtion example
let negator = pipe( n => -n ),
rIn = createReadable(),
rOut;
rOut = rIn.pipeThrough( new negator ); // -1, -2, -3
// Basic generator example
let doubler = pipe( function* (v) {
yield v;
yield v;
}),
rIn = createReadable(),
rOut;
rOut = rIn.pipeThrough( new doubler ); // 1, 1, 2, 2, 3, 3
// Infinite generator example
let inf = pipe( function* (v) {
// Close on shutdown signal
while( !( yield v ));
}, {
init: 1
});
new inf; // 1, 1, 1, 1...
Returns TransformStream
This function takes any async func and returns a transform stream.
Parameters
asyncFunction
function an async function that returns a Promiseopts
object containing config optionsExamples
// Basic async example
let serverTalker = pipe.async( async function (msg) {
let response = await sendToServer( msg );
return response;
}),
rIn = createReadable(),
rOut;
rOut = rIn.pipeThrough( new serverTalker ); // {response}, {response}, {response}
// Basic promise example
let serverTalker = pipe.async( function (msg) {
let response = new Promise( resolve => {
sendToServer( msg, resolve );
});
return response;
}),
rIn = createReadable(),
rOut;
rOut = rIn.pipeThrough( new serverTalker ); // {response}, {response}, {response}
Returns any TransformStream
“End of Stream” This is the equivalent of EOF
char in UNIX systems, if a pipe
function
returns
this at any point, the streams are gracefully closed.
This function takes a readable stream and a number and returns an array of
tee’d readable streams, with a cancelAll
function that cancels all the tee’d
streams and in turn the original stream.
Parameters
stream
ReadableStream
parts
number (optional, default 2
)Examples
let readable = createReadable([1,2,3]),
[r1, r2] = split( readable ),
w1 = createWritable(),
w2 = createWritable();
r1.pipeTo( w1 ); // 1, 2, 3
r2.pipeTo( w2 ); // 1, 2, 3
Returns Array<ReadableStream>