diff options
author | Martin Bligh <mbligh@mongodb.com> | 2015-10-27 10:30:34 -0400 |
---|---|---|
committer | Martin Bligh <mbligh@mongodb.com> | 2015-10-27 10:30:46 -0400 |
commit | 45bfa34b8b99237c16a839e5afa0df44c5fde15d (patch) | |
tree | 591d23194a65a667232708d1e29e2eee6fa11a09 /src/mongo | |
parent | 90607c08ebb7dc7d6730b2d932ff06b3cef01c12 (diff) | |
download | mongo-45bfa34b8b99237c16a839e5afa0df44c5fde15d.tar.gz |
Revert "SERVER-21031: Vectorize logOps"
This reverts commit bc0704481b68db6d3116eb2692088f73759d776a.
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/catalog/collection.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/op_observer.cpp | 25 | ||||
-rw-r--r-- | src/mongo/db/op_observer.h | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 134 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.h | 14 |
5 files changed, 67 insertions, 125 deletions
diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp index 2c45a6057b7..a7e2087f643 100644 --- a/src/mongo/db/catalog/collection.cpp +++ b/src/mongo/db/catalog/collection.cpp @@ -360,7 +360,11 @@ Status Collection::insertDocuments(OperationContext* txn, return status; invariant(sid == txn->recoveryUnit()->getSnapshotId()); - getGlobalServiceContext()->getOpObserver()->onInserts(txn, ns(), begin, end, fromMigrate); + int inserted = 0; + for (vector<BSONObj>::iterator it = begin; it != end; it++) { // TODO: vectorize + getGlobalServiceContext()->getOpObserver()->onInsert(txn, ns(), *it, fromMigrate); + inserted++; + } txn->recoveryUnit()->onCommit([this]() { notifyCappedWaitersIfNeeded(); }); @@ -401,9 +405,7 @@ Status Collection::insertDocument(OperationContext* txn, if (!status.isOK()) return status; - vector<BSONObj> docs; - docs.push_back(doc); - getGlobalServiceContext()->getOpObserver()->onInserts(txn, ns(), docs.begin(), docs.end()); + getGlobalServiceContext()->getOpObserver()->onInsert(txn, ns(), doc); txn->recoveryUnit()->onCommit([this]() { notifyCappedWaitersIfNeeded(); }); diff --git a/src/mongo/db/op_observer.cpp b/src/mongo/db/op_observer.cpp index 720d26ca9f2..0563482a99a 100644 --- a/src/mongo/db/op_observer.cpp +++ b/src/mongo/db/op_observer.cpp @@ -43,8 +43,6 @@ namespace mongo { -using std::vector; - void OpObserver::onCreateIndex(OperationContext* txn, const std::string& ns, BSONObj indexDoc, @@ -56,21 +54,16 @@ void OpObserver::onCreateIndex(OperationContext* txn, logOpForDbHash(txn, ns.c_str()); } -void OpObserver::onInserts(OperationContext* txn, - const NamespaceString& nss, - vector<BSONObj>::iterator begin, - vector<BSONObj>::iterator end, - bool fromMigrate) { - repl::logOps(txn, "i", nss, begin, end, fromMigrate); - - const char* ns = nss.ns().c_str(); - for (auto it = begin; it != end; it++) { - getGlobalAuthorizationManager()->logOp(txn, "i", ns, *it, nullptr); - logOpForSharding(txn, "i", ns, *it, nullptr, fromMigrate); - } +void OpObserver::onInsert(OperationContext* txn, + const NamespaceString& ns, + BSONObj doc, + bool fromMigrate) { + repl::logOp(txn, "i", ns.ns().c_str(), doc, nullptr, fromMigrate); - logOpForDbHash(txn, ns); - if (strstr(ns, ".system.js")) { + getGlobalAuthorizationManager()->logOp(txn, "i", ns.ns().c_str(), doc, nullptr); + logOpForSharding(txn, "i", ns.ns().c_str(), doc, nullptr, fromMigrate); + logOpForDbHash(txn, ns.ns().c_str()); + if (strstr(ns.ns().c_str(), ".system.js")) { Scope::storedFuncMod(txn); } } diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h index 2ef3a191f0a..4ecf7df20d0 100644 --- a/src/mongo/db/op_observer.h +++ b/src/mongo/db/op_observer.h @@ -55,11 +55,10 @@ public: const std::string& ns, BSONObj indexDoc, bool fromMigrate = false); - void onInserts(OperationContext* txn, - const NamespaceString& ns, - std::vector<BSONObj>::iterator begin, - std::vector<BSONObj>::iterator end, - bool fromMigrate = false); + void onInsert(OperationContext* txn, + const NamespaceString& ns, + BSONObj doc, + bool fromMigrate = false); void onUpdate(OperationContext* txn, oplogUpdateEntryArgs args); void onDelete(OperationContext* txn, const std::string& ns, diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 2a29126bdbd..629a2e91f41 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -100,7 +100,6 @@ using std::endl; using std::string; using std::stringstream; using std::unique_ptr; -using std::vector; namespace repl { std::string rsOplogName = "local.oplog.rs"; @@ -128,11 +127,6 @@ void checkOplogInsert(Status result) { massert(17322, str::stream() << "write to oplog failed: " << result.toString(), result.isOK()); } -struct OplogSlot { - OpTime opTime; - int64_t hash; -}; - /** * Allocates an optime for a new entry in the oplog, and updates the replication coordinator to * reflect that new optime. Returns the new optime and the correct value of the "h" field for @@ -143,14 +137,14 @@ struct OplogSlot { * function registers the new optime with the storage system and the replication coordinator, * and provides no facility to revert those registrations on rollback. */ -OplogSlot getNextOpTime(OperationContext* txn, - Collection* oplog, - ReplicationCoordinator* replCoord, - ReplicationCoordinator::Mode replicationMode) { +std::pair<OpTime, long long> getNextOpTime(OperationContext* txn, + Collection* oplog, + ReplicationCoordinator* replCoord, + const char* opstr, + ReplicationCoordinator::Mode replicationMode) { synchronizeOnCappedInFlightResource(txn->lockState(), oplog->ns()); - OplogSlot slot; - slot.hash = 0; + long long hashNew = 0; long long term = OpTime::kUninitializedTerm; // Fetch term out of the newOpMutex. @@ -168,11 +162,11 @@ OplogSlot getNextOpTime(OperationContext* txn, // Set hash if we're in replset mode, otherwise it remains 0 in master/slave. if (replicationMode == ReplicationCoordinator::modeReplSet) { - slot.hash = hashGenerator.nextInt64(); + hashNew = hashGenerator.nextInt64(); } - slot.opTime = OpTime(ts, term); - return slot; + OpTime opTime(ts, term); + return std::pair<OpTime, long long>(opTime, hashNew); } /** @@ -240,10 +234,7 @@ void setOplogCollectionName() { } namespace { - -Collection* getLocalOplogCollection(OperationContext* txn, const std::string& oplogCollectionName) { - if (_localOplogCollection) - return _localOplogCollection; +void cacheOplogCollection(OperationContext* txn, const std::string& oplogCollectionName) { Lock::DBLock lk(txn->lockState(), "local", MODE_IX); Lock::CollectionLock lk2(txn->lockState(), oplogCollectionName, MODE_IX); @@ -255,12 +246,11 @@ Collection* getLocalOplogCollection(OperationContext* txn, const std::string& op "the oplog collection " + oplogCollectionName + " missing. did you drop it? if so, restart the server", _localOplogCollection); - return _localOplogCollection; } bool oplogDisabled(OperationContext* txn, ReplicationCoordinator::Mode replicationMode, - const NamespaceString& nss) { + NamespaceString& nss) { if (replicationMode == ReplicationCoordinator::modeNone) return true; @@ -280,7 +270,7 @@ bool oplogDisabled(OperationContext* txn, OplogDocWriter _logOpWriter(OperationContext* txn, const char* opstr, - const NamespaceString& nss, + NamespaceString& nss, const BSONObj& obj, BSONObj* o2, bool fromMigrate, @@ -349,7 +339,6 @@ void truncateOplogTo(OperationContext* txn, Timestamp truncateTimestamp) { oplogCollection->temp_cappedTruncateAfter(txn, lastRecordId, false); } } - /* we write to local.oplog.rs: { ts : ..., h: ..., v: ..., op: ..., etc } ts: an OpTime timestamp @@ -367,15 +356,22 @@ void truncateOplogTo(OperationContext* txn, Timestamp truncateTimestamp) { if not null, specifies a boolean to pass along to the other side as b: param. used for "justOne" or "upsert" flags on 'd', 'u' */ -void _logOpsInner(OperationContext* txn, - const char* opstr, - const NamespaceString& nss, - vector<OplogDocWriter> writers, - bool fromMigrate, - Collection* oplogCollection, - ReplicationCoordinator::Mode replicationMode, - bool updateReplOpTime, - OpTime finalOpTime) { +void _logOp(OperationContext* txn, + const char* opstr, + const char* ns, + const BSONObj& obj, + BSONObj* o2, + bool fromMigrate, + const std::string& oplogCollectionName, + ReplicationCoordinator::Mode replicationMode, + bool updateReplOpTime) { + NamespaceString nss(ns); + if (oplogDisabled(txn, replicationMode, nss)) + return; + + if (_localOplogCollection == nullptr) + cacheOplogCollection(txn, oplogCollectionName); + Lock::DBLock lk(txn->lockState(), "local", MODE_IX); ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); @@ -385,68 +381,21 @@ void _logOpsInner(OperationContext* txn, severe() << "logOp() but can't accept write to collection " << nss.ns(); fassertFailed(17405); } + Lock::CollectionLock lk2(txn->lockState(), oplogCollectionName, MODE_IX); + auto slot = getNextOpTime(txn, _localOplogCollection, replCoord, opstr, replicationMode); // we jump through a bunch of hoops here to avoid copying the obj buffer twice -- // instead we do a single copy to the destination in the record store. - for (auto it = writers.begin(); it != writers.end(); it++) - checkOplogInsert(oplogCollection->insertDocument(txn, &(*it), false)); + auto writer = _logOpWriter(txn, opstr, nss, obj, o2, fromMigrate, slot.first, slot.second); + checkOplogInsert(_localOplogCollection->insertDocument(txn, &writer, false)); // Set replCoord last optime only after we're sure the WUOW didn't abort and roll back. - if (updateReplOpTime) - txn->recoveryUnit()->registerChange(new UpdateReplOpTimeChange(finalOpTime, replCoord)); - - ReplClientInfo::forClient(txn->getClient()).setLastOp(finalOpTime); -} - -void _logOp(OperationContext* txn, - const char* opstr, - const char* ns, - const BSONObj& obj, - BSONObj* o2, - bool fromMigrate, - const std::string& oplogName, - ReplicationCoordinator::Mode replMode, - bool updateOpTime) { - NamespaceString nss(ns); - if (oplogDisabled(txn, replMode, nss)) - return; - - ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); - vector<OplogDocWriter> writers; - Collection* oplog = getLocalOplogCollection(txn, oplogName); - Lock::CollectionLock lock(txn->lockState(), oplogName, MODE_IX); - auto slot = getNextOpTime(txn, oplog, replCoord, replMode); - auto writer = _logOpWriter(txn, opstr, nss, obj, o2, fromMigrate, slot.opTime, slot.hash); - writers.push_back(writer); - _logOpsInner(txn, opstr, nss, writers, fromMigrate, oplog, replMode, updateOpTime, slot.opTime); -} - -void logOps(OperationContext* txn, - const char* opstr, - const NamespaceString& nss, - std::vector<BSONObj>::iterator begin, - std::vector<BSONObj>::iterator end, - bool fromMigrate) { - ReplicationCoordinator::Mode replMode = ReplicationCoordinator::get(txn)->getReplicationMode(); - - invariant(begin != end); - if (oplogDisabled(txn, replMode, nss)) - return; - - vector<OplogDocWriter> writers; - OpTime finalOpTime; - ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); - Collection* oplog = getLocalOplogCollection(txn, _oplogCollectionName); - Lock::CollectionLock lock(txn->lockState(), _oplogCollectionName, MODE_IX); - for (auto it = begin; it != end; it++) { - auto slot = getNextOpTime(txn, oplog, replCoord, replMode); - finalOpTime = slot.opTime; - auto writer = _logOpWriter(txn, opstr, nss, *it, NULL, fromMigrate, slot.opTime, slot.hash); - writers.emplace_back(writer); + if (updateReplOpTime) { + txn->recoveryUnit()->registerChange(new UpdateReplOpTimeChange(slot.first, replCoord)); } - _logOpsInner(txn, opstr, nss, writers, fromMigrate, oplog, replMode, true, finalOpTime); -} + ReplClientInfo::forClient(txn->getClient()).setLastOp(slot.first); +} void logOp(OperationContext* txn, const char* opstr, @@ -454,8 +403,15 @@ void logOp(OperationContext* txn, const BSONObj& obj, BSONObj* o2, bool fromMigrate) { - ReplicationCoordinator::Mode replMode = ReplicationCoordinator::get(txn)->getReplicationMode(); - _logOp(txn, opstr, ns, obj, o2, fromMigrate, _oplogCollectionName, replMode, true); + _logOp(txn, + opstr, + ns, + obj, + o2, + fromMigrate, + _oplogCollectionName, + ReplicationCoordinator::get(txn)->getReplicationMode(), + true); } OpTime writeOpsToOplog(OperationContext* txn, const std::deque<BSONObj>& ops) { diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index 613c02edbcf..bd47fbc0a1b 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -81,7 +81,7 @@ extern std::string masterSlaveOplogName; extern int OPLOG_VERSION; -/* Log operation(s) to the local oplog +/** Log an operation to the local oplog * * @param opstr * "i" insert @@ -90,16 +90,8 @@ extern int OPLOG_VERSION; * "c" db cmd * "n" no-op * "db" declares presence of a database (ns is set to the db name + '.') - */ - -void logOps(OperationContext* txn, - const char* opstr, - const NamespaceString& nss, - std::vector<BSONObj>::iterator begin, - std::vector<BSONObj>::iterator end, - bool fromMigrate); - -/* For 'u' records, 'obj' captures the mutation made to the object but not + * + * For 'u' records, 'obj' captures the mutation made to the object but not * the object itself. 'o2' captures the the criteria for the object that will be modified. * * Sets replCoord last optime if 'updateReplOpTime' is true. |