@pipes/core

Basic utilities for web streams

View the Project on GitHub

Table of Contents

Introducing pipes/core

pipesjs includes an evolving bunch of modules of utilities and helpers for web streams.


About

The core module contains some basic utility functions to make working with web streams a lot easier. Here’s more about Web Streams from the spec itself:

Large swathes of the web platform are built on streaming data: that is, data that is created, processed, and consumed in an incremental fashion, without ever reading all of it into memory. The Streams Standard provides a common set of APIs for creating and interfacing with such streaming data, embodied in readable streams, writable streams, and transform streams.

The spec is still evolving but has reached a fairly stable stage with a reference implementation as well. The API has almost been finalized and Streams are coming to the web very soon!

The core module is designed to be extremely lightweight and barebones, the minified and gzipped build is just 4 kb.

At it’s core, the API exposes three major components:

Streams are essentially data structures that handle sequential flow of values. You can split streams, merge them and connect them together in various ways. What’s amazing is that, in most cases, they can handle backpressure automatically, so you don’t have to mess with the underlying details.

For further information, the spec is quite informative and easy to read. Jake Archibald also wrote a great blog post on them.

Heads up: If you’re coming from node land, web streams are quite a lot different from node streams and incompatible with each other.

Installing

For browsers

The library depends on a polyfill for browsers that don’t support Stream APIs yet (which as of now, is all of them), so make sure you include it in before including the library.

You can use either of the builds from the dist folder:

    <script src="path/to/web-streams-polyfill.js"></script>
    <script src="path/to/pipes.core.js"></script>

And in your code, all the functions will be available on the window.Pipes variable.

    let { pipe, flatten } = window.Pipes;

    flatten(/* some streams here */);

For browserify users

The library has a peer-dependency on web-streams-polyfill, so to install it:

    npm install web-streams-polyfill @pipes/core

The library is split up into modules, so you can both require the whole library or only parts of it:

    let { flatten } = require("@pipes/core");
    let merge = require("@pipes/core/merge");

For ES6 and Rollup users

If you want, you can directly import the es6 modules like so:

    import pipes from "@pipes/core/src";
    import { flatten } from "@pipes/core/src";
    import flatten from "@pipes/core/src/flatten";

API Reference

The core library only consists of the following functions:

Set up code for examples

  // Setup
  let createReadable = data => new ReadableStream({
      start (controller) {
      this.data = data || [1,2,3];

      // Kickstart stream
      controller.enqueue( this.data.pop() );
      },
      pull (controller) {
      if ( !this.data.length )
          return controller.close()

      controller.enqueue( this.data.pop() );
      }
  }),
  createWritable = () => new WritableStream({
      write (chunk) {
      console.log( chunk );
      }
  });

accumulate

This function takes a reducer function and an optional initial value and returns a transformstream that accumulates the values of any stream piped to it.

Parameters

Examples

let readable, accumulator, accumulated, total;

  // Create streams
  readable = createTestReadable( [1,2,3] );

  // Connect the streams
  accumulator = accumulate( (a, b) => a+b, 4 );
  accumulated = readable.pipeThrough( new accumulator );    // 10

Returns TransformStream a ReadableWritable that consumes piped stream, combining the values with the reducer and enqueues the result.

connect

This function takes any number of transform streams with an optional readable at the head and a writable at the tail. It connects them together by applying pipeThrough recursively and returns the resulting readable that acts as a composition of the input streams.

In case, a writable is passed at the tail, the resulting readable is pipeTod and the resulting promise is returned.

Parameters

Examples

let readable = createReadable(),
  writable = createWritable(),
  passThrough = pipe( k => k );

let promise = connect( readable, passThrough, writable );   // 1, 2, 3
Returns **(ReadableStream Promise<any>)**

chain

This function takes one or more transform streams / { readable, writable } pairs connects them to each other. Then takes the readable of the end and the writable of the head and returns the { readable, writable } pair that is compatible with ReadableStream::pipeThrough.

Parameters

Examples

// Pure funtion example
let negator = pipe( n => -n ),
  doubler = pipe( n => 2*n ),
  composed = chain( new negator, new doubler ),
  rIn = createReadable(),
  rOut;

rOut = rIn.pipeThrough( composed );  // -2, -4, -6

Returns ReadableWritable

flatten

This function takes one or more streams and returns a readable combining the streams, returning chunks as they arrive in combined streams.

Parameters

Examples

let r1 = createReadable([1,2,3]),
  r2 = createReadable([4,5,6]),
  writable = createWritable(),
  flattened = flatten(r1,r2);

flattened.pipeTo( writable );   // 1,4,2,5,3,6   (order depends on order received so may vary)

Returns ReadableStream

merge

This function takes one or more streams and returns a readable combining the streams, such that it gathers chunks from all streams into an array and then pushes them onto the combined stream, by waiting for all streams to have pushed a chunk.

Parameters

Examples

let r1 = createReadable([1,2,3]),
  r2 = createReadable([4,5,6,7]),
  writable = createWritable(),
  merged = merge(r1,r2);

merged.pipeTo( writable );   // [1,4], [2,5], [3,6]

Returns ReadableStream

pipe

This function takes any normal/generator func and returns a transform stream.

Parameters

Examples

// Pure funtion example
  let negator = pipe( n => -n ),
    rIn = createReadable(),
    rOut;

  rOut = rIn.pipeThrough( new negator );  // -1, -2, -3

  // Basic generator example
  let doubler = pipe( function* (v) {
      yield v;
      yield v;
  }),
  rIn = createReadable(),
  rOut;

  rOut = rIn.pipeThrough( new doubler );  // 1, 1, 2, 2, 3, 3
// Infinite generator example

  let inf = pipe( function* (v) {
      // Close on shutdown signal
      while( !( yield v ));
  }, {
      init: 1
  });

  new inf;    // 1, 1, 1, 1...

Returns TransformStream

pipe.async

This function takes any async func and returns a transform stream.

Parameters

Examples

// Basic async example
let serverTalker = pipe.async( async function (msg) {
    let response = await sendToServer( msg );
    return response;
  }),
  rIn = createReadable(),
  rOut;

rOut = rIn.pipeThrough( new serverTalker );  // {response}, {response}, {response}
// Basic promise example
let serverTalker = pipe.async( function (msg) {
    let response = new Promise( resolve => {
      sendToServer( msg, resolve );
    });
    return response;
  }),
  rIn = createReadable(),
  rOut;

rOut = rIn.pipeThrough( new serverTalker );  // {response}, {response}, {response}

Returns any TransformStream

pipe.eos

“End of Stream” This is the equivalent of EOF char in UNIX systems, if a pipe function returns this at any point, the streams are gracefully closed.

split

This function takes a readable stream and a number and returns an array of tee’d readable streams, with a cancelAll function that cancels all the tee’d streams and in turn the original stream.

Parameters

Examples

let readable = createReadable([1,2,3]),
  [r1, r2] = split( readable ),
  w1 = createWritable(),
  w2 = createWritable();

r1.pipeTo( w1 );   // 1, 2, 3
r2.pipeTo( w2 );   // 1, 2, 3

Returns Array<ReadableStream>