diff options
Diffstat (limited to 'src/mongo/db')
50 files changed, 654 insertions, 369 deletions
diff --git a/src/mongo/db/auth/authorization_manager.cpp b/src/mongo/db/auth/authorization_manager.cpp index 16485be8015..2a4ef685acb 100644 --- a/src/mongo/db/auth/authorization_manager.cpp +++ b/src/mongo/db/auth/authorization_manager.cpp @@ -1014,10 +1014,9 @@ namespace { const char* op, const char* ns, const BSONObj& o, - BSONObj* o2, - bool* b) { + BSONObj* o2) { - _externalState->logOp(txn, op, ns, o, o2, b); + _externalState->logOp(txn, op, ns, o, o2); if (appliesToAuthzData(op, ns, o)) { _invalidateRelevantCacheData(op, ns, o, o2); } diff --git a/src/mongo/db/auth/authorization_manager.h b/src/mongo/db/auth/authorization_manager.h index a6b4f4ae068..d430d1e8012 100644 --- a/src/mongo/db/auth/authorization_manager.h +++ b/src/mongo/db/auth/authorization_manager.h @@ -438,8 +438,7 @@ namespace mongo { const char* opstr, const char* ns, const BSONObj& obj, - BSONObj* patt, - bool* b); + BSONObj* patt); private: /** diff --git a/src/mongo/db/auth/authz_manager_external_state.h b/src/mongo/db/auth/authz_manager_external_state.h index 4c54cabc1aa..87c057b25da 100644 --- a/src/mongo/db/auth/authz_manager_external_state.h +++ b/src/mongo/db/auth/authz_manager_external_state.h @@ -233,8 +233,7 @@ namespace mongo { const char* op, const char* ns, const BSONObj& o, - BSONObj* o2, - bool* b) {} + BSONObj* o2) {} protected: diff --git a/src/mongo/db/auth/authz_manager_external_state_local.cpp b/src/mongo/db/auth/authz_manager_external_state_local.cpp index ea843a2d4b0..410d3389986 100644 --- a/src/mongo/db/auth/authz_manager_external_state_local.cpp +++ b/src/mongo/db/auth/authz_manager_external_state_local.cpp @@ -375,18 +375,14 @@ namespace { const char* op, const char* ns, const BSONObj& o, - const BSONObj* o2, - const bool* b): + const BSONObj* o2): _externalState(externalState), _op(op), _ns(ns), _o(o.getOwned()), _isO2Set(o2 ? true : false), - _o2(_isO2Set ? o2->getOwned() : BSONObj()), - - _isBSet(b ? true : false), - _b(_isBSet ? *b : false) { + _o2(_isO2Set ? o2->getOwned() : BSONObj()) { } @@ -404,8 +400,6 @@ namespace { oplogEntryBuilder << "op" << _op << "ns" << _ns << "o" << _o; if (_isO2Set) oplogEntryBuilder << "o2" << _o2; - if (_isBSet) - oplogEntryBuilder << "b" << _b; error() << "Unsupported modification to roles collection in oplog; " "restart this process to reenable user-defined roles; " << status.reason() << "; Oplog entry: " << oplogEntryBuilder.done(); @@ -438,9 +432,6 @@ namespace { const bool _isO2Set; const BSONObj _o2; - - const bool _isBSet; - const bool _b; }; void AuthzManagerExternalStateLocal::logOp( @@ -448,8 +439,7 @@ namespace { const char* op, const char* ns, const BSONObj& o, - BSONObj* o2, - bool* b) { + BSONObj* o2) { if (ns == AuthorizationManager::rolesCollectionNamespace.ns() || ns == AuthorizationManager::adminCommandNamespace.ns()) { @@ -458,8 +448,7 @@ namespace { op, ns, o, - o2, - b)); + o2)); } } diff --git a/src/mongo/db/auth/authz_manager_external_state_local.h b/src/mongo/db/auth/authz_manager_external_state_local.h index 87648b911c0..d5f852ac85d 100644 --- a/src/mongo/db/auth/authz_manager_external_state_local.h +++ b/src/mongo/db/auth/authz_manager_external_state_local.h @@ -69,8 +69,7 @@ namespace mongo { const char* op, const char* ns, const BSONObj& o, - BSONObj* o2, - bool* b); + BSONObj* o2); protected: AuthzManagerExternalStateLocal(); diff --git a/src/mongo/db/auth/authz_manager_external_state_mock.cpp b/src/mongo/db/auth/authz_manager_external_state_mock.cpp index c031cbc52cc..9d9f0cc0955 100644 --- a/src/mongo/db/auth/authz_manager_external_state_mock.cpp +++ b/src/mongo/db/auth/authz_manager_external_state_mock.cpp @@ -159,7 +159,6 @@ namespace { "i", collectionName.ns().c_str(), toInsert, - NULL, NULL); } @@ -200,8 +199,7 @@ namespace { "u", collectionName.ns().c_str(), logObj, - &idQuery, - NULL); + &idQuery); } return Status::OK(); @@ -256,7 +254,6 @@ namespace { "d", collectionName.ns().c_str(), idQuery, - NULL, NULL); } diff --git a/src/mongo/db/catalog/database.cpp b/src/mongo/db/catalog/database.cpp index b556af32321..9252d6417ab 100644 --- a/src/mongo/db/catalog/database.cpp +++ b/src/mongo/db/catalog/database.cpp @@ -53,6 +53,7 @@ #include "mongo/db/index/index_access_method.h" #include "mongo/db/instance.h" #include "mongo/db/introspect.h" +#include "mongo/db/op_observer.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/server_parameters.h" #include "mongo/db/stats/top.h" @@ -129,8 +130,8 @@ namespace mongo { void Database::close(OperationContext* txn ) { // XXX? - Do we need to close database under global lock or just DB-lock is sufficient ? invariant(txn->lockState()->isW()); - - repl::oplogCheckCloseDatabase(txn, this); // oplog caches some things, dirty its caches + // oplog caches some things, dirty its caches + repl::oplogCheckCloseDatabase(txn, this); if ( BackgroundOperation::inProgForDb( _name ) ) { log() << "warning: bg op in prog during close db? " << _name << endl; @@ -271,11 +272,8 @@ namespace mongo { continue; } - string cmdNs = _name + ".$cmd"; - repl::logOp( txn, - "c", - cmdNs.c_str(), - BSON( "drop" << nsToCollectionSubstring( ns ) ) ); + getGlobalEnvironment()->getOpObserver()->onDropCollection( + txn, NamespaceString(ns)); wunit.commit(); } catch (const WriteConflictException& exp) { @@ -624,14 +622,9 @@ namespace mongo { invariant( db->createCollection( txn, ns, collectionOptions, true, createDefaultIndexes ) ); if ( logForReplication ) { - if ( options.getField( "create" ).eoo() ) { - BSONObjBuilder b; - b << "create" << nsToCollectionSubstring( ns ); - b.appendElements( options ); - options = b.obj(); - } - string logNs = nsToDatabase(ns) + ".$cmd"; - repl::logOp(txn, "c", logNs.c_str(), options); + getGlobalEnvironment()->getOpObserver()->onCreateCollection(txn, + NamespaceString(ns), + collectionOptions); } return Status::OK(); diff --git a/src/mongo/db/catalog/index_create.cpp b/src/mongo/db/catalog/index_create.cpp index 82d621260a1..c7a2536c3bc 100644 --- a/src/mongo/db/catalog/index_create.cpp +++ b/src/mongo/db/catalog/index_create.cpp @@ -46,7 +46,6 @@ #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" #include "mongo/db/query/internal_plans.h" -#include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/operation_context.h" #include "mongo/util/log.h" diff --git a/src/mongo/db/cloner.cpp b/src/mongo/db/cloner.cpp index 90f13bb0094..d22663f1f98 100644 --- a/src/mongo/db/cloner.cpp +++ b/src/mongo/db/cloner.cpp @@ -50,11 +50,12 @@ #include "mongo/db/commands/rename_collection.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/dbhelpers.h" +#include "mongo/db/global_environment_experiment.h" #include "mongo/db/index_builder.h" #include "mongo/db/jsobj.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/op_observer.h" #include "mongo/db/repl/isself.h" -#include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/server_parameters.h" #include "mongo/db/storage_options.h" @@ -150,10 +151,10 @@ namespace mongo { verify(collection); if (logForRepl) { - repl::logOp(txn, - "c", - (_dbName + ".$cmd").c_str(), - BSON("create" << to_collection.coll())); + getGlobalEnvironment()->getOpObserver()->onCreateCollection( + txn, + to_collection, + CollectionOptions()); } wunit.commit(); } @@ -232,7 +233,7 @@ namespace mongo { } uassertStatusOK( loc.getStatus() ); if (logForRepl) - repl::logOp(txn, "i", to_collection.ns().c_str(), js); + getGlobalEnvironment()->getOpObserver()->onInsert(txn, to_collection.ns(), js); wunit.commit(); @@ -339,10 +340,9 @@ namespace mongo { collection = db->createCollection( txn, to_collection.ns() ); invariant(collection); if (logForRepl) { - repl::logOp(txn, - "c", - (toDBName + ".$cmd").c_str(), - BSON("create" << to_collection.coll())); + getGlobalEnvironment()->getOpObserver()->onCreateCollection(txn, + to_collection, + CollectionOptions()); } wunit.commit(); } @@ -371,7 +371,7 @@ namespace mongo { const char* createIndexNs = targetSystemIndexesCollectionName.c_str(); for (vector<BSONObj>::const_iterator it = indexesToBuild.begin(); it != indexesToBuild.end(); ++it) { - repl::logOp(txn, "i", createIndexNs, *it); + getGlobalEnvironment()->getOpObserver()->onInsert(txn, createIndexNs, *it); } } wunit.commit(); @@ -650,7 +650,9 @@ namespace mongo { c->deleteDocument(txn, *it, true, true, opts.logForRepl ? &id : NULL); if (opts.logForRepl) - repl::logOp(txn, "d", c->ns().ns().c_str(), id); + getGlobalEnvironment()->getOpObserver()->onDelete(txn, + c->ns().ns(), + id); wunit.commit(); } @@ -661,10 +663,10 @@ namespace mongo { WriteUnitOfWork wunit(txn); indexer.commit(); if (opts.logForRepl) { - repl::logOp(txn, - "i", - c->ns().getSystemIndexesCollection().c_str(), - c->getIndexCatalog()->getDefaultIdIndexSpec()); + getGlobalEnvironment()->getOpObserver()->onCreateIndex( + txn, + c->ns().getSystemIndexesCollection().c_str(), + c->getIndexCatalog()->getDefaultIdIndexSpec()); } wunit.commit(); } diff --git a/src/mongo/db/commands/apply_ops.cpp b/src/mongo/db/commands/apply_ops.cpp index dc8e09c22d6..69641f16c43 100644 --- a/src/mongo/db/commands/apply_ops.cpp +++ b/src/mongo/db/commands/apply_ops.cpp @@ -44,9 +44,11 @@ #include "mongo/db/commands/dbhash.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/dbdirectclient.h" +#include "mongo/db/global_environment_experiment.h" #include "mongo/db/jsobj.h" #include "mongo/db/matcher/matcher.h" #include "mongo/db/operation_context_impl.h" +#include "mongo/db/op_observer.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/util/log.h" @@ -221,7 +223,9 @@ namespace mongo { while (true) { try { WriteUnitOfWork wunit(txn); - repl::logOp(txn, "c", tempNS.c_str(), cmdRewritten); + getGlobalEnvironment()->getOpObserver()->onApplyOps(txn, + tempNS, + cmdRewritten); wunit.commit(); break; } diff --git a/src/mongo/db/commands/clone.cpp b/src/mongo/db/commands/clone.cpp index 63b8ae2679d..00efcfcf5c1 100644 --- a/src/mongo/db/commands/clone.cpp +++ b/src/mongo/db/commands/clone.cpp @@ -45,7 +45,6 @@ #include "mongo/db/instance.h" #include "mongo/db/jsobj.h" #include "mongo/db/namespace_string.h" -#include "mongo/db/repl/oplog.h" #include "mongo/db/operation_context_impl.h" #include "mongo/db/storage_options.h" diff --git a/src/mongo/db/commands/clone_collection.cpp b/src/mongo/db/commands/clone_collection.cpp index afb048a7d4a..952044bc747 100644 --- a/src/mongo/db/commands/clone_collection.cpp +++ b/src/mongo/db/commands/clone_collection.cpp @@ -49,7 +49,6 @@ #include "mongo/db/jsobj.h" #include "mongo/db/namespace_string.h" #include "mongo/db/repl/isself.h" -#include "mongo/db/repl/oplog.h" #include "mongo/db/operation_context_impl.h" #include "mongo/db/storage_options.h" #include "mongo/util/log.h" diff --git a/src/mongo/db/commands/collection_to_capped.cpp b/src/mongo/db/commands/collection_to_capped.cpp index 14e817840fa..9d15d3df794 100644 --- a/src/mongo/db/commands/collection_to_capped.cpp +++ b/src/mongo/db/commands/collection_to_capped.cpp @@ -35,10 +35,11 @@ #include "mongo/db/background.h" #include "mongo/db/client.h" #include "mongo/db/commands.h" +#include "mongo/db/global_environment_experiment.h" #include "mongo/db/index_builder.h" +#include "mongo/db/op_observer.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/query/find.h" -#include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/operation_context_impl.h" @@ -123,8 +124,9 @@ namespace { WriteUnitOfWork wunit(txn); toCollection->insertDocument( txn, obj, true ); - if ( logForReplication ) - repl::logOp(txn, "i", toNs.c_str(), obj); + if ( logForReplication ) { + getGlobalEnvironment()->getOpObserver()->onInsert(txn, toNs, obj); + } wunit.commit(); } } @@ -296,8 +298,12 @@ namespace { if ( !status.isOK() ) return appendCommandStatus( result, status ); - if (!fromRepl) - repl::logOp(txn, "c",(dbname + ".$cmd").c_str(), jsobj); + if (!fromRepl) { + getGlobalEnvironment()->getOpObserver()->onConvertToCapped( + txn, + NamespaceString(longSource), + size); + } wunit.commit(); return true; diff --git a/src/mongo/db/commands/copydb.cpp b/src/mongo/db/commands/copydb.cpp index 29f28e305f0..d043f717ddd 100644 --- a/src/mongo/db/commands/copydb.cpp +++ b/src/mongo/db/commands/copydb.cpp @@ -49,7 +49,6 @@ #include "mongo/db/instance.h" #include "mongo/db/jsobj.h" #include "mongo/db/namespace_string.h" -#include "mongo/db/repl/oplog.h" #include "mongo/db/operation_context_impl.h" #include "mongo/db/storage_options.h" diff --git a/src/mongo/db/commands/copydb_start_commands.cpp b/src/mongo/db/commands/copydb_start_commands.cpp index 56f430a5fb1..18979fa6022 100644 --- a/src/mongo/db/commands/copydb_start_commands.cpp +++ b/src/mongo/db/commands/copydb_start_commands.cpp @@ -50,7 +50,6 @@ #include "mongo/db/instance.h" #include "mongo/db/jsobj.h" #include "mongo/db/namespace_string.h" -#include "mongo/db/repl/oplog.h" #include "mongo/db/operation_context_impl.h" #include "mongo/db/storage_options.h" #include "mongo/util/log.h" diff --git a/src/mongo/db/commands/create_indexes.cpp b/src/mongo/db/commands/create_indexes.cpp index 7da1cc30dd0..eed2eb96a13 100644 --- a/src/mongo/db/commands/create_indexes.cpp +++ b/src/mongo/db/commands/create_indexes.cpp @@ -42,9 +42,10 @@ #include "mongo/db/commands.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" +#include "mongo/db/global_environment_experiment.h" #include "mongo/db/operation_context_impl.h" +#include "mongo/db/op_observer.h" #include "mongo/db/ops/insert.h" -#include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/s/d_state.h" #include "mongo/s/shard_key_pattern.h" @@ -161,8 +162,10 @@ namespace mongo { collection = db->createCollection( txn, ns.ns() ); invariant( collection ); if (!fromRepl) { - repl::logOp(txn, "c", (dbname + ".$cmd").c_str(), - BSON("create" << ns.coll())); + getGlobalEnvironment()->getOpObserver()->onCreateCollection( + txn, + ns, + CollectionOptions()); } wunit.commit(); } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createIndexes", ns.ns()); @@ -271,7 +274,9 @@ namespace mongo { if ( !fromRepl ) { for ( size_t i = 0; i < specs.size(); i++ ) { std::string systemIndexes = ns.getSystemIndexesCollection(); - repl::logOp(txn, "i", systemIndexes.c_str(), specs[i]); + getGlobalEnvironment()->getOpObserver()->onCreateIndex(txn, + systemIndexes, + specs[i]); } } diff --git a/src/mongo/db/commands/drop_indexes.cpp b/src/mongo/db/commands/drop_indexes.cpp index c2c2f05d6c1..e861204c1c4 100644 --- a/src/mongo/db/commands/drop_indexes.cpp +++ b/src/mongo/db/commands/drop_indexes.cpp @@ -40,6 +40,7 @@ #include "mongo/db/commands.h" #include "mongo/db/curop.h" #include "mongo/db/dbdirectclient.h" +#include "mongo/db/global_environment_experiment.h" #include "mongo/db/index_builder.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/catalog/collection.h" @@ -49,7 +50,7 @@ #include "mongo/db/catalog/index_create.h" #include "mongo/db/catalog/index_key_validate.h" #include "mongo/db/concurrency/write_conflict_exception.h" -#include "mongo/db/repl/oplog.h" +#include "mongo/db/op_observer.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/operation_context_impl.h" #include "mongo/util/log.h" @@ -130,7 +131,9 @@ namespace mongo { return false; } if (!fromRepl) { - repl::logOp(txn, "c",(dbname + ".$cmd").c_str(), jsobj); + getGlobalEnvironment()->getOpObserver()->onDropIndex(txn, + dbname + ".$cmd", + jsobj); } wunit.commit(); } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "dropIndexes", dbname); diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp index acd9dbed9b1..2cba4a3960f 100644 --- a/src/mongo/db/commands/find_and_modify.cpp +++ b/src/mongo/db/commands/find_and_modify.cpp @@ -39,12 +39,13 @@ #include "mongo/db/dbhelpers.h" #include "mongo/db/exec/update.h" #include "mongo/db/exec/working_set_common.h" +#include "mongo/db/global_environment_experiment.h" #include "mongo/db/projection.h" +#include "mongo/db/op_observer.h" #include "mongo/db/ops/delete.h" #include "mongo/db/ops/update.h" #include "mongo/db/ops/update_lifecycle_impl.h" #include "mongo/db/query/get_executor.h" -#include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/util/log.h" @@ -430,7 +431,9 @@ namespace mongo { // This is the last thing we do before the WriteUnitOfWork commits (except // for some BSON manipulation). - repl::logOp(txn, "i", collection->ns().ns().c_str(), newDoc); + getGlobalEnvironment()->getOpObserver()->onInsert(txn, + collection->ns().ns(), + newDoc); // Must commit the write and logOp() before doing anything that could throw. wuow.commit(); diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index cd0be7674c7..c3349c814eb 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -44,12 +44,13 @@ #include "mongo/db/commands.h" #include "mongo/db/db.h" #include "mongo/db/dbhelpers.h" +#include "mongo/db/global_environment_experiment.h" #include "mongo/db/instance.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/matcher/matcher.h" +#include "mongo/db/op_observer.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/query_planner.h" -#include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/range_preserver.h" #include "mongo/db/namespace_string.h" @@ -439,7 +440,10 @@ namespace mongo { b.append( "create", nsToCollectionSubstring( _config.tempNamespace )); b.appendElements( options.toBSON() ); string logNs = nsToDatabase( _config.tempNamespace ) + ".$cmd"; - repl::logOp(_txn, "c", logNs.c_str(), b.obj()); + getGlobalEnvironment()->getOpObserver()->onCreateCollection( + _txn, + NamespaceString(_config.tempNamespace), + options); for ( vector<BSONObj>::iterator it = indexesToInsert.begin(); it != indexesToInsert.end(); ++it ) { @@ -453,7 +457,7 @@ namespace mongo { } // Log the createIndex operation. string logNs = nsToDatabase( _config.tempNamespace ) + ".system.indexes"; - repl::logOp(_txn, "i", logNs.c_str(), *it); + getGlobalEnvironment()->getOpObserver()->onInsert(_txn, logNs, *it); } wuow.commit(); } @@ -690,7 +694,7 @@ namespace mongo { BSONObj bo = b.obj(); uassertStatusOK( coll->insertDocument( _txn, bo, true ).getStatus() ); - repl::logOp(_txn, "i", ns.c_str(), bo); + getGlobalEnvironment()->getOpObserver()->onInsert(_txn, ns, bo); wuow.commit(); } diff --git a/src/mongo/db/commands/oplog_note.cpp b/src/mongo/db/commands/oplog_note.cpp index 00b34ead0da..bc41c2a1409 100644 --- a/src/mongo/db/commands/oplog_note.cpp +++ b/src/mongo/db/commands/oplog_note.cpp @@ -32,9 +32,11 @@ #include "mongo/db/auth/action_type.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/auth/resource_pattern.h" +#include "mongo/db/global_environment_experiment.h" #include "mongo/db/commands.h" #include "mongo/db/jsobj.h" #include "mongo/db/operation_context.h" +#include "mongo/db/op_observer.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_coordinator_global.h" @@ -82,7 +84,7 @@ namespace mongo { Lock::GlobalWrite globalWrite(txn->lockState()); WriteUnitOfWork wuow(txn); - repl::logOpComment(txn, dataElement.Obj()); + getGlobalEnvironment()->getOpObserver()->onOpMessage(txn, dataElement.Obj()); wuow.commit(); return true; } diff --git a/src/mongo/db/commands/rename_collection.cpp b/src/mongo/db/commands/rename_collection.cpp index fdbf6ce220e..a9da366eb74 100644 --- a/src/mongo/db/commands/rename_collection.cpp +++ b/src/mongo/db/commands/rename_collection.cpp @@ -38,12 +38,13 @@ #include "mongo/db/commands.h" #include "mongo/db/commands/rename_collection.h" #include "mongo/db/dbhelpers.h" +#include "mongo/db/global_environment_experiment.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/index_builder.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context_impl.h" +#include "mongo/db/op_observer.h" #include "mongo/db/ops/insert.h" -#include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/util/scopeguard.h" @@ -245,7 +246,12 @@ namespace mongo { } if (!fromRepl) { - repl::logOp(txn, "c", (dbname + ".$cmd").c_str(), cmdObj); + getGlobalEnvironment()->getOpObserver()->onRenameCollection( + txn, + NamespaceString(source), + NamespaceString(target), + cmdObj["dropTarget"].trueValue(), + cmdObj["stayTemp"].trueValue()); } wunit.commit(); @@ -344,7 +350,12 @@ namespace mongo { indexer.commit(); if (!fromRepl) { - repl::logOp(txn, "c", (dbname + ".$cmd").c_str(), cmdObj); + getGlobalEnvironment()->getOpObserver()->onRenameCollection( + txn, + NamespaceString(source), + NamespaceString(target), + cmdObj["dropTarget"].trueValue(), + cmdObj["stayTemp"].trueValue()); } wunit.commit(); diff --git a/src/mongo/db/commands/test_commands.cpp b/src/mongo/db/commands/test_commands.cpp index de121a73002..34151c96303 100644 --- a/src/mongo/db/commands/test_commands.cpp +++ b/src/mongo/db/commands/test_commands.cpp @@ -36,10 +36,11 @@ #include "mongo/base/initializer_context.h" #include "mongo/db/client.h" #include "mongo/db/commands.h" +#include "mongo/db/global_environment_experiment.h" #include "mongo/db/index_builder.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/catalog/collection.h" -#include "mongo/db/repl/oplog.h" +#include "mongo/db/op_observer.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/operation_context_impl.h" #include "mongo/util/log.h" @@ -233,7 +234,7 @@ namespace mongo { IndexBuilder::restoreIndexes(txn, indexes); if (!fromRepl) { - repl::logOp(txn, "c", (dbname + ".$cmd").c_str(), cmdObj); + getGlobalEnvironment()->getOpObserver()->onEmptyCapped(txn, collection->ns()); } wuow.commit(); diff --git a/src/mongo/db/commands/write_commands/batch_executor.cpp b/src/mongo/db/commands/write_commands/batch_executor.cpp index 34f4a448021..ae0c16e30f6 100644 --- a/src/mongo/db/commands/write_commands/batch_executor.cpp +++ b/src/mongo/db/commands/write_commands/batch_executor.cpp @@ -39,6 +39,7 @@ #include "mongo/db/catalog/database.h" #include "mongo/db/clientcursor.h" #include "mongo/db/commands.h" +#include "mongo/db/global_environment_experiment.h" #include "mongo/db/instance.h" #include "mongo/db/introspect.h" #include "mongo/db/lasterror.h" @@ -48,6 +49,7 @@ #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/exec/delete.h" #include "mongo/db/exec/update.h" +#include "mongo/db/op_observer.h" #include "mongo/db/ops/delete_request.h" #include "mongo/db/ops/parsed_delete.h" #include "mongo/db/ops/parsed_update.h" @@ -57,6 +59,7 @@ #include "mongo/db/query/plan_executor.h" #include "mongo/db/query/query_knobs.h" #include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/server_parameters.h" #include "mongo/db/stats/counters.h" @@ -1037,10 +1040,10 @@ namespace mongo { request->getTargetingNS()))); return false; } - repl::logOp(txn, - "c", - (database->name() + ".$cmd").c_str(), - BSON("create" << nsToCollectionSubstring(request->getTargetingNS()))); + getGlobalEnvironment()->getOpObserver()->onCreateCollection( + txn, + NamespaceString(request->getTargetingNS()), + CollectionOptions()); wunit.commit(); } return true; @@ -1156,7 +1159,7 @@ namespace mongo { result->setError(toWriteError(status.getStatus())); } else { - repl::logOp( txn, "i", insertNS.c_str(), docToInsert ); + getGlobalEnvironment()->getOpObserver()->onInsert(txn, insertNS, docToInsert); result->getStats().n = 1; wunit.commit(); } diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index 981c927b486..2fbee0bc850 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -71,10 +71,12 @@ #include "mongo/db/log_process_details.h" #include "mongo/db/mongod_options.h" #include "mongo/db/operation_context_impl.h" +#include "mongo/db/op_observer.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/range_deleter_service.h" #include "mongo/db/repair_database.h" #include "mongo/db/repl/network_interface_impl.h" +#include "mongo/db/repl/oplog.h" #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/replication_coordinator_external_state_impl.h" #include "mongo/db/repl/replication_coordinator_global.h" @@ -462,6 +464,7 @@ namespace mongo { } getGlobalEnvironment()->setGlobalStorageEngine(storageGlobalParams.engine); + getGlobalEnvironment()->setOpObserver(std::unique_ptr<OpObserver>(new OpObserver)); const repl::ReplSettings& replSettings = repl::getGlobalReplicationCoordinator()->getSettings(); @@ -801,6 +804,7 @@ MONGO_INITIALIZER_WITH_PREREQUISITES(CreateReplicationManager, ("SetGlobalEnviro new repl::TopologyCoordinatorImpl(Seconds(repl::maxSyncSourceLagSecs)), static_cast<int64_t>(curTimeMillis64())); repl::setGlobalReplicationCoordinator(replCoord); + repl::setOplogCollectionName(); getGlobalEnvironment()->registerKillOpListener(replCoord); return Status::OK(); } diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp index d5e246a43aa..add67a02143 100644 --- a/src/mongo/db/dbcommands.cpp +++ b/src/mongo/db/dbcommands.cpp @@ -58,6 +58,7 @@ #include "mongo/db/db.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/dbhelpers.h" +#include "mongo/db/global_environment_experiment.h" #include "mongo/db/global_environment_d.h" #include "mongo/db/index_builder.h" #include "mongo/db/instance.h" @@ -66,6 +67,7 @@ #include "mongo/db/json.h" #include "mongo/db/lasterror.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/op_observer.h" #include "mongo/db/ops/insert.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/internal_plans.h" @@ -73,7 +75,6 @@ #include "mongo/db/repair_database.h" #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/replication_coordinator_global.h" -#include "mongo/db/repl/oplog.h" #include "mongo/db/storage/storage_engine.h" #include "mongo/db/storage/mmap_v1/dur_stats.h" #include "mongo/db/write_concern.h" @@ -217,7 +218,7 @@ namespace mongo { WriteUnitOfWork wunit(txn); if (!fromRepl) { - repl::logOp(txn, "c", (dbname + ".$cmd").c_str(), cmdObj); + getGlobalEnvironment()->getOpObserver()->onDropDatabase(txn, dbname + ".$cmd"); } wunit.commit(); @@ -527,7 +528,9 @@ namespace mongo { } if ( !fromRepl ) { - repl::logOp(txn, "c",(dbname + ".$cmd").c_str(), cmdObj); + getGlobalEnvironment()->getOpObserver()->onDropCollection( + txn, + NamespaceString(nsToDrop)); } wunit.commit(); } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "drop", nsToDrop); @@ -1166,7 +1169,9 @@ namespace mongo { } if (!fromRepl) { - repl::logOp(txn, "c",(dbname + ".$cmd").c_str(), jsobj); + getGlobalEnvironment()->getOpObserver()->onCollMod(txn, + (dbname + ".$cmd").c_str(), + jsobj); } wunit.commit(); diff --git a/src/mongo/db/dbhelpers.cpp b/src/mongo/db/dbhelpers.cpp index da3adfbc961..92f82960e3e 100644 --- a/src/mongo/db/dbhelpers.cpp +++ b/src/mongo/db/dbhelpers.cpp @@ -41,9 +41,11 @@ #include "mongo/db/catalog/index_create.h" #include "mongo/db/db.h" #include "mongo/db/exec/working_set_common.h" +#include "mongo/db/global_environment_experiment.h" #include "mongo/db/json.h" #include "mongo/db/keypattern.h" #include "mongo/db/index/btree_access_method.h" +#include "mongo/db/op_observer.h" #include "mongo/db/ops/delete.h" #include "mongo/db/ops/update.h" #include "mongo/db/ops/update_lifecycle_impl.h" @@ -53,7 +55,6 @@ #include "mongo/db/query/internal_plans.h" #include "mongo/db/query/query_planner.h" #include "mongo/db/range_arithmetic.h" -#include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/write_concern.h" #include "mongo/db/write_concern_options.h" @@ -451,7 +452,7 @@ namespace mongo { BSONObj deletedId; collection->deleteDocument( txn, rloc, false, false, &deletedId ); // The above throws on failure, and so is not logged - repl::logOp(txn, "d", ns.c_str(), deletedId, 0, 0, fromMigrate); + getGlobalEnvironment()->getOpObserver()->onDelete(txn, ns, deletedId, fromMigrate); wuow.commit(); numDeleted++; } diff --git a/src/mongo/db/exec/delete.cpp b/src/mongo/db/exec/delete.cpp index a2184a87b41..a6d8a967704 100644 --- a/src/mongo/db/exec/delete.cpp +++ b/src/mongo/db/exec/delete.cpp @@ -37,8 +37,8 @@ #include "mongo/db/exec/scoped_timer.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/global_environment_experiment.h" +#include "mongo/db/op_observer.h" #include "mongo/db/query/canonical_query.h" -#include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" @@ -164,9 +164,11 @@ namespace mongo { << ", not logging."; } else { - bool replJustOne = true; - repl::logOp(_txn, "d", _collection->ns().ns().c_str(), deletedDoc, 0, - &replJustOne, _params.fromMigrate); + getGlobalEnvironment()->getOpObserver()->onDelete( + _txn, + _collection->ns().ns(), + deletedDoc, + _params.fromMigrate); } } diff --git a/src/mongo/db/exec/update.cpp b/src/mongo/db/exec/update.cpp index 04d7813cb47..0da9981261f 100644 --- a/src/mongo/db/exec/update.cpp +++ b/src/mongo/db/exec/update.cpp @@ -37,10 +37,10 @@ #include "mongo/db/exec/scoped_timer.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/global_environment_experiment.h" +#include "mongo/db/op_observer.h" #include "mongo/db/ops/update_lifecycle.h" #include "mongo/db/query/explain.h" #include "mongo/db/repl/replication_coordinator_global.h" -#include "mongo/db/repl/oplog.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" @@ -590,13 +590,12 @@ namespace mongo { // Call logOp if requested, and we're not an explain. if (request->shouldCallLogOp() && !logObj.isEmpty() && !request->isExplain()) { BSONObj idQuery = driver->makeOplogEntryQuery(newObj, request->isMulti()); - repl::logOp(_txn, - "u", - request->getNamespaceString().ns().c_str(), - logObj, - &idQuery, - NULL, - request->isFromMigration()); + getGlobalEnvironment()->getOpObserver()->onUpdate( + _txn, + request->getNamespaceString().ns().c_str(), + logObj, + idQuery, + request->isFromMigration()); } invariant(oldObj.snapshotId() == _txn->recoveryUnit()->getSnapshotId()); @@ -743,13 +742,10 @@ namespace mongo { !request->isGod()/*enforceQuota*/); uassertStatusOK(newLoc.getStatus()); if (request->shouldCallLogOp()) { - repl::logOp(_txn, - "i", - request->getNamespaceString().ns().c_str(), - newObj, - NULL, - NULL, - request->isFromMigration()); + getGlobalEnvironment()->getOpObserver()->onInsert(_txn, + request->getNamespaceString().ns(), + newObj, + request->isFromMigration()); } // Technically, we should save/restore state here, but since we are going to return EOF diff --git a/src/mongo/db/global_environment_d.cpp b/src/mongo/db/global_environment_d.cpp index 6f94d2a0f11..420f9c0779f 100644 --- a/src/mongo/db/global_environment_d.cpp +++ b/src/mongo/db/global_environment_d.cpp @@ -35,6 +35,7 @@ #include "mongo/db/client.h" #include "mongo/db/curop.h" #include "mongo/db/operation_context_impl.h" +#include "mongo/db/op_observer.h" #include "mongo/db/storage/storage_engine.h" #include "mongo/db/storage/storage_engine_lock_file.h" #include "mongo/db/storage/storage_engine_metadata.h" @@ -261,4 +262,12 @@ namespace mongo { return new OperationContextImpl(); } + void GlobalEnvironmentMongoD::setOpObserver(std::unique_ptr<OpObserver> opObserver) { + _opObserver.reset(opObserver.get()); + } + + OpObserver* GlobalEnvironmentMongoD::getOpObserver() { + return _opObserver.get(); + } + } // namespace mongo diff --git a/src/mongo/db/global_environment_d.h b/src/mongo/db/global_environment_d.h index d1bb679eee5..6c94098b03d 100644 --- a/src/mongo/db/global_environment_d.h +++ b/src/mongo/db/global_environment_d.h @@ -75,6 +75,9 @@ namespace mongo { OperationContext* newOpCtx(); + void setOpObserver(std::unique_ptr<OpObserver> opObserver); + + OpObserver* getOpObserver(); private: @@ -90,6 +93,9 @@ namespace mongo { // logically owned here, but never deleted by anyone. StorageEngine* _storageEngine; + // logically owned here. + std::unique_ptr<OpObserver> _opObserver; + // All possible storage engines are registered here through MONGO_INIT. FactoryMap _storageFactories; }; diff --git a/src/mongo/db/global_environment_experiment.h b/src/mongo/db/global_environment_experiment.h index b32fcce9f73..3c72a235aff 100644 --- a/src/mongo/db/global_environment_experiment.h +++ b/src/mongo/db/global_environment_experiment.h @@ -35,6 +35,7 @@ namespace mongo { class OperationContext; + class OpObserver; /** * Classes that implement this interface can receive notification on killOp. @@ -157,6 +158,20 @@ namespace mongo { */ virtual OperationContext* newOpCtx() = 0; + // + // Global OpObserver. + // + + /** + * Set the OpObserver. + */ + virtual void setOpObserver(std::unique_ptr<OpObserver> opObserver) = 0; + + /** + * Return the OpObserver instance we're using. + */ + virtual OpObserver* getOpObserver() = 0; + protected: GlobalEnvironmentExperiment() { } }; diff --git a/src/mongo/db/global_environment_noop.cpp b/src/mongo/db/global_environment_noop.cpp index 498df1ff0e1..2170420a00e 100644 --- a/src/mongo/db/global_environment_noop.cpp +++ b/src/mongo/db/global_environment_noop.cpp @@ -31,6 +31,7 @@ #include "mongo/db/global_environment_noop.h" #include "mongo/db/operation_context_noop.h" +#include "mongo/db/op_observer.h" namespace mongo { @@ -84,4 +85,10 @@ namespace mongo { return new OperationContextNoop(); } + void GlobalEnvironmentNoop::setOpObserver(std::unique_ptr<OpObserver> opObserver) { + } + + OpObserver* GlobalEnvironmentNoop::getOpObserver() { + return nullptr; + } } // namespace mongo diff --git a/src/mongo/db/global_environment_noop.h b/src/mongo/db/global_environment_noop.h index 0bacdf8b009..dc946f28b7d 100644 --- a/src/mongo/db/global_environment_noop.h +++ b/src/mongo/db/global_environment_noop.h @@ -58,6 +58,10 @@ namespace mongo { void registerKillOpListener(KillOpListenerInterface* listener); OperationContext* newOpCtx(); + + void setOpObserver(std::unique_ptr<OpObserver> opObserver); + + OpObserver* getOpObserver(); }; } // namespace mongo diff --git a/src/mongo/db/instance.cpp b/src/mongo/db/instance.cpp index 96a9fa496b8..aa1cf74a859 100644 --- a/src/mongo/db/instance.cpp +++ b/src/mongo/db/instance.cpp @@ -67,6 +67,7 @@ #include "mongo/db/catalog/index_create.h" #include "mongo/db/exec/delete.h" #include "mongo/db/exec/update.h" +#include "mongo/db/op_observer.h" #include "mongo/db/ops/delete_request.h" #include "mongo/db/ops/insert.h" #include "mongo/db/ops/parsed_delete.h" @@ -880,15 +881,15 @@ namespace mongo { if ( !collection ) { collection = ctx.db()->createCollection( txn, ns ); verify( collection ); - repl::logOp(txn, - "c", - (ctx.db()->name() + ".$cmd").c_str(), - BSON("create" << nsToCollectionSubstring(ns))); + getGlobalEnvironment()->getOpObserver()->onCreateCollection( + txn, + NamespaceString(ns), + CollectionOptions()); } StatusWith<RecordId> status = collection->insertDocument( txn, js, true ); uassertStatusOK( status.getStatus() ); - repl::logOp(txn, "i", ns, js); + getGlobalEnvironment()->getOpObserver()->onInsert(txn, std::string(ns), js); wunit.commit(); break; } diff --git a/src/mongo/db/op_observer.cpp b/src/mongo/db/op_observer.cpp new file mode 100644 index 00000000000..25aa20b35a7 --- /dev/null +++ b/src/mongo/db/op_observer.cpp @@ -0,0 +1,235 @@ +/** +* Copyright (C) 2008-2014 MongoDB Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +* +* As a special exception, the copyright holders give permission to link the +* code of portions of this program with the OpenSSL library under certain +* conditions as described in each individual source file and distribute +* linked combinations including the program with the OpenSSL library. You +* must comply with the GNU Affero General Public License in all respects for +* all of the code used other than as permitted herein. If you modify file(s) +* with this exception, you may extend this exception to your version of the +* file(s), but you are not obligated to do so. If you do not wish to do so, +* delete this exception statement from your version. If you delete this +* exception statement from all source files in the program, then also delete +* it in the license file. +*/ + +#include "mongo/platform/basic.h" + +#include "mongo/db/op_observer.h" + +#include "mongo/db/auth/authorization_manager_global.h" +#include "mongo/db/catalog/collection_options.h" +#include "mongo/db/commands/dbhash.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/global_environment_experiment.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/s/d_state.h" +#include "mongo/scripting/engine.h" + +namespace mongo { + + void OpObserver::onCreateIndex(OperationContext* txn, + const std::string& ns, + BSONObj indexDoc, + bool fromMigrate) { + if (repl::getGlobalReplicationCoordinator()->isReplEnabled()) { + repl::_logOp(txn, "i", ns.c_str(), indexDoc, nullptr, fromMigrate); + } + + getGlobalAuthorizationManager()->logOp(txn, "i", ns.c_str(), indexDoc, nullptr); + logOpForSharding(txn, "i", ns.c_str(), indexDoc, nullptr, fromMigrate); + logOpForDbHash(txn, ns.c_str()); + } + + void OpObserver::onInsert(OperationContext* txn, + const std::string& ns, + BSONObj doc, + bool fromMigrate) { + if ( repl::getGlobalReplicationCoordinator()->isReplEnabled() ) { + repl::_logOp(txn, "i", ns.c_str(), doc, nullptr, fromMigrate); + } + + getGlobalAuthorizationManager()->logOp(txn, "i", ns.c_str(), doc, nullptr); + logOpForSharding(txn, "i", ns.c_str(), doc, nullptr, fromMigrate); + logOpForDbHash(txn, ns.c_str()); + if ( strstr( ns.c_str(), ".system.js" ) ) { + Scope::storedFuncMod(txn); + } + } + + void OpObserver::onUpdate(OperationContext* txn, + const std::string& ns, + const BSONObj& update, + BSONObj& criteria, + bool fromMigrate) { + if ( repl::getGlobalReplicationCoordinator()->isReplEnabled() ) { + repl::_logOp(txn, "u", ns.c_str(), update, &criteria, fromMigrate); + } + + getGlobalAuthorizationManager()->logOp(txn, "u", ns.c_str(), update, &criteria); + logOpForSharding(txn, "u", ns.c_str(), update, &criteria, fromMigrate); + logOpForDbHash(txn, ns.c_str()); + if ( strstr( ns.c_str(), ".system.js" ) ) { + Scope::storedFuncMod(txn); + } + } + + void OpObserver::onDelete(OperationContext* txn, + const std::string& ns, + const BSONObj& idDoc, + bool fromMigrate) { + + if ( repl::getGlobalReplicationCoordinator()->isReplEnabled() ) { + repl::_logOp(txn, "d", ns.c_str(), idDoc, nullptr, fromMigrate); + } + + getGlobalAuthorizationManager()->logOp(txn, "d", ns.c_str(), idDoc, nullptr); + logOpForSharding(txn, "d", ns.c_str(), idDoc, nullptr, fromMigrate); + logOpForDbHash(txn, ns.c_str()); + if ( strstr( ns.c_str(), ".system.js" ) ) { + Scope::storedFuncMod(txn); + } + } + + void OpObserver::onOpMessage(OperationContext* txn, const BSONObj& msgObj) { + if ( repl::getGlobalReplicationCoordinator()->isReplEnabled() ) { + repl::_logOp(txn, "n", "", msgObj, nullptr, false); + } + } + + void OpObserver::onCreateCollection(OperationContext* txn, + const NamespaceString& collectionName, + const CollectionOptions& options) { + std::string dbName = collectionName.db().toString() + ".$cmd"; + BSONObjBuilder b; + b.append("create", collectionName.coll().toString()); + b.appendElements(options.toBSON()); + BSONObj cmdObj = b.obj(); + + if ( repl::getGlobalReplicationCoordinator()->isReplEnabled() ) { + repl::_logOp(txn, "c", dbName.c_str(), cmdObj, nullptr, false); + } + + getGlobalAuthorizationManager()->logOp(txn, "c", dbName.c_str(), cmdObj, nullptr); + logOpForDbHash(txn, dbName.c_str()); + } + + void OpObserver::onCollMod(OperationContext* txn, + const std::string& dbName, + const BSONObj& collModCmd) { + if ( repl::getGlobalReplicationCoordinator()->isReplEnabled() ) { + repl::_logOp(txn, "c", dbName.c_str(), collModCmd, nullptr, false); + } + + getGlobalAuthorizationManager()->logOp(txn, "c", dbName.c_str(), collModCmd, nullptr); + logOpForDbHash(txn, dbName.c_str()); + } + + void OpObserver::onDropDatabase(OperationContext* txn, + const std::string& dbName) { + BSONObj cmdObj = BSON("dropDatabase" << 1); + + if ( repl::getGlobalReplicationCoordinator()->isReplEnabled() ) { + repl::_logOp(txn, "c", dbName.c_str(), cmdObj, nullptr, false); + } + + getGlobalAuthorizationManager()->logOp(txn, "c", dbName.c_str(), cmdObj, nullptr); + logOpForDbHash(txn, dbName.c_str()); + } + + void OpObserver::onDropCollection(OperationContext* txn, + const NamespaceString& collectionName) { + std::string dbName = collectionName.db().toString() + ".$cmd"; + BSONObj cmdObj = BSON("drop" << collectionName.coll().toString()); + + if ( repl::getGlobalReplicationCoordinator()->isReplEnabled() ) { + repl::_logOp(txn, "c", dbName.c_str(), cmdObj, nullptr, false); + } + + getGlobalAuthorizationManager()->logOp(txn, "c", dbName.c_str(), cmdObj, nullptr); + logOpForDbHash(txn, dbName.c_str()); + } + + void OpObserver::onDropIndex(OperationContext* txn, + const std::string& dbName, + const BSONObj& idxDescriptor) { + if ( repl::getGlobalReplicationCoordinator()->isReplEnabled() ) { + repl::_logOp(txn, "c", dbName.c_str(), idxDescriptor, nullptr, false); + } + + getGlobalAuthorizationManager()->logOp(txn, "c", dbName.c_str(), idxDescriptor, nullptr); + logOpForDbHash(txn, dbName.c_str()); + } + + void OpObserver::onRenameCollection(OperationContext* txn, + const NamespaceString& fromCollection, + const NamespaceString& toCollection, + bool dropTarget, + bool stayTemp) { + std::string dbName = fromCollection.db().toString() + ".$cmd"; + BSONObj cmdObj = BSON("renameCollection" << fromCollection << + "to" << toCollection << + "stayTemp" << stayTemp << + "dropTarget" << dropTarget); + + if ( repl::getGlobalReplicationCoordinator()->isReplEnabled() ) { + repl::_logOp(txn, "c", dbName.c_str(), cmdObj, nullptr, false); + } + + getGlobalAuthorizationManager()->logOp(txn, "c", dbName.c_str(), cmdObj, nullptr); + logOpForDbHash(txn, dbName.c_str()); + } + + void OpObserver::onApplyOps(OperationContext* txn, + const std::string& dbName, + const BSONObj& applyOpCmd) { + if ( repl::getGlobalReplicationCoordinator()->isReplEnabled() ) { + repl::_logOp(txn, "c", dbName.c_str(), applyOpCmd, nullptr, false); + } + + getGlobalAuthorizationManager()->logOp(txn, "c", dbName.c_str(), applyOpCmd, nullptr); + logOpForDbHash(txn, dbName.c_str()); + } + + void OpObserver::onConvertToCapped(OperationContext* txn, + const NamespaceString& collectionName, + double size) { + std::string dbName = collectionName.db().toString() + ".$cmd"; + BSONObj cmdObj = BSON("convertToCapped" << collectionName.coll() << "size" << size); + + if ( repl::getGlobalReplicationCoordinator()->isReplEnabled() ) { + repl::_logOp(txn, "c", dbName.c_str(), cmdObj, nullptr, false); + } + + getGlobalAuthorizationManager()->logOp(txn, "c", dbName.c_str(), cmdObj, nullptr); + logOpForDbHash(txn, dbName.c_str()); + } + + void OpObserver::onEmptyCapped(OperationContext* txn, const NamespaceString& collectionName) { + std::string dbName = collectionName.db().toString() + ".$cmd"; + BSONObj cmdObj = BSON("emptycapped" << collectionName.coll()); + + if ( repl::getGlobalReplicationCoordinator()->isReplEnabled() ) { + repl::_logOp(txn, "c", dbName.c_str(), cmdObj, nullptr, false); + } + + getGlobalAuthorizationManager()->logOp(txn, "c", dbName.c_str(), cmdObj, nullptr); + logOpForDbHash(txn, dbName.c_str()); + } + +} // namespace mongo diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h new file mode 100644 index 00000000000..ebc296acf13 --- /dev/null +++ b/src/mongo/db/op_observer.h @@ -0,0 +1,88 @@ +/** + * Copyright 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <string> + +#include "mongo/base/disallow_copying.h" + +namespace mongo { + class BSONObj; + struct CollectionOptions; + class NamespaceString; + class OperationContext; + + class OpObserver { + MONGO_DISALLOW_COPYING(OpObserver); + + public: + OpObserver() {} + ~OpObserver() {} + void onCreateIndex(OperationContext* txn, + const std::string& ns, + BSONObj indexDoc, + bool fromMigrate = false); + void onInsert(OperationContext* txn, + const std::string& ns, + BSONObj doc, + bool fromMigrate = false); + void onUpdate(OperationContext* txn, + const std::string& ns, + const BSONObj& update, + BSONObj& criteria, + bool fromMigrate); + void onDelete(OperationContext* txn, + const std::string& ns, + const BSONObj& idDoc, + bool fromMigrate = false); + void onOpMessage(OperationContext* txn, const BSONObj& msgObj); + void onCreateCollection(OperationContext* txn, + const NamespaceString& collectionName, + const CollectionOptions& options); + void onCollMod(OperationContext* txn, const std::string& dbName, const BSONObj& collModCmd); + void onDropDatabase(OperationContext* txn, const std::string& dbName); + void onDropCollection(OperationContext* txn, const NamespaceString& collectionName); + void onDropIndex(OperationContext* txn, + const std::string& dbName, + const BSONObj& idxDescriptor); + void onRenameCollection(OperationContext* txn, + const NamespaceString& fromCollection, + const NamespaceString& toCollection, + bool dropTarget, + bool stayTemp); + void onApplyOps(OperationContext* txn, + const std::string& dbName, + const BSONObj& applyOpCmd); + void onEmptyCapped(OperationContext* txn, const NamespaceString& collectionName); + void onConvertToCapped(OperationContext* txn, + const NamespaceString& collectionName, + double size); + }; + +} // namespace mongo diff --git a/src/mongo/db/ops/update.cpp b/src/mongo/db/ops/update.cpp index 9aeb9946a45..b92aa821bb3 100644 --- a/src/mongo/db/ops/update.cpp +++ b/src/mongo/db/ops/update.cpp @@ -40,12 +40,13 @@ #include "mongo/db/clientcursor.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/exec/update.h" +#include "mongo/db/global_environment_experiment.h" #include "mongo/db/operation_context_impl.h" +#include "mongo/db/op_observer.h" #include "mongo/db/ops/update_driver.h" #include "mongo/db/ops/update_lifecycle.h" #include "mongo/db/query/explain.h" #include "mongo/db/query/get_executor.h" -#include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/update_index_data.h" #include "mongo/util/log.h" @@ -90,10 +91,10 @@ namespace mongo { invariant(collection); if (!request.isFromReplication()) { - repl::logOp(txn, - "c", - (db->name() + ".$cmd").c_str(), - BSON("create" << (nsString.coll()))); + getGlobalEnvironment()->getOpObserver()->onCreateCollection( + txn, + NamespaceString(nsString), + CollectionOptions()); } wuow.commit(); } diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 81a1a373264..ad2833df368 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -251,7 +251,7 @@ namespace { _replCoord->signalUpstreamUpdater(); } - _syncSourceReader.tailingQueryGTE(rsoplog, lastOpTimeFetched); + _syncSourceReader.tailingQueryGTE(rsOplogName.c_str(), lastOpTimeFetched); // if target cut connections between connecting and querying (for // example, because it stepped down) we might not have a cursor @@ -396,7 +396,7 @@ namespace { if (!r.more()) { try { - BSONObj theirLastOp = r.getLastOp(rsoplog); + BSONObj theirLastOp = r.getLastOp(rsOplogName.c_str()); if (theirLastOp.isEmpty()) { error() << "empty query result from " << hn << " oplog"; sleepsecs(2); @@ -497,7 +497,7 @@ namespace { try { ScopedTransaction transaction(txn, MODE_IX); Lock::DBLock lk(txn->lockState(), "local", MODE_X); - bool success = Helpers::getLast(txn, rsoplog, oplogEntry); + bool success = Helpers::getLast(txn, rsOplogName.c_str(), oplogEntry); if (!success) { // This can happen when we are to do an initial sync. lastHash will be set // after the initial sync is complete. @@ -505,18 +505,18 @@ namespace { } } catch (const DBException& ex) { - severe() << "Problem reading " << rsoplog << ": " << ex.toStatus(); + severe() << "Problem reading " << rsOplogName << ": " << ex.toStatus(); fassertFailed(18904); } BSONElement hashElement = oplogEntry[hashFieldName]; if (hashElement.eoo()) { - severe() << "Most recent entry in " << rsoplog << " missing \"" << hashFieldName << + severe() << "Most recent entry in " << rsOplogName << " missing \"" << hashFieldName << "\" field"; fassertFailed(18902); } if (hashElement.type() != NumberLong) { severe() << "Expected type of \"" << hashFieldName << "\" in most recent " << - rsoplog << " entry to have type NumberLong, but found " << + rsOplogName << " entry to have type NumberLong, but found " << typeName(hashElement.type()); fassertFailed(18903); } diff --git a/src/mongo/db/repl/initial_sync.cpp b/src/mongo/db/repl/initial_sync.cpp index 2f976f5ce5d..7e48453ae4e 100644 --- a/src/mongo/db/repl/initial_sync.cpp +++ b/src/mongo/db/repl/initial_sync.cpp @@ -33,7 +33,6 @@ #include "mongo/db/repl/initial_sync.h" #include "mongo/db/operation_context_impl.h" -#include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replset_commands.h" #include "mongo/util/log.h" diff --git a/src/mongo/db/repl/master_slave.cpp b/src/mongo/db/repl/master_slave.cpp index e2873a7ea86..2782061bce3 100644 --- a/src/mongo/db/repl/master_slave.cpp +++ b/src/mongo/db/repl/master_slave.cpp @@ -55,6 +55,8 @@ #include "mongo/db/commands.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/dbhelpers.h" +#include "mongo/db/global_environment_experiment.h" +#include "mongo/db/op_observer.h" #include "mongo/db/ops/update.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/repl/handshake_args.h" @@ -1281,7 +1283,7 @@ namespace repl { try { WriteUnitOfWork wuow(&txn); - logKeepalive(&txn); + getGlobalEnvironment()->getOpObserver()->onOpMessage(&txn, BSONObj()); wuow.commit(); } catch (...) { @@ -1326,8 +1328,6 @@ namespace repl { void startMasterSlave(OperationContext* txn) { - oldRepl(); - const ReplSettings& replSettings = getGlobalReplicationCoordinator()->getSettings(); if( !replSettings.slave && !replSettings.master ) return; diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index abda0997a35..76ba3f7ec20 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -53,6 +53,7 @@ #include "mongo/db/global_optime.h" #include "mongo/db/index_builder.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/op_observer.h" #include "mongo/db/ops/update.h" #include "mongo/db/ops/update_lifecycle_impl.h" #include "mongo/db/ops/delete.h" @@ -79,18 +80,22 @@ namespace mongo { using std::stringstream; namespace repl { + std::string rsOplogName = "local.oplog.rs"; + std::string masterSlaveOplogName = "local.oplog.$main"; + int OPLOG_VERSION = 2; namespace { // cached copies of these...so don't rename them, drop them, etc.!!! - Database* localDB = NULL; - Collection* localOplogMainCollection = 0; - Collection* localOplogRSCollection = 0; + Database* _localDB = nullptr; + Collection* _localOplogCollection = nullptr; // Synchronizes the section where a new OpTime is generated and when it actually // appears in the oplog. mongo::mutex newOpMutex("oplogNewOp"); boost::condition newOptimeNotifier; + static std::string _oplogCollectionName; + // so we can fail the same way void checkOplogInsert( StatusWith<RecordId> result ) { massert( 17322, @@ -98,7 +103,6 @@ namespace { result.isOK() ); } - /** * Allocates an optime for a new entry in the oplog, and updates the replication coordinator to * reflect that new optime. Returns the new optime and the correct value of the "h" field for @@ -189,6 +193,18 @@ namespace { BSONObj _oField; }; +} // namespace + + void setOplogCollectionName() { + if (getGlobalReplicationCoordinator()->getReplicationMode() == + ReplicationCoordinator::modeReplSet) { + _oplogCollectionName = rsOplogName; + } + else { + _oplogCollectionName = masterSlaveOplogName; + } + } + /* we write to local.oplog.rs: { ts : ..., h: ..., v: ..., op: ..., etc } ts: an OpTime timestamp @@ -208,40 +224,41 @@ namespace { */ - void _logOpRS(OperationContext* txn, - const char *opstr, - const char *ns, - const char *logNS, - const BSONObj& obj, - BSONObj *o2, - bool *bb, - bool fromMigrate ) { + void _logOp(OperationContext* txn, + const char *opstr, + const char *ns, + const BSONObj& obj, + BSONObj *o2, + bool fromMigrate) { if ( strncmp(ns, "local.", 6) == 0 ) { return; } Lock::DBLock lk(txn->lockState(), "local", MODE_IX); - Lock::OplogIntentWriteLock oplogLk(txn->lockState()); - DEV verify( logNS == 0 ); // check this was never a master/slave master + ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); - if ( localOplogRSCollection == 0 ) { - Client::Context ctx(txn, rsoplog); - localDB = ctx.db(); - invariant( localDB ); - localOplogRSCollection = localDB->getCollection( rsoplog ); - massert(13347, "local.oplog.rs missing. did you drop it? if so restart server", localOplogRSCollection); + if (ns[0] && replCoord->getReplicationMode() == ReplicationCoordinator::modeReplSet && + !replCoord->canAcceptWritesForDatabase(nsToDatabaseSubstring(ns))) { + severe() << "logOp() but can't accept write to collection " << ns; + fassertFailed(17405); } - - ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); - if (ns[0] && !replCoord->canAcceptWritesForDatabase(nsToDatabaseSubstring(ns))) { - severe() << "logOp() but can't accept write to collection " << ns; - fassertFailed(17405); + Lock::CollectionLock lk2(txn->lockState(), _oplogCollectionName, MODE_IX); + + + if (_localOplogCollection == nullptr) { + Client::Context ctx(txn, _oplogCollectionName); + _localDB = ctx.db(); + invariant(_localDB); + _localOplogCollection = _localDB->getCollection(_oplogCollectionName); + massert(13347, + "the oplog collection " + _oplogCollectionName + + " missing. did you drop it? if so, restart the server", + _localOplogCollection); } - oplogLk.serializeIfNeeded(); std::pair<OpTime, long long> slot = getNextOpTime(txn, - localOplogRSCollection, + _localOplogCollection, ns, replCoord, opstr); @@ -258,129 +275,16 @@ namespace { b.append("ns", ns); if (fromMigrate) b.appendBool("fromMigrate", true); - if ( bb ) - b.appendBool("b", *bb); if ( o2 ) b.append("o2", *o2); BSONObj partial = b.done(); OplogDocWriter writer( partial, obj ); - checkOplogInsert( localOplogRSCollection->insertDocument( txn, &writer, false ) ); + checkOplogInsert( _localOplogCollection->insertDocument( txn, &writer, false ) ); txn->getClient()->setLastOp( slot.first ); } - void _logOpOld(OperationContext* txn, - const char *opstr, - const char *ns, - const char *logNS, - const BSONObj& obj, - BSONObj *o2, - bool *bb, - bool fromMigrate ) { - - - if ( strncmp(ns, "local.", 6) == 0 ) { - return; - } - - Lock::DBLock lk(txn->lockState(), "local", MODE_IX); - - if( logNS == 0 ) { - logNS = "local.oplog.$main"; - } - - Lock::CollectionLock lk2(txn->lockState(), logNS, MODE_IX); - - if (localOplogMainCollection == 0) { - Client::Context ctx(txn, logNS); - localDB = ctx.db(); - invariant(localDB); - localOplogMainCollection = localDB->getCollection(logNS); - invariant(localOplogMainCollection); - } - - ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); - std::pair<OpTime,long long> slot = getNextOpTime(txn, - localOplogMainCollection, - ns, - replCoord, - opstr); - - /* we jump through a bunch of hoops here to avoid copying the obj buffer twice -- - instead we do a single copy to the destination position in the memory mapped file. - */ - - BSONObjBuilder b(256); - b.appendTimestamp("ts", slot.first.asDate()); - b.append("op", opstr); - b.append("ns", ns); - if (fromMigrate) - b.appendBool("fromMigrate", true); - if ( bb ) - b.appendBool("b", *bb); - if ( o2 ) - b.append("o2", *o2); - BSONObj partial = b.done(); // partial is everything except the o:... part. - - OplogDocWriter writer( partial, obj ); - checkOplogInsert( localOplogMainCollection->insertDocument( txn, &writer, false ) ); - - txn->getClient()->setLastOp(slot.first); - } - - void (*_logOp)(OperationContext* txn, - const char *opstr, - const char *ns, - const char *logNS, - const BSONObj& obj, - BSONObj *o2, - bool *bb, - bool fromMigrate ) = _logOpRS; -} // namespace - - void oldRepl() { _logOp = _logOpOld; } - - void logKeepalive(OperationContext* txn) { - _logOp(txn, "n", "", 0, BSONObj(), 0, 0, false); - } - void logOpComment(OperationContext* txn, const BSONObj& obj) { - _logOp(txn, "n", "", 0, obj, 0, 0, false); - } - void logOpInitiate(OperationContext* txn, const BSONObj& obj) { - _logOpRS(txn, "n", "", 0, obj, 0, 0, false); - } - - /*@ @param opstr: - c userCreateNS - i insert - n no-op / keepalive - d delete / remove - u update - */ - void logOp(OperationContext* txn, - const char* opstr, - const char* ns, - const BSONObj& obj, - BSONObj* patt, - bool* b, - bool fromMigrate) { - - if ( getGlobalReplicationCoordinator()->isReplEnabled() ) { - _logOp(txn, opstr, ns, 0, obj, patt, b, fromMigrate); - } - - // - // rollback-safe logOp listeners - // - getGlobalAuthorizationManager()->logOp(txn, opstr, ns, obj, patt, b); - logOpForSharding(txn, opstr, ns, obj, patt, fromMigrate); - logOpForDbHash(txn, ns); - if ( strstr( ns, ".system.js" ) ) { - Scope::storedFuncMod(txn); - } - } - OpTime writeOpsToOplog(OperationContext* txn, const std::deque<BSONObj>& ops) { ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); OpTime lastOptime = replCoord->getMyLastOptime(); @@ -391,18 +295,18 @@ namespace { ScopedTransaction transaction(txn, MODE_IX); Lock::DBLock lk(txn->lockState(), "local", MODE_X); - if ( localOplogRSCollection == 0 ) { - Client::Context ctx(txn, rsoplog); + if ( _localOplogCollection == 0 ) { + Client::Context ctx(txn, rsOplogName); - localDB = ctx.db(); - verify( localDB ); - localOplogRSCollection = localDB->getCollection(rsoplog); + _localDB = ctx.db(); + verify( _localDB ); + _localOplogCollection = _localDB->getCollection(rsOplogName); massert(13389, "local.oplog.rs missing. did you drop it? if so restart server", - localOplogRSCollection); + _localOplogCollection); } - Client::Context ctx(txn, rsoplog, localDB); + Client::Context ctx(txn, rsOplogName, _localDB); WriteUnitOfWork wunit(txn); for (std::deque<BSONObj>::const_iterator it = ops.begin(); @@ -411,7 +315,7 @@ namespace { const BSONObj& op = *it; const OpTime ts = op["ts"]._opTime(); - checkOplogInsert(localOplogRSCollection->insertDocument(txn, op, false)); + checkOplogInsert(_localOplogCollection->insertDocument(txn, op, false)); if (!(lastOptime < ts)) { severe() << "replication oplog stream went back in time. " @@ -445,15 +349,11 @@ namespace { ScopedTransaction transaction(txn, MODE_X); Lock::GlobalWrite lk(txn->lockState()); - const char * ns = "local.oplog.$main"; - const ReplSettings& replSettings = getGlobalReplicationCoordinator()->getSettings(); bool rs = !replSettings.replSet.empty(); - if( rs ) - ns = rsoplog; - Client::Context ctx(txn, ns); - Collection* collection = ctx.db()->getCollection( ns ); + Client::Context ctx(txn, _oplogCollectionName); + Collection* collection = ctx.db()->getCollection( _oplogCollectionName ); if ( collection ) { @@ -472,7 +372,7 @@ namespace { } if ( !rs ) - initOpTimeFromOplog(txn, ns); + initOpTimeFromOplog(txn, _oplogCollectionName); return; } @@ -512,9 +412,9 @@ namespace { options.autoIndexId = CollectionOptions::NO; WriteUnitOfWork uow( txn ); - invariant(ctx.db()->createCollection(txn, ns, options)); + invariant(ctx.db()->createCollection(txn, _oplogCollectionName, options)); if( !rs ) - logOp(txn, "n", "", BSONObj() ); + getGlobalEnvironment()->getOpObserver()->onOpMessage(txn, BSONObj()); uow.commit(); /* sync here so we don't get any surprising lag later when we try to sync */ @@ -570,7 +470,7 @@ namespace { } } Collection* collection = db->getCollection( ns ); - IndexCatalog* indexCatalog = collection == NULL ? NULL : collection->getIndexCatalog(); + IndexCatalog* indexCatalog = collection == nullptr ? nullptr : collection->getIndexCatalog(); // operation type -- see logOp() comments for types const char *opType = fieldOp.valuestrsafe(); @@ -758,8 +658,7 @@ namespace { opType, ns, o, - fieldO2.isABSONObj() ? &o2 : NULL, - !fieldB.eoo() ? &valueB : NULL ); + fieldO2.isABSONObj() ? &o2 : NULL); wuow.commit(); return Status::OK(); @@ -797,9 +696,8 @@ namespace { void oplogCheckCloseDatabase(OperationContext* txn, Database* db) { invariant(txn->lockState()->isW()); - localDB = NULL; - localOplogMainCollection = NULL; - localOplogRSCollection = NULL; + _localDB = nullptr; + _localOplogCollection = nullptr; } } // namespace repl diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index c84dfefb3d8..f2705bb2673 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -33,18 +33,22 @@ #include <string> #include "mongo/base/status.h" +#include "mongo/base/disallow_copying.h" +#include "mongo/util/concurrency/mutex.h" namespace mongo { class BSONObj; + class Collection; + struct CollectionOptions; class Database; + class NamespaceString; class OperationContext; class OpTime; + class RecordId; namespace repl { - - // Redefines the function for logOp() to master/slave. - void oldRepl(); // master-slave - + class ReplicationCoordinator; + // Create a new capped collection for the oplog if it doesn't yet exist. // This will be either local.oplog.rs (replica sets) or local.oplog.$main (master/slave) // If the collection already exists, set the 'last' OpTime if master/slave (side effect!) @@ -56,44 +60,30 @@ namespace repl { // Returns the optime for the last op inserted. OpTime writeOpsToOplog(OperationContext* txn, const std::deque<BSONObj>& ops); - const char rsoplog[] = "local.oplog.rs"; - static const int OPLOG_VERSION = 2; - - /** Log an operation to the local oplog - - @param opstr - "i" insert - "u" update - "d" delete - "c" db cmd - "n" no-op - "db" declares presence of a database (ns is set to the db name + '.') + extern std::string rsOplogName; + extern std::string masterSlaveOplogName; - For 'u' records, 'obj' captures the mutation made to the object but not - the object itself. In that case, we provide also 'fullObj' which is the - image of the object _after_ the mutation logged here was applied. + extern int OPLOG_VERSION; - See _logOp() in oplog.cpp for more details. - */ - void logOp( OperationContext* txn, + /** Log an operation to the local oplog + * + * @param opstr + * "i" insert + * "u" update + * "d" delete + * "c" db cmd + * "n" no-op + * "db" declares presence of a database (ns is set to the db name + '.') + * + * For 'u' records, 'obj' captures the mutation made to the object but not + * the object itself. 'o2' captures the the criteria for the object that will be modified. + */ + void _logOp(OperationContext* txn, const char *opstr, const char *ns, const BSONObj& obj, - BSONObj *patt = NULL, - bool *b = NULL, - bool fromMigrate = false); - - // Log an empty no-op operation to the local oplog - void logKeepalive(OperationContext* txn); - - /** puts obj in the oplog as a comment (a no-op). Just for diags. - convention is - { msg : "text", ... } - */ - void logOpComment(OperationContext* txn, const BSONObj& obj); - - // Same as logOpComment, except only works for replsets - void logOpInitiate(OperationContext* txn, const BSONObj& obj); + BSONObj *o2, + bool fromMigrate); // Flush out the cached pointers to the local database and oplog. // Used by the closeDatabase command to ensure we don't cache closed things. @@ -126,5 +116,10 @@ namespace repl { * Sets the global OpTime to be 'newTime'. */ void setNewOptime(const OpTime& newTime); + + /** + * Detects the current replication mode and sets the "_oplogCollectionName" accordingly. + */ + void setOplogCollectionName(); } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp index 90c8e449c08..9746d975fda 100644 --- a/src/mongo/db/repl/oplogreader.cpp +++ b/src/mongo/db/repl/oplogreader.cpp @@ -188,7 +188,7 @@ namespace repl { } // Read the first (oldest) op and confirm that it's not newer than our last // fetched op. Otherwise, we have fallen off the back of that source's oplog. - BSONObj remoteOldestOp(findOne(rsoplog, Query())); + BSONObj remoteOldestOp(findOne(rsOplogName.c_str(), Query())); BSONElement tsElem(remoteOldestOp["ts"]); if (tsElem.type() != Timestamp) { // This member's got a bad op in its oplog. diff --git a/src/mongo/db/repl/oplogreader.h b/src/mongo/db/repl/oplogreader.h index 527452fd5fc..d8639ee8530 100644 --- a/src/mongo/db/repl/oplogreader.h +++ b/src/mongo/db/repl/oplogreader.h @@ -77,8 +77,8 @@ namespace repl { BSONObj findOne(const char *ns, const Query& q) { return conn()->findOne(ns, q, 0, QueryOption_SlaveOk); } - BSONObj getLastOp(const char *ns) { - return findOne(ns, Query().sort(reverseNaturalObj)); + BSONObj getLastOp(const std::string& ns) { + return findOne(ns.c_str(), Query().sort(reverseNaturalObj)); } /* SO_TIMEOUT (send/recv time out) for our DBClientConnections */ diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index c1e3ac4b10d..9d5ba123213 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -45,6 +45,7 @@ #include "mongo/db/global_environment_experiment.h" #include "mongo/db/jsobj.h" #include "mongo/db/operation_context_impl.h" +#include "mongo/db/op_observer.h" #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/isself.h" #include "mongo/db/repl/master_slave.h" @@ -115,7 +116,7 @@ namespace { Lock::GlobalWrite globalWrite(txn->lockState()); WriteUnitOfWork wuow(txn); - logOpInitiate(txn, BSON("msg" << "initiating set")); + getGlobalEnvironment()->getOpObserver()->onOpMessage(txn, BSON("msg" << "initiating set")); wuow.commit(); } @@ -193,23 +194,23 @@ namespace { try { BSONObj oplogEntry; - if (!Helpers::getLast(txn, rsoplog, oplogEntry)) { + if (!Helpers::getLast(txn, rsOplogName.c_str(), oplogEntry)) { return StatusWith<OpTime>( ErrorCodes::NoMatchingDocument, - str::stream() << "Did not find any entries in " << rsoplog); + str::stream() << "Did not find any entries in " << rsOplogName); } BSONElement tsElement = oplogEntry[tsFieldName]; if (tsElement.eoo()) { return StatusWith<OpTime>( ErrorCodes::NoSuchKey, - str::stream() << "Most recent entry in " << rsoplog << " missing \"" << + str::stream() << "Most recent entry in " << rsOplogName << " missing \"" << tsFieldName << "\" field"); } if (tsElement.type() != Timestamp) { return StatusWith<OpTime>( ErrorCodes::TypeMismatch, str::stream() << "Expected type of \"" << tsFieldName << - "\" in most recent " << rsoplog << + "\" in most recent " << rsOplogName << " entry to have type Timestamp, but found " << typeName(tsElement.type())); } return StatusWith<OpTime>(tsElement._opTime()); diff --git a/src/mongo/db/repl/replication_info.cpp b/src/mongo/db/repl/replication_info.cpp index cf7ff0faabb..a836ef84cc6 100644 --- a/src/mongo/db/repl/replication_info.cpp +++ b/src/mongo/db/repl/replication_info.cpp @@ -178,13 +178,13 @@ namespace repl { BSONObjBuilder result; result.append("latestOptime", replCoord->getMyLastOptime()); - const char* oplogNS = + const std::string& oplogNS = replCoord->getReplicationMode() == ReplicationCoordinator::modeReplSet ? - rsoplog : "local.oplog.$main"; + rsOplogName : masterSlaveOplogName; BSONObj o; uassert(17347, "Problem reading earliest entry from oplog", - Helpers::getSingleton(txn, oplogNS, o)); + Helpers::getSingleton(txn, oplogNS.c_str(), o)); result.append("earliestOptime", o["ts"]._opTime()); return result.obj(); } diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp index a67b8ca6480..f3ad62ead42 100644 --- a/src/mongo/db/repl/replset_commands.cpp +++ b/src/mongo/db/repl/replset_commands.cpp @@ -41,6 +41,7 @@ #include "mongo/db/commands.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/global_environment_experiment.h" +#include "mongo/db/op_observer.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/repl_set_heartbeat_args.h" #include "mongo/db/repl/repl_set_heartbeat_response.h" @@ -318,8 +319,10 @@ namespace { WriteUnitOfWork wuow(txn); if (status.isOK() && !parsedArgs.force) { - logOpInitiate(txn, BSON("msg" << "Reconfig set" << - "version" << parsedArgs.newConfigObj["version"])); + getGlobalEnvironment()->getOpObserver()->onOpMessage( + txn, + BSON("msg" << "Reconfig set" << + "version" << parsedArgs.newConfigObj["version"])); } wuow.commit(); @@ -553,7 +556,7 @@ namespace { // we have a local database. return true if oplog isn't empty BSONObj o; - if (Helpers::getSingleton(txn, repl::rsoplog, o)) { + if (Helpers::getSingleton(txn, repl::rsOplogName.c_str(), o)) { return true; } } diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp index 47b30ee5543..1d3f17b1536 100644 --- a/src/mongo/db/repl/rs_initialsync.cpp +++ b/src/mongo/db/repl/rs_initialsync.cpp @@ -39,8 +39,10 @@ #include "mongo/db/client.h" #include "mongo/db/cloner.h" #include "mongo/db/dbhelpers.h" +#include "mongo/db/global_environment_experiment.h" #include "mongo/db/operation_context_impl.h" #include "mongo/db/operation_context_impl.h" +#include "mongo/db/op_observer.h" #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/initial_sync.h" #include "mongo/db/repl/minvalid.h" @@ -70,7 +72,7 @@ namespace { BackgroundSync* bgsync) { AutoGetDb autoDb(txn, "local", MODE_X); massert(28585, "no local database found", autoDb.getDb()); - invariant(txn->lockState()->isCollectionLockedForMode(rsoplog, MODE_X)); + invariant(txn->lockState()->isCollectionLockedForMode(rsOplogName, MODE_X)); // Note: the following order is important. // The bgsync thread uses an empty optime as a sentinel to know to wait // for initial sync; thus, we must @@ -83,7 +85,7 @@ namespace { replCoord->clearSyncSourceBlacklist(); // Truncate the oplog in case there was a prior initial sync that failed. - Collection* collection = autoDb.getDb()->getCollection(rsoplog); + Collection* collection = autoDb.getDb()->getCollection(rsOplogName); fassert(28565, collection); WriteUnitOfWork wunit(txn); Status status = collection->truncate(txn); @@ -153,7 +155,7 @@ namespace { // A common problem is that TCP keepalives are set too infrequent, and thus // our connection here is terminated by a firewall due to inactivity. // Solution is to increase the TCP keepalive frequency. - lastOp = r->getLastOp(rsoplog); + lastOp = r->getLastOp(rsOplogName); } catch ( SocketException & ) { HostAndPort host = r->getHost(); log() << "connection lost to " << host.toString() << @@ -163,7 +165,7 @@ namespace { throw; } // retry - lastOp = r->getLastOp(rsoplog); + lastOp = r->getLastOp(rsOplogName); } if (lastOp.isEmpty()) { @@ -292,7 +294,7 @@ namespace { InitialSync init(bgsync); init.setHostname(r.getHost().toString()); - BSONObj lastOp = r.getLastOp(rsoplog); + BSONObj lastOp = r.getLastOp(rsOplogName); if ( lastOp.isEmpty() ) { std::string msg = "initial sync couldn't read remote oplog"; log() << msg; diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index d1444ab6ac4..aba19c1e1f8 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -259,12 +259,12 @@ namespace { void syncRollbackFindCommonPoint(OperationContext* txn, DBClientConnection* them, FixUpInfo& fixUpInfo) { - Client::Context ctx(txn, rsoplog); + Client::Context ctx(txn, rsOplogName); boost::scoped_ptr<PlanExecutor> exec( InternalPlanner::collectionScan(txn, - rsoplog, - ctx.db()->getCollection(rsoplog), + rsOplogName, + ctx.db()->getCollection(rsOplogName), InternalPlanner::BACKWARD)); BSONObj ourObj; @@ -277,10 +277,10 @@ namespace { const Query query = Query().sort(reverseNaturalObj); const BSONObj fields = BSON("ts" << 1 << "h" << 1); - //auto_ptr<DBClientCursor> u = us->query(rsoplog, query, 0, 0, &fields, 0, 0); + //auto_ptr<DBClientCursor> u = us->query(rsOplogName, query, 0, 0, &fields, 0, 0); fixUpInfo.rbid = getRBID(them); - auto_ptr<DBClientCursor> oplogCursor = them->query(rsoplog, query, 0, 0, &fields, 0, 0); + auto_ptr<DBClientCursor> oplogCursor = them->query(rsOplogName, query, 0, 0, &fields, 0, 0); if (oplogCursor.get() == NULL || !oplogCursor->more()) throw RSFatalException("remote oplog empty or unreadable"); @@ -418,7 +418,7 @@ namespace { goodVersions.push_back(pair<DocID, BSONObj>(doc,good)); } } - newMinValid = oplogreader->getLastOp(rsoplog); + newMinValid = oplogreader->getLastOp(rsOplogName); if (newMinValid.isEmpty()) { error() << "rollback error newMinValid empty?"; return; @@ -493,7 +493,7 @@ namespace { string err; try { - newMinValid = oplogreader->getLastOp(rsoplog); + newMinValid = oplogreader->getLastOp(rsOplogName); if (newMinValid.isEmpty()) { err = "can't get minvalid from sync source"; } @@ -539,10 +539,10 @@ namespace { } log() << "rollback 4.7"; - Client::Context ctx(txn, rsoplog); - Collection* oplogCollection = ctx.db()->getCollection(rsoplog); + Client::Context ctx(txn, rsOplogName); + Collection* oplogCollection = ctx.db()->getCollection(rsOplogName); uassert(13423, - str::stream() << "replSet error in rollback can't find " << rsoplog, + str::stream() << "replSet error in rollback can't find " << rsOplogName, oplogCollection); map<string,shared_ptr<Helpers::RemoveSaver> > removeSavers; diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp index ca95eec0442..174147fb078 100644 --- a/src/mongo/db/repl/rs_sync.cpp +++ b/src/mongo/db/repl/rs_sync.cpp @@ -46,7 +46,6 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/minvalid.h" -#include "mongo/db/repl/oplog.h" #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/repl/rs_initialsync.h" |