Skip to content

idris-maps/datastream

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

12 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

datastream

Building blocks to read and transform data streams

Sources

fromFile

Opens a file and creates a stream of lines

type fromFile = (
  path: string,
  funcs?: PipeFunction[],
) => Promise<{ iterator: AsyncIterableIterator<any>; rid: number }>;

fromStdin

Reads a stream from stdin

type fromStdin = (
  funcs: PipeFunction[] = [],
) => { iterator: AsyncIterableIterator<string>, rid: number }

fromNdjsonFile

Opens a ndjson (new line delimited JSON) file and creates a stream of JSON objects

type fromNdjsonFile = (
  path: string,
  funcs?: PipeFunction[],
) => Promise<{ iterator: AsyncIterableIterator<any>; rid: number }>;

fromNdjsonStdin

Reads and parses a stream of ndjson from stdin

type fromNdjsonStdin = (
  funcs?: PipeFunction[],
) => { iterator: AsyncIterableIterator<any> };

fromDsvFile

Opens a dsv (delimiter separated values) file and creates a stream of JSON objects

The first line is expected to be the column labels

type fromDsvFile = (
  path: string,
  config?: {
    delimiter?: string; // default "," (csv)
    numeric?: string[]; // list of numeric columns
    bool?: string[]; // list of boolean columns
  },
  funcs?: PipeFunction[],
) => Promise<{ iterator: AsyncIterableIterator<any>; rid: number }>;

fromDsvStdin

Reads a stream of dsv from stdin and parses it as JSON objects

type fromDsvStdin = (
  config?: {
    delimiter?: string; // default "," (csv)
    numeric?: string[]; // list of numeric columns
    bool?: string[]; // list of boolean columns
  },
  funcs?: PipeFunction[],
) => { iterator: AsyncIterableIterator<any> };

Transforms

Functions to modify the stream of data

All transforms return a PipeFunction

type PipeFunction = (
  d: AsyncIterableIterator<any>,
) => AsyncIterableIterator<any>;

map

type map = <A = any, B = any>(func: (d: A, i: number) => B) => PipeFunction;

filter

type filter = <T = any>(func: (d: T, i: number) => boolean) => PipeFunction;

offset

type offset = (n: number) => PipeFunction;

limit

type limit = (n: number) => PipeFunction;

Output

toArray

type toArray = <T = any>({ iterator: AsyncIterableIterator<T>, rid?: number }) => Promise<T[]>;

find

type find = <T>(
  func: (d: T) => boolean,
) => ({ iterator: AsyncIterableIterator<T>, rid?: number }) =>
  Promise<T | undefined>;

reduce

type reduce = <A = any, B = any>(
  func: (r: B, d: A, i: number) => B,
  start: B,
) => ({
  iterator: AsyncIterableIterator<A>,
  rid?: number,
}) => Promise<B>;

toNdjsonStdout

type toNdjsonStdout = <T = any>({
  iterator: AsyncIterableIterator<T>,
  rid?: number,
}) => Promise<void>;

toDsvStdout

type toDsvStdout = <T = any>(
  { iterator: AsyncIterableIterator<T>, rid?: number },
  delimiter?: string,
) => Promise<void>;

toNdjsonFile

type toNdjsonFile = <T = any>(
  { iterator: AsyncIterableIterator<T>, rid?: number },
  path: string,
) => Promise<void>;

toDsvFile

type toDsvFile = <T = any>(
  { iterator: AsyncIterableIterator<T>, rid?: number },
  path: string,
  delimiter?: string,
) => Promise<void>;