mirror of
https://github.com/KevinMidboe/zoff.git
synced 2025-10-29 18:00:23 +00:00
837 lines
25 KiB
JavaScript
Executable File
837 lines
25 KiB
JavaScript
Executable File
"use strict";
|
|
|
|
var inherits = require('util').inherits
|
|
, f = require('util').format
|
|
, toError = require('./utils').toError
|
|
, getSingleProperty = require('./utils').getSingleProperty
|
|
, formattedOrderClause = require('./utils').formattedOrderClause
|
|
, handleCallback = require('./utils').handleCallback
|
|
, Logger = require('mongodb-core').Logger
|
|
, EventEmitter = require('events').EventEmitter
|
|
, ReadPreference = require('./read_preference')
|
|
, MongoError = require('mongodb-core').MongoError
|
|
, Readable = require('stream').Readable || require('readable-stream').Readable
|
|
, CoreCursor = require('mongodb-core').Cursor
|
|
, Query = require('mongodb-core').Query
|
|
, CoreReadPreference = require('mongodb-core').ReadPreference;
|
|
|
|
/**
|
|
* @fileOverview The **Cursor** class is an internal class that embodies a cursor on MongoDB
|
|
* allowing for iteration over the results returned from the underlying query. It supports
|
|
* one by one document iteration, conversion to an array or can be iterated as a Node 0.10.X
|
|
* or higher stream
|
|
*
|
|
* **CURSORS Cannot directly be instantiated**
|
|
* @example
|
|
* var MongoClient = require('mongodb').MongoClient,
|
|
* test = require('assert');
|
|
* // Connection url
|
|
* var url = 'mongodb://localhost:27017/test';
|
|
* // Connect using MongoClient
|
|
* MongoClient.connect(url, function(err, db) {
|
|
* // Create a collection we want to drop later
|
|
* var col = db.collection('createIndexExample1');
|
|
* // Insert a bunch of documents
|
|
* col.insert([{a:1, b:1}
|
|
* , {a:2, b:2}, {a:3, b:3}
|
|
* , {a:4, b:4}], {w:1}, function(err, result) {
|
|
* test.equal(null, err);
|
|
*
|
|
* // Show that duplicate records got dropped
|
|
* col.find({}).toArray(function(err, items) {
|
|
* test.equal(null, err);
|
|
* test.equal(4, items.length);
|
|
* db.close();
|
|
* });
|
|
* });
|
|
* });
|
|
*/
|
|
|
|
/**
|
|
* Namespace provided by the mongodb-core and node.js
|
|
* @external CoreCursor
|
|
* @external Readable
|
|
*/
|
|
|
|
// Flags allowed for cursor
|
|
var flags = ['tailable', 'oplogReplay', 'noCursorTimeout', 'awaitData', 'exhaust', 'partial'];
|
|
|
|
/**
|
|
* Creates a new Cursor instance (INTERNAL TYPE, do not instantiate directly)
|
|
* @class
|
|
* @extends external:CoreCursor
|
|
* @extends external:Readable
|
|
* @property {string} sortValue Cursor query sort setting.
|
|
* @property {boolean} timeout Is Cursor able to time out.
|
|
* @property {ReadPreference} readPreference Get cursor ReadPreference.
|
|
* @fires Cursor#data
|
|
* @fires Cursor#end
|
|
* @fires Cursor#close
|
|
* @fires Cursor#readable
|
|
* @return {Cursor} a Cursor instance.
|
|
* @example
|
|
* Some example
|
|
*/
|
|
var Cursor = function(bson, ns, cmd, options, topology, topologyOptions) {
|
|
CoreCursor.apply(this, Array.prototype.slice.call(arguments, 0));
|
|
var self = this;
|
|
var state = Cursor.INIT;
|
|
var streamOptions = {};
|
|
|
|
// Tailable cursor options
|
|
var numberOfRetries = options.numberOfRetries || 5;
|
|
var tailableRetryInterval = options.tailableRetryInterval || 500;
|
|
var currentNumberOfRetries = numberOfRetries;
|
|
// MaxTimeMS
|
|
var maxTimeMS = null;
|
|
|
|
// Set up
|
|
Readable.call(this, {objectMode: true});
|
|
|
|
// Internal cursor state
|
|
this.s = {
|
|
// MaxTimeMS
|
|
maxTimeMS: null
|
|
// Tailable cursor options
|
|
, numberOfRetries: numberOfRetries
|
|
, tailableRetryInterval: tailableRetryInterval
|
|
, currentNumberOfRetries: currentNumberOfRetries
|
|
// State
|
|
, state: state
|
|
// Stream options
|
|
, streamOptions: streamOptions
|
|
// BSON
|
|
, bson: bson
|
|
// Namespace
|
|
, ns: ns
|
|
// Command
|
|
, cmd: cmd
|
|
// Options
|
|
, options: options
|
|
// Topology
|
|
, topology: topology
|
|
// Topology options
|
|
, topologyOptions: topologyOptions
|
|
}
|
|
|
|
// Legacy fields
|
|
this.timeout = self.s.options.noCursorTimeout == true;
|
|
this.sortValue = self.s.cmd.sort;
|
|
this.readPreference = self.s.options.readPreference;
|
|
}
|
|
|
|
/**
|
|
* Cursor stream data event, fired for each document in the cursor.
|
|
*
|
|
* @event Cursor#data
|
|
* @type {object}
|
|
*/
|
|
|
|
/**
|
|
* Cursor stream end event
|
|
*
|
|
* @event Cursor#end
|
|
* @type {null}
|
|
*/
|
|
|
|
/**
|
|
* Cursor stream close event
|
|
*
|
|
* @event Cursor#close
|
|
* @type {null}
|
|
*/
|
|
|
|
/**
|
|
* Cursor stream readable event
|
|
*
|
|
* @event Cursor#readable
|
|
* @type {null}
|
|
*/
|
|
|
|
// Inherit from Readable
|
|
inherits(Cursor, Readable);
|
|
|
|
for(var name in CoreCursor.prototype) {
|
|
Cursor.prototype[name] = CoreCursor.prototype[name];
|
|
}
|
|
|
|
/**
|
|
* Set the cursor query
|
|
* @method
|
|
* @param {object} filter The filter object used for the cursor.
|
|
* @return {Cursor}
|
|
*/
|
|
Cursor.prototype.filter = function(filter) {
|
|
if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw new MongoError("Cursor is closed");
|
|
this.s.cmd.query = filter;
|
|
return this;
|
|
}
|
|
|
|
/**
|
|
* Add a cursor flag to the cursor
|
|
* @method
|
|
* @param {string} flag The flag to set, must be one of following ['tailable', 'oplogReplay', 'noCursorTimeout', 'awaitData', 'exhaust', 'partial'].
|
|
* @param {boolean} value The flag boolean value.
|
|
* @throws {MongoError}
|
|
* @return {Cursor}
|
|
*/
|
|
Cursor.prototype.addCursorFlag = function(flag, value) {
|
|
if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw new MongoError("Cursor is closed");
|
|
if(flags.indexOf(flag) == -1) throw new MongoError(f("flag % not a supported flag %s", flag, flags));
|
|
if(typeof value != 'boolean') throw new MongoError(f("flag % must be a boolean value", flag));
|
|
this.s.cmd[flag] = value;
|
|
return this;
|
|
}
|
|
|
|
/**
|
|
* Add a query modifier to the cursor query
|
|
* @method
|
|
* @param {string} name The query modifier (must start with $, such as $orderby etc)
|
|
* @param {boolean} value The flag boolean value.
|
|
* @throws {MongoError}
|
|
* @return {Cursor}
|
|
*/
|
|
Cursor.prototype.addQueryModifier = function(name, value) {
|
|
if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw new MongoError("Cursor is closed");
|
|
if(name[0] != '$') throw new MongoError(f("%s is not a valid query modifier"));
|
|
// Strip of the $
|
|
var field = name.substr(1);
|
|
// Set on the command
|
|
this.s.cmd[field] = value;
|
|
// Deal with the special case for sort
|
|
if(field == 'orderby') this.s.cmd.sort = this.s.cmd[field];
|
|
return this;
|
|
}
|
|
|
|
/**
|
|
* Add a comment to the cursor query allowing for tracking the comment in the log.
|
|
* @method
|
|
* @param {string} value The comment attached to this query.
|
|
* @throws {MongoError}
|
|
* @return {Cursor}
|
|
*/
|
|
Cursor.prototype.comment = function(value) {
|
|
if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw new MongoError("Cursor is closed");
|
|
this.s.cmd.comment = value;
|
|
return this;
|
|
}
|
|
|
|
/**
|
|
* Set a maxTimeMS on the cursor query, allowing for hard timeout limits on queries (Only supported on MongoDB 2.6 or higher)
|
|
* @method
|
|
* @param {number} value Number of milliseconds to wait before aborting the query.
|
|
* @throws {MongoError}
|
|
* @return {Cursor}
|
|
*/
|
|
Cursor.prototype.maxTimeMS = function(value) {
|
|
if(typeof value != 'number') throw new MongoError("maxTimeMS must be a number");
|
|
if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw new MongoError("Cursor is closed");
|
|
this.s.maxTimeMS = value;
|
|
this.s.cmd.maxTimeMS = value;
|
|
return this;
|
|
}
|
|
|
|
Cursor.prototype.maxTimeMs = Cursor.prototype.maxTimeMS;
|
|
|
|
/**
|
|
* Sets a field projection for the query.
|
|
* @method
|
|
* @param {object} value The field projection object.
|
|
* @throws {MongoError}
|
|
* @return {Cursor}
|
|
*/
|
|
Cursor.prototype.project = function(value) {
|
|
if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw new MongoError("Cursor is closed");
|
|
this.s.cmd.fields = value;
|
|
return this;
|
|
}
|
|
|
|
/**
|
|
* Sets the sort order of the cursor query.
|
|
* @method
|
|
* @param {(string|array|object)} keyOrList The key or keys set for the sort.
|
|
* @param {number} [direction] The direction of the sorting (1 or -1).
|
|
* @throws {MongoError}
|
|
* @return {Cursor}
|
|
*/
|
|
Cursor.prototype.sort = function(keyOrList, direction) {
|
|
if(this.s.options.tailable) throw new MongoError("Tailable cursor doesn't support sorting");
|
|
if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw new MongoError("Cursor is closed");
|
|
var order = keyOrList;
|
|
|
|
if(direction != null) {
|
|
order = [[keyOrList, direction]];
|
|
}
|
|
|
|
this.s.cmd.sort = order;
|
|
this.sortValue = order;
|
|
return this;
|
|
}
|
|
|
|
/**
|
|
* Set the batch size for the cursor.
|
|
* @method
|
|
* @param {number} value The batchSize for the cursor.
|
|
* @throws {MongoError}
|
|
* @return {Cursor}
|
|
*/
|
|
Cursor.prototype.batchSize = function(value) {
|
|
if(this.s.options.tailable) throw new MongoError("Tailable cursor doesn't support limit");
|
|
if(this.s.state == Cursor.CLOSED || this.isDead()) throw new MongoError("Cursor is closed");
|
|
if(typeof value != 'number') throw new MongoError("batchSize requires an integer");
|
|
this.s.cmd.batchSize = value;
|
|
this.setCursorBatchSize(value);
|
|
return this;
|
|
}
|
|
|
|
/**
|
|
* Set the limit for the cursor.
|
|
* @method
|
|
* @param {number} value The limit for the cursor query.
|
|
* @throws {MongoError}
|
|
* @return {Cursor}
|
|
*/
|
|
Cursor.prototype.limit = function(value) {
|
|
if(this.s.options.tailable) throw new MongoError("Tailable cursor doesn't support limit");
|
|
if(this.s.state == Cursor.OPEN || this.s.state == Cursor.CLOSED || this.isDead()) throw new MongoError("Cursor is closed");
|
|
if(typeof value != 'number') throw new MongoError("limit requires an integer");
|
|
this.s.cmd.limit = value;
|
|
// this.cursorLimit = value;
|
|
this.setCursorLimit(value);
|
|
return this;
|
|
}
|
|
|
|
/**
|
|
* Set the skip for the cursor.
|
|
* @method
|
|
* @param {number} value The skip for the cursor query.
|
|
* @throws {MongoError}
|
|
* @return {Cursor}
|
|
*/
|
|
Cursor.prototype.skip = function(value) {
|
|
if(this.s.options.tailable) throw new MongoError("Tailable cursor doesn't support skip");
|
|
if(this.s.state == Cursor.OPEN || this.s.state == Cursor.CLOSED || this.isDead()) throw new MongoError("Cursor is closed");
|
|
if(typeof value != 'number') throw new MongoError("skip requires an integer");
|
|
this.s.cmd.skip = value;
|
|
this.setCursorSkip(value);
|
|
return this;
|
|
}
|
|
|
|
/**
|
|
* The callback format for results
|
|
* @callback Cursor~resultCallback
|
|
* @param {MongoError} error An error instance representing the error during the execution.
|
|
* @param {(object|null)} result The result object if the command was executed successfully.
|
|
*/
|
|
|
|
/**
|
|
* Get the next available document from the cursor, returns null if no more documents are available.
|
|
* @function external:CoreCursor#next
|
|
* @param {Cursor~resultCallback} callback The result callback.
|
|
* @throws {MongoError}
|
|
* @return {null}
|
|
*/
|
|
|
|
/**
|
|
* Set the new batchSize of the cursor
|
|
* @function Cursor.prototype.setBatchSize
|
|
* @param {number} value The new batchSize for the cursor
|
|
* @return {null}
|
|
*/
|
|
|
|
/**
|
|
* Get the batchSize of the cursor
|
|
* @function Cursor.prototype.batchSize
|
|
* @param {number} value The current batchSize for the cursor
|
|
* @return {null}
|
|
*/
|
|
|
|
/**
|
|
* Set the new skip value of the cursor
|
|
* @function Cursor.prototype.setCursorSkip
|
|
* @param {number} value The new skip for the cursor
|
|
* @return {null}
|
|
*/
|
|
|
|
/**
|
|
* Get the skip value of the cursor
|
|
* @function Cursor.prototype.cursorSkip
|
|
* @param {number} value The current skip value for the cursor
|
|
* @return {null}
|
|
*/
|
|
|
|
/**
|
|
* Set the new limit value of the cursor
|
|
* @function Cursor.prototype.setCursorLimit
|
|
* @param {number} value The new limit for the cursor
|
|
* @return {null}
|
|
*/
|
|
|
|
/**
|
|
* Get the limit value of the cursor
|
|
* @function Cursor.prototype.cursorLimit
|
|
* @param {number} value The current limit value for the cursor
|
|
* @return {null}
|
|
*/
|
|
|
|
/**
|
|
* Clone the cursor
|
|
* @function external:CoreCursor#clone
|
|
* @return {Cursor}
|
|
*/
|
|
|
|
/**
|
|
* Resets the cursor
|
|
* @function external:CoreCursor#rewind
|
|
* @return {null}
|
|
*/
|
|
|
|
/**
|
|
* Get the next available document from the cursor, returns null if no more documents are available.
|
|
* @method
|
|
* @param {Cursor~resultCallback} callback The result callback.
|
|
* @throws {MongoError}
|
|
* @deprecated
|
|
* @return {null}
|
|
*/
|
|
Cursor.prototype.nextObject = function(callback) {
|
|
var self = this;
|
|
if(this.s.state == Cursor.CLOSED || self.isDead()) return handleCallback(callback, new MongoError("Cursor is closed"));
|
|
if(this.s.state == Cursor.INIT && this.s.cmd.sort) {
|
|
try {
|
|
this.s.cmd.sort = formattedOrderClause(this.s.cmd.sort);
|
|
} catch(err) {
|
|
return handleCallback(callback, err);
|
|
}
|
|
}
|
|
|
|
// Get the next object
|
|
self.next(function(err, doc) {
|
|
if(err && err.tailable && self.s.currentNumberOfRetries == 0) return callback(err);
|
|
if(err && err.tailable && self.s.currentNumberOfRetries > 0) {
|
|
self.s.currentNumberOfRetries = self.s.currentNumberOfRetries - 1;
|
|
return setTimeout(function() {
|
|
self.nextObject(callback);
|
|
}, self.s.tailableRetryInterval);
|
|
}
|
|
|
|
self.s.state = Cursor.OPEN;
|
|
if(err) return handleCallback(callback, err);
|
|
handleCallback(callback, null, doc);
|
|
});
|
|
}
|
|
|
|
// Trampoline emptying the number of retrieved items
|
|
// without incurring a nextTick operation
|
|
var loop = function(self, callback) {
|
|
// No more items we are done
|
|
if(self.bufferedCount() == 0) return;
|
|
// Get the next document
|
|
self.next(callback);
|
|
// Loop
|
|
return loop;
|
|
}
|
|
|
|
/**
|
|
* Iterates over all the documents for this cursor. As with **{cursor.toArray}**,
|
|
* not all of the elements will be iterated if this cursor had been previouly accessed.
|
|
* In that case, **{cursor.rewind}** can be used to reset the cursor. However, unlike
|
|
* **{cursor.toArray}**, the cursor will only hold a maximum of batch size elements
|
|
* at any given time if batch size is specified. Otherwise, the caller is responsible
|
|
* for making sure that the entire result can fit the memory.
|
|
* @method
|
|
* @deprecated
|
|
* @param {Cursor~resultCallback} callback The result callback.
|
|
* @throws {MongoError}
|
|
* @return {null}
|
|
*/
|
|
Cursor.prototype.each = function(callback) {
|
|
// Rewind cursor state
|
|
this.rewind();
|
|
// Set current cursor to INIT
|
|
this.s.state = Cursor.INIT;
|
|
// Run the query
|
|
_each(this, callback);
|
|
};
|
|
|
|
// Run the each loop
|
|
var _each = function(self, callback) {
|
|
if(!callback) throw new MongoError('callback is mandatory');
|
|
if(self.isNotified()) return;
|
|
if(self.s.state == Cursor.CLOSED || self.isDead()) {
|
|
return handleCallback(callback, new MongoError("Cursor is closed"), null);
|
|
}
|
|
|
|
if(self.s.state == Cursor.INIT) self.s.state = Cursor.OPEN;
|
|
|
|
// Define function to avoid global scope escape
|
|
var fn = null;
|
|
// Trampoline all the entries
|
|
if(self.bufferedCount() > 0) {
|
|
while(fn = loop(self, callback)) fn(self, callback);
|
|
_each(self, callback);
|
|
} else {
|
|
self.next(function(err, item) {
|
|
if(err) return handleCallback(callback, err);
|
|
if(item == null) {
|
|
self.s.state = Cursor.CLOSED;
|
|
return handleCallback(callback, null, null);
|
|
}
|
|
|
|
if(handleCallback(callback, null, item) == false) return;
|
|
_each(self, callback);
|
|
})
|
|
}
|
|
}
|
|
|
|
/**
|
|
* The callback format for the forEach iterator method
|
|
* @callback Cursor~iteratorCallback
|
|
* @param {Object} doc An emitted document for the iterator
|
|
*/
|
|
|
|
/**
|
|
* The callback error format for the forEach iterator method
|
|
* @callback Cursor~endCallback
|
|
* @param {MongoError} error An error instance representing the error during the execution.
|
|
*/
|
|
|
|
/**
|
|
* Iterates over all the documents for this cursor using the iterator, callback pattern.
|
|
* @method
|
|
* @param {Cursor~iteratorCallback} iterator The iteration callback.
|
|
* @param {Cursor~endCallback} callback The end callback.
|
|
* @throws {MongoError}
|
|
* @return {null}
|
|
*/
|
|
Cursor.prototype.forEach = function(iterator, callback) {
|
|
this.each(function(err, doc){
|
|
if(err) { callback(err); return false; }
|
|
if(doc != null) { iterator(doc); return true; }
|
|
if(doc == null && callback) {
|
|
var internalCallback = callback;
|
|
callback = null;
|
|
internalCallback(null);
|
|
return false;
|
|
}
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Set the ReadPreference for the cursor.
|
|
* @method
|
|
* @param {(string|ReadPreference)} readPreference The new read preference for the cursor.
|
|
* @throws {MongoError}
|
|
* @return {Cursor}
|
|
*/
|
|
Cursor.prototype.setReadPreference = function(r) {
|
|
if(this.s.state != Cursor.INIT) throw new MongoError('cannot change cursor readPreference after cursor has been accessed');
|
|
if(r instanceof ReadPreference) {
|
|
this.s.options.readPreference = new CoreReadPreference(r.mode, r.tags);
|
|
} else {
|
|
this.s.options.readPreference = new CoreReadPreference(r);
|
|
}
|
|
|
|
return this;
|
|
}
|
|
|
|
/**
|
|
* The callback format for results
|
|
* @callback Cursor~toArrayResultCallback
|
|
* @param {MongoError} error An error instance representing the error during the execution.
|
|
* @param {object[]} documents All the documents the satisfy the cursor.
|
|
*/
|
|
|
|
/**
|
|
* Returns an array of documents. The caller is responsible for making sure that there
|
|
* is enough memory to store the results. Note that the array only contain partial
|
|
* results when this cursor had been previouly accessed. In that case,
|
|
* cursor.rewind() can be used to reset the cursor.
|
|
* @method
|
|
* @param {Cursor~toArrayResultCallback} callback The result callback.
|
|
* @throws {MongoError}
|
|
* @return {null}
|
|
*/
|
|
Cursor.prototype.toArray = function(callback) {
|
|
var self = this;
|
|
if(!callback) throw new MongoError('callback is mandatory');
|
|
if(self.s.options.tailable) return handleCallback(callback, new MongoError("Tailable cursor cannot be converted to array"), null);
|
|
var items = [];
|
|
|
|
// Reset cursor
|
|
this.rewind();
|
|
self.s.state = Cursor.INIT;
|
|
|
|
// Fetch all the documents
|
|
var fetchDocs = function() {
|
|
self.next(function(err, doc) {
|
|
if(err) return handleCallback(callback, err);
|
|
if(doc == null) {
|
|
self.s.state = Cursor.CLOSED;
|
|
return handleCallback(callback, null, items);
|
|
}
|
|
|
|
// Add doc to items
|
|
items.push(doc)
|
|
// Get all buffered objects
|
|
if(self.bufferedCount() > 0) {
|
|
var a = self.readBufferedDocuments(self.bufferedCount())
|
|
items = items.concat(a);
|
|
}
|
|
|
|
// Attempt a fetch
|
|
fetchDocs();
|
|
})
|
|
}
|
|
|
|
fetchDocs();
|
|
}
|
|
|
|
/**
|
|
* The callback format for results
|
|
* @callback Cursor~countResultCallback
|
|
* @param {MongoError} error An error instance representing the error during the execution.
|
|
* @param {number} count The count of documents.
|
|
*/
|
|
|
|
/**
|
|
* Get the count of documents for this cursor
|
|
* @method
|
|
* @param {boolean} applySkipLimit Should the count command apply limit and skip settings on the cursor or in the passed in options.
|
|
* @param {object} [options=null] Optional settings.
|
|
* @param {number} [options.skip=null] The number of documents to skip.
|
|
* @param {number} [options.limit=null] The maximum amounts to count before aborting.
|
|
* @param {number} [options.maxTimeMS=null] Number of miliseconds to wait before aborting the query.
|
|
* @param {string} [options.hint=null] An index name hint for the query.
|
|
* @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
|
|
* @param {Cursor~countResultCallback} callback The result callback.
|
|
* @return {null}
|
|
*/
|
|
Cursor.prototype.count = function(applySkipLimit, opts, callback) {
|
|
var self = this;
|
|
if(typeof opts == 'function') callback = opts, opts = {};
|
|
opts = opts || {};
|
|
if(self.s.cmd.query == null) callback(new MongoError("count can only be used with find command"));
|
|
if(typeof applySkipLimit == 'function') {
|
|
callback = applySkipLimit;
|
|
applySkipLimit = true;
|
|
}
|
|
|
|
var opts = {};
|
|
if(applySkipLimit) {
|
|
if(typeof this.cursorSkip() == 'number') opts.skip = this.cursorSkip();
|
|
if(typeof this.cursorLimit() == 'number') opts.limit = this.cursorLimit();
|
|
}
|
|
|
|
// Command
|
|
|
|
var delimiter = self.s.ns.indexOf('.');
|
|
|
|
var command = {
|
|
'count': self.s.ns.substr(delimiter+1), 'query': self.s.cmd.query
|
|
}
|
|
|
|
// If maxTimeMS set
|
|
if(typeof maxTimeMS == 'number') {
|
|
command.maxTimeMS = self.s.maxTimeMS;
|
|
}
|
|
|
|
// Get a server
|
|
var server = self.s.topology.getServer(opts);
|
|
// Get a connection
|
|
var connection = self.s.topology.getConnection(opts);
|
|
// Get the callbacks
|
|
var callbacks = server.getCallbacks();
|
|
|
|
// Merge in any options
|
|
if(opts.skip) command.skip = opts.skip;
|
|
if(opts.limit) command.limit = opts.limit;
|
|
if(self.s.options.hint) command.hint = self.s.options.hint;
|
|
|
|
// Build Query object
|
|
var query = new Query(self.s.bson, f("%s.$cmd", self.s.ns.substr(0, delimiter)), command, {
|
|
numberToSkip: 0, numberToReturn: -1
|
|
, checkKeys: false
|
|
});
|
|
|
|
// Set up callback
|
|
callbacks.register(query.requestId, function(err, result) {
|
|
if(err) return handleCallback(callback, err);
|
|
if(result.documents.length == 1
|
|
&& (result.documents[0].errmsg
|
|
|| result.documents[0].err
|
|
|| result.documents[0]['$err'])) return callback(MongoError.create(result.documents[0]));
|
|
handleCallback(callback, null, result.documents[0].n);
|
|
});
|
|
|
|
// Write the initial command out
|
|
connection.write(query.toBin());
|
|
};
|
|
|
|
/**
|
|
* Close the cursor, sending a KillCursor command and emitting close.
|
|
* @method
|
|
* @param {Cursor~resultCallback} [callback] The result callback.
|
|
* @return {null}
|
|
*/
|
|
Cursor.prototype.close = function(callback) {
|
|
this.s.state = Cursor.CLOSED;
|
|
// Kill the cursor
|
|
this.kill();
|
|
// Emit the close event for the cursor
|
|
this.emit('close');
|
|
// Callback if provided
|
|
if(callback) return handleCallback(callback, null, this);
|
|
}
|
|
|
|
/**
|
|
* Is the cursor closed
|
|
* @method
|
|
* @return {boolean}
|
|
*/
|
|
Cursor.prototype.isClosed = function() {
|
|
return this.isDead();
|
|
}
|
|
|
|
Cursor.prototype.destroy = function(err) {
|
|
this.pause();
|
|
this.close();
|
|
if(err) this.emit('error', err);
|
|
}
|
|
|
|
/**
|
|
* Return a modified Readable stream including a possible transform method.
|
|
* @method
|
|
* @param {object} [options=null] Optional settings.
|
|
* @param {function} [options.transform=null] A transformation method applied to each document emitted by the stream.
|
|
* @return {Cursor}
|
|
*/
|
|
Cursor.prototype.stream = function(options) {
|
|
this.s.streamOptions = options || {};
|
|
return this;
|
|
}
|
|
|
|
/**
|
|
* Execute the explain for the cursor
|
|
* @method
|
|
* @param {Cursor~resultCallback} [callback] The result callback.
|
|
* @return {null}
|
|
*/
|
|
Cursor.prototype.explain = function(callback) {
|
|
this.s.cmd.explain = true;
|
|
this.next(callback);
|
|
}
|
|
|
|
Cursor.prototype._read = function(n) {
|
|
var self = this;
|
|
if(self.s.state == Cursor.CLOSED || self.isDead()) {
|
|
return self.push(null);
|
|
}
|
|
|
|
// Get the next item
|
|
self.nextObject(function(err, result) {
|
|
if(err) {
|
|
if(!self.isDead()) self.close();
|
|
if(self.listeners('error') && self.listeners('error').length > 0) {
|
|
self.emit('error', err);
|
|
}
|
|
|
|
// Emit end event
|
|
return self.emit('end');
|
|
}
|
|
|
|
// If we provided a transformation method
|
|
if(typeof self.s.streamOptions.transform == 'function' && result != null) {
|
|
return self.push(self.s.streamOptions.transform(result));
|
|
}
|
|
|
|
// Return the result
|
|
self.push(result);
|
|
});
|
|
}
|
|
|
|
Object.defineProperty(Cursor.prototype, 'namespace', {
|
|
enumerable: true,
|
|
get: function() {
|
|
if (!this || !this.s) {
|
|
return null;
|
|
}
|
|
|
|
// TODO: refactor this logic into core
|
|
var ns = this.s.ns || '';
|
|
var firstDot = ns.indexOf('.');
|
|
if (firstDot < 0) {
|
|
return {
|
|
database: this.s.ns,
|
|
collection: ''
|
|
};
|
|
}
|
|
return {
|
|
database: ns.substr(0, firstDot),
|
|
collection: ns.substr(firstDot + 1)
|
|
};
|
|
}
|
|
});
|
|
|
|
/**
|
|
* The read() method pulls some data out of the internal buffer and returns it. If there is no data available, then it will return null.
|
|
* @function external:Readable#read
|
|
* @param {number} size Optional argument to specify how much data to read.
|
|
* @return {(String | Buffer | null)}
|
|
*/
|
|
|
|
/**
|
|
* Call this function to cause the stream to return strings of the specified encoding instead of Buffer objects.
|
|
* @function external:Readable#setEncoding
|
|
* @param {string} encoding The encoding to use.
|
|
* @return {null}
|
|
*/
|
|
|
|
/**
|
|
* This method will cause the readable stream to resume emitting data events.
|
|
* @function external:Readable#resume
|
|
* @return {null}
|
|
*/
|
|
|
|
/**
|
|
* This method will cause a stream in flowing-mode to stop emitting data events. Any data that becomes available will remain in the internal buffer.
|
|
* @function external:Readable#pause
|
|
* @return {null}
|
|
*/
|
|
|
|
/**
|
|
* This method pulls all the data out of a readable stream, and writes it to the supplied destination, automatically managing the flow so that the destination is not overwhelmed by a fast readable stream.
|
|
* @function external:Readable#pipe
|
|
* @param {Writable} destination The destination for writing data
|
|
* @param {object} [options] Pipe options
|
|
* @return {null}
|
|
*/
|
|
|
|
/**
|
|
* This method will remove the hooks set up for a previous pipe() call.
|
|
* @function external:Readable#unpipe
|
|
* @param {Writable} [destination] The destination for writing data
|
|
* @return {null}
|
|
*/
|
|
|
|
/**
|
|
* This is useful in certain cases where a stream is being consumed by a parser, which needs to "un-consume" some data that it has optimistically pulled out of the source, so that the stream can be passed on to some other party.
|
|
* @function external:Readable#unshift
|
|
* @param {(Buffer|string)} chunk Chunk of data to unshift onto the read queue.
|
|
* @return {null}
|
|
*/
|
|
|
|
/**
|
|
* Versions of Node prior to v0.10 had streams that did not implement the entire Streams API as it is today. (See "Compatibility" below for more information.)
|
|
* @function external:Readable#wrap
|
|
* @param {Stream} stream An "old style" readable stream.
|
|
* @return {null}
|
|
*/
|
|
|
|
Cursor.INIT = 0;
|
|
Cursor.OPEN = 1;
|
|
Cursor.CLOSED = 2;
|
|
Cursor.GET_MORE = 3;
|
|
|
|
module.exports = Cursor;
|