If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/db/auth/authorization_session.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/database.h" #include "mongo/db/client.h" #include "mongo/db/commands.h" #include "mongo/db/ops/insert.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/operation_context_impl.h" #include "mongo/s/d_logic.h" #include "mongo/s/shard_key_pattern.h" namespace mongo { /** * { createIndexes : "bar", indexes : [ { ns : "test.bar", key : { x : 1 }, name: "x_1" } ] } */ class CmdCreateIndex : public Command { public: CmdCreateIndex() : Command( "createIndexes" ){} virtual bool isWriteCommandForConfigServer() const { return false; } virtual bool slaveOk() const { return false; } // TODO: this could be made true... virtual Status checkAuthForCommand(OperationContext* txn, ClientBasic* client, const std::string& dbname, const BSONObj& cmdObj) { ActionSet actions; actions.addAction(ActionType::createIndex); Privilege p(parseResourcePattern(dbname, cmdObj), actions); if (client->getAuthorizationSession()->isAuthorizedForPrivilege(txn, p)) return Status::OK(); return Status(ErrorCodes::Unauthorized, "Unauthorized"); } BSONObj _addNsToSpec( const NamespaceString& ns, const BSONObj& obj ) { BSONObjBuilder b; b.append( "ns", ns ); b.appendElements( obj ); return b.obj(); } virtual bool run(OperationContext* txn, const string& dbname, BSONObj& cmdObj, int options, string& errmsg, BSONObjBuilder& result, bool fromRepl = false ) { // --- parse NamespaceString ns( dbname, cmdObj[name].String() ); Status status = userAllowedWriteNS( ns ); if ( !status.isOK() ) return appendCommandStatus( result, status ); if ( cmdObj["indexes"].type() != Array ) { errmsg = "indexes has to be an array"; result.append( "cmdObj", cmdObj ); return false; } std::vector specs; { BSONObjIterator i( cmdObj["indexes"].Obj() ); while ( i.more() ) { BSONElement e = i.next(); if ( e.type() != Object ) { errmsg = "everything in indexes has to be an Object"; result.append( "cmdObj", cmdObj ); return false; } specs.push_back( e.Obj() ); } } if ( specs.size() == 0 ) { errmsg = "no indexes to add"; return false; } // check specs for ( size_t i = 0; i < specs.size(); i++ ) { BSONObj spec = specs[i]; if ( spec["ns"].eoo() ) { spec = _addNsToSpec( ns, spec ); specs[i] = spec; } if ( spec["ns"].type() != String ) { errmsg = "spec has no ns"; result.append( "spec", spec ); return false; } if ( ns != spec["ns"].String() ) { errmsg = "namespace mismatch"; result.append( "spec", spec ); return false; } } { // We first take a read lock to see if we need to do anything // as many calls are ensureIndex (and hence no-ops), this is good so its a shared // lock for common calls. We only take write lock if needed. // Note: createIndexes command does not currently respect shard versioning. Client::ReadContext readContext(txn, ns, false /* doVersion */); const Collection* collection = readContext.ctx().db()->getCollection( ns.ns() ); if ( collection ) { for ( size_t i = 0; i < specs.size(); i++ ) { BSONObj spec = specs[i]; StatusWith statusWithSpec = collection->getIndexCatalog()->prepareSpecForCreate( spec ); status = statusWithSpec.getStatus(); if ( status.code() == ErrorCodes::IndexAlreadyExists ) { specs.erase( specs.begin() + i ); i--; continue; } if ( !status.isOK() ) return appendCommandStatus( result, status ); } if ( specs.size() == 0 ) { result.append( "numIndexesBefore", collection->getIndexCatalog()->numIndexesTotal() ); result.append( "note", "all indexes already exist" ); return true; } // need to create index } } // now we know we have to create index(es) // Note: createIndexes command does not currently respect shard versioning. Client::WriteContext writeContext(txn, ns.ns(), false /* doVersion */ ); Database* db = writeContext.ctx().db(); Collection* collection = db->getCollection( txn, ns.ns() ); result.appendBool( "createdCollectionAutomatically", collection == NULL ); if ( !collection ) { collection = db->createCollection( txn, ns.ns() ); invariant( collection ); } result.append( "numIndexesBefore", collection->getIndexCatalog()->numIndexesTotal() ); for ( size_t i = 0; i < specs.size(); i++ ) { BSONObj spec = specs[i]; if ( spec["unique"].trueValue() ) { status = checkUniqueIndexConstraints( ns.ns(), spec["key"].Obj() ); if ( !status.isOK() ) { appendCommandStatus( result, status ); return false; } } status = collection->getIndexCatalog()->createIndex(txn, spec, true); if ( status.code() == ErrorCodes::IndexAlreadyExists ) { if ( !result.hasField( "note" ) ) result.append( "note", "index already exists" ); continue; } if ( !status.isOK() ) { appendCommandStatus( result, status ); return false; } if ( !fromRepl ) { std::string systemIndexes = ns.getSystemIndexesCollection(); repl::logOp(txn, "i", systemIndexes.c_str(), spec); } } result.append( "numIndexesAfter", collection->getIndexCatalog()->numIndexesTotal() ); return true; } private: static Status checkUniqueIndexConstraints(const StringData& ns, const BSONObj& newIdxKey) { Lock::assertWriteLocked( ns ); if ( shardingState.enabled() ) { CollectionMetadataPtr metadata( shardingState.getCollectionMetadata( ns.toString() )); if ( metadata ) { BSONObj shardKey(metadata->getKeyPattern()); if ( !isUniqueIndexCompatible( shardKey, newIdxKey )) { return Status(ErrorCodes::CannotCreateIndex, str::stream() << "cannot create unique index over " << newIdxKey << " with shard key pattern " << shardKey); } } } return Status::OK(); } } cmdCreateIndex; }