@pipes/utils

pipes/utils is to `web streams` what `highland.js` is to `node streams`

View the Project on GitHub

Table of Contents

Introducing pipes/utils

The pipesjs/utils module is to web streams what highland.js is to node streams. It contains utility functions to make working with web streams a lot easier. For more about Web Streams, refer to the spec.


About

The utils module is to web streams what highland.js is to node streams. It contains utility functions to make working with web streams a lot easier. Here’s more about Web Streams from the spec itself:

Large swathes of the web platform are built on streaming data: that is, data that is created, processed, and consumed in an incremental fashion, without ever reading all of it into memory. The Streams Standard provides a common set of APIs for creating and interfacing with such streaming data, embodied in readable streams, writable streams, and transform streams.

The spec is still evolving but has reached a fairly stable stage with a reference implementation as well. The API has almost been finalized and Streams are coming to the web very soon!

At it’s core, the API exposes three major components:

Streams are essentially data structures that handle sequential flow of values. You can split streams, merge them and connect them together in various ways. What’s amazing is that, in most cases, they can handle backpressure automatically, so you don’t have to mess with the underlying details.

For further information, the spec is quite informative and easy to read. Jake Archibald also wrote a great blog post on them.

Heads up: If you’re coming from node land, web streams are quite a lot different from node streams and incompatible with each other.

Installing

For browsers

The library depends on @pipes/core, so make sure you include it in before including the library.

You can use either of the builds from the dist folder:

    <script src="path/to/web-streams-polyfill.js"></script>
    <script src="path/to/pipes.utils.js"></script>

And in your code, all the functions will be available on the window.Pipes.utils variable.

    let { uniq, compact } = window.Pipes.utils;

For browserify users

The library has a peer-dependency on @pipes/core, so to install it:

    npm install @pipes/core @pipes/utils

The library is split up into modules, so you can both require the whole library or only parts of it:

    let { compact } = require("@pipes/utils");
    let compact = require("@pipes/utils/compact");

For ES6 and Rollup users

If you want, you can directly import the es6 modules like so:

    import pipesUtils from "@pipes/utils/src";
    import { compact } from "@pipes/utils/src";
    import compact from "@pipes/utils/src/compact";

API Reference

The utils library only consists of the following functions:

Set up code for examples

  // Setup
  let createReadable = data => new ReadableStream({
      start (controller) {
      this.data = data || [1,2,3];

      // Kickstart stream
      controller.enqueue( this.data.pop() );
      },
      pull (controller) {
      if ( !this.data.length )
          return controller.close()

      controller.enqueue( this.data.pop() );
      }
  }),
  createWritable = () => new WritableStream({
      write (chunk) {
      console.log( chunk );
      }
  });

batch

This function takes an int n and returns a transform stream that batches the incoming values in arrays of lengths no more than n.

Parameters

Examples

let
  input = [1,2,3,4,5],
  expected = [[1,2],[3,4],[5]];

let readable, writable, res=[];

// Create test streams
readable = createTestReadable( input );
writable = createTestWritable( c => res.push( c ));

// Connect the streams
connect(
  readable,
  batch( 2 ),
  writable
); // res == expected

Returns TransformStream

compact

This function returns a transform stream that spits out only truthy values from the input stream.

Examples

let readable, writable,
  count = 0;

// Create test streams
readable = createTestReadable( [true, false, 0, "", "hello", 1] );
writable = createTestWritable( () => count++ );

// Connect the streams
connect(
  readable,
  compact(),
  writable
); // count == 3

Pipe

This function takes an iterable as argument and returns a readable stream that repeatedly emits values generated by the emitter.

Examples

let readable, writable, values=[1,2,3], sum=0;

// Create test streams
  readable = cycle( values );
  writable = createTestWritable( c => { sum+=c });

// Connect the streams
const expected = 2 * values.reduce( (a, b) => a+b );

connect(
  readable,
  take( 2 * values.length ),
  writable
); // sum == expected

debounce

This function takes an int n and returns a transform stream that debounces the incoming values by n ms, only producing values with n ms delay between them and dropping the rest.

Parameters

Returns ReadableWritable

drop

This function takes an int n and returns a transform stream that drops the first n values from the input stream.

Parameters

Examples

let readable, writable,
  count = 0;

// Create test streams
readable = createTestReadable( [1,2,3,4,5,6] );
writable = createTestWritable( () => count++ );

// Connect the streams
connect(
  readable,
  drop(3),
  writable
); // count == 3

Returns ReadableWritable

filter

This function takes a predicate function as argument and returns a transform stream that only emits values that satisfy the predicate.

Parameters

Examples

let readable, writable;

// Create test streams
readable = createTestReadable( [1,2,3,4,5,6] );
writable = createTestWritable( c => assert( c > 3 ) );

// Connect the streams
connect(
  readable,
  filter( a => a > 3 ),
  writable
);

Returns ReadableWritable

This function returns a transform stream that takes the first value from the input stream and enqueues it on the output stream.

Examples

let readable, writable, el;

// Create test streams
readable = createTestReadable( [1,2,3,4,5,6] );
writable = createTestWritable( e => { el = e; });

// Connect the streams
connect(
  readable,
  head(),
  writable
); // el == 1

Returns ReadableWritable

intersperse

This function takes any value a and returns a transform stream that intersperses the values from the input stream with the a.

Parameters

Examples

let readable, writable,
  res = [];

// Create test streams
readable = createTestReadable( [1,2,3] );
writable = createTestWritable( c => res.push(c) );

// Connect the streams
connect(
  readable,
  intersperse(0),
  writable
); // res == [1,0,2,0,3]

Returns ReadableWritable

last

This function returns a transform stream that takes the last value from the input stream and enqueues it on the output stream.

Examples

let readable, writable, el;

// Create test streams
readable = createTestReadable( [1,2,3,4,5,6] );
writable = createTestWritable( e => { el = e; });

// Connect the streams
connect(
  readable,
  last(),
  writable
); // el == 6

Returns ReadableWritable

pick

This function takes any number of strings as arguments and returns a transform stream that extracts the passed property names from incoming values.

Parameters

Examples

let readable, writable,
  o = {
    'a': 1,
    'b': 2
  };

// Create test streams
readable = createTestReadable( [o, o, o] );
writable = createTestWritable( c => assert( c.a ) && assert( !c.b ) );

// Connect the streams
connect(
  readable,
  pick('a'),
  writable
);

Returns ReadableWritable

pluck

This function takes a string as argument and returns a transform stream that extracts the passed property from incoming values.

Parameters

Examples

let readable, writable,
  o = {
    'a': 1,
    'b': 2
  };

// Create test streams
readable = createTestReadable( [o, o, o] );
writable = createTestWritable( c => assert( c == 1 ) );

// Connect the streams
connect(
  readable,
  pluck('a'),
  writable
);

Returns ReadableWritable

repeat

This function takes a value as argument and returns a readable stream that repeatedly emits that value.

Parameters

Examples

let readable, writable, val=1, len=6, sum=0;

// Create test streams
readable = repeat(val);
writable = createTestWritable( c => { sum+=c });

// Connect the streams
connect(
  readable,
  take( len ),
  writable
); // sum == (val * len)

Returns ReadableWritable

scan

This function takes a reducer function and an optional init value as arguments and returns a transform stream that applies the function to the incoming values and enqueues the accumulation of the results.

If an init value is not passed, the first incoming value is treated as one.

Parameters

Examples

let readable, writable, res;

// Create test streams
readable = createTestReadable( [1,2,3] );
writable = createTestWritable( c => {
  // Check last element is number
  let n = c[c.length-1];
  assert( n === +n );

  res = c;
});

// Connect the streams
connect(
  readable,
  scan( add, 0 ),
  writable
); // res[res.length-1], [1,2,3].reduce( add )

Returns ReadableWritable

slice

This function takes an int m and an int n and returns a transform stream that drops the first m values and takes the next (m-n) values from the input stream.

Parameters

Examples

let readable, writable,
  count = 0;

// Create test streams
readable = createTestReadable( [1,2,3,4,5,6] );
writable = createTestWritable( () => count++ );

// Connect the streams
connect(
  readable,
  slice(2,5),
  writable
); // count == 3

Returns ReadableWritable

take

This function takes an int n and returns a transform stream that takes the first n values from the input stream.

Parameters

Examples

let readable, writable,
  count = 0;

// Create test streams
readable = createTestReadable( [1,2,3,4,5,6] );
writable = createTestWritable( () => count++ );

// Connect the streams
connect(
  readable,
  take(3),
  writable
); // count == 3

Returns ReadableWritable

tap

This function takes a function as rgument and returns a transform stream that applies the function to the incoming values before re-emitting them.

Parameters

Examples

let readable, writable;

// Create test streams
readable = createTestReadable( [1,2,3] );
writable = createTestWritable( c => assert( !Number.isNaN( c*1 ) ) );

// Connect the streams
connect(
  readable,
  tap( console.log.bind(console) ),
  writable
);

Returns ReadableWritable

throttle

This function takes an int n and returns a transform stream that throttles the incoming values by n ms, only producing values every n ms and dropping the rest.

Parameters

Returns TransformStream

uniq

This function returns a transform stream that keeps only unique values from the input stream and enqueues it on the output stream.

Examples

let readable, writable,
  res = [];

// Create test streams
readable = createTestReadable( [1,1,2,2,3,3] );
writable = createTestWritable( c => res.push(c) );

// Connect the streams
connect(
  readable,
  uniq(),
  writable
); // res == [1,2,3]

Returns ReadableWritable