diff options
author | Martin Bligh <mbligh@mongodb.com> | 2015-09-25 09:15:47 -0400 |
---|---|---|
committer | Martin Bligh <mbligh@mongodb.com> | 2015-09-25 09:16:40 -0400 |
commit | 0c001db908b6811c0e665e15150d1d30c47b8b5c (patch) | |
tree | 50a7c8e8b40f71a1e20c25086653f2fcd0548a05 /src/mongo/db | |
parent | 4898cb582633fd686c3057824ce0d1713284d15d (diff) | |
download | mongo-0c001db908b6811c0e665e15150d1d30c47b8b5c.tar.gz |
SERVER-20438: logOp refactor prior to vectorizing
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/op_observer.cpp | 28 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 105 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.h | 12 |
3 files changed, 77 insertions, 68 deletions
diff --git a/src/mongo/db/op_observer.cpp b/src/mongo/db/op_observer.cpp index 2d1ce157b43..0563482a99a 100644 --- a/src/mongo/db/op_observer.cpp +++ b/src/mongo/db/op_observer.cpp @@ -47,7 +47,7 @@ void OpObserver::onCreateIndex(OperationContext* txn, const std::string& ns, BSONObj indexDoc, bool fromMigrate) { - repl::_logOp(txn, "i", ns.c_str(), indexDoc, nullptr, fromMigrate); + repl::logOp(txn, "i", ns.c_str(), indexDoc, nullptr, fromMigrate); getGlobalAuthorizationManager()->logOp(txn, "i", ns.c_str(), indexDoc, nullptr); logOpForSharding(txn, "i", ns.c_str(), indexDoc, nullptr, fromMigrate); @@ -58,7 +58,7 @@ void OpObserver::onInsert(OperationContext* txn, const NamespaceString& ns, BSONObj doc, bool fromMigrate) { - repl::_logOp(txn, "i", ns.ns().c_str(), doc, nullptr, fromMigrate); + repl::logOp(txn, "i", ns.ns().c_str(), doc, nullptr, fromMigrate); getGlobalAuthorizationManager()->logOp(txn, "i", ns.ns().c_str(), doc, nullptr); logOpForSharding(txn, "i", ns.ns().c_str(), doc, nullptr, fromMigrate); @@ -69,7 +69,7 @@ void OpObserver::onInsert(OperationContext* txn, } void OpObserver::onUpdate(OperationContext* txn, oplogUpdateEntryArgs args) { - repl::_logOp(txn, "u", args.ns.c_str(), args.update, &args.criteria, args.fromMigrate); + repl::logOp(txn, "u", args.ns.c_str(), args.update, &args.criteria, args.fromMigrate); getGlobalAuthorizationManager()->logOp(txn, "u", args.ns.c_str(), args.update, &args.criteria); logOpForSharding(txn, "u", args.ns.c_str(), args.update, &args.criteria, args.fromMigrate); @@ -83,7 +83,7 @@ void OpObserver::onDelete(OperationContext* txn, const std::string& ns, const BSONObj& idDoc, bool fromMigrate) { - repl::_logOp(txn, "d", ns.c_str(), idDoc, nullptr, fromMigrate); + repl::logOp(txn, "d", ns.c_str(), idDoc, nullptr, fromMigrate); getGlobalAuthorizationManager()->logOp(txn, "d", ns.c_str(), idDoc, nullptr); logOpForSharding(txn, "d", ns.c_str(), idDoc, nullptr, fromMigrate); @@ -94,7 +94,7 @@ void OpObserver::onDelete(OperationContext* txn, } void OpObserver::onOpMessage(OperationContext* txn, const BSONObj& msgObj) { - repl::_logOp(txn, "n", "", msgObj, nullptr, false); + repl::logOp(txn, "n", "", msgObj, nullptr, false); } void OpObserver::onCreateCollection(OperationContext* txn, @@ -108,7 +108,7 @@ void OpObserver::onCreateCollection(OperationContext* txn, if (!collectionName.isSystemDotProfile()) { // do not replicate system.profile modifications - repl::_logOp(txn, "c", dbName.c_str(), cmdObj, nullptr, false); + repl::logOp(txn, "c", dbName.c_str(), cmdObj, nullptr, false); } getGlobalAuthorizationManager()->logOp(txn, "c", dbName.c_str(), cmdObj, nullptr); @@ -123,7 +123,7 @@ void OpObserver::onCollMod(OperationContext* txn, if (!NamespaceString(NamespaceString(dbName).db(), coll).isSystemDotProfile()) { // do not replicate system.profile modifications - repl::_logOp(txn, "c", dbName.c_str(), collModCmd, nullptr, false); + repl::logOp(txn, "c", dbName.c_str(), collModCmd, nullptr, false); } getGlobalAuthorizationManager()->logOp(txn, "c", dbName.c_str(), collModCmd, nullptr); @@ -133,7 +133,7 @@ void OpObserver::onCollMod(OperationContext* txn, void OpObserver::onDropDatabase(OperationContext* txn, const std::string& dbName) { BSONObj cmdObj = BSON("dropDatabase" << 1); - repl::_logOp(txn, "c", dbName.c_str(), cmdObj, nullptr, false); + repl::logOp(txn, "c", dbName.c_str(), cmdObj, nullptr, false); getGlobalAuthorizationManager()->logOp(txn, "c", dbName.c_str(), cmdObj, nullptr); logOpForDbHash(txn, dbName.c_str()); @@ -145,7 +145,7 @@ void OpObserver::onDropCollection(OperationContext* txn, const NamespaceString& if (!collectionName.isSystemDotProfile()) { // do not replicate system.profile modifications - repl::_logOp(txn, "c", dbName.c_str(), cmdObj, nullptr, false); + repl::logOp(txn, "c", dbName.c_str(), cmdObj, nullptr, false); } getGlobalAuthorizationManager()->logOp(txn, "c", dbName.c_str(), cmdObj, nullptr); @@ -155,7 +155,7 @@ void OpObserver::onDropCollection(OperationContext* txn, const NamespaceString& void OpObserver::onDropIndex(OperationContext* txn, const std::string& dbName, const BSONObj& idxDescriptor) { - repl::_logOp(txn, "c", dbName.c_str(), idxDescriptor, nullptr, false); + repl::logOp(txn, "c", dbName.c_str(), idxDescriptor, nullptr, false); getGlobalAuthorizationManager()->logOp(txn, "c", dbName.c_str(), idxDescriptor, nullptr); logOpForDbHash(txn, dbName.c_str()); @@ -171,7 +171,7 @@ void OpObserver::onRenameCollection(OperationContext* txn, BSON("renameCollection" << fromCollection.ns() << "to" << toCollection.ns() << "stayTemp" << stayTemp << "dropTarget" << dropTarget); - repl::_logOp(txn, "c", dbName.c_str(), cmdObj, nullptr, false); + repl::logOp(txn, "c", dbName.c_str(), cmdObj, nullptr, false); getGlobalAuthorizationManager()->logOp(txn, "c", dbName.c_str(), cmdObj, nullptr); logOpForDbHash(txn, dbName.c_str()); @@ -180,7 +180,7 @@ void OpObserver::onRenameCollection(OperationContext* txn, void OpObserver::onApplyOps(OperationContext* txn, const std::string& dbName, const BSONObj& applyOpCmd) { - repl::_logOp(txn, "c", dbName.c_str(), applyOpCmd, nullptr, false); + repl::logOp(txn, "c", dbName.c_str(), applyOpCmd, nullptr, false); getGlobalAuthorizationManager()->logOp(txn, "c", dbName.c_str(), applyOpCmd, nullptr); logOpForDbHash(txn, dbName.c_str()); @@ -194,7 +194,7 @@ void OpObserver::onConvertToCapped(OperationContext* txn, if (!collectionName.isSystemDotProfile()) { // do not replicate system.profile modifications - repl::_logOp(txn, "c", dbName.c_str(), cmdObj, nullptr, false); + repl::logOp(txn, "c", dbName.c_str(), cmdObj, nullptr, false); } getGlobalAuthorizationManager()->logOp(txn, "c", dbName.c_str(), cmdObj, nullptr); @@ -207,7 +207,7 @@ void OpObserver::onEmptyCapped(OperationContext* txn, const NamespaceString& col if (!collectionName.isSystemDotProfile()) { // do not replicate system.profile modifications - repl::_logOp(txn, "c", dbName.c_str(), cmdObj, nullptr, false); + repl::logOp(txn, "c", dbName.c_str(), cmdObj, nullptr, false); } getGlobalAuthorizationManager()->logOp(txn, "c", dbName.c_str(), cmdObj, nullptr); diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 90ce1efa98e..ffa20206be2 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -99,6 +99,7 @@ namespace mongo { using std::endl; using std::string; using std::stringstream; +using std::unique_ptr; namespace repl { std::string rsOplogName = "local.oplog.rs"; @@ -138,7 +139,6 @@ void checkOplogInsert(Status result) { */ std::pair<OpTime, long long> getNextOpTime(OperationContext* txn, Collection* oplog, - const char* ns, ReplicationCoordinator* replCoord, const char* opstr, ReplicationCoordinator::Mode replicationMode) { @@ -235,6 +235,42 @@ void setOplogCollectionName() { } } +namespace { +void createOplog(OperationContext* txn, const std::string& oplogCollectionName) { + Lock::DBLock lk(txn->lockState(), "local", MODE_IX); + Lock::CollectionLock lk2(txn->lockState(), oplogCollectionName, MODE_IX); + + OldClientContext ctx(txn, oplogCollectionName); + _localDB = ctx.db(); + invariant(_localDB); + _localOplogCollection = _localDB->getCollection(oplogCollectionName); + massert(13347, + "the oplog collection " + oplogCollectionName + + " missing. did you drop it? if so, restart the server", + _localOplogCollection); +} + +bool oplogDisabled(OperationContext* txn, + ReplicationCoordinator::Mode replicationMode, + NamespaceString& nss) { + if (replicationMode == ReplicationCoordinator::modeNone) + return true; + + if (nss.db() == "local") + return true; + + if (nss.isSystemDotProfile()) + return true; + + if (!txn->writesAreReplicated()) + return true; + + fassert(28626, txn->recoveryUnit()); + + return false; +} +} // end anon namespace + /* we write to local.oplog.rs: { ts : ..., h: ..., v: ..., op: ..., etc } ts: an OpTime timestamp @@ -251,9 +287,7 @@ void setOplogCollectionName() { bb param: if not null, specifies a boolean to pass along to the other side as b: param. used for "justOne" or "upsert" flags on 'd', 'u' - */ - void _logOp(OperationContext* txn, const char* opstr, const char* ns, @@ -264,61 +298,36 @@ void _logOp(OperationContext* txn, ReplicationCoordinator::Mode replicationMode, bool updateReplOpTime) { NamespaceString nss(ns); - if (nss.db() == "local") { + if (oplogDisabled(txn, replicationMode, nss)) return; - } - if (nss.isSystemDotProfile()) { - return; - } - - if (replicationMode == ReplicationCoordinator::modeNone) { - return; - } - - if (!txn->writesAreReplicated()) { - return; - } - - fassert(28626, txn->recoveryUnit()); + if (_localOplogCollection == nullptr) + createOplog(txn, oplogCollectionName); Lock::DBLock lk(txn->lockState(), "local", MODE_IX); ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); - if (ns[0] && replicationMode == ReplicationCoordinator::modeReplSet && + if (nss.size() && replicationMode == ReplicationCoordinator::modeReplSet && !replCoord->canAcceptWritesFor(nss)) { - severe() << "logOp() but can't accept write to collection " << ns; + severe() << "logOp() but can't accept write to collection " << nss.ns(); fassertFailed(17405); } Lock::CollectionLock lk2(txn->lockState(), oplogCollectionName, MODE_IX); + auto slot = getNextOpTime(txn, _localOplogCollection, replCoord, opstr, replicationMode); + OpTime optime = slot.first; + long long hashNew = slot.second; - if (_localOplogCollection == nullptr) { - OldClientContext ctx(txn, oplogCollectionName); - _localDB = ctx.db(); - invariant(_localDB); - _localOplogCollection = _localDB->getCollection(oplogCollectionName); - massert(13347, - "the oplog collection " + oplogCollectionName + - " missing. did you drop it? if so, restart the server", - _localOplogCollection); - } - - std::pair<OpTime, long long> slot = - getNextOpTime(txn, _localOplogCollection, ns, replCoord, opstr, replicationMode); - - /* we jump through a bunch of hoops here to avoid copying the obj buffer twice -- - instead we do a single copy to the destination position in the memory mapped file. - */ - + // we jump through a bunch of hoops here to avoid copying the obj buffer twice -- + // instead we do a single copy to the destination in the record store. BSONObjBuilder b(256); - b.append("ts", slot.first.getTimestamp()); - if (slot.first.getTerm() != -1) { - b.append("t", slot.first.getTerm()); + b.append("ts", optime.getTimestamp()); + if (optime.getTerm() != -1) { + b.append("t", optime.getTerm()); } - b.append("h", slot.second); + b.append("h", hashNew); b.append("v", OPLOG_VERSION); b.append("op", opstr); b.append("ns", ns); @@ -343,12 +352,12 @@ void _logOp(OperationContext* txn, ReplClientInfo::forClient(txn->getClient()).setLastOp(slot.first); } -void _logOp(OperationContext* txn, - const char* opstr, - const char* ns, - const BSONObj& obj, - BSONObj* o2, - bool fromMigrate) { +void logOp(OperationContext* txn, + const char* opstr, + const char* ns, + const BSONObj& obj, + BSONObj* o2, + bool fromMigrate) { _logOp(txn, opstr, ns, diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index f17d7916f99..cfde2ce8ccd 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -102,12 +102,12 @@ void _logOp(OperationContext* txn, ReplicationCoordinator::Mode replicationMode, bool updateReplOpTime); -void _logOp(OperationContext* txn, - const char* opstr, - const char* ns, - const BSONObj& obj, - BSONObj* o2, - bool fromMigrate); +void logOp(OperationContext* txn, + const char* opstr, + const char* ns, + const BSONObj& obj, + BSONObj* o2, + bool fromMigrate); // Flush out the cached pointers to the local database and oplog. // Used by the closeDatabase command to ensure we don't cache closed things. |