diff options
author | Mathias Stearn <mathias@10gen.com> | 2016-05-05 15:56:55 -0400 |
---|---|---|
committer | Mathias Stearn <mathias@10gen.com> | 2016-05-09 17:52:12 -0400 |
commit | 955304d50df6c94bb10e757736e531ce91627d23 (patch) | |
tree | 0220a3476219d5dcdb20b15b5ca836a19c24b9eb /src/mongo | |
parent | 0257f34483b0dee3691b8bd4a6715ac33c250b67 (diff) | |
download | mongo-955304d50df6c94bb10e757736e531ce91627d23.tar.gz |
SERVER-24005 Allow getNextOpTime() to hand out batches of OpTimes
This allows a lot of expensive work that is done inside of a mutex to be done
once per batch rather than once per document.
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/global_timestamp.cpp | 47 | ||||
-rw-r--r-- | src/mongo/db/global_timestamp.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 117 |
3 files changed, 82 insertions, 85 deletions
diff --git a/src/mongo/db/global_timestamp.cpp b/src/mongo/db/global_timestamp.cpp index d380e2a4f05..bd082cb30d1 100644 --- a/src/mongo/db/global_timestamp.cpp +++ b/src/mongo/db/global_timestamp.cpp @@ -37,49 +37,54 @@ namespace mongo { namespace { +// This is the value of the next timestamp to handed out. AtomicUInt64 globalTimestamp(0); } // namespace void setGlobalTimestamp(const Timestamp& newTime) { - globalTimestamp.store(newTime.asULL()); + globalTimestamp.store(newTime.asULL() + 1); } Timestamp getLastSetTimestamp() { - return Timestamp(globalTimestamp.load()); + return Timestamp(globalTimestamp.load() - 1); } -Timestamp getNextGlobalTimestamp() { +Timestamp getNextGlobalTimestamp(unsigned count) { const unsigned now = durationCount<Seconds>( getGlobalServiceContext()->getFastClockSource()->now().toDurationSinceEpoch()); + invariant(now != 0); // This is a sentinel value for null Timestamps. + invariant(count != 0); // Optimistic approach: just increment the timestamp, assuming the seconds still match. - auto next = globalTimestamp.addAndFetch(1); - unsigned globalSecs = Timestamp(next).getSecs(); + auto first = globalTimestamp.fetchAndAdd(count); + auto currentTimestamp = first + count; // What we just set it to. + unsigned globalSecs = Timestamp(currentTimestamp).getSecs(); // Fail if time is not moving forward for 2**31 calls to getNextGlobalTimestamp. - if (globalSecs > now && Timestamp(next).getInc() >= 1U << 31) { - mongo::warning() << "clock skew detected, prev: " << Timestamp(next).getSecs() - << " now: " << now << std::endl; + if (MONGO_unlikely(globalSecs > now) && Timestamp(currentTimestamp).getInc() >= 1U << 31) { + mongo::severe() << "clock skew detected, prev: " << globalSecs << " now: " << now; fassertFailed(17449); } - // While the seconds need to be updated, try to do it. - while (globalSecs < now) { - const auto expected = next; - const auto desired = Timestamp(now, 1).asULL(); + // If the seconds need to be updated, try to do it. This can happen at most once per second. + if (MONGO_unlikely(globalSecs < now)) { + // First fix the seconds portion. + while (globalSecs < now) { + const auto desired = Timestamp(now, 1).asULL(); - // If the compareAndSwap was not successful, assume someone else updated the seconds. - auto actual = globalTimestamp.compareAndSwap(expected, desired); - if (actual == expected) { - next = desired; - } else { - next = globalTimestamp.addAndFetch(1); + auto actual = globalTimestamp.compareAndSwap(currentTimestamp, desired); + if (actual == currentTimestamp) + break; // We successfully set the secs, so we're done here. + + // We raced with someone else. Try again, unless they fixed the secs field for us. + currentTimestamp = actual; + globalSecs = Timestamp(currentTimestamp).getSecs(); } - // Either way, the seconds should no longer be less than now, but repeat if we raced. - globalSecs = Timestamp(next).getSecs(); + // Now reserve our timestamps with the new value of secs. + first = globalTimestamp.fetchAndAdd(count); } - return Timestamp(next); + return Timestamp(first); } } // namespace mongo diff --git a/src/mongo/db/global_timestamp.h b/src/mongo/db/global_timestamp.h index 64d1ce551e6..9c106147b6a 100644 --- a/src/mongo/db/global_timestamp.h +++ b/src/mongo/db/global_timestamp.h @@ -40,6 +40,7 @@ Timestamp getLastSetTimestamp(); /** * Generates a new and unique Timestamp. + * If count > 1 that many unique Timestamps are reserved starting with the returned value. */ -Timestamp getNextGlobalTimestamp(); +Timestamp getNextGlobalTimestamp(unsigned count = 1); } diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index ca0620ab203..2566562b1da 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -129,7 +129,7 @@ void checkOplogInsert(Status result) { struct OplogSlot { OpTime opTime; - int64_t hash; + int64_t hash = 0; }; /** @@ -142,14 +142,13 @@ struct OplogSlot { * function registers the new optime with the storage system and the replication coordinator, * and provides no facility to revert those registrations on rollback. */ -OplogSlot getNextOpTime(OperationContext* txn, - Collection* oplog, - ReplicationCoordinator* replCoord, - ReplicationCoordinator::Mode replicationMode) { +void getNextOpTime(OperationContext* txn, + Collection* oplog, + ReplicationCoordinator* replCoord, + ReplicationCoordinator::Mode replicationMode, + unsigned count, + OplogSlot* slotsOut) { synchronizeOnCappedInFlightResource(txn->lockState(), oplog->ns()); - - OplogSlot slot; - slot.hash = 0; long long term = OpTime::kUninitializedTerm; // Fetch term out of the newOpMutex. @@ -160,18 +159,19 @@ OplogSlot getNextOpTime(OperationContext* txn, } stdx::lock_guard<stdx::mutex> lk(newOpMutex); - Timestamp ts = getNextGlobalTimestamp(); + Timestamp ts = getNextGlobalTimestamp(count); newTimestampNotifier.notify_all(); fassert(28560, oplog->getRecordStore()->oplogDiskLocRegister(txn, ts)); // Set hash if we're in replset mode, otherwise it remains 0 in master/slave. - if (replicationMode == ReplicationCoordinator::modeReplSet) { - slot.hash = hashGenerator.nextInt64(); + const bool needHash = (replicationMode == ReplicationCoordinator::modeReplSet); + for (unsigned i = 0; i < count; i++) { + slotsOut[i].opTime = {Timestamp(ts.asULL() + i), term}; + if (needHash) { + slotsOut[i].hash = hashGenerator.nextInt64(); + } } - - slot.opTime = OpTime(ts, term); - return slot; } /** @@ -180,9 +180,10 @@ OplogSlot getNextOpTime(OperationContext* txn, * which can be very large * TODO: can have this build the entire doc */ -class OplogDocWriter : public DocWriter { +class OplogDocWriter final : public DocWriter { public: - OplogDocWriter(const BSONObj& frame, const BSONObj& oField) : _frame(frame), _oField(oField) {} + OplogDocWriter(BSONObj frame, BSONObj oField) + : _frame(std::move(frame)), _oField(std::move(oField)) {} void writeDocument(char* start) const { char* buf = start; @@ -211,22 +212,6 @@ private: BSONObj _oField; }; -class UpdateReplOpTimeChange : public RecoveryUnit::Change { -public: - UpdateReplOpTimeChange(OpTime newOpTime, ReplicationCoordinator* replCoord) - : _newOpTime(newOpTime), _replCoord(replCoord) {} - - virtual void commit() { - _replCoord->setMyLastAppliedOpTimeForward(_newOpTime); - } - - virtual void rollback() {} - -private: - const OpTime _newOpTime; - ReplicationCoordinator* _replCoord; -}; - } // namespace void setOplogCollectionName() { @@ -277,14 +262,14 @@ bool oplogDisabled(OperationContext* txn, return false; } -unique_ptr<OplogDocWriter> _logOpWriter(OperationContext* txn, - const char* opstr, - const NamespaceString& nss, - const BSONObj& obj, - const BSONObj* o2, - bool fromMigrate, - OpTime optime, - long long hashNew) { +OplogDocWriter _logOpWriter(OperationContext* txn, + const char* opstr, + const NamespaceString& nss, + const BSONObj& obj, + const BSONObj* o2, + bool fromMigrate, + OpTime optime, + long long hashNew) { BSONObjBuilder b(256); b.append("ts", optime.getTimestamp()); @@ -299,7 +284,7 @@ unique_ptr<OplogDocWriter> _logOpWriter(OperationContext* txn, if (o2) b.append("o2", *o2); - return stdx::make_unique<OplogDocWriter>(OplogDocWriter(b.obj(), obj)); + return OplogDocWriter(OplogDocWriter(b.obj(), obj)); } } // end anon namespace @@ -373,10 +358,9 @@ void truncateOplogTo(OperationContext* txn, Timestamp truncateTimestamp) { used for "justOne" or "upsert" flags on 'd', 'u' */ void _logOpsInner(OperationContext* txn, - const char* opstr, const NamespaceString& nss, - const vector<unique_ptr<OplogDocWriter>>& writers, - bool fromMigrate, + const DocWriter* const* writers, + size_t nWriters, Collection* oplogCollection, ReplicationCoordinator::Mode replicationMode, bool updateReplOpTime, @@ -391,12 +375,14 @@ void _logOpsInner(OperationContext* txn, // we jump through a bunch of hoops here to avoid copying the obj buffer twice -- // instead we do a single copy to the destination in the record store. - for (auto it = writers.begin(); it != writers.end(); it++) - checkOplogInsert(oplogCollection->insertDocument(txn, it->get(), false)); + for (size_t i = 0; i < nWriters; i++) + checkOplogInsert(oplogCollection->insertDocument(txn, writers[i], false)); // Set replCoord last optime only after we're sure the WUOW didn't abort and roll back. - if (updateReplOpTime) - txn->recoveryUnit()->registerChange(new UpdateReplOpTimeChange(finalOpTime, replCoord)); + if (updateReplOpTime) { + txn->recoveryUnit()->onCommit( + [replCoord, finalOpTime] { replCoord->setMyLastAppliedOpTimeForward(finalOpTime); }); + } ReplClientInfo::forClient(txn->getClient()).setLastOp(finalOpTime); } @@ -415,14 +401,14 @@ void _logOp(OperationContext* txn, return; ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); - vector<unique_ptr<OplogDocWriter>> writers; Collection* oplog = getLocalOplogCollection(txn, oplogName); Lock::DBLock lk(txn->lockState(), "local", MODE_IX); Lock::CollectionLock lock(txn->lockState(), oplogName, MODE_IX); - auto slot = getNextOpTime(txn, oplog, replCoord, replMode); + OplogSlot slot; + getNextOpTime(txn, oplog, replCoord, replMode, 1, &slot); auto writer = _logOpWriter(txn, opstr, nss, obj, o2, fromMigrate, slot.opTime, slot.hash); - writers.emplace_back(std::move(writer)); - _logOpsInner(txn, opstr, nss, writers, fromMigrate, oplog, replMode, updateOpTime, slot.opTime); + const DocWriter* basePtr = &writer; + _logOpsInner(txn, nss, &basePtr, 1, oplog, replMode, updateOpTime, slot.opTime); } void logOps(OperationContext* txn, @@ -431,26 +417,31 @@ void logOps(OperationContext* txn, std::vector<BSONObj>::const_iterator begin, std::vector<BSONObj>::const_iterator end, bool fromMigrate) { - ReplicationCoordinator::Mode replMode = ReplicationCoordinator::get(txn)->getReplicationMode(); + ReplicationCoordinator* replCoord = ReplicationCoordinator::get(txn); + ReplicationCoordinator::Mode replMode = replCoord->getReplicationMode(); invariant(begin != end); if (oplogDisabled(txn, replMode, nss)) return; - vector<unique_ptr<OplogDocWriter>> writers; - writers.reserve(end - begin); - OpTime finalOpTime; - ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); + const size_t count = end - begin; + std::vector<OplogDocWriter> writers; + writers.reserve(count); Collection* oplog = getLocalOplogCollection(txn, _oplogCollectionName); Lock::DBLock lk(txn->lockState(), "local", MODE_IX); Lock::CollectionLock lock(txn->lockState(), _oplogCollectionName, MODE_IX); - for (auto it = begin; it != end; it++) { - auto slot = getNextOpTime(txn, oplog, replCoord, replMode); - finalOpTime = slot.opTime; - auto writer = _logOpWriter(txn, opstr, nss, *it, NULL, fromMigrate, slot.opTime, slot.hash); - writers.emplace_back(std::move(writer)); + std::unique_ptr<OplogSlot[]> slots(new OplogSlot[count]); + getNextOpTime(txn, oplog, replCoord, replMode, count, slots.get()); + for (size_t i = 0; i < count; i++) { + writers.emplace_back(_logOpWriter( + txn, opstr, nss, begin[i], NULL, fromMigrate, slots[i].opTime, slots[i].hash)); + } + + std::unique_ptr<DocWriter const* []> basePtrs(new DocWriter const* [count]); + for (size_t i = 0; i < count; i++) { + basePtrs[i] = &writers[i]; } - _logOpsInner(txn, opstr, nss, writers, fromMigrate, oplog, replMode, true, finalOpTime); + _logOpsInner(txn, nss, basePtrs.get(), count, oplog, replMode, true, slots[count - 1].opTime); } |