pipes/utils is to `web streams` what `highland.js` is to `node streams`
The pipesjs/utils
module is to web streams
what highland.js
is to node streams
.
It contains utility functions to make working with web streams
a lot easier.
For more about Web Streams
, refer to the spec.
The utils
module is to web streams
what highland.js
is to node streams
. It contains utility functions to make working with web streams
a lot easier. Here’s more about Web Streams
from the spec itself:
Large swathes of the web platform are built on streaming data: that is, data that is created, processed, and consumed in an incremental fashion, without ever reading all of it into memory. The Streams Standard provides a common set of APIs for creating and interfacing with such streaming data, embodied in readable streams, writable streams, and transform streams.
The spec is still evolving but has reached a fairly stable stage with a reference implementation as well. The API has almost been finalized and Stream
s are coming to the web very soon!
At it’s core, the API exposes three major components:
ReadableStream
encapsulates a source producing values and emits them.TransformStream
are essentially { readable, writable}
pairs that take a function which can be used to transform the values flowing through it.WritableStream
encapsulates a sink that receives values and writes to it.Stream
s are essentially data structures that handle sequential flow of values. You can split streams, merge them and connect them together in various ways. What’s amazing is that, in most cases, they can handle backpressure automatically, so you don’t have to mess with the underlying details.
For further information, the spec is quite informative and easy to read. Jake Archibald also wrote a great blog post on them.
Heads up: If you’re coming from node
land, web streams
are quite a lot different from node streams
and incompatible with each other.
The library depends on @pipes/core, so make sure you include it in before including the library.
You can use either of the builds from the dist
folder:
<script src="path/to/web-streams-polyfill.js"></script>
<script src="path/to/pipes.utils.js"></script>
And in your code, all the functions will be available on the window.Pipes.utils
variable.
let { uniq, compact } = window.Pipes.utils;
The library has a peer-dependency on @pipes/core, so to install it:
npm install @pipes/core @pipes/utils
The library is split up into modules, so you can both require the whole library or only parts of it:
let { compact } = require("@pipes/utils");
let compact = require("@pipes/utils/compact");
If you want, you can directly import the es6 modules like so:
import pipesUtils from "@pipes/utils/src";
import { compact } from "@pipes/utils/src";
import compact from "@pipes/utils/src/compact";
The utils
library only consists of the following functions:
Set up code for examples
// Setup
let createReadable = data => new ReadableStream({
start (controller) {
this.data = data || [1,2,3];
// Kickstart stream
controller.enqueue( this.data.pop() );
},
pull (controller) {
if ( !this.data.length )
return controller.close()
controller.enqueue( this.data.pop() );
}
}),
createWritable = () => new WritableStream({
write (chunk) {
console.log( chunk );
}
});
This function takes an int n
and returns a transform stream
that batches the incoming values in arrays of lengths no
more than n
.
Parameters
size
number
Examples
let
input = [1,2,3,4,5],
expected = [[1,2],[3,4],[5]];
let readable, writable, res=[];
// Create test streams
readable = createTestReadable( input );
writable = createTestWritable( c => res.push( c ));
// Connect the streams
connect(
readable,
batch( 2 ),
writable
); // res == expected
Returns TransformStream
This function returns a transform stream that spits out only truthy values from the input stream.
Examples
let readable, writable,
count = 0;
// Create test streams
readable = createTestReadable( [true, false, 0, "", "hello", 1] );
writable = createTestWritable( () => count++ );
// Connect the streams
connect(
readable,
compact(),
writable
); // count == 3
This function takes an iterable
as argument and returns
a readable stream that repeatedly emits values generated by the emitter.
Examples
let readable, writable, values=[1,2,3], sum=0;
// Create test streams
readable = cycle( values );
writable = createTestWritable( c => { sum+=c });
// Connect the streams
const expected = 2 * values.reduce( (a, b) => a+b );
connect(
readable,
take( 2 * values.length ),
writable
); // sum == expected
This function takes an int n
and returns a
transform stream
that debounces the incoming values by n
ms,
only producing values with n
ms delay between them and dropping the rest.
Parameters
Returns ReadableWritable
This function takes an int n
and returns a transform stream
that drops the first n
values from the input stream.
Parameters
count
number
Examples
let readable, writable,
count = 0;
// Create test streams
readable = createTestReadable( [1,2,3,4,5,6] );
writable = createTestWritable( () => count++ );
// Connect the streams
connect(
readable,
drop(3),
writable
); // count == 3
Returns ReadableWritable
This function takes a predicate function as argument and returns
a transform stream
that only emits values that satisfy the predicate.
Parameters
pred
function (any?): boolean
Examples
let readable, writable;
// Create test streams
readable = createTestReadable( [1,2,3,4,5,6] );
writable = createTestWritable( c => assert( c > 3 ) );
// Connect the streams
connect(
readable,
filter( a => a > 3 ),
writable
);
Returns ReadableWritable
This function returns a transform stream
that takes the first value
from the input stream and enqueues it on the output stream.
Examples
let readable, writable, el;
// Create test streams
readable = createTestReadable( [1,2,3,4,5,6] );
writable = createTestWritable( e => { el = e; });
// Connect the streams
connect(
readable,
head(),
writable
); // el == 1
Returns ReadableWritable
This function takes any value a
and returns a transform stream
that intersperses the values from the input stream with the a
.
Parameters
val
any
Examples
let readable, writable,
res = [];
// Create test streams
readable = createTestReadable( [1,2,3] );
writable = createTestWritable( c => res.push(c) );
// Connect the streams
connect(
readable,
intersperse(0),
writable
); // res == [1,0,2,0,3]
Returns ReadableWritable
This function returns a transform stream
that takes
the last value from the input stream and enqueues it on the output stream.
Examples
let readable, writable, el;
// Create test streams
readable = createTestReadable( [1,2,3,4,5,6] );
writable = createTestWritable( e => { el = e; });
// Connect the streams
connect(
readable,
last(),
writable
); // el == 6
Returns ReadableWritable
This function takes any number of string
s as arguments and
returns a transform stream
that extracts the passed property names from
incoming values.
Parameters
Examples
let readable, writable,
o = {
'a': 1,
'b': 2
};
// Create test streams
readable = createTestReadable( [o, o, o] );
writable = createTestWritable( c => assert( c.a ) && assert( !c.b ) );
// Connect the streams
connect(
readable,
pick('a'),
writable
);
Returns ReadableWritable
This function takes a string
as argument and returns
a transform stream
that extracts the passed property from
incoming values.
Parameters
prop
string
Examples
let readable, writable,
o = {
'a': 1,
'b': 2
};
// Create test streams
readable = createTestReadable( [o, o, o] );
writable = createTestWritable( c => assert( c == 1 ) );
// Connect the streams
connect(
readable,
pluck('a'),
writable
);
Returns ReadableWritable
This function takes a value as argument and returns a readable stream
that repeatedly emits that value.
Parameters
value
any
Examples
let readable, writable, val=1, len=6, sum=0;
// Create test streams
readable = repeat(val);
writable = createTestWritable( c => { sum+=c });
// Connect the streams
connect(
readable,
take( len ),
writable
); // sum == (val * len)
Returns ReadableWritable
This function takes a reducer function and an optional init
value
as arguments and returns a transform stream
that applies the
function to the incoming values and enqueues the accumulation of the results.
If an init
value is not passed, the first incoming value is treated as one.
Parameters
func **function ((T2? |
T1?), T1?): T2** |
init
T1?
Examples
let readable, writable, res;
// Create test streams
readable = createTestReadable( [1,2,3] );
writable = createTestWritable( c => {
// Check last element is number
let n = c[c.length-1];
assert( n === +n );
res = c;
});
// Connect the streams
connect(
readable,
scan( add, 0 ),
writable
); // res[res.length-1], [1,2,3].reduce( add )
Returns ReadableWritable
This function takes an int m
and an int n
and returns
a transform stream
that drops the first m
values and
takes the next (m-n)
values from the input stream.
Parameters
Examples
let readable, writable,
count = 0;
// Create test streams
readable = createTestReadable( [1,2,3,4,5,6] );
writable = createTestWritable( () => count++ );
// Connect the streams
connect(
readable,
slice(2,5),
writable
); // count == 3
Returns ReadableWritable
This function takes an int n
and returns a transform stream
that takes the first n
values from the input stream.
Parameters
count
number
Examples
let readable, writable,
count = 0;
// Create test streams
readable = createTestReadable( [1,2,3,4,5,6] );
writable = createTestWritable( () => count++ );
// Connect the streams
connect(
readable,
take(3),
writable
); // count == 3
Returns ReadableWritable
This function takes a function as rgument and returns
a transform stream
that applies the function to the incoming
values before re-emitting them.
Parameters
func
anyFn
Examples
let readable, writable;
// Create test streams
readable = createTestReadable( [1,2,3] );
writable = createTestWritable( c => assert( !Number.isNaN( c*1 ) ) );
// Connect the streams
connect(
readable,
tap( console.log.bind(console) ),
writable
);
Returns ReadableWritable
This function takes an int n
and returns a transform stream
that throttles the incoming values by n
ms, only producing
values every n
ms and dropping the rest.
Parameters
Returns TransformStream
This function returns a transform stream
that keeps only unique
values from the input stream and enqueues it on the output stream.
Examples
let readable, writable,
res = [];
// Create test streams
readable = createTestReadable( [1,1,2,2,3,3] );
writable = createTestWritable( c => res.push(c) );
// Connect the streams
connect(
readable,
uniq(),
writable
); // res == [1,2,3]
Returns ReadableWritable