mongodb-pipeline-builder is a pipeline builder for the db.collection.aggregate, the db.aggregate and the mongoose Model.aggregate methods.
- Simplify pipelines by making them more readable
- Pipelines are easier to edit.
- Pipeline stages appear in an array.
- Sequential stages for documents
All stages except the Out, Merge, GeoNear, ChangeStream, ChangeStreamSplitLargeEvent and Paging stages can appear multiple times in a pipeline.
npm i -S mongodb-pipeline-builder
Breaking changes between v3 and v4
-
Helpers
Replacing the
Payload
suffix withHelper
suffixPrefixed with the name of the pipeline stage where they should be used
-
Operators
Prefixed with the $ symbol
Rename
MapOperator
to$Map
-
GetResult
To be used only if no Paging stage is set
Adding new GetElement method to the response object
Removing GetDocs method arguments
-
GetPagingResult
To be used exclusively with Paging stage.
Welcome generics! GetResult<Type>
and GetPagingResult<Type>
now offer the ability to type responses
const PipelineBuilder = require("mongodb-pipeline-builder").PipelineBuilder;
const { LookupEqualityHelper, ProjectOnlyHelper, Field } = require('mongodb-pipeline-builder/helpers');
const { $LessThanEqual, $ArrayElemAt, $Equal, $Expression } = require('mongodb-pipeline-builder/operators');
import { PipelineBuilder } from 'mongodb-pipeline-builder';
import { LookupEqualityHelper, ProjectOnlyHelper, Field } from 'mongodb-pipeline-builder/helpers';
import { $LessThanEqual, $ArrayElemAt, $Equal, $Expression } from 'mongodb-pipeline-builder/operators';
const myNewPipeline = new PipelineBuilder( 'myPagination', { debug: true } )
.Match( $Expression( $LessThanEqual( '$id', 20 ) ) )
.Project( ProjectOnlyHelper( 'name', 'weight' ) )
.Paging( 5, 3 ) // 5 per page, page 3
.build();
is equivalent to
const myNewPipeline = [ {
$facet: {
docs: [
{ $match: { $expr: { $lte: ["$id", 20] } } },
{ $project: { _id: 0, name: 1, weight: 1 } },
{ $skip: 10 },
{ $limit: 5 }
],
count: [
{ $match: { $expr: { $lte: ["$id", 20] } } },
{ $count: "totalElements" }
]
}
} ];
const myNewPipeline = new PipelineBuilder( 'user-skills' )
.Match( $Expression( $Equal( '$id', 123456 ) ) )
.Lookup( LookupEqualityHelper( 'profiles', 'profile', 'id', 'profileId' ) )
.Project( ProjectOnlyHelper( 'firstname', 'lastname', 'email' ) )
.AddFields(
Field( 'skills', $ArrayElemAt( '$profile.skills', 0 ) ),
Field( 'availability', $ArrayElemAt( '$profile.availability', 0 ) )
)
.Unset( 'profile' )
.build();
is equivalent to
const myNewPipeline = [
{ $match: { $expr: { $eq: ["$id", 123456] } } },
{ $lookup: { from: "profiles", as: "profile", localField: "id", foreignField: "profileId" } },
{ $project: { _id: 0, firstname: 1, lastname: 1, email: 1 } },
{ $addFields: {
skills: { $arrayElemAt: ["$profile.skills", 0] },
availability: { $arrayElemAt: ["$profile.availability", 0] }
} },
{ $unset: "profile" }
];
GetResult<T = any>(): Promise<GetResultResponse<T>>
GetResult<T>()
is an asynchronous method that provides a very easy way to use aggregation responses.
This method returns a GetResultResponse
object that contains 3 methods:
GetDocs(): T[]
to get all the documents that match the request.GetElement(index: number | 'last'): T
to get a particular document by its index.GetCount(): number
to get the total number of documents found.
const result = await GetResult<DocType>( target, pipeline );
result.GetDocs(); // () => DocType[]
result.GetElement(); // () => DocType
result.GetCount(); // () => number
$Or
GetResult<DocType>( target, pipeline ).then( result => {
result.GetDocs(); // () => DocType[]
result.GetElement(); // () => DocType
result.GetCount(); // () => number
} );
- A particular document can be retrieved by specifying its index.
- To get the last document, simply provide the string
'last'
. - If the specified index is greater than the index of the last document,
GetElement()
will return undefined.
// GetDocs() -> [document1, document2, document3, ..., document51]
result.GetElement(2); // will return document to index 2, document3
result.GetElement('last'); // will return the last document, document51
result.GetElement(99); // will return undefined
GetPagingResult<T = any>(): Promise<GetPagingResultResponse<T>>
GetPagingResult<T>()
is an asynchronous method that provides a very easy way to use aggregation responses when Paging stage is used.
This method returns a GetPagingResultResponse
object that contains three methods:
GetDocs()
to get the documents found.GetCount()
to get the total number of documents found.GetTotalPageNumber()
to get the total number of pages.
const result = await GetPagingResult<DocType>(target, pipeline);
result.GetDocs(); // () => DocType[]
result.GetCount(); // () => number
result.GetTotalPageNumber(); // () => number
$Or
GetPagingResult<DocType>(target, pipeline).then( result => {
result.GetDocs(); // () => DocType[]
result.GetCount(); // () => number
result.GetTotalPageNumber(); // () => number
} );
=> Try the lib on NPM RunKit with the require method <=
// builder = new PipelineBuilder('example');
The Paging stage automatically adds 3 native stages used to paginate documents ($skip, $limit and $count).
builder.Paging(5, 2).build();
// pipeline
[
{
'$facet': {
docs: [ { '$skip': 5 }, { '$limit': 5 } ],
count: [ { '$count': 'totalElements' } ]
}
}
]
AddFields(...values: AddFieldsStage[])
builder.AddFields(Field('foo', 'value1'), Field('bar', 'value2')).build();
// pipeline
[ { '$addFields': { foo: 'value1', bar: 'value2' } } ]
Bucket(value: BucketStage)
builder.Bucket(BucketGroupByHelper('$age', [6, 13, 18])).build();
// pipeline
[
{
'$bucket': {
groupBy: '$age',
boundaries: [ 6, 13, 18 ],
output: { count: { '$sum': 1 } }
}
}
]
BucketAuto(value: BucketAutoStage)
builder.BucketAuto(BucketAutoGroupByHelper('$age', 5)).build();
// pipeline
[
{
'$bucketAuto': { groupBy: '$age', buckets: 5, output: { count: { '$sum': 1 } } }
}
]
ChangeStream(value: ChangeStreamStage)
builder.ChangeStream({ allChangesForCluster: true, fullDocument: 'required' }).build();
// pipeline
[
{
'$changeStream': { allChangesForCluster: true, fullDocument: 'required' }
}
]
ChangeStreamSplitLargeEvent(value: ChangeStreamSplitLargeEventStage)
builder.ChangeStreamSplitLargeEvent({}).build();
// pipeline
[ { '$changeStreamSplitLargeEvent': {} } ]
CollStats(value: CollStatsStage)
builder.CollStats({ latencyStats: { histograms: true } }).build();
// pipeline
[
{ '$collStats': { latencyStats: { histograms: true } } }
]
Count(value: string)
builder.Count('counter').build();
// pipeline
[ { '$count': 'counter' } ]
CurrentOp(value: CurrentOpStage)
builder.CurrentOp(CurrentOpHelper({ allUsers: true, idleConnections: true })).build();
// pipeline
[
{
'$currentOp': {
allUsers: true,
idleConnections: true,
idleCursors: false,
idleSessions: true,
localOps: false,
backtrace: false
}
}
]
Densify(value: DensifyStage)
builder.Densify({
field: "altitude",
partitionByFields: [ "variety" ],
range: { bounds: "full", step: 200 }
}).build();
// pipeline
[
{
'$densify': {
field: 'altitude',
partitionByFields: [ 'variety' ],
range: { bounds: 'full', step: 200 }
}
}
]
Documents(value: DocumentsStage)
builder.Documents([{ doc1Id: 1 }, { doc2Id: 2 }, { doc3Id: 3 }]).build();
// pipeline
[
{ '$documents': [ { doc1Id: 1 }, { doc2Id: 2 }, { doc3Id: 3 } ] }
]
Facet(...values: FacetStage[])
builder.Facet(
Field('pipeline1', [{ $match: { tag: 'first' }}]),
Field('pipeline2', [{ $match: { tag: 'second' }}]),
Field('pipeline3', [{ $match: { tag: 'third' }}]),
).build();
// pipeline
[
{
'$facet': {
pipeline1: [ { '$match': { tag: 'first' } } ],
pipeline2: [ { '$match': { tag: 'second' } } ],
pipeline3: [ { '$match': { tag: 'third' } } ]
}
}
]
Fill(value: FillStage)
builder.Fill({
output:
{
"bootsSold": { value: 0 },
"sandalsSold": { value: 0 },
"sneakersSold": { value: 0 }
}
}).build();
// pipeline
[
{
'$fill': {
output: {
bootsSold: { value: 0 },
sandalsSold: { value: 0 },
sneakersSold: { value: 0 }
}
}
}
]
GeoNear(value: GeoNearStage)
builder.GeoNear(
GeoNearHelper({ type: "Point", coordinates: [ -73.99279 , 40.719296 ] }, 'dist.calculated')
).build();
// pipeline
[
{
'$geoNear': {
near: { type: 'Point', coordinates: [ -73.99279, 40.719296 ] },
distanceField: 'dist.calculated'
}
}
]
GraphLookup(value: GraphLookupStage)
builder.GraphLookup({
from: 'employees', startWith: '$reportsTo', connectFromField: 'reportsTo', connectToField: 'name', as: 'reportingHierarchy',
}).build();
// pipeline
[
{
'$graphLookup': {
from: 'employees',
startWith: '$reportsTo',
connectFromField: 'reportsTo',
connectToField: 'name',
as: 'reportingHierarchy'
}
}
]
Group(value: GroupStage)
builder.Group({ _id: null, count: { $count: { } } }).build();
// pipeline
[
{ '$group': { _id: null, count: { '$count': {} } } }
]
IndexStats(value: IndexStatsStage)
builder.IndexStats({}).build();
// pipeline
[ { '$indexStats': {} } ]
Limit(value: number)
builder.Limit(10).build();
// pipeline
[ { '$limit': 10 } ]
ListLocalSessions(value: ListSessionsStage)
builder.ListLocalSessions({ allUsers: true }).build();
// pipeline
[ { '$listLocalSessions': { allUsers: true } } ]
ListSampledQueries(value: ListSampledQueriesStage)
builder.ListSampledQueries({ namespace: "social.post" }).build();
// pipeline
[ { '$listSampledQueries': { namespace: 'social.post' } } ]
ListSearchIndexes(value: ListSearchIndexesStage)
builder.ListSearchIndexes({ name: 'searchIndex01' }).build();
// pipeline
[ { '$listSearchIndexes': { name: 'searchIndex01' } } ]
ListSessions(value: ListSessionsStage)
builder.ListSessions({ allUsers: true }).build();
// pipeline
[ { '$listSessions': { allUsers: true } } ]
Lookup(value: LookupStage)
builder.Lookup(LookupConditionHelper('users', 'users', {
pipeline: builder2.Match(
$Expression($GreaterThanEqual('$age', '$$age_min')),
).build(),
project: ProjectOnlyHelper('name', 'age', 'city'),
sourceList: ['age_min'],
})).build();
// pipeline
[
{
'$lookup': {
from: 'users',
as: 'users',
let: { age_min: '$age_min' },
pipeline: [
{
'$match': { '$expr': { '$gte': [ '$age', '$$age_min' ] } }
},
{ '$project': { _id: 0, name: 1, age: 1, city: 1 } }
]
}
}
]
builder.Lookup(
LookupEqualityHelper('users', 'user', 'id', 'userId')
).build();
// pipeline
[
{
'$lookup': {
from: 'users',
localField: 'id',
foreignField: 'userId',
as: 'user'
}
}
]
Match(value: MatchStage)
builder.Match(Field('age', 18)).build();
// pipeline
[ { '$match': { age: 18 } } ]
builder.Match($Expression($GreaterThanEqual('$age', 18))).build();
// pipeline
[
{
'$match': { '$expr': { '$gte': [ '$age', 18 ] } }
}
]
Merge(value: MergeStage)
builder.Merge(MergeIntoHelper('newCollection')).build();
// pipeline
[
{
'$merge': {
into: 'newCollection',
on: '_id',
whenMatched: 'merge',
whenNotMatched: 'insert',
let: { new: '$$ROOT' }
}
}
]
Out(value: OutStage)
builder.Out(OutDbCollHelper('users', 'db1')).build();
// pipeline
[ { '$out': { db: 'db1', coll: 'users' } } ]
PlanCacheStats(value: PlanCacheStatsStage)
builder.PlanCacheStats({}).build();
// pipeline
[ { '$planCacheStats': {} } ]
Project(value: ProjectStage)
builder.Project(ProjectIgnoreHelper('password', 'refreshToken')).build();
// pipeline
[ { '$project': { password: 0, refreshToken: 0 } } ]
builder.Project(ProjectOnlyHelper('password', 'refreshToken')).build();
// pipeline
[ { '$project': { _id: 0, password: 1, refreshToken: 1 } } ]
Redact(value: RedactStage)
builder.Redact(
$Cond(
$GreaterThan($Size($SetIntersection('$tags', ['STLW', 'G'])), 0),
'$$DESCEND',
'$$PRUNE'
)
).build();
// pipeline
[
{
'$redact': {
'$cond': [
{ '$gt': [ { '$size': { '$setIntersection': [ '$tags', [ 'STLW', 'G' ] ] } }, 0 ] },
'$$DESCEND',
'$$PRUNE'
]
}
}
]
ReplaceRoot(value: ReplaceRootStage)
builder.ReplaceRoot({
newRoot: { full_name: { $concat : [ "$first_name", " ", "$last_name" ] } }
}).build();
// pipeline
[
{
'$replaceRoot': {
newRoot: {
full_name: { '$concat': [ '$first_name', ' ', '$last_name' ] }
}
}
}
]
ReplaceWith(value: ReplaceWithStage)
builder.ReplaceWith('$name').build();
// pipeline
[ { '$replaceWith': '$name' } ]
Sample(value: number)
builder.Sample(6).build();
// pipeline
[ { '$sample': { size: 6 } } ]
Search(value: AtlasSearchStage)
builder.Search(SearchHelper('near', {
'path': 'released',
'origin': '2011-09-01T00:00:00.000+00:00',
'pivot': 7776000000,
}, { returnStoredSource: true, scoreDetails: true })).build();
// pipeline
[
{
'$search': {
near: {
path: 'released',
origin: '2011-09-01T00:00:00.000+00:00',
pivot: 7776000000
},
returnStoredSource: true,
scoreDetails: true
}
}
]
SearchMeta(value: AtlasSearchStage)
builder.SearchMeta(SearchHelper('range', {
"path": "year",
"gte": 1998,
"lt": 1999
}, { count: { type: 'total' } })).build();
// pipeline
[
{
'$searchMeta': {
range: { path: 'year', gte: 1998, lt: 1999 },
count: { type: 'total' }
}
}
]
Set(...values: SetStage[])
builder.Set(Field('first', true), Field('second', 2)).build();
// pipeline
[ { '$set': { first: true, second: 2 } } ]
SetWindowFields(value: SetWindowFieldsStage)
builder.SetWindowFields({
partitionBy: "$state",
sortBy: { orderDate: 1 },
output: {
cumulativeQuantityForState: {
$sum: "$quantity",
window: { documents: [ "unbounded", "current" ] }
}
}
}).build();
// pipeline
[
{
'$setWindowFields': {
partitionBy: '$state',
sortBy: { orderDate: 1 },
output: {
cumulativeQuantityForState: {
'$sum': '$quantity',
window: { documents: [ 'unbounded', 'current' ] }
}
}
}
}
]
ShardedDataDistribution(value: ShardedDataDistributionStage)
builder.ShardedDataDistribution({}).build();
// pipeline
[ { '$shardedDataDistribution': {} } ]
Skip(value: number)
builder.Skip(100).build();
// pipeline
[ { '$skip': 100 } ]
Sort(...values: SortStage[])
builder.Sort(
Field('first', -1),
Field('second', 1),
Field('third', { $meta: "textScore" }),
).build();
// pipeline
[
{
'$sort': { first: -1, second: 1, third: { '$meta': 'textScore' } }
}
]
SortByCount(value: SortByCountStage)
builder.SortByCount('$employee').build();
// pipeline
[ { '$sortByCount': '$employee' } ]
UnionWith(value: UnionWithStage)
builder.UnionWith(
UnionWithCollectionHelper(
'cities',
builder2.Project(ProjectOnlyHelper('name', 'country')).build()
)
).build();
// pipeline
[
{
'$unionWith': {
coll: 'cities',
pipeline: [ { '$project': { _id: 0, name: 1, country: 1 } } ]
}
}
]
Unset(...values: UnsetStage)
builder.Unset('users', 'roles').build();
// pipeline
[ { '$unset': [ 'users', 'roles' ] } ]
Unwind(value: UnwindStage)
builder.Unwind({ path: '$sizes', preserveNullAndEmptyArrays: true }).build();
// pipeline
[ { '$unwind': { path: '$sizes', preserveNullAndEmptyArrays: true } } ]
$Absolute | $Accumulator | $Acos | $Acosh | $Add | $AddToSet | $AllElementsTrue | $And | $AnyElementTrue | $ArrayElemAt | $ArrayToObject | $Asin | $Asinh | $Atan | $Atan2 | $Atanh | $Avg | $BinarySize | $BsonSize | $Ceil | $Compare | $Concat | $ConcatArrays | $Cond | $Convert | $Cos | $Cosh | $DateFromParts | $DateFromString | $DateToParts | $DateToString | $DayOfMonth | $DayOfWeek | $DayOfYear | $DegreesToRadians | $Divide | $Equal | $Exponent | $Expression | $Filter | $First | $Floor | $FunctionOperator | $GreaterThan | $GreaterThanEqual | $Hour | $IfNull | $In | $IndexOfArray | $IndexOfBytes | $IndexOfCP | $IsArray | $IsNumber | $IsoDayOfWeek | $IsoWeek | $IsoWeekYear | $Last | $LessThan | $LessThanEqual | $Let | $Literal | $Log | $Log10 | $Ltrim | $Map | $Max | $MergeObjects | $Meta | $Millisecond | $Min | $Minute | $Mod | $Month | $Multiply | $NaturalLog | $Not | $NotEqual | $ObjectToArray | $Or | $Pow | $Push | $RadiansToDegrees | $Rand | $Range | $Reduce | $RegexFind | $RegexFindAll | $RegexMatch | $ReplaceAll | $ReplaceOne | $ReverseArray | $Round | $Rtrim | $SampleRate | $Second | $SetDifference | $SetEquals | $SetIntersection | $SetIsSubset | $SetUnion | $Sin | $Sinh | $Size | $Slice | $Split | $Sqrt | $StdDevPop | $StdDevSamp | $StrCaseCmp | $StrLenBytes | $StrLenCP | $Substr | $SubstrBytes | $SubstrCP | $Subtract | $Sum | $Switch | $Tan | $Tanh | $ToBool | $ToDate | $ToDecimal | $ToDouble | $ToInt | $ToLong | $ToLower | $ToObjectId | $ToString | $ToUpper | $Trim | $Trunc | $Type | $Week | $Year | $Zip