// create_indexes.cpp
/**
* Copyright (C) 2013 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/platform/basic.h"
#include
#include
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/catalog/index_create.h"
#include "mongo/db/client.h"
#include "mongo/db/commands.h"
#include "mongo/db/curop.h"
#include "mongo/db/ops/insert.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/operation_context_impl.h"
#include "mongo/s/d_state.h"
#include "mongo/s/shard_key_pattern.h"
namespace mongo {
/**
* { createIndexes : "bar", indexes : [ { ns : "test.bar", key : { x : 1 }, name: "x_1" } ] }
*/
class CmdCreateIndex : public Command {
public:
CmdCreateIndex() : Command( "createIndexes" ){}
virtual bool isWriteCommandForConfigServer() const { return false; }
virtual bool slaveOk() const { return false; } // TODO: this could be made true...
virtual Status checkAuthForCommand(ClientBasic* client,
const std::string& dbname,
const BSONObj& cmdObj) {
ActionSet actions;
actions.addAction(ActionType::createIndex);
Privilege p(parseResourcePattern(dbname, cmdObj), actions);
if (client->getAuthorizationSession()->isAuthorizedForPrivilege(p))
return Status::OK();
return Status(ErrorCodes::Unauthorized, "Unauthorized");
}
BSONObj _addNsToSpec( const NamespaceString& ns, const BSONObj& obj ) {
BSONObjBuilder b;
b.append( "ns", ns );
b.appendElements( obj );
return b.obj();
}
virtual bool run(OperationContext* txn, const string& dbname, BSONObj& cmdObj, int options,
string& errmsg, BSONObjBuilder& result,
bool fromRepl = false ) {
// --- parse
NamespaceString ns( dbname, cmdObj[name].String() );
Status status = userAllowedWriteNS( ns );
if ( !status.isOK() )
return appendCommandStatus( result, status );
if ( cmdObj["indexes"].type() != Array ) {
errmsg = "indexes has to be an array";
result.append( "cmdObj", cmdObj );
return false;
}
std::vector specs;
{
BSONObjIterator i( cmdObj["indexes"].Obj() );
while ( i.more() ) {
BSONElement e = i.next();
if ( e.type() != Object ) {
errmsg = "everything in indexes has to be an Object";
result.append( "cmdObj", cmdObj );
return false;
}
specs.push_back( e.Obj() );
}
}
if ( specs.size() == 0 ) {
errmsg = "no indexes to add";
return false;
}
// check specs
for ( size_t i = 0; i < specs.size(); i++ ) {
BSONObj spec = specs[i];
if ( spec["ns"].eoo() ) {
spec = _addNsToSpec( ns, spec );
specs[i] = spec;
}
if ( spec["ns"].type() != String ) {
errmsg = "spec has no ns";
result.append( "spec", spec );
return false;
}
if ( ns != spec["ns"].String() ) {
errmsg = "namespace mismatch";
result.append( "spec", spec );
return false;
}
}
// now we know we have to create index(es)
// Note: createIndexes command does not currently respect shard versioning.
ScopedTransaction transaction(txn, MODE_IX);
Lock::DBLock dbLock(txn->lockState(), ns.db(), MODE_X);
Database* db = dbHolder().get(txn, ns.db());
if (!db) {
db = dbHolder().openDb(txn, ns.db());
}
Collection* collection = db->getCollection( txn, ns.ns() );
result.appendBool( "createdCollectionAutomatically", collection == NULL );
if ( !collection ) {
WriteUnitOfWork wunit(txn);
collection = db->createCollection( txn, ns.ns() );
invariant( collection );
if (!fromRepl) {
repl::logOp(txn, "c", (dbname + ".$cmd").c_str(), BSON("create" << ns.coll()));
}
wunit.commit();
}
result.append("numIndexesBefore", collection->getIndexCatalog()->numIndexesTotal(txn));
MultiIndexBlock indexer(txn, collection);
indexer.allowBackgroundBuilding();
indexer.allowInterruption();
const size_t origSpecsSize = specs.size();
indexer.removeExistingIndexes(&specs);
if (specs.size() == 0) {
result.append( "note", "all indexes already exist" );
return true;
}
if (specs.size() != origSpecsSize) {
result.append( "note", "index already exists" );
}
for ( size_t i = 0; i < specs.size(); i++ ) {
const BSONObj& spec = specs[i];
if ( spec["unique"].trueValue() ) {
status = checkUniqueIndexConstraints(txn, ns.ns(), spec["key"].Obj());
if ( !status.isOK() ) {
appendCommandStatus( result, status );
return false;
}
}
}
uassertStatusOK(indexer.init(specs));
// If we're a background index, replace exclusive db lock with an intent lock, so that
// other readers and writers can proceed during this phase.
if (indexer.getBuildInBackground()) {
txn->recoveryUnit()->commitAndRestart();
dbLock.relockWithMode(MODE_IX);
}
try {
Lock::CollectionLock colLock(txn->lockState(), ns.ns(), MODE_IX);
uassertStatusOK(indexer.insertAllDocumentsInCollection());
}
catch (const DBException& e) {
// Must have exclusive DB lock before we clean up the index build via the
// destructor of 'indexer'.
if (indexer.getBuildInBackground()) {
try {
// This function cannot throw today, but we will preemptively prepare for
// that day, to avoid data corruption due to lack of index cleanup.
txn->recoveryUnit()->commitAndRestart();
dbLock.relockWithMode(MODE_X);
}
catch (...) {
std::terminate();
}
}
throw;
}
// Need to return db lock back to exclusive, to complete the index build.
if (indexer.getBuildInBackground()) {
txn->recoveryUnit()->commitAndRestart();
dbLock.relockWithMode(MODE_X);
Database* db = dbHolder().get(txn, ns.db());
uassert(28551, "database dropped during index build", db);
uassert(28552, "collection dropped during index build",
db->getCollection(txn, ns.ns()));
}
{
WriteUnitOfWork wunit(txn);
indexer.commit();
if ( !fromRepl ) {
for ( size_t i = 0; i < specs.size(); i++ ) {
std::string systemIndexes = ns.getSystemIndexesCollection();
repl::logOp(txn, "i", systemIndexes.c_str(), specs[i]);
}
}
wunit.commit();
}
result.append( "numIndexesAfter", collection->getIndexCatalog()->numIndexesTotal(txn) );
return true;
}
private:
static Status checkUniqueIndexConstraints(OperationContext* txn,
const StringData& ns,
const BSONObj& newIdxKey) {
invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X));
if ( shardingState.enabled() ) {
CollectionMetadataPtr metadata(
shardingState.getCollectionMetadata( ns.toString() ));
if ( metadata ) {
ShardKeyPattern shardKeyPattern(metadata->getKeyPattern());
if (!shardKeyPattern.isUniqueIndexCompatible(newIdxKey)) {
return Status(ErrorCodes::CannotCreateIndex,
str::stream() << "cannot create unique index over " << newIdxKey
<< " with shard key pattern "
<< shardKeyPattern.toBSON());
}
}
}
return Status::OK();
}
} cmdCreateIndex;
}