@pipes/convert

Conversion utilities for web streams to/from other data structures.

View the Project on GitHub

Table of Contents

Introducing pipes/convert

The pipesjs/convert module provides various functions to convert web streams to other similar data structures like highland streams, Observables, Rx streams etc. and vice versa.

For more about Web Streams, refer to the spec.


About

The convert module provides various functions to convert web streams to other similar data structures like highland streams, Observables, Rx streams etc. and vice versa. Here’s more about Web Streams from the spec itself:

Large swathes of the web platform are built on streaming data: that is, data that is created, processed, and consumed in an incremental fashion, without ever reading all of it into memory. The Streams Standard provides a common set of APIs for creating and interfacing with such streaming data, embodied in readable streams, writable streams, and transform streams.

The spec is still evolving but has reached a fairly stable stage with a reference implementation as well. The API has almost been finalized and Streams are coming to the web very soon!

At it’s core, the API exposes three major components:

Streams are essentially data structures that handle sequential flow of values. You can split streams, merge them and connect them together in various ways. What’s amazing is that, in most cases, they can handle backpressure automatically, so you don’t have to mess with the underlying details.

For further information, the spec is quite informative and easy to read. Jake Archibald also wrote a great blog post on them.

Heads up: If you’re coming from node land, web streams are quite a lot different from node streams and incompatible with each other.

Installing

For browsers

The library depends on @pipes/core, so make sure you include it in before including the library.

You can use either of the builds from the dist folder:

    <script src="path/to/web-streams-polyfill.js"></script>
    <script src="path/to/pipes.convert.js"></script>

And in your code, all the functions will be available on the window.Pipes.convert variable.

    let { toNodeStream, fromIterable } = window.Pipes.convert;

For browserify users

The library has a peer-dependency on @pipes/core, so to install it:

    npm install @pipes/core @pipes/convert

The library is split up into modules, so you can both require the whole library or only parts of it:

    let { fromIterable } = require("@pipes/convert");
    let fromIterable = require("@pipes/convert/fromIterable");

    let fromIterable = require("@pipes/convert/from/iterable");

For ES6 and Rollup users

If you want, you can directly import the es6 modules like so:

    import pipesConvert from "@pipes/convert/src";
    import { fromIterable } from "@pipes/convert/src";
    import fromIterable from "@pipes/convert/src/fromIterable";

    import fromIterable from "@pipes/convert/src/from/iterable";

API Reference

The convert library consists of the following functions:

Set up code for examples

  // Setup
  let createReadable = data => new ReadableStream({
      start (controller) {
      this.data = data || [1,2,3];

      // Kickstart stream
      controller.enqueue( this.data.pop() );
      },
      pull (controller) {
      if ( !this.data.length )
          return controller.close()

      controller.enqueue( this.data.pop() );
      }
  }),
  createWritable = () => new WritableStream({
      write (chunk) {
      console.log( chunk );
      }
  });

fromIterable

This function takes an iterable and returns a readable stream that queues the iterated values sequentially.

Parameters

Examples

let
  input = [1,2,3,4,5],
  // input = function* gen() { yield* input; },
  // input = input.join("");

let writable, res=[];

// Create test streams
writable = createTestWritable( c => res.push( c ));

// Connect the streams
connect(
  fromIterable( input ),
  writable
); // res == input

Returns ReadableStream

fromObservable

This function takes any ReadableStream and returns an Observable that emits chunks to subscribers when they arrive.

Parameters

Examples

let input = [1,2,3],
  output = [],
  observable, writable;

// Create test streams
writable = createTestWritable( i => output.push( i ));

// Test the promise
return fromObservable( Observable.from( input ) )
  .pipeTo( writable );

Returns ReadableStream

fromPromise

This function takes any promise and returns a readable stream that queues the resolved value or errors on rejection.

Parameters

Examples

let
  input = 42;
  promise = new Promise( resolve => resolve( input ) ),
  writable;

// Create test streams
writable = createTestWritable( c => assert.equal( c, input ));

connect(
  fromPromise( promise ),
  writable
); // 42

Returns ReadableStream

toObservable

This function takes any ReadableStream and returns an Observable that emits chunks to subscribers when they arrive.

Parameters

Examples

let input = [1,2,3],
  output = [],
  readable;

// Create test streams
readable = createTestReadable( input );

// Test the promise
toObservable( readable )
  .subscribe({
      next (val) { output.push( val ); },
      complete () {
        assert.deepEqual( input, output );
      }
  });

Returns Observable

toPromise

This function takes any ReadableStream and returns a promise that resolves with an Array of the stream’s contents when the stream closes.

Parameters

Examples

let input = [1,2,3],
  output = [1,2,3],
  readable;

// Create test streams
readable = createTestReadable( input );

// Test the promise
toPromise( readable )
  .then( result => {
    assert.deepEqual( result, output );
    done();
  });

Returns Promise<Array<T»