summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorMartin Bligh <mbligh@mongodb.com>2015-10-27 10:30:34 -0400
committerMartin Bligh <mbligh@mongodb.com>2015-10-27 10:30:46 -0400
commit45bfa34b8b99237c16a839e5afa0df44c5fde15d (patch)
tree591d23194a65a667232708d1e29e2eee6fa11a09 /src/mongo
parent90607c08ebb7dc7d6730b2d932ff06b3cef01c12 (diff)
downloadmongo-45bfa34b8b99237c16a839e5afa0df44c5fde15d.tar.gz
Revert "SERVER-21031: Vectorize logOps"
This reverts commit bc0704481b68db6d3116eb2692088f73759d776a.
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/catalog/collection.cpp10
-rw-r--r--src/mongo/db/op_observer.cpp25
-rw-r--r--src/mongo/db/op_observer.h9
-rw-r--r--src/mongo/db/repl/oplog.cpp134
-rw-r--r--src/mongo/db/repl/oplog.h14
5 files changed, 67 insertions, 125 deletions
diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp
index 2c45a6057b7..a7e2087f643 100644
--- a/src/mongo/db/catalog/collection.cpp
+++ b/src/mongo/db/catalog/collection.cpp
@@ -360,7 +360,11 @@ Status Collection::insertDocuments(OperationContext* txn,
return status;
invariant(sid == txn->recoveryUnit()->getSnapshotId());
- getGlobalServiceContext()->getOpObserver()->onInserts(txn, ns(), begin, end, fromMigrate);
+ int inserted = 0;
+ for (vector<BSONObj>::iterator it = begin; it != end; it++) { // TODO: vectorize
+ getGlobalServiceContext()->getOpObserver()->onInsert(txn, ns(), *it, fromMigrate);
+ inserted++;
+ }
txn->recoveryUnit()->onCommit([this]() { notifyCappedWaitersIfNeeded(); });
@@ -401,9 +405,7 @@ Status Collection::insertDocument(OperationContext* txn,
if (!status.isOK())
return status;
- vector<BSONObj> docs;
- docs.push_back(doc);
- getGlobalServiceContext()->getOpObserver()->onInserts(txn, ns(), docs.begin(), docs.end());
+ getGlobalServiceContext()->getOpObserver()->onInsert(txn, ns(), doc);
txn->recoveryUnit()->onCommit([this]() { notifyCappedWaitersIfNeeded(); });
diff --git a/src/mongo/db/op_observer.cpp b/src/mongo/db/op_observer.cpp
index 720d26ca9f2..0563482a99a 100644
--- a/src/mongo/db/op_observer.cpp
+++ b/src/mongo/db/op_observer.cpp
@@ -43,8 +43,6 @@
namespace mongo {
-using std::vector;
-
void OpObserver::onCreateIndex(OperationContext* txn,
const std::string& ns,
BSONObj indexDoc,
@@ -56,21 +54,16 @@ void OpObserver::onCreateIndex(OperationContext* txn,
logOpForDbHash(txn, ns.c_str());
}
-void OpObserver::onInserts(OperationContext* txn,
- const NamespaceString& nss,
- vector<BSONObj>::iterator begin,
- vector<BSONObj>::iterator end,
- bool fromMigrate) {
- repl::logOps(txn, "i", nss, begin, end, fromMigrate);
-
- const char* ns = nss.ns().c_str();
- for (auto it = begin; it != end; it++) {
- getGlobalAuthorizationManager()->logOp(txn, "i", ns, *it, nullptr);
- logOpForSharding(txn, "i", ns, *it, nullptr, fromMigrate);
- }
+void OpObserver::onInsert(OperationContext* txn,
+ const NamespaceString& ns,
+ BSONObj doc,
+ bool fromMigrate) {
+ repl::logOp(txn, "i", ns.ns().c_str(), doc, nullptr, fromMigrate);
- logOpForDbHash(txn, ns);
- if (strstr(ns, ".system.js")) {
+ getGlobalAuthorizationManager()->logOp(txn, "i", ns.ns().c_str(), doc, nullptr);
+ logOpForSharding(txn, "i", ns.ns().c_str(), doc, nullptr, fromMigrate);
+ logOpForDbHash(txn, ns.ns().c_str());
+ if (strstr(ns.ns().c_str(), ".system.js")) {
Scope::storedFuncMod(txn);
}
}
diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h
index 2ef3a191f0a..4ecf7df20d0 100644
--- a/src/mongo/db/op_observer.h
+++ b/src/mongo/db/op_observer.h
@@ -55,11 +55,10 @@ public:
const std::string& ns,
BSONObj indexDoc,
bool fromMigrate = false);
- void onInserts(OperationContext* txn,
- const NamespaceString& ns,
- std::vector<BSONObj>::iterator begin,
- std::vector<BSONObj>::iterator end,
- bool fromMigrate = false);
+ void onInsert(OperationContext* txn,
+ const NamespaceString& ns,
+ BSONObj doc,
+ bool fromMigrate = false);
void onUpdate(OperationContext* txn, oplogUpdateEntryArgs args);
void onDelete(OperationContext* txn,
const std::string& ns,
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 2a29126bdbd..629a2e91f41 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -100,7 +100,6 @@ using std::endl;
using std::string;
using std::stringstream;
using std::unique_ptr;
-using std::vector;
namespace repl {
std::string rsOplogName = "local.oplog.rs";
@@ -128,11 +127,6 @@ void checkOplogInsert(Status result) {
massert(17322, str::stream() << "write to oplog failed: " << result.toString(), result.isOK());
}
-struct OplogSlot {
- OpTime opTime;
- int64_t hash;
-};
-
/**
* Allocates an optime for a new entry in the oplog, and updates the replication coordinator to
* reflect that new optime. Returns the new optime and the correct value of the "h" field for
@@ -143,14 +137,14 @@ struct OplogSlot {
* function registers the new optime with the storage system and the replication coordinator,
* and provides no facility to revert those registrations on rollback.
*/
-OplogSlot getNextOpTime(OperationContext* txn,
- Collection* oplog,
- ReplicationCoordinator* replCoord,
- ReplicationCoordinator::Mode replicationMode) {
+std::pair<OpTime, long long> getNextOpTime(OperationContext* txn,
+ Collection* oplog,
+ ReplicationCoordinator* replCoord,
+ const char* opstr,
+ ReplicationCoordinator::Mode replicationMode) {
synchronizeOnCappedInFlightResource(txn->lockState(), oplog->ns());
- OplogSlot slot;
- slot.hash = 0;
+ long long hashNew = 0;
long long term = OpTime::kUninitializedTerm;
// Fetch term out of the newOpMutex.
@@ -168,11 +162,11 @@ OplogSlot getNextOpTime(OperationContext* txn,
// Set hash if we're in replset mode, otherwise it remains 0 in master/slave.
if (replicationMode == ReplicationCoordinator::modeReplSet) {
- slot.hash = hashGenerator.nextInt64();
+ hashNew = hashGenerator.nextInt64();
}
- slot.opTime = OpTime(ts, term);
- return slot;
+ OpTime opTime(ts, term);
+ return std::pair<OpTime, long long>(opTime, hashNew);
}
/**
@@ -240,10 +234,7 @@ void setOplogCollectionName() {
}
namespace {
-
-Collection* getLocalOplogCollection(OperationContext* txn, const std::string& oplogCollectionName) {
- if (_localOplogCollection)
- return _localOplogCollection;
+void cacheOplogCollection(OperationContext* txn, const std::string& oplogCollectionName) {
Lock::DBLock lk(txn->lockState(), "local", MODE_IX);
Lock::CollectionLock lk2(txn->lockState(), oplogCollectionName, MODE_IX);
@@ -255,12 +246,11 @@ Collection* getLocalOplogCollection(OperationContext* txn, const std::string& op
"the oplog collection " + oplogCollectionName +
" missing. did you drop it? if so, restart the server",
_localOplogCollection);
- return _localOplogCollection;
}
bool oplogDisabled(OperationContext* txn,
ReplicationCoordinator::Mode replicationMode,
- const NamespaceString& nss) {
+ NamespaceString& nss) {
if (replicationMode == ReplicationCoordinator::modeNone)
return true;
@@ -280,7 +270,7 @@ bool oplogDisabled(OperationContext* txn,
OplogDocWriter _logOpWriter(OperationContext* txn,
const char* opstr,
- const NamespaceString& nss,
+ NamespaceString& nss,
const BSONObj& obj,
BSONObj* o2,
bool fromMigrate,
@@ -349,7 +339,6 @@ void truncateOplogTo(OperationContext* txn, Timestamp truncateTimestamp) {
oplogCollection->temp_cappedTruncateAfter(txn, lastRecordId, false);
}
}
-
/* we write to local.oplog.rs:
{ ts : ..., h: ..., v: ..., op: ..., etc }
ts: an OpTime timestamp
@@ -367,15 +356,22 @@ void truncateOplogTo(OperationContext* txn, Timestamp truncateTimestamp) {
if not null, specifies a boolean to pass along to the other side as b: param.
used for "justOne" or "upsert" flags on 'd', 'u'
*/
-void _logOpsInner(OperationContext* txn,
- const char* opstr,
- const NamespaceString& nss,
- vector<OplogDocWriter> writers,
- bool fromMigrate,
- Collection* oplogCollection,
- ReplicationCoordinator::Mode replicationMode,
- bool updateReplOpTime,
- OpTime finalOpTime) {
+void _logOp(OperationContext* txn,
+ const char* opstr,
+ const char* ns,
+ const BSONObj& obj,
+ BSONObj* o2,
+ bool fromMigrate,
+ const std::string& oplogCollectionName,
+ ReplicationCoordinator::Mode replicationMode,
+ bool updateReplOpTime) {
+ NamespaceString nss(ns);
+ if (oplogDisabled(txn, replicationMode, nss))
+ return;
+
+ if (_localOplogCollection == nullptr)
+ cacheOplogCollection(txn, oplogCollectionName);
+
Lock::DBLock lk(txn->lockState(), "local", MODE_IX);
ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
@@ -385,68 +381,21 @@ void _logOpsInner(OperationContext* txn,
severe() << "logOp() but can't accept write to collection " << nss.ns();
fassertFailed(17405);
}
+ Lock::CollectionLock lk2(txn->lockState(), oplogCollectionName, MODE_IX);
+ auto slot = getNextOpTime(txn, _localOplogCollection, replCoord, opstr, replicationMode);
// we jump through a bunch of hoops here to avoid copying the obj buffer twice --
// instead we do a single copy to the destination in the record store.
- for (auto it = writers.begin(); it != writers.end(); it++)
- checkOplogInsert(oplogCollection->insertDocument(txn, &(*it), false));
+ auto writer = _logOpWriter(txn, opstr, nss, obj, o2, fromMigrate, slot.first, slot.second);
+ checkOplogInsert(_localOplogCollection->insertDocument(txn, &writer, false));
// Set replCoord last optime only after we're sure the WUOW didn't abort and roll back.
- if (updateReplOpTime)
- txn->recoveryUnit()->registerChange(new UpdateReplOpTimeChange(finalOpTime, replCoord));
-
- ReplClientInfo::forClient(txn->getClient()).setLastOp(finalOpTime);
-}
-
-void _logOp(OperationContext* txn,
- const char* opstr,
- const char* ns,
- const BSONObj& obj,
- BSONObj* o2,
- bool fromMigrate,
- const std::string& oplogName,
- ReplicationCoordinator::Mode replMode,
- bool updateOpTime) {
- NamespaceString nss(ns);
- if (oplogDisabled(txn, replMode, nss))
- return;
-
- ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
- vector<OplogDocWriter> writers;
- Collection* oplog = getLocalOplogCollection(txn, oplogName);
- Lock::CollectionLock lock(txn->lockState(), oplogName, MODE_IX);
- auto slot = getNextOpTime(txn, oplog, replCoord, replMode);
- auto writer = _logOpWriter(txn, opstr, nss, obj, o2, fromMigrate, slot.opTime, slot.hash);
- writers.push_back(writer);
- _logOpsInner(txn, opstr, nss, writers, fromMigrate, oplog, replMode, updateOpTime, slot.opTime);
-}
-
-void logOps(OperationContext* txn,
- const char* opstr,
- const NamespaceString& nss,
- std::vector<BSONObj>::iterator begin,
- std::vector<BSONObj>::iterator end,
- bool fromMigrate) {
- ReplicationCoordinator::Mode replMode = ReplicationCoordinator::get(txn)->getReplicationMode();
-
- invariant(begin != end);
- if (oplogDisabled(txn, replMode, nss))
- return;
-
- vector<OplogDocWriter> writers;
- OpTime finalOpTime;
- ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
- Collection* oplog = getLocalOplogCollection(txn, _oplogCollectionName);
- Lock::CollectionLock lock(txn->lockState(), _oplogCollectionName, MODE_IX);
- for (auto it = begin; it != end; it++) {
- auto slot = getNextOpTime(txn, oplog, replCoord, replMode);
- finalOpTime = slot.opTime;
- auto writer = _logOpWriter(txn, opstr, nss, *it, NULL, fromMigrate, slot.opTime, slot.hash);
- writers.emplace_back(writer);
+ if (updateReplOpTime) {
+ txn->recoveryUnit()->registerChange(new UpdateReplOpTimeChange(slot.first, replCoord));
}
- _logOpsInner(txn, opstr, nss, writers, fromMigrate, oplog, replMode, true, finalOpTime);
-}
+ ReplClientInfo::forClient(txn->getClient()).setLastOp(slot.first);
+}
void logOp(OperationContext* txn,
const char* opstr,
@@ -454,8 +403,15 @@ void logOp(OperationContext* txn,
const BSONObj& obj,
BSONObj* o2,
bool fromMigrate) {
- ReplicationCoordinator::Mode replMode = ReplicationCoordinator::get(txn)->getReplicationMode();
- _logOp(txn, opstr, ns, obj, o2, fromMigrate, _oplogCollectionName, replMode, true);
+ _logOp(txn,
+ opstr,
+ ns,
+ obj,
+ o2,
+ fromMigrate,
+ _oplogCollectionName,
+ ReplicationCoordinator::get(txn)->getReplicationMode(),
+ true);
}
OpTime writeOpsToOplog(OperationContext* txn, const std::deque<BSONObj>& ops) {
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index 613c02edbcf..bd47fbc0a1b 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -81,7 +81,7 @@ extern std::string masterSlaveOplogName;
extern int OPLOG_VERSION;
-/* Log operation(s) to the local oplog
+/** Log an operation to the local oplog
*
* @param opstr
* "i" insert
@@ -90,16 +90,8 @@ extern int OPLOG_VERSION;
* "c" db cmd
* "n" no-op
* "db" declares presence of a database (ns is set to the db name + '.')
- */
-
-void logOps(OperationContext* txn,
- const char* opstr,
- const NamespaceString& nss,
- std::vector<BSONObj>::iterator begin,
- std::vector<BSONObj>::iterator end,
- bool fromMigrate);
-
-/* For 'u' records, 'obj' captures the mutation made to the object but not
+ *
+ * For 'u' records, 'obj' captures the mutation made to the object but not
* the object itself. 'o2' captures the the criteria for the object that will be modified.
*
* Sets replCoord last optime if 'updateReplOpTime' is true.