Added remote folder

This commit is contained in:
Kasper Rynning-Tønnesen
2015-11-29 13:30:54 +01:00
parent 97e7dfd7c4
commit 95bb0c3919
3521 changed files with 331825 additions and 0 deletions

608
server/node_modules/mongodb/lib/apm.js generated vendored Normal file
View File

@@ -0,0 +1,608 @@
var EventEmitter = require('events').EventEmitter,
inherits = require('util').inherits;
// Get prototypes
var AggregationCursor = require('./aggregation_cursor'),
CommandCursor = require('./command_cursor'),
OrderedBulkOperation = require('./bulk/ordered').OrderedBulkOperation,
UnorderedBulkOperation = require('./bulk/unordered').UnorderedBulkOperation,
GridStore = require('./gridfs/grid_store'),
Server = require('./server'),
ReplSet = require('./replset'),
Mongos = require('./mongos'),
Cursor = require('./cursor'),
Collection = require('./collection'),
Db = require('./db'),
Admin = require('./admin');
var basicOperationIdGenerator = {
operationId: 1,
next: function() {
return this.operationId++;
}
}
var basicTimestampGenerator = {
current: function() {
return new Date().getTime();
},
duration: function(start, end) {
return end - start;
}
}
var senstiveCommands = ['authenticate', 'saslStart', 'saslContinue', 'getnonce',
'createUser', 'updateUser', 'copydbgetnonce', 'copydbsaslstart', 'copydb'];
var Instrumentation = function(core, options, callback) {
options = options || {};
// Optional id generators
var operationIdGenerator = options.operationIdGenerator || basicOperationIdGenerator;
// Optional timestamp generator
var timestampGenerator = options.timestampGenerator || basicTimestampGenerator;
// Extend with event emitter functionality
EventEmitter.call(this);
// Contains all the instrumentation overloads
this.overloads = [];
// ---------------------------------------------------------
//
// Instrument prototype
//
// ---------------------------------------------------------
var instrumentPrototype = function(callback) {
var instrumentations = []
// Classes to support
var classes = [GridStore, OrderedBulkOperation, UnorderedBulkOperation,
CommandCursor, AggregationCursor, Cursor, Collection, Db];
// Add instrumentations to the available list
for(var i = 0; i < classes.length; i++) {
if(classes[i].define) {
instrumentations.push(classes[i].define.generate());
}
}
// Return the list of instrumentation points
callback(null, instrumentations);
}
// Did the user want to instrument the prototype
if(typeof callback == 'function') {
instrumentPrototype(callback);
}
// ---------------------------------------------------------
//
// Server
//
// ---------------------------------------------------------
// Reference
var self = this;
// Names of methods we need to wrap
var methods = ['command', 'insert', 'update', 'remove'];
// Prototype
var proto = core.Server.prototype;
// Core server method we are going to wrap
methods.forEach(function(x) {
var func = proto[x];
// Add to overloaded methods
self.overloads.push({proto: proto, name:x, func:func});
// The actual prototype
proto[x] = function() {
var requestId = core.Query.nextRequestId();
// Get the aruments
var args = Array.prototype.slice.call(arguments, 0);
var ns = args[0];
var commandObj = args[1];
var options = args[2] || {};
var keys = Object.keys(commandObj);
var commandName = keys[0];
var db = ns.split('.')[0];
// Do we have a legacy insert/update/remove command
if(x == 'insert' && !this.lastIsMaster().maxWireVersion) {
commandName = 'insert';
// Get the collection
var col = ns.split('.');
col.shift();
col = col.join('.');
// Re-write the command
commandObj = {
insert: col, documents: commandObj
}
if(options.writeConcern && Object.keys(options.writeConcern).length > 0) {
commandObj.writeConcern = options.writeConcern;
}
commandObj.ordered = options.ordered != undefined ? options.ordered : true;
} else if(x == 'update' && !this.lastIsMaster().maxWireVersion) {
commandName = 'update';
// Get the collection
var col = ns.split('.');
col.shift();
col = col.join('.');
// Re-write the command
commandObj = {
update: col, updates: commandObj
}
if(options.writeConcern && Object.keys(options.writeConcern).length > 0) {
commandObj.writeConcern = options.writeConcern;
}
commandObj.ordered = options.ordered != undefined ? options.ordered : true;
} else if(x == 'remove' && !this.lastIsMaster().maxWireVersion) {
commandName = 'delete';
// Get the collection
var col = ns.split('.');
col.shift();
col = col.join('.');
// Re-write the command
commandObj = {
delete: col, deletes: commandObj
}
if(options.writeConcern && Object.keys(options.writeConcern).length > 0) {
commandObj.writeConcern = options.writeConcern;
}
commandObj.ordered = options.ordered != undefined ? options.ordered : true;
} else if(x == 'insert' || x == 'update' || x == 'remove' && this.lastIsMaster().maxWireVersion >= 2) {
// Skip the insert/update/remove commands as they are executed as actual write commands in 2.6 or higher
return func.apply(this, args);
}
// Get the callback
var callback = args.pop();
// Set current callback operation id from the current context or create
// a new one
var ourOpId = callback.operationId || operationIdGenerator.next();
// Get a connection reference for this server instance
var connection = this.s.pool.get()
// Emit the start event for the command
var command = {
// Returns the command.
command: commandObj,
// Returns the database name.
databaseName: db,
// Returns the command name.
commandName: commandName,
// Returns the driver generated request id.
requestId: requestId,
// Returns the driver generated operation id.
// This is used to link events together such as bulk write operations. OPTIONAL.
operationId: ourOpId,
// Returns the connection id for the command. For languages that do not have this,
// this MUST return the driver equivalent which MUST include the server address and port.
// The name of this field is flexible to match the object that is returned from the driver.
connectionId: connection
};
// Filter out any sensitive commands
if(senstiveCommands.indexOf(commandName.toLowerCase())) {
command.commandObj = {};
command.commandObj[commandName] = true;
}
// Emit the started event
self.emit('started', command)
// Start time
var startTime = timestampGenerator.current();
// Push our handler callback
args.push(function(err, r) {
var endTime = timestampGenerator.current();
var command = {
duration: timestampGenerator.duration(startTime, endTime),
commandName: commandName,
requestId: requestId,
operationId: ourOpId,
connectionId: connection
};
// If we have an error
if(err || (r && r.result && r.result.ok == 0)) {
command.failure = err || r.result.writeErrors || r.result;
// Filter out any sensitive commands
if(senstiveCommands.indexOf(commandName.toLowerCase())) {
command.failure = {};
}
self.emit('failed', command);
} else if(commandObj && commandObj.writeConcern
&& commandObj.writeConcern.w == 0) {
// If we have write concern 0
command.reply = {ok:1};
self.emit('succeeded', command);
} else {
command.reply = r && r.result ? r.result : r;
// Filter out any sensitive commands
if(senstiveCommands.indexOf(commandName.toLowerCase()) != -1) {
command.reply = {};
}
self.emit('succeeded', command);
}
// Return to caller
callback(err, r);
});
// Apply the call
func.apply(this, args);
}
});
// ---------------------------------------------------------
//
// Bulk Operations
//
// ---------------------------------------------------------
// Inject ourselves into the Bulk methods
var methods = ['execute'];
var prototypes = [
require('./bulk/ordered').Bulk.prototype,
require('./bulk/unordered').Bulk.prototype
]
prototypes.forEach(function(proto) {
// Core server method we are going to wrap
methods.forEach(function(x) {
var func = proto[x];
// Add to overloaded methods
self.overloads.push({proto: proto, name:x, func:func});
// The actual prototype
proto[x] = function() {
var bulk = this;
// Get the aruments
var args = Array.prototype.slice.call(arguments, 0);
// Set an operation Id on the bulk object
this.operationId = operationIdGenerator.next();
// Get the callback
var callback = args.pop();
// If we have a callback use this
if(typeof callback == 'function') {
args.push(function(err, r) {
// Return to caller
callback(err, r);
});
// Apply the call
func.apply(this, args);
} else {
return func.apply(this, args);
}
}
});
});
// ---------------------------------------------------------
//
// Cursor
//
// ---------------------------------------------------------
// Inject ourselves into the Cursor methods
var methods = ['_find', '_getmore', '_killcursor'];
var prototypes = [
require('./cursor').prototype,
require('./command_cursor').prototype,
require('./aggregation_cursor').prototype
]
// Command name translation
var commandTranslation = {
'_find': 'find', '_getmore': 'getMore', '_killcursor': 'killCursors', '_explain': 'explain'
}
prototypes.forEach(function(proto) {
// Core server method we are going to wrap
methods.forEach(function(x) {
var func = proto[x];
// Add to overloaded methods
self.overloads.push({proto: proto, name:x, func:func});
// The actual prototype
proto[x] = function() {
var cursor = this;
var requestId = core.Query.nextRequestId();
var ourOpId = operationIdGenerator.next();
var parts = this.ns.split('.');
var db = parts[0];
// Get the collection
parts.shift();
var collection = parts.join('.');
// Set the command
var command = this.query;
var cmd = this.s.cmd;
// If we have a find method, set the operationId on the cursor
if(x == '_find') {
cursor.operationId = ourOpId;
}
// Do we have a find command rewrite it
if(x == '_getmore') {
command = {
getMore: this.cursorState.cursorId,
collection: collection,
batchSize: cmd.batchSize
}
if(cmd.maxTimeMS) command.maxTimeMS = cmd.maxTimeMS;
} else if(x == '_killcursors') {
command = {
killCursors: collection,
cursors: [this.cursorState.cursorId]
}
} else if(cmd.find) {
command = {
find: collection, filter: cmd.query
}
if(cmd.sort) command.sort = cmd.sort;
if(cmd.fields) command.projection = cmd.fields;
if(cmd.limit && cmd.limit < 0) {
command.limit = Math.abs(cmd.limit);
command.singleBatch = true;
} else if(cmd.limit) {
command.limit = Math.abs(cmd.limit);
}
// Options
if(cmd.skip) command.skip = cmd.skip;
if(cmd.hint) command.hint = cmd.hint;
if(cmd.batchSize) command.batchSize = cmd.batchSize;
if(typeof cmd.returnKey == 'boolean') command.returnKey = cmd.returnKey;
if(cmd.comment) command.comment = cmd.comment;
if(cmd.min) command.min = cmd.min;
if(cmd.max) command.max = cmd.max;
if(cmd.maxScan) command.maxScan = cmd.maxScan;
if(cmd.maxTimeMS) command.maxTimeMS = cmd.maxTimeMS;
// Flags
if(typeof cmd.awaitData == 'boolean') command.awaitData = cmd.awaitData;
if(typeof cmd.snapshot == 'boolean') command.snapshot = cmd.snapshot;
if(typeof cmd.tailable == 'boolean') command.tailable = cmd.tailable;
if(typeof cmd.oplogReplay == 'boolean') command.oplogReplay = cmd.oplogReplay;
if(typeof cmd.noCursorTimeout == 'boolean') command.noCursorTimeout = cmd.noCursorTimeout;
if(typeof cmd.partial == 'boolean') command.partial = cmd.partial;
if(typeof cmd.showDiskLoc == 'boolean') command.showRecordId = cmd.showDiskLoc;
// Read Concern
if(cmd.readConcern) command.readConcern = cmd.readConcern;
// Override method
if(cmd.explain) command.explain = cmd.explain;
if(cmd.exhaust) command.exhaust = cmd.exhaust;
// If we have a explain flag
if(cmd.explain) {
// Create fake explain command
command = {
explain: command,
verbosity: 'allPlansExecution'
}
// Set readConcern on the command if available
if(cmd.readConcern) command.readConcern = cmd.readConcern
// Set up the _explain name for the command
x = '_explain';
}
} else {
command = cmd;
}
// Set up the connection
var connectionId = null;
// Set local connection
if(this.connection) connectionId = this.connection;
if(!connectionId && this.server && this.server.getConnection) connectionId = this.server.getConnection();
// Get the command Name
var commandName = x == '_find' ? Object.keys(command)[0] : commandTranslation[x];
// Emit the start event for the command
var command = {
// Returns the command.
command: command,
// Returns the database name.
databaseName: db,
// Returns the command name.
commandName: commandName,
// Returns the driver generated request id.
requestId: requestId,
// Returns the driver generated operation id.
// This is used to link events together such as bulk write operations. OPTIONAL.
operationId: this.operationId,
// Returns the connection id for the command. For languages that do not have this,
// this MUST return the driver equivalent which MUST include the server address and port.
// The name of this field is flexible to match the object that is returned from the driver.
connectionId: connectionId
};
// Get the aruments
var args = Array.prototype.slice.call(arguments, 0);
// Get the callback
var callback = args.pop();
// We do not have a callback but a Promise
if(typeof callback == 'function' || command.commandName == 'killCursors') {
var startTime = timestampGenerator.current();
// Emit the started event
self.emit('started', command)
// Emit succeeded event with killcursor if we have a legacy protocol
if(command.commandName == 'killCursors'
&& this.server.lastIsMaster()
&& this.server.lastIsMaster().maxWireVersion < 4) {
// Emit the succeeded command
var command = {
duration: timestampGenerator.duration(startTime, timestampGenerator.current()),
commandName: commandName,
requestId: requestId,
operationId: cursor.operationId,
connectionId: cursor.server.getConnection(),
reply: [{ok:1}]
};
// Emit the command
return self.emit('succeeded', command)
}
// Add our callback handler
args.push(function(err, r) {
if(err) {
// Command
var command = {
duration: timestampGenerator.duration(startTime, timestampGenerator.current()),
commandName: commandName,
requestId: requestId,
operationId: ourOpId,
connectionId: cursor.server.getConnection(),
failure: err };
// Emit the command
self.emit('failed', command)
} else {
// Do we have a getMore
if(commandName.toLowerCase() == 'getmore' && r == null) {
r = {
cursor: {
id: cursor.cursorState.cursorId,
ns: cursor.ns,
nextBatch: cursor.cursorState.documents
}, ok:1
}
} else if(commandName.toLowerCase() == 'find' && r == null) {
r = {
cursor: {
id: cursor.cursorState.cursorId,
ns: cursor.ns,
firstBatch: cursor.cursorState.documents
}, ok:1
}
} else if(commandName.toLowerCase() == 'killcursors' && r == null) {
r = {
cursorsUnknown:[cursor.cursorState.lastCursorId],
ok:1
}
}
// cursor id is zero, we can issue success command
var command = {
duration: timestampGenerator.duration(startTime, timestampGenerator.current()),
commandName: commandName,
requestId: requestId,
operationId: cursor.operationId,
connectionId: cursor.server.getConnection(),
reply: r && r.result ? r.result : r
};
// Emit the command
self.emit('succeeded', command)
}
// Return
if(!callback) return;
// Return to caller
callback(err, r);
});
// Apply the call
func.apply(this, args);
} else {
// Assume promise, push back the missing value
args.push(callback);
// Get the promise
var promise = func.apply(this, args);
// Return a new promise
return new cursor.s.promiseLibrary(function(resolve, reject) {
var startTime = timestampGenerator.current();
// Emit the started event
self.emit('started', command)
// Execute the function
promise.then(function(r) {
// cursor id is zero, we can issue success command
var command = {
duration: timestampGenerator.duration(startTime, timestampGenerator.current()),
commandName: commandName,
requestId: requestId,
operationId: cursor.operationId,
connectionId: cursor.server.getConnection(),
reply: cursor.cursorState.documents
};
// Emit the command
self.emit('succeeded', command)
}).catch(function(err) {
// Command
var command = {
duration: timestampGenerator.duration(startTime, timestampGenerator.current()),
commandName: commandName,
requestId: requestId,
operationId: ourOpId,
connectionId: cursor.server.getConnection(),
failure: err };
// Emit the command
self.emit('failed', command)
// reject the promise
reject(err);
});
});
}
}
});
});
}
inherits(Instrumentation, EventEmitter);
Instrumentation.prototype.uninstrument = function() {
for(var i = 0; i < this.overloads.length; i++) {
var obj = this.overloads[i];
obj.proto[obj.name] = obj.func;
}
// Remove all listeners
this.removeAllListeners('started');
this.removeAllListeners('succeeded');
this.removeAllListeners('failed');
}
module.exports = Instrumentation;

View File

@@ -0,0 +1,310 @@
var shallowClone = require('../utils').shallowClone;
var stream = require('stream');
var util = require('util');
module.exports = GridFSBucketReadStream;
/**
* A readable stream that enables you to read buffers from GridFS.
*
* Do not instantiate this class directly. Use `openDownloadStream()` instead.
*
* @class
* @param {Collection} chunks Handle for chunks collection
* @param {Collection} files Handle for files collection
* @param {Object} readPreference The read preference to use
* @param {Object} filter The query to use to find the file document
* @param {Object} [options=null] Optional settings.
* @param {Number} [options.sort=null] Optional sort for the file find query
* @param {Number} [options.skip=null] Optional skip for the file find query
* @param {Number} [options.start=null] Optional 0-based offset in bytes to start streaming from
* @param {Number} [options.end=null] Optional 0-based offset in bytes to stop streaming before
* @fires GridFSBucketReadStream#error
* @fires GridFSBucketReadStream#file
* @return {GridFSBucketReadStream} a GridFSBucketReadStream instance.
*/
function GridFSBucketReadStream(chunks, files, readPreference, filter, options) {
var _this = this;
this.s = {
bytesRead: 0,
chunks: chunks,
cursor: null,
expected: 0,
files: files,
filter: filter,
init: false,
expectedEnd: 0,
file: null,
options: options,
readPreference: readPreference
};
stream.Readable.call(this);
}
util.inherits(GridFSBucketReadStream, stream.Readable);
/**
* An error occurred
*
* @event GridFSBucketReadStream#error
* @type {Error}
*/
/**
* Fires when the stream loaded the file document corresponding to the
* provided id.
*
* @event GridFSBucketReadStream#file
* @type {object}
*/
/**
* Reads from the cursor and pushes to the stream.
* @method
*/
GridFSBucketReadStream.prototype._read = function() {
var _this = this;
waitForFile(_this, function() {
doRead(_this);
});
};
/**
* Sets the 0-based offset in bytes to start streaming from. Throws
* an error if this stream has entered flowing mode
* (e.g. if you've already called `on('data')`)
* @method
* @param {Number} start Offset in bytes to start reading at
* @return {GridFSBucketReadStream}
*/
GridFSBucketReadStream.prototype.start = function(start) {
throwIfInitialized(this);
this.s.options.start = start;
return this;
};
/**
* Sets the 0-based offset in bytes to start streaming from. Throws
* an error if this stream has entered flowing mode
* (e.g. if you've already called `on('data')`)
* @method
* @param {Number} end Offset in bytes to stop reading at
* @return {GridFSBucketReadStream}
*/
GridFSBucketReadStream.prototype.end = function(end) {
throwIfInitialized(this);
this.s.options.end = end;
return this;
};
/**
* @ignore
*/
function throwIfInitialized(self) {
if (self.s.init) {
throw new Error('You cannot change options after the stream has entered' +
'flowing mode!');
}
}
/**
* @ignore
*/
function doRead(_this) {
_this.s.cursor.next(function(error, doc) {
if (error) {
return __handleError(_this, error);
}
if (!doc) {
return _this.push(null);
}
var bytesRemaining = _this.s.file.length - _this.s.bytesRead;
var expectedN = _this.s.expected++;
var expectedLength = Math.min(_this.s.file.chunkSize,
bytesRemaining);
if (doc.n > expectedN) {
var errmsg = 'ChunkIsMissing: Got unexpected n: ' + doc.n +
', expected: ' + expectedN;
return __handleError(_this, new Error(errmsg));
}
if (doc.n < expectedN) {
var errmsg = 'ExtraChunk: Got unexpected n: ' + doc.n +
', expected: ' + expectedN;
return __handleError(_this, new Error(errmsg));
}
if (doc.data.length() !== expectedLength) {
if (bytesRemaining <= 0) {
var errmsg = 'ExtraChunk: Got unexpected n: ' + doc.n;
return __handleError(_this, new Error(errmsg));
}
var errmsg = 'ChunkIsWrongSize: Got unexpected length: ' +
doc.data.length() + ', expected: ' + expectedLength;
return __handleError(_this, new Error(errmsg));
}
_this.s.bytesRead += doc.data.length();
if (doc.data.buffer.length === 0) {
return _this.push(null);
}
var sliceStart = null;
var sliceEnd = null;
var buf = doc.data.buffer;
if (_this.s.bytesToSkip != null) {
sliceStart = _this.s.bytesToSkip;
_this.s.bytesToSkip = 0;
}
if (expectedN === _this.s.expectedEnd && _this.s.bytesToTrim != null) {
sliceEnd = _this.s.bytesToTrim;
}
if (sliceStart != null || sliceEnd != null) {
buf = buf.slice(sliceStart || 0, sliceEnd || buf.length);
}
_this.push(buf);
});
};
/**
* @ignore
*/
function init(self) {
var findOneOptions = {};
if (self.s.readPreference) {
findOneOptions.readPreference = self.s.readPreference;
}
if (self.s.options && self.s.options.sort) {
findOneOptions.sort = self.s.options.sort;
}
if (self.s.options && self.s.options.skip) {
findOneOptions.skip = self.s.options.skip;
}
self.s.files.findOne(self.s.filter, findOneOptions, function(error, doc) {
if (error) {
return __handleError(self, error);
}
if (!doc) {
var identifier = self.s.filter._id ?
self.s.filter._id.toString() : self.s.filter.filename;
var errmsg = 'FileNotFound: file ' + identifier + ' was not found';
return __handleError(self, new Error(errmsg));
}
// If document is empty, kill the stream immediately and don't
// execute any reads
if (doc.length <= 0) {
self.push(null);
return;
}
self.s.cursor = self.s.chunks.find({ files_id: doc._id }).sort({ n: 1 });
if (self.s.readPreference) {
self.s.cursor.setReadPreference(self.s.readPreference);
}
self.s.expectedEnd = Math.ceil(doc.length / doc.chunkSize);
self.s.file = doc;
self.s.bytesToSkip = handleStartOption(self, doc, self.s.cursor,
self.s.options);
self.s.bytesToTrim = handleEndOption(self, doc, self.s.cursor,
self.s.options);
self.emit('file', doc);
});
}
/**
* @ignore
*/
function waitForFile(_this, callback) {
if (_this.s.file) {
return callback();
}
if (!_this.s.init) {
init(_this);
_this.s.init = true;
}
_this.once('file', function() {
callback();
});
};
/**
* @ignore
*/
function handleStartOption(stream, doc, cursor, options) {
if (options && options.start != null) {
if (options.start > doc.length) {
throw new Error('Stream start (' + options.start + ') must not be ' +
'more than the length of the file (' + doc.length +')')
}
if (options.start < 0) {
throw new Error('Stream start (' + options.start + ') must not be ' +
'negative');
}
if (options.end != null && options.end < options.start) {
throw new Error('Stream start (' + options.start + ') must not be ' +
'greater than stream end (' + options.end + ')');
}
cursor.skip(Math.floor(options.start / doc.chunkSize));
stream.s.bytesRead = Math.floor(options.start / doc.chunkSize) *
doc.chunkSize;
stream.s.expected = Math.floor(options.start / doc.chunkSize);
return options.start - stream.s.bytesRead;
}
}
/**
* @ignore
*/
function handleEndOption(stream, doc, cursor, options) {
if (options && options.end != null) {
if (options.end > doc.length) {
throw new Error('Stream end (' + options.end + ') must not be ' +
'more than the length of the file (' + doc.length +')')
}
if (options.start < 0) {
throw new Error('Stream end (' + options.end + ') must not be ' +
'negative');
}
var start = options.start != null ?
Math.floor(options.start / doc.chunkSize) :
0;
cursor.limit(Math.ceil(options.end / doc.chunkSize) - start);
stream.s.expectedEnd = Math.ceil(options.end / doc.chunkSize);
return (Math.ceil(options.end / doc.chunkSize) * doc.chunkSize) -
options.end;
}
}
/**
* @ignore
*/
function __handleError(_this, error) {
_this.emit('error', error);
}

335
server/node_modules/mongodb/lib/gridfs-stream/index.js generated vendored Normal file
View File

@@ -0,0 +1,335 @@
var Emitter = require('events').EventEmitter;
var GridFSBucketReadStream = require('./download');
var GridFSBucketWriteStream = require('./upload');
var shallowClone = require('../utils').shallowClone;
var toError = require('../utils').toError;
var util = require('util');
var DEFAULT_GRIDFS_BUCKET_OPTIONS = {
bucketName: 'fs',
chunkSizeBytes: 255 * 1024
};
module.exports = GridFSBucket;
/**
* Constructor for a streaming GridFS interface
* @class
* @param {Db} db A db handle
* @param {object} [options=null] Optional settings.
* @param {string} [options.bucketName="fs"] The 'files' and 'chunks' collections will be prefixed with the bucket name followed by a dot.
* @param {number} [options.chunkSizeBytes=255 * 1024] Number of bytes stored in each chunk. Defaults to 255KB
* @param {object} [options.writeConcern=null] Optional write concern to be passed to write operations, for instance `{ w: 1 }`
* @param {object} [options.readPreference=null] Optional read preference to be passed to read operations
* @fires GridFSBucketWriteStream#index
* @return {GridFSBucket}
*/
function GridFSBucket(db, options) {
Emitter.apply(this);
this.setMaxListeners(0);
if (options && typeof options === 'object') {
options = shallowClone(options);
var keys = Object.keys(DEFAULT_GRIDFS_BUCKET_OPTIONS);
for (var i = 0; i < keys.length; ++i) {
if (!options[keys[i]]) {
options[keys[i]] = DEFAULT_GRIDFS_BUCKET_OPTIONS[keys[i]];
}
}
} else {
options = DEFAULT_GRIDFS_BUCKET_OPTIONS;
}
this.s = {
db: db,
options: options,
_chunksCollection: db.collection(options.bucketName + '.chunks'),
_filesCollection: db.collection(options.bucketName + '.files'),
checkedIndexes: false,
calledOpenUploadStream: false,
promiseLibrary: db.s.promiseLibrary ||
(typeof global.Promise == 'function' ? global.Promise : require('es6-promise').Promise)
};
};
util.inherits(GridFSBucket, Emitter);
/**
* When the first call to openUploadStream is made, the upload stream will
* check to see if it needs to create the proper indexes on the chunks and
* files collections. This event is fired either when 1) it determines that
* no index creation is necessary, 2) when it successfully creates the
* necessary indexes.
*
* @event GridFSBucket#index
* @type {Error}
*/
/**
* Returns a writable stream (GridFSBucketWriteStream) for writing
* buffers to GridFS. The stream's 'id' property contains the resulting
* file's id.
* @method
* @param {string} filename The value of the 'filename' key in the files doc
* @param {object} [options=null] Optional settings.
* @param {number} [options.chunkSizeBytes=null] Optional overwrite this bucket's chunkSizeBytes for this file
* @param {object} [options.metadata=null] Optional object to store in the file document's `metadata` field
* @param {string} [options.contentType=null] Optional string to store in the file document's `contentType` field
* @param {array} [options.aliases=null] Optional array of strings to store in the file document's `aliases` field
* @return {GridFSBucketWriteStream}
*/
GridFSBucket.prototype.openUploadStream = function(filename, options) {
if (options) {
options = shallowClone(options);
} else {
options = {};
}
if (!options.chunkSizeBytes) {
options.chunkSizeBytes = this.s.options.chunkSizeBytes;
}
return new GridFSBucketWriteStream(this, filename, options);
};
/**
* Returns a readable stream (GridFSBucketReadStream) for streaming file
* data from GridFS.
* @method
* @param {ObjectId} id The id of the file doc
* @param {Object} [options=null] Optional settings.
* @param {Number} [options.start=null] Optional 0-based offset in bytes to start streaming from
* @param {Number} [options.end=null] Optional 0-based offset in bytes to stop streaming before
* @return {GridFSBucketReadStream}
*/
GridFSBucket.prototype.openDownloadStream = function(id, options) {
var filter = { _id: id };
var options = {
start: options && options.start,
end: options && options.end
};
return new GridFSBucketReadStream(this.s._chunksCollection,
this.s._filesCollection, this.s.options.readPreference, filter, options);
};
/**
* Deletes a file with the given id
* @method
* @param {ObjectId} id The id of the file doc
* @param {Function} callback
*/
GridFSBucket.prototype.delete = function(id, callback) {
if (typeof callback === 'function') {
return _delete(this, id, callback);
}
var _this = this;
return new this.s.promiseLibrary(function(resolve, reject) {
_delete(_this, id, function(error, res) {
if (error) {
reject(error);
} else {
resolve(res);
}
});
});
};
/**
* @ignore
*/
function _delete(_this, id, callback) {
_this.s._filesCollection.deleteOne({ _id: id }, function(error, res) {
if (error) {
return callback(error);
}
_this.s._chunksCollection.deleteMany({ files_id: id }, function(error) {
if (error) {
return callback(error);
}
// Delete orphaned chunks before returning FileNotFound
if (!res.result.n) {
var errmsg = 'FileNotFound: no file with id ' + id + ' found';
return callback(new Error(errmsg));
}
callback();
});
});
}
/**
* Convenience wrapper around find on the files collection
* @method
* @param {Object} filter
* @param {Object} [options=null] Optional settings for cursor
* @param {number} [options.batchSize=null] Optional batch size for cursor
* @param {number} [options.limit=null] Optional limit for cursor
* @param {number} [options.maxTimeMS=null] Optional maxTimeMS for cursor
* @param {boolean} [options.noCursorTimeout=null] Optionally set cursor's `noCursorTimeout` flag
* @param {number} [options.skip=null] Optional skip for cursor
* @param {object} [options.sort=null] Optional sort for cursor
* @return {Cursor}
*/
GridFSBucket.prototype.find = function(filter, options) {
filter = filter || {};
options = options || {};
var cursor = this.s._filesCollection.find(filter);
if (options.batchSize != null) {
cursor.batchSize(options.batchSize);
}
if (options.limit != null) {
cursor.limit(options.limit);
}
if (options.maxTimeMS != null) {
cursor.maxTimeMS(options.maxTimeMS);
}
if (options.noCursorTimeout != null) {
cursor.addCursorFlag('noCursorTimeout', options.noCursorTimeout);
}
if (options.skip != null) {
cursor.skip(options.skip);
}
if (options.sort != null) {
cursor.sort(options.sort);
}
return cursor;
};
/**
* Returns a readable stream (GridFSBucketReadStream) for streaming the
* file with the given name from GridFS. If there are multiple files with
* the same name, this will stream the most recent file with the given name
* (as determined by the `uploadedDate` field). You can set the `revision`
* option to change this behavior.
* @method
* @param {String} filename The name of the file to stream
* @param {Object} [options=null] Optional settings
* @param {number} [options.revision=-1] The revision number relative to the oldest file with the given filename. 0 gets you the oldest file, 1 gets you the 2nd oldest, -1 gets you the newest.
* @param {Number} [options.start=null] Optional 0-based offset in bytes to start streaming from
* @param {Number} [options.end=null] Optional 0-based offset in bytes to stop streaming before
* @return {GridFSBucketReadStream}
*/
GridFSBucket.prototype.openDownloadStreamByName = function(filename, options) {
var sort = { uploadedDate: -1 };
var skip = null;
if (options && options.revision != null) {
if (options.revision >= 0) {
sort = { uploadedDate: 1 };
skip = options.revision;
} else {
skip = -options.revision - 1;
}
}
var filter = { filename: filename };
var options = {
sort: sort,
skip: skip,
start: options && options.start,
end: options && options.end
};
return new GridFSBucketReadStream(this.s._chunksCollection,
this.s._filesCollection, this.s.options.readPreference, filter, options);
};
/**
* Renames the file with the given _id to the given string
* @method
* @param {ObjectId} id the id of the file to rename
* @param {String} filename new name for the file
* @param {GridFSBucket~errorCallback} [callback]
*/
GridFSBucket.prototype.rename = function(id, filename, callback) {
if (typeof callback === 'function') {
return _rename(this, id, filename, callback);
}
var _this = this;
return new this.s.promiseLibrary(function(resolve, reject) {
_rename(_this, id, filename, function(error, res) {
if (error) {
reject(error);
} else {
resolve(res);
}
});
});
};
/**
* @ignore
*/
function _rename(_this, id, filename, callback) {
var filter = { _id: id };
var update = { $set: { filename: filename } };
_this.s._filesCollection.updateOne(filter, update, function(error, res) {
if (error) {
return callback(error);
}
if (!res.result.n) {
return callback(toError('File with id ' + id + ' not found'));
}
callback();
});
}
/**
* Removes this bucket's files collection, followed by its chunks collection.
* @method
* @param {GridFSBucket~errorCallback} [callback]
*/
GridFSBucket.prototype.drop = function(callback) {
if (typeof callback === 'function') {
return _drop(this, callback);
}
var _this = this;
return new this.s.promiseLibrary(function(resolve, reject) {
_drop(_this, function(error, res) {
if (error) {
reject(error);
} else {
resolve(res);
}
});
});
};
/**
* @ignore
*/
function _drop(_this, callback) {
_this.s._filesCollection.drop(function(error) {
if (error) {
return callback(error);
}
_this.s._chunksCollection.drop(function(error) {
if (error) {
return callback(error);
}
return callback();
});
});
}
/**
* Callback format for all GridFSBucket methods that can accept a callback.
* @callback GridFSBucket~errorCallback
* @param {MongoError} error An error instance representing any errors that occurred
*/

450
server/node_modules/mongodb/lib/gridfs-stream/upload.js generated vendored Normal file
View File

@@ -0,0 +1,450 @@
var core = require('mongodb-core');
var crypto = require('crypto');
var shallowClone = require('../utils').shallowClone;
var stream = require('stream');
var util = require('util');
var ERROR_NAMESPACE_NOT_FOUND = 26;
module.exports = GridFSBucketWriteStream;
/**
* A writable stream that enables you to write buffers to GridFS.
*
* Do not instantiate this class directly. Use `openUploadStream()` instead.
*
* @class
* @param {GridFSBucket} bucket Handle for this stream's corresponding bucket
* @param {string} filename The value of the 'filename' key in the files doc
* @param {object} [options=null] Optional settings.
* @param {number} [options.chunkSizeBytes=null] The chunk size to use, in bytes
* @param {number} [options.w=null] The write concern
* @param {number} [options.wtimeout=null] The write concern timeout
* @param {number} [options.j=null] The journal write concern
* @fires GridFSBucketWriteStream#error
* @fires GridFSBucketWriteStream#finish
* @return {GridFSBucketWriteStream} a GridFSBucketWriteStream instance.
*/
function GridFSBucketWriteStream(bucket, filename, options) {
this.bucket = bucket;
this.chunks = bucket.s._chunksCollection;
this.filename = filename;
this.files = bucket.s._filesCollection;
this.options = options;
this.id = core.BSON.ObjectId();
this.chunkSizeBytes = this.options.chunkSizeBytes;
this.bufToStore = new Buffer(this.chunkSizeBytes);
this.length = 0;
this.md5 = crypto.createHash('md5');
this.n = 0;
this.pos = 0;
this.state = {
streamEnd: false,
outstandingRequests: 0,
errored: false
};
if (!this.bucket.s.calledOpenUploadStream) {
this.bucket.s.calledOpenUploadStream = true;
var _this = this;
checkIndexes(this, function() {
_this.bucket.s.checkedIndexes = true;
_this.bucket.emit('index');
});
}
}
util.inherits(GridFSBucketWriteStream, stream.Writable);
/**
* An error occurred
*
* @event GridFSBucketWriteStream#error
* @type {Error}
*/
/**
* end() was called and the write stream successfully wrote all chunks to
* MongoDB.
*
* @event GridFSBucketWriteStream#finish
* @type {object}
*/
/**
* Write a buffer to the stream.
*
* @method
* @param {Buffer} chunk Buffer to write
* @param {String} encoding Optional encoding for the buffer
* @param {Function} callback Function to call when the chunk was added to the buffer, or if the entire chunk was persisted to MongoDB if this chunk caused a flush.
* @return {Boolean} False if this write required flushing a chunk to MongoDB. True otherwise.
*/
GridFSBucketWriteStream.prototype.write = function(chunk, encoding, callback) {
var _this = this;
return waitForIndexes(this, function() {
return doWrite(_this, chunk, encoding, callback);
});
};
/**
* Tells the stream that no more data will be coming in. The stream will
* persist the remaining data to MongoDB, write the files document, and
* then emit a 'finish' event.
*
* @method
* @param {Buffer} chunk Buffer to write
* @param {String} encoding Optional encoding for the buffer
* @param {Function} callback Function to call when all files and chunks have been persisted to MongoDB
*/
GridFSBucketWriteStream.prototype.end = function(chunk, encoding, callback) {
var _this = this;
this.state.streamEnd = true;
if (callback) {
this.once('finish', callback);
}
if (!chunk) {
waitForIndexes(this, function() {
writeRemnant(_this);
});
return;
}
var _this = this;
var inputBuf = (Buffer.isBuffer(chunk)) ?
chunk : new Buffer(chunk, encoding);
this.write(chunk, encoding, function() {
writeRemnant(_this);
});
};
/**
* @ignore
*/
function __handleError(_this, error, callback) {
if (_this.state.errored) {
return;
}
_this.state.errored = true;
if (callback) {
return callback(error);
}
_this.emit('error', error);
}
/**
* @ignore
*/
function createChunkDoc(filesId, n, data) {
return {
_id: core.BSON.ObjectId(),
files_id: filesId,
n: n,
data: data
};
}
/**
* @ignore
*/
function checkChunksIndex(_this, callback) {
_this.chunks.listIndexes().toArray(function(error, indexes) {
if (error) {
// Collection doesn't exist so create index
if (error.code === ERROR_NAMESPACE_NOT_FOUND) {
var index = { files_id: 1, n: 1 };
_this.chunks.createIndex(index, { background: false }, function(error) {
if (error) {
return callback(error);
}
callback();
});
return;
}
return callback(error);
}
var hasChunksIndex = false;
indexes.forEach(function(index) {
if (index.key) {
var keys = Object.keys(index.key);
if (keys.length === 2 && index.key.files_id === 1 &&
index.key.n === 1) {
hasChunksIndex = true;
}
}
});
if (hasChunksIndex) {
callback();
} else {
var index = { files_id: 1, n: 1 };
var indexOptions = getWriteOptions(_this);
indexOptions.background = false;
indexOptions.unique = true;
_this.chunks.createIndex(index, indexOptions, function(error) {
if (error) {
return callback(error);
}
callback();
});
}
});
}
/**
* @ignore
*/
function checkDone(_this, callback) {
if (_this.state.streamEnd &&
_this.state.outstandingRequests === 0 &&
!_this.state.errored) {
var filesDoc = createFilesDoc(_this.id, _this.length, _this.chunkSizeBytes,
_this.md5.digest('hex'), _this.filename, _this.options.contentType,
_this.options.aliases, _this.options.metadata);
_this.files.insert(filesDoc, getWriteOptions(_this), function(error) {
if (error) {
return __handleError(_this, error, callback);
}
_this.emit('finish', filesDoc);
});
return true;
}
return false;
}
/**
* @ignore
*/
function checkIndexes(_this, callback) {
_this.files.findOne({}, { _id: 1 }, function(error, doc) {
if (error) {
return callback(error);
}
if (doc) {
return callback();
}
_this.files.listIndexes().toArray(function(error, indexes) {
if (error) {
// Collection doesn't exist so create index
if (error.code === ERROR_NAMESPACE_NOT_FOUND) {
var index = { filename: 1, uploadDate: 1 };
_this.files.createIndex(index, { background: false }, function(error) {
if (error) {
return callback(error);
}
checkChunksIndex(_this, callback);
});
return;
}
return callback(error);
}
var hasFileIndex = false;
indexes.forEach(function(index) {
var keys = Object.keys(index.key);
if (keys.length === 2 && index.key.filename === 1 &&
index.key.uploadDate === 1) {
hasFileIndex = true;
}
});
if (hasFileIndex) {
checkChunksIndex(_this, callback);
} else {
var index = { filename: 1, uploadDate: 1 };
var indexOptions = getWriteOptions(_this);
indexOptions.background = false;
_this.files.createIndex(index, indexOptions, function(error) {
if (error) {
return callback(error);
}
checkChunksIndex(_this, callback);
});
}
});
});
}
/**
* @ignore
*/
function createFilesDoc(_id, length, chunkSize, md5, filename, contentType,
aliases, metadata) {
var ret = {
_id: _id,
length: length,
chunkSize: chunkSize,
uploadDate: new Date(),
md5: md5,
filename: filename
};
if (contentType) {
ret.contentType = contentType;
}
if (aliases) {
ret.aliases = aliases;
}
if (metadata) {
ret.metadata = metadata;
}
return ret;
}
/**
* @ignore
*/
function doWrite(_this, chunk, encoding, callback) {
var inputBuf = (Buffer.isBuffer(chunk)) ?
chunk : new Buffer(chunk, encoding);
_this.length += inputBuf.length;
// Input is small enough to fit in our buffer
if (_this.pos + inputBuf.length < _this.chunkSizeBytes) {
inputBuf.copy(_this.bufToStore, _this.pos);
_this.pos += inputBuf.length;
callback && callback();
// Note that we reverse the typical semantics of write's return value
// to be compatible with node's `.pipe()` function.
// True means client can keep writing.
return true;
}
// Otherwise, buffer is too big for current chunk, so we need to flush
// to MongoDB.
var inputBufRemaining = inputBuf.length;
var spaceRemaining = _this.chunkSizeBytes - _this.pos;
var numToCopy = Math.min(spaceRemaining, inputBuf.length);
var outstandingRequests = 0;
while (inputBufRemaining > 0) {
var inputBufPos = inputBuf.length - inputBufRemaining;
inputBuf.copy(_this.bufToStore, _this.pos,
inputBufPos, inputBufPos + numToCopy);
_this.pos += numToCopy;
spaceRemaining -= numToCopy;
if (spaceRemaining === 0) {
_this.md5.update(_this.bufToStore);
var doc = createChunkDoc(_this.id, _this.n, _this.bufToStore);
++_this.state.outstandingRequests;
++outstandingRequests;
_this.chunks.insert(doc, getWriteOptions(_this), function(error) {
if (error) {
return __handleError(_this, error);
}
--_this.state.outstandingRequests;
--outstandingRequests;
if (!outstandingRequests) {
_this.emit('drain', doc);
callback && callback();
checkDone(_this);
}
});
spaceRemaining = _this.chunkSizeBytes;
_this.pos = 0;
++_this.n;
}
inputBufRemaining -= numToCopy;
numToCopy = Math.min(spaceRemaining, inputBufRemaining);
}
// Note that we reverse the typical semantics of write's return value
// to be compatible with node's `.pipe()` function.
// False means the client should wait for the 'drain' event.
return false;
}
/**
* @ignore
*/
function getWriteOptions(_this) {
var obj = {};
if (_this.options.writeConcern) {
obj.w = concern.w;
obj.wtimeout = concern.wtimeout;
obj.j = concern.j;
}
return obj;
}
/**
* @ignore
*/
function waitForIndexes(_this, callback) {
if (_this.bucket.s.checkedIndexes) {
callback(false);
}
_this.bucket.once('index', function() {
callback(true);
});
return true;
}
/**
* @ignore
*/
function writeRemnant(_this, callback) {
// Buffer is empty, so don't bother to insert
if (_this.pos === 0) {
return checkDone(_this, callback);
}
++_this.state.outstandingRequests;
// Create a new buffer to make sure the buffer isn't bigger than it needs
// to be.
var remnant = new Buffer(_this.pos);
_this.bufToStore.copy(remnant, 0, 0, _this.pos);
_this.md5.update(remnant);
var doc = createChunkDoc(_this.id, _this.n, remnant);
_this.chunks.insert(doc, getWriteOptions(_this), function(error) {
if (error) {
return __handleError(_this, error);
}
--_this.state.outstandingRequests;
checkDone(_this);
});
}

64
server/node_modules/mongodb/lib/metadata.js generated vendored Normal file
View File

@@ -0,0 +1,64 @@
var f = require('util').format;
var Define = function(name, object, stream) {
this.name = name;
this.object = object;
this.stream = typeof stream == 'boolean' ? stream : false;
this.instrumentations = {};
}
Define.prototype.classMethod = function(name, options) {
var keys = Object.keys(options).sort();
var key = generateKey(keys, options);
// Add a list of instrumentations
if(this.instrumentations[key] == null) {
this.instrumentations[key] = {
methods: [], options: options
}
}
// Push to list of method for this instrumentation
this.instrumentations[key].methods.push(name);
}
var generateKey = function(keys, options) {
var parts = [];
for(var i = 0; i < keys.length; i++) {
parts.push(f('%s=%s', keys[i], options[keys[i]]));
}
return parts.join();
}
Define.prototype.staticMethod = function(name, options) {
options.static = true;
var keys = Object.keys(options).sort();
var key = generateKey(keys, options);
// Add a list of instrumentations
if(this.instrumentations[key] == null) {
this.instrumentations[key] = {
methods: [], options: options
}
}
// Push to list of method for this instrumentation
this.instrumentations[key].methods.push(name);
}
Define.prototype.generate = function(keys, options) {
// Generate the return object
var object = {
name: this.name, obj: this.object, stream: this.stream,
instrumentations: []
}
for(var name in this.instrumentations) {
object.instrumentations.push(this.instrumentations[name]);
}
return object;
}
module.exports = Define;