Conversion utilities for web streams to/from other data structures.
The pipesjs/convert
module provides various functions to convert web streams
to other similar data structures
like highland
streams, Observable
s, Rx
streams etc. and vice versa.
For more about Web Streams
, refer to the spec.
The convert
module provides various functions to convert web streams
to other similar data structures
like highland
streams, Observable
s, Rx
streams etc. and vice versa. Here’s more about Web Streams
from the spec itself:
Large swathes of the web platform are built on streaming data: that is, data that is created, processed, and consumed in an incremental fashion, without ever reading all of it into memory. The Streams Standard provides a common set of APIs for creating and interfacing with such streaming data, embodied in readable streams, writable streams, and transform streams.
The spec is still evolving but has reached a fairly stable stage with a reference implementation as well. The API has almost been finalized and Stream
s are coming to the web very soon!
At it’s core, the API exposes three major components:
ReadableStream
encapsulates a source producing values and emits them.TransformStream
are essentially { readable, writable}
pairs that take a function which can be used to transform the values flowing through it.WritableStream
encapsulates a sink that receives values and writes to it.Stream
s are essentially data structures that handle sequential flow of values. You can split streams, merge them and connect them together in various ways. What’s amazing is that, in most cases, they can handle backpressure automatically, so you don’t have to mess with the underlying details.
For further information, the spec is quite informative and easy to read. Jake Archibald also wrote a great blog post on them.
Heads up: If you’re coming from node
land, web streams
are quite a lot different from node streams
and incompatible with each other.
The library depends on @pipes/core, so make sure you include it in before including the library.
You can use either of the builds from the dist
folder:
<script src="path/to/web-streams-polyfill.js"></script>
<script src="path/to/pipes.convert.js"></script>
And in your code, all the functions will be available on the window.Pipes.convert
variable.
let { toNodeStream, fromIterable } = window.Pipes.convert;
The library has a peer-dependency on @pipes/core, so to install it:
npm install @pipes/core @pipes/convert
The library is split up into modules, so you can both require the whole library or only parts of it:
let { fromIterable } = require("@pipes/convert");
let fromIterable = require("@pipes/convert/fromIterable");
let fromIterable = require("@pipes/convert/from/iterable");
If you want, you can directly import the es6 modules like so:
import pipesConvert from "@pipes/convert/src";
import { fromIterable } from "@pipes/convert/src";
import fromIterable from "@pipes/convert/src/fromIterable";
import fromIterable from "@pipes/convert/src/from/iterable";
The convert
library consists of the following functions:
Set up code for examples
// Setup
let createReadable = data => new ReadableStream({
start (controller) {
this.data = data || [1,2,3];
// Kickstart stream
controller.enqueue( this.data.pop() );
},
pull (controller) {
if ( !this.data.length )
return controller.close()
controller.enqueue( this.data.pop() );
}
}),
createWritable = () => new WritableStream({
write (chunk) {
console.log( chunk );
}
});
This function takes an iterable and returns a readable stream that queues the iterated values sequentially.
Parameters
iterable
Iterable<T>
Examples
let
input = [1,2,3,4,5],
// input = function* gen() { yield* input; },
// input = input.join("");
let writable, res=[];
// Create test streams
writable = createTestWritable( c => res.push( c ));
// Connect the streams
connect(
fromIterable( input ),
writable
); // res == input
Returns ReadableStream
This function takes any ReadableStream
and returns an Observable
that emits chunks to subscribers
when
they arrive.
Parameters
observable
Observable
Examples
let input = [1,2,3],
output = [],
observable, writable;
// Create test streams
writable = createTestWritable( i => output.push( i ));
// Test the promise
return fromObservable( Observable.from( input ) )
.pipeTo( writable );
Returns ReadableStream
This function takes any promise and returns a readable stream that queues the resolved value or errors on rejection.
Parameters
promise
Promise<T>
Examples
let
input = 42;
promise = new Promise( resolve => resolve( input ) ),
writable;
// Create test streams
writable = createTestWritable( c => assert.equal( c, input ));
connect(
fromPromise( promise ),
writable
); // 42
Returns ReadableStream
This function takes any ReadableStream
and returns an Observable
that emits chunks to subscribers
when
they arrive.
Parameters
stream
ReadableStream
Examples
let input = [1,2,3],
output = [],
readable;
// Create test streams
readable = createTestReadable( input );
// Test the promise
toObservable( readable )
.subscribe({
next (val) { output.push( val ); },
complete () {
assert.deepEqual( input, output );
}
});
Returns Observable
This function takes any ReadableStream
and returns a promise
that resolves with an Array
of the stream’s contents when
the stream closes.
Parameters
stream
ReadableStream
Examples
let input = [1,2,3],
output = [1,2,3],
readable;
// Create test streams
readable = createTestReadable( input );
// Test the promise
toPromise( readable )
.then( result => {
assert.deepEqual( result, output );
done();
});