diff options
-rw-r--r-- | src/mongo/db/catalog/collection.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/op_observer.cpp | 25 | ||||
-rw-r--r-- | src/mongo/db/op_observer.h | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 134 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.h | 14 |
5 files changed, 125 insertions, 67 deletions
diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp index a7e2087f643..2c45a6057b7 100644 --- a/src/mongo/db/catalog/collection.cpp +++ b/src/mongo/db/catalog/collection.cpp @@ -360,11 +360,7 @@ Status Collection::insertDocuments(OperationContext* txn, return status; invariant(sid == txn->recoveryUnit()->getSnapshotId()); - int inserted = 0; - for (vector<BSONObj>::iterator it = begin; it != end; it++) { // TODO: vectorize - getGlobalServiceContext()->getOpObserver()->onInsert(txn, ns(), *it, fromMigrate); - inserted++; - } + getGlobalServiceContext()->getOpObserver()->onInserts(txn, ns(), begin, end, fromMigrate); txn->recoveryUnit()->onCommit([this]() { notifyCappedWaitersIfNeeded(); }); @@ -405,7 +401,9 @@ Status Collection::insertDocument(OperationContext* txn, if (!status.isOK()) return status; - getGlobalServiceContext()->getOpObserver()->onInsert(txn, ns(), doc); + vector<BSONObj> docs; + docs.push_back(doc); + getGlobalServiceContext()->getOpObserver()->onInserts(txn, ns(), docs.begin(), docs.end()); txn->recoveryUnit()->onCommit([this]() { notifyCappedWaitersIfNeeded(); }); diff --git a/src/mongo/db/op_observer.cpp b/src/mongo/db/op_observer.cpp index 0563482a99a..720d26ca9f2 100644 --- a/src/mongo/db/op_observer.cpp +++ b/src/mongo/db/op_observer.cpp @@ -43,6 +43,8 @@ namespace mongo { +using std::vector; + void OpObserver::onCreateIndex(OperationContext* txn, const std::string& ns, BSONObj indexDoc, @@ -54,16 +56,21 @@ void OpObserver::onCreateIndex(OperationContext* txn, logOpForDbHash(txn, ns.c_str()); } -void OpObserver::onInsert(OperationContext* txn, - const NamespaceString& ns, - BSONObj doc, - bool fromMigrate) { - repl::logOp(txn, "i", ns.ns().c_str(), doc, nullptr, fromMigrate); +void OpObserver::onInserts(OperationContext* txn, + const NamespaceString& nss, + vector<BSONObj>::iterator begin, + vector<BSONObj>::iterator end, + bool fromMigrate) { + repl::logOps(txn, "i", nss, begin, end, fromMigrate); + + const char* ns = nss.ns().c_str(); + for (auto it = begin; it != end; it++) { + getGlobalAuthorizationManager()->logOp(txn, "i", ns, *it, nullptr); + logOpForSharding(txn, "i", ns, *it, nullptr, fromMigrate); + } - getGlobalAuthorizationManager()->logOp(txn, "i", ns.ns().c_str(), doc, nullptr); - logOpForSharding(txn, "i", ns.ns().c_str(), doc, nullptr, fromMigrate); - logOpForDbHash(txn, ns.ns().c_str()); - if (strstr(ns.ns().c_str(), ".system.js")) { + logOpForDbHash(txn, ns); + if (strstr(ns, ".system.js")) { Scope::storedFuncMod(txn); } } diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h index 4ecf7df20d0..2ef3a191f0a 100644 --- a/src/mongo/db/op_observer.h +++ b/src/mongo/db/op_observer.h @@ -55,10 +55,11 @@ public: const std::string& ns, BSONObj indexDoc, bool fromMigrate = false); - void onInsert(OperationContext* txn, - const NamespaceString& ns, - BSONObj doc, - bool fromMigrate = false); + void onInserts(OperationContext* txn, + const NamespaceString& ns, + std::vector<BSONObj>::iterator begin, + std::vector<BSONObj>::iterator end, + bool fromMigrate = false); void onUpdate(OperationContext* txn, oplogUpdateEntryArgs args); void onDelete(OperationContext* txn, const std::string& ns, diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 629a2e91f41..2a29126bdbd 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -100,6 +100,7 @@ using std::endl; using std::string; using std::stringstream; using std::unique_ptr; +using std::vector; namespace repl { std::string rsOplogName = "local.oplog.rs"; @@ -127,6 +128,11 @@ void checkOplogInsert(Status result) { massert(17322, str::stream() << "write to oplog failed: " << result.toString(), result.isOK()); } +struct OplogSlot { + OpTime opTime; + int64_t hash; +}; + /** * Allocates an optime for a new entry in the oplog, and updates the replication coordinator to * reflect that new optime. Returns the new optime and the correct value of the "h" field for @@ -137,14 +143,14 @@ void checkOplogInsert(Status result) { * function registers the new optime with the storage system and the replication coordinator, * and provides no facility to revert those registrations on rollback. */ -std::pair<OpTime, long long> getNextOpTime(OperationContext* txn, - Collection* oplog, - ReplicationCoordinator* replCoord, - const char* opstr, - ReplicationCoordinator::Mode replicationMode) { +OplogSlot getNextOpTime(OperationContext* txn, + Collection* oplog, + ReplicationCoordinator* replCoord, + ReplicationCoordinator::Mode replicationMode) { synchronizeOnCappedInFlightResource(txn->lockState(), oplog->ns()); - long long hashNew = 0; + OplogSlot slot; + slot.hash = 0; long long term = OpTime::kUninitializedTerm; // Fetch term out of the newOpMutex. @@ -162,11 +168,11 @@ std::pair<OpTime, long long> getNextOpTime(OperationContext* txn, // Set hash if we're in replset mode, otherwise it remains 0 in master/slave. if (replicationMode == ReplicationCoordinator::modeReplSet) { - hashNew = hashGenerator.nextInt64(); + slot.hash = hashGenerator.nextInt64(); } - OpTime opTime(ts, term); - return std::pair<OpTime, long long>(opTime, hashNew); + slot.opTime = OpTime(ts, term); + return slot; } /** @@ -234,7 +240,10 @@ void setOplogCollectionName() { } namespace { -void cacheOplogCollection(OperationContext* txn, const std::string& oplogCollectionName) { + +Collection* getLocalOplogCollection(OperationContext* txn, const std::string& oplogCollectionName) { + if (_localOplogCollection) + return _localOplogCollection; Lock::DBLock lk(txn->lockState(), "local", MODE_IX); Lock::CollectionLock lk2(txn->lockState(), oplogCollectionName, MODE_IX); @@ -246,11 +255,12 @@ void cacheOplogCollection(OperationContext* txn, const std::string& oplogCollect "the oplog collection " + oplogCollectionName + " missing. did you drop it? if so, restart the server", _localOplogCollection); + return _localOplogCollection; } bool oplogDisabled(OperationContext* txn, ReplicationCoordinator::Mode replicationMode, - NamespaceString& nss) { + const NamespaceString& nss) { if (replicationMode == ReplicationCoordinator::modeNone) return true; @@ -270,7 +280,7 @@ bool oplogDisabled(OperationContext* txn, OplogDocWriter _logOpWriter(OperationContext* txn, const char* opstr, - NamespaceString& nss, + const NamespaceString& nss, const BSONObj& obj, BSONObj* o2, bool fromMigrate, @@ -339,6 +349,7 @@ void truncateOplogTo(OperationContext* txn, Timestamp truncateTimestamp) { oplogCollection->temp_cappedTruncateAfter(txn, lastRecordId, false); } } + /* we write to local.oplog.rs: { ts : ..., h: ..., v: ..., op: ..., etc } ts: an OpTime timestamp @@ -356,22 +367,15 @@ void truncateOplogTo(OperationContext* txn, Timestamp truncateTimestamp) { if not null, specifies a boolean to pass along to the other side as b: param. used for "justOne" or "upsert" flags on 'd', 'u' */ -void _logOp(OperationContext* txn, - const char* opstr, - const char* ns, - const BSONObj& obj, - BSONObj* o2, - bool fromMigrate, - const std::string& oplogCollectionName, - ReplicationCoordinator::Mode replicationMode, - bool updateReplOpTime) { - NamespaceString nss(ns); - if (oplogDisabled(txn, replicationMode, nss)) - return; - - if (_localOplogCollection == nullptr) - cacheOplogCollection(txn, oplogCollectionName); - +void _logOpsInner(OperationContext* txn, + const char* opstr, + const NamespaceString& nss, + vector<OplogDocWriter> writers, + bool fromMigrate, + Collection* oplogCollection, + ReplicationCoordinator::Mode replicationMode, + bool updateReplOpTime, + OpTime finalOpTime) { Lock::DBLock lk(txn->lockState(), "local", MODE_IX); ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); @@ -381,37 +385,77 @@ void _logOp(OperationContext* txn, severe() << "logOp() but can't accept write to collection " << nss.ns(); fassertFailed(17405); } - Lock::CollectionLock lk2(txn->lockState(), oplogCollectionName, MODE_IX); - auto slot = getNextOpTime(txn, _localOplogCollection, replCoord, opstr, replicationMode); // we jump through a bunch of hoops here to avoid copying the obj buffer twice -- // instead we do a single copy to the destination in the record store. - auto writer = _logOpWriter(txn, opstr, nss, obj, o2, fromMigrate, slot.first, slot.second); - checkOplogInsert(_localOplogCollection->insertDocument(txn, &writer, false)); + for (auto it = writers.begin(); it != writers.end(); it++) + checkOplogInsert(oplogCollection->insertDocument(txn, &(*it), false)); // Set replCoord last optime only after we're sure the WUOW didn't abort and roll back. - if (updateReplOpTime) { - txn->recoveryUnit()->registerChange(new UpdateReplOpTimeChange(slot.first, replCoord)); - } + if (updateReplOpTime) + txn->recoveryUnit()->registerChange(new UpdateReplOpTimeChange(finalOpTime, replCoord)); + + ReplClientInfo::forClient(txn->getClient()).setLastOp(finalOpTime); +} - ReplClientInfo::forClient(txn->getClient()).setLastOp(slot.first); +void _logOp(OperationContext* txn, + const char* opstr, + const char* ns, + const BSONObj& obj, + BSONObj* o2, + bool fromMigrate, + const std::string& oplogName, + ReplicationCoordinator::Mode replMode, + bool updateOpTime) { + NamespaceString nss(ns); + if (oplogDisabled(txn, replMode, nss)) + return; + + ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); + vector<OplogDocWriter> writers; + Collection* oplog = getLocalOplogCollection(txn, oplogName); + Lock::CollectionLock lock(txn->lockState(), oplogName, MODE_IX); + auto slot = getNextOpTime(txn, oplog, replCoord, replMode); + auto writer = _logOpWriter(txn, opstr, nss, obj, o2, fromMigrate, slot.opTime, slot.hash); + writers.push_back(writer); + _logOpsInner(txn, opstr, nss, writers, fromMigrate, oplog, replMode, updateOpTime, slot.opTime); } +void logOps(OperationContext* txn, + const char* opstr, + const NamespaceString& nss, + std::vector<BSONObj>::iterator begin, + std::vector<BSONObj>::iterator end, + bool fromMigrate) { + ReplicationCoordinator::Mode replMode = ReplicationCoordinator::get(txn)->getReplicationMode(); + + invariant(begin != end); + if (oplogDisabled(txn, replMode, nss)) + return; + + vector<OplogDocWriter> writers; + OpTime finalOpTime; + ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); + Collection* oplog = getLocalOplogCollection(txn, _oplogCollectionName); + Lock::CollectionLock lock(txn->lockState(), _oplogCollectionName, MODE_IX); + for (auto it = begin; it != end; it++) { + auto slot = getNextOpTime(txn, oplog, replCoord, replMode); + finalOpTime = slot.opTime; + auto writer = _logOpWriter(txn, opstr, nss, *it, NULL, fromMigrate, slot.opTime, slot.hash); + writers.emplace_back(writer); + } + _logOpsInner(txn, opstr, nss, writers, fromMigrate, oplog, replMode, true, finalOpTime); +} + + void logOp(OperationContext* txn, const char* opstr, const char* ns, const BSONObj& obj, BSONObj* o2, bool fromMigrate) { - _logOp(txn, - opstr, - ns, - obj, - o2, - fromMigrate, - _oplogCollectionName, - ReplicationCoordinator::get(txn)->getReplicationMode(), - true); + ReplicationCoordinator::Mode replMode = ReplicationCoordinator::get(txn)->getReplicationMode(); + _logOp(txn, opstr, ns, obj, o2, fromMigrate, _oplogCollectionName, replMode, true); } OpTime writeOpsToOplog(OperationContext* txn, const std::deque<BSONObj>& ops) { diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index bd47fbc0a1b..613c02edbcf 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -81,7 +81,7 @@ extern std::string masterSlaveOplogName; extern int OPLOG_VERSION; -/** Log an operation to the local oplog +/* Log operation(s) to the local oplog * * @param opstr * "i" insert @@ -90,8 +90,16 @@ extern int OPLOG_VERSION; * "c" db cmd * "n" no-op * "db" declares presence of a database (ns is set to the db name + '.') - * - * For 'u' records, 'obj' captures the mutation made to the object but not + */ + +void logOps(OperationContext* txn, + const char* opstr, + const NamespaceString& nss, + std::vector<BSONObj>::iterator begin, + std::vector<BSONObj>::iterator end, + bool fromMigrate); + +/* For 'u' records, 'obj' captures the mutation made to the object but not * the object itself. 'o2' captures the the criteria for the object that will be modified. * * Sets replCoord last optime if 'updateReplOpTime' is true. |