summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorMartin Bligh <mbligh@mongodb.com>2015-09-25 09:15:47 -0400
committerMartin Bligh <mbligh@mongodb.com>2015-09-25 09:16:40 -0400
commit0c001db908b6811c0e665e15150d1d30c47b8b5c (patch)
tree50a7c8e8b40f71a1e20c25086653f2fcd0548a05 /src/mongo/db
parent4898cb582633fd686c3057824ce0d1713284d15d (diff)
downloadmongo-0c001db908b6811c0e665e15150d1d30c47b8b5c.tar.gz
SERVER-20438: logOp refactor prior to vectorizing
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/op_observer.cpp28
-rw-r--r--src/mongo/db/repl/oplog.cpp105
-rw-r--r--src/mongo/db/repl/oplog.h12
3 files changed, 77 insertions, 68 deletions
diff --git a/src/mongo/db/op_observer.cpp b/src/mongo/db/op_observer.cpp
index 2d1ce157b43..0563482a99a 100644
--- a/src/mongo/db/op_observer.cpp
+++ b/src/mongo/db/op_observer.cpp
@@ -47,7 +47,7 @@ void OpObserver::onCreateIndex(OperationContext* txn,
const std::string& ns,
BSONObj indexDoc,
bool fromMigrate) {
- repl::_logOp(txn, "i", ns.c_str(), indexDoc, nullptr, fromMigrate);
+ repl::logOp(txn, "i", ns.c_str(), indexDoc, nullptr, fromMigrate);
getGlobalAuthorizationManager()->logOp(txn, "i", ns.c_str(), indexDoc, nullptr);
logOpForSharding(txn, "i", ns.c_str(), indexDoc, nullptr, fromMigrate);
@@ -58,7 +58,7 @@ void OpObserver::onInsert(OperationContext* txn,
const NamespaceString& ns,
BSONObj doc,
bool fromMigrate) {
- repl::_logOp(txn, "i", ns.ns().c_str(), doc, nullptr, fromMigrate);
+ repl::logOp(txn, "i", ns.ns().c_str(), doc, nullptr, fromMigrate);
getGlobalAuthorizationManager()->logOp(txn, "i", ns.ns().c_str(), doc, nullptr);
logOpForSharding(txn, "i", ns.ns().c_str(), doc, nullptr, fromMigrate);
@@ -69,7 +69,7 @@ void OpObserver::onInsert(OperationContext* txn,
}
void OpObserver::onUpdate(OperationContext* txn, oplogUpdateEntryArgs args) {
- repl::_logOp(txn, "u", args.ns.c_str(), args.update, &args.criteria, args.fromMigrate);
+ repl::logOp(txn, "u", args.ns.c_str(), args.update, &args.criteria, args.fromMigrate);
getGlobalAuthorizationManager()->logOp(txn, "u", args.ns.c_str(), args.update, &args.criteria);
logOpForSharding(txn, "u", args.ns.c_str(), args.update, &args.criteria, args.fromMigrate);
@@ -83,7 +83,7 @@ void OpObserver::onDelete(OperationContext* txn,
const std::string& ns,
const BSONObj& idDoc,
bool fromMigrate) {
- repl::_logOp(txn, "d", ns.c_str(), idDoc, nullptr, fromMigrate);
+ repl::logOp(txn, "d", ns.c_str(), idDoc, nullptr, fromMigrate);
getGlobalAuthorizationManager()->logOp(txn, "d", ns.c_str(), idDoc, nullptr);
logOpForSharding(txn, "d", ns.c_str(), idDoc, nullptr, fromMigrate);
@@ -94,7 +94,7 @@ void OpObserver::onDelete(OperationContext* txn,
}
void OpObserver::onOpMessage(OperationContext* txn, const BSONObj& msgObj) {
- repl::_logOp(txn, "n", "", msgObj, nullptr, false);
+ repl::logOp(txn, "n", "", msgObj, nullptr, false);
}
void OpObserver::onCreateCollection(OperationContext* txn,
@@ -108,7 +108,7 @@ void OpObserver::onCreateCollection(OperationContext* txn,
if (!collectionName.isSystemDotProfile()) {
// do not replicate system.profile modifications
- repl::_logOp(txn, "c", dbName.c_str(), cmdObj, nullptr, false);
+ repl::logOp(txn, "c", dbName.c_str(), cmdObj, nullptr, false);
}
getGlobalAuthorizationManager()->logOp(txn, "c", dbName.c_str(), cmdObj, nullptr);
@@ -123,7 +123,7 @@ void OpObserver::onCollMod(OperationContext* txn,
if (!NamespaceString(NamespaceString(dbName).db(), coll).isSystemDotProfile()) {
// do not replicate system.profile modifications
- repl::_logOp(txn, "c", dbName.c_str(), collModCmd, nullptr, false);
+ repl::logOp(txn, "c", dbName.c_str(), collModCmd, nullptr, false);
}
getGlobalAuthorizationManager()->logOp(txn, "c", dbName.c_str(), collModCmd, nullptr);
@@ -133,7 +133,7 @@ void OpObserver::onCollMod(OperationContext* txn,
void OpObserver::onDropDatabase(OperationContext* txn, const std::string& dbName) {
BSONObj cmdObj = BSON("dropDatabase" << 1);
- repl::_logOp(txn, "c", dbName.c_str(), cmdObj, nullptr, false);
+ repl::logOp(txn, "c", dbName.c_str(), cmdObj, nullptr, false);
getGlobalAuthorizationManager()->logOp(txn, "c", dbName.c_str(), cmdObj, nullptr);
logOpForDbHash(txn, dbName.c_str());
@@ -145,7 +145,7 @@ void OpObserver::onDropCollection(OperationContext* txn, const NamespaceString&
if (!collectionName.isSystemDotProfile()) {
// do not replicate system.profile modifications
- repl::_logOp(txn, "c", dbName.c_str(), cmdObj, nullptr, false);
+ repl::logOp(txn, "c", dbName.c_str(), cmdObj, nullptr, false);
}
getGlobalAuthorizationManager()->logOp(txn, "c", dbName.c_str(), cmdObj, nullptr);
@@ -155,7 +155,7 @@ void OpObserver::onDropCollection(OperationContext* txn, const NamespaceString&
void OpObserver::onDropIndex(OperationContext* txn,
const std::string& dbName,
const BSONObj& idxDescriptor) {
- repl::_logOp(txn, "c", dbName.c_str(), idxDescriptor, nullptr, false);
+ repl::logOp(txn, "c", dbName.c_str(), idxDescriptor, nullptr, false);
getGlobalAuthorizationManager()->logOp(txn, "c", dbName.c_str(), idxDescriptor, nullptr);
logOpForDbHash(txn, dbName.c_str());
@@ -171,7 +171,7 @@ void OpObserver::onRenameCollection(OperationContext* txn,
BSON("renameCollection" << fromCollection.ns() << "to" << toCollection.ns() << "stayTemp"
<< stayTemp << "dropTarget" << dropTarget);
- repl::_logOp(txn, "c", dbName.c_str(), cmdObj, nullptr, false);
+ repl::logOp(txn, "c", dbName.c_str(), cmdObj, nullptr, false);
getGlobalAuthorizationManager()->logOp(txn, "c", dbName.c_str(), cmdObj, nullptr);
logOpForDbHash(txn, dbName.c_str());
@@ -180,7 +180,7 @@ void OpObserver::onRenameCollection(OperationContext* txn,
void OpObserver::onApplyOps(OperationContext* txn,
const std::string& dbName,
const BSONObj& applyOpCmd) {
- repl::_logOp(txn, "c", dbName.c_str(), applyOpCmd, nullptr, false);
+ repl::logOp(txn, "c", dbName.c_str(), applyOpCmd, nullptr, false);
getGlobalAuthorizationManager()->logOp(txn, "c", dbName.c_str(), applyOpCmd, nullptr);
logOpForDbHash(txn, dbName.c_str());
@@ -194,7 +194,7 @@ void OpObserver::onConvertToCapped(OperationContext* txn,
if (!collectionName.isSystemDotProfile()) {
// do not replicate system.profile modifications
- repl::_logOp(txn, "c", dbName.c_str(), cmdObj, nullptr, false);
+ repl::logOp(txn, "c", dbName.c_str(), cmdObj, nullptr, false);
}
getGlobalAuthorizationManager()->logOp(txn, "c", dbName.c_str(), cmdObj, nullptr);
@@ -207,7 +207,7 @@ void OpObserver::onEmptyCapped(OperationContext* txn, const NamespaceString& col
if (!collectionName.isSystemDotProfile()) {
// do not replicate system.profile modifications
- repl::_logOp(txn, "c", dbName.c_str(), cmdObj, nullptr, false);
+ repl::logOp(txn, "c", dbName.c_str(), cmdObj, nullptr, false);
}
getGlobalAuthorizationManager()->logOp(txn, "c", dbName.c_str(), cmdObj, nullptr);
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 90ce1efa98e..ffa20206be2 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -99,6 +99,7 @@ namespace mongo {
using std::endl;
using std::string;
using std::stringstream;
+using std::unique_ptr;
namespace repl {
std::string rsOplogName = "local.oplog.rs";
@@ -138,7 +139,6 @@ void checkOplogInsert(Status result) {
*/
std::pair<OpTime, long long> getNextOpTime(OperationContext* txn,
Collection* oplog,
- const char* ns,
ReplicationCoordinator* replCoord,
const char* opstr,
ReplicationCoordinator::Mode replicationMode) {
@@ -235,6 +235,42 @@ void setOplogCollectionName() {
}
}
+namespace {
+void createOplog(OperationContext* txn, const std::string& oplogCollectionName) {
+ Lock::DBLock lk(txn->lockState(), "local", MODE_IX);
+ Lock::CollectionLock lk2(txn->lockState(), oplogCollectionName, MODE_IX);
+
+ OldClientContext ctx(txn, oplogCollectionName);
+ _localDB = ctx.db();
+ invariant(_localDB);
+ _localOplogCollection = _localDB->getCollection(oplogCollectionName);
+ massert(13347,
+ "the oplog collection " + oplogCollectionName +
+ " missing. did you drop it? if so, restart the server",
+ _localOplogCollection);
+}
+
+bool oplogDisabled(OperationContext* txn,
+ ReplicationCoordinator::Mode replicationMode,
+ NamespaceString& nss) {
+ if (replicationMode == ReplicationCoordinator::modeNone)
+ return true;
+
+ if (nss.db() == "local")
+ return true;
+
+ if (nss.isSystemDotProfile())
+ return true;
+
+ if (!txn->writesAreReplicated())
+ return true;
+
+ fassert(28626, txn->recoveryUnit());
+
+ return false;
+}
+} // end anon namespace
+
/* we write to local.oplog.rs:
{ ts : ..., h: ..., v: ..., op: ..., etc }
ts: an OpTime timestamp
@@ -251,9 +287,7 @@ void setOplogCollectionName() {
bb param:
if not null, specifies a boolean to pass along to the other side as b: param.
used for "justOne" or "upsert" flags on 'd', 'u'
-
*/
-
void _logOp(OperationContext* txn,
const char* opstr,
const char* ns,
@@ -264,61 +298,36 @@ void _logOp(OperationContext* txn,
ReplicationCoordinator::Mode replicationMode,
bool updateReplOpTime) {
NamespaceString nss(ns);
- if (nss.db() == "local") {
+ if (oplogDisabled(txn, replicationMode, nss))
return;
- }
- if (nss.isSystemDotProfile()) {
- return;
- }
-
- if (replicationMode == ReplicationCoordinator::modeNone) {
- return;
- }
-
- if (!txn->writesAreReplicated()) {
- return;
- }
-
- fassert(28626, txn->recoveryUnit());
+ if (_localOplogCollection == nullptr)
+ createOplog(txn, oplogCollectionName);
Lock::DBLock lk(txn->lockState(), "local", MODE_IX);
ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
- if (ns[0] && replicationMode == ReplicationCoordinator::modeReplSet &&
+ if (nss.size() && replicationMode == ReplicationCoordinator::modeReplSet &&
!replCoord->canAcceptWritesFor(nss)) {
- severe() << "logOp() but can't accept write to collection " << ns;
+ severe() << "logOp() but can't accept write to collection " << nss.ns();
fassertFailed(17405);
}
Lock::CollectionLock lk2(txn->lockState(), oplogCollectionName, MODE_IX);
+ auto slot = getNextOpTime(txn, _localOplogCollection, replCoord, opstr, replicationMode);
+ OpTime optime = slot.first;
+ long long hashNew = slot.second;
- if (_localOplogCollection == nullptr) {
- OldClientContext ctx(txn, oplogCollectionName);
- _localDB = ctx.db();
- invariant(_localDB);
- _localOplogCollection = _localDB->getCollection(oplogCollectionName);
- massert(13347,
- "the oplog collection " + oplogCollectionName +
- " missing. did you drop it? if so, restart the server",
- _localOplogCollection);
- }
-
- std::pair<OpTime, long long> slot =
- getNextOpTime(txn, _localOplogCollection, ns, replCoord, opstr, replicationMode);
-
- /* we jump through a bunch of hoops here to avoid copying the obj buffer twice --
- instead we do a single copy to the destination position in the memory mapped file.
- */
-
+ // we jump through a bunch of hoops here to avoid copying the obj buffer twice --
+ // instead we do a single copy to the destination in the record store.
BSONObjBuilder b(256);
- b.append("ts", slot.first.getTimestamp());
- if (slot.first.getTerm() != -1) {
- b.append("t", slot.first.getTerm());
+ b.append("ts", optime.getTimestamp());
+ if (optime.getTerm() != -1) {
+ b.append("t", optime.getTerm());
}
- b.append("h", slot.second);
+ b.append("h", hashNew);
b.append("v", OPLOG_VERSION);
b.append("op", opstr);
b.append("ns", ns);
@@ -343,12 +352,12 @@ void _logOp(OperationContext* txn,
ReplClientInfo::forClient(txn->getClient()).setLastOp(slot.first);
}
-void _logOp(OperationContext* txn,
- const char* opstr,
- const char* ns,
- const BSONObj& obj,
- BSONObj* o2,
- bool fromMigrate) {
+void logOp(OperationContext* txn,
+ const char* opstr,
+ const char* ns,
+ const BSONObj& obj,
+ BSONObj* o2,
+ bool fromMigrate) {
_logOp(txn,
opstr,
ns,
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index f17d7916f99..cfde2ce8ccd 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -102,12 +102,12 @@ void _logOp(OperationContext* txn,
ReplicationCoordinator::Mode replicationMode,
bool updateReplOpTime);
-void _logOp(OperationContext* txn,
- const char* opstr,
- const char* ns,
- const BSONObj& obj,
- BSONObj* o2,
- bool fromMigrate);
+void logOp(OperationContext* txn,
+ const char* opstr,
+ const char* ns,
+ const BSONObj& obj,
+ BSONObj* o2,
+ bool fromMigrate);
// Flush out the cached pointers to the local database and oplog.
// Used by the closeDatabase command to ensure we don't cache closed things.