diff options
author | Benety Goh <benety@mongodb.com> | 2019-04-24 18:09:11 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2019-04-24 18:09:25 -0400 |
commit | dd01e3b6f14f52205ccb9ff738d712531665355b (patch) | |
tree | 235691ef948f5611e20bb8881156ab1b5089cb46 /src | |
parent | a5c727a27bd9b3a2b1f43190b9cb8e08812bd2f9 (diff) | |
download | mongo-dd01e3b6f14f52205ccb9ff738d712531665355b.tar.gz |
SERVER-40809 clean up LocalOplogInfo in oplog.cpp
remove persist argument from _getNextOpTimes()
add LocalOplogInfo::get()
move _getNextOpTimes() into LocalOplogInfo
make LocalOplogInfo::oplog and oplogName private and provide accessors/mutators
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 152 |
1 files changed, 107 insertions, 45 deletions
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 2a3dcd239d0..89995c53a80 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -121,37 +121,94 @@ MONGO_FAIL_POINT_DEFINE(hangBeforeLogOpAdvancesLastApplied); /** * This structure contains per-service-context state related to the oplog. */ -struct LocalOplogInfo { +class LocalOplogInfo { +public: + static LocalOplogInfo* get(ServiceContext& service); + static LocalOplogInfo* get(ServiceContext* service); + static LocalOplogInfo* get(OperationContext* opCtx); + LocalOplogInfo(const LocalOplogInfo&) = delete; LocalOplogInfo& operator=(const LocalOplogInfo&) = delete; LocalOplogInfo() = default; + /** + * Returns namespace of the local oplog collection. + */ + const NamespaceString& getOplogCollectionName() const; + + /** + * Detects the current replication mode and sets the "_oplogName" accordingly. + */ + void setOplogCollectionName(ServiceContext* service); + + Collection* getCollection() const; + void setCollection(Collection* oplog); + void resetCollection(); + + /** + * Sets the global Timestamp to be 'newTime'. + */ + void setNewTimestamp(ServiceContext* opCtx, const Timestamp& newTime); + + /** + * Allocates optimes for new entries in the oplog. Stores results in 'slotsOut', which + * contain the new optimes along with their terms and newly calculated hash fields. + */ + std::vector<OplogSlot> getNextOpTimes(OperationContext* opCtx, std::size_t count); + +private: // Name of the oplog collection. - NamespaceString oplogName; + NamespaceString _oplogName; // The "oplog" pointer is always valid (or null) because an operation must take the global // exclusive lock to set the pointer to null when the Collection instance is destroyed. See // "oplogCheckCloseDatabase". - Collection* oplog = nullptr; + Collection* _oplog = nullptr; // Synchronizes the section where a new Timestamp is generated and when it is registered in the // storage engine. - stdx::mutex newOpMutex; + mutable stdx::mutex _newOpMutex; }; const auto localOplogInfo = ServiceContext::declareDecoration<LocalOplogInfo>(); +// static +LocalOplogInfo* LocalOplogInfo::get(ServiceContext& service) { + return get(&service); +} + +// static +LocalOplogInfo* LocalOplogInfo::get(ServiceContext* service) { + return &localOplogInfo(service); +} + +// static +LocalOplogInfo* LocalOplogInfo::get(OperationContext* opCtx) { + return get(opCtx->getServiceContext()); +} + +const NamespaceString& LocalOplogInfo::getOplogCollectionName() const { + return _oplogName; +} + +Collection* LocalOplogInfo::getCollection() const { + return _oplog; +} + +void LocalOplogInfo::setCollection(Collection* oplog) { + _oplog = oplog; +} + +void LocalOplogInfo::resetCollection() { + _oplog = nullptr; +} + // so we can fail the same way void checkOplogInsert(Status result) { massert(17322, str::stream() << "write to oplog failed: " << result.toString(), result.isOK()); } -void _getNextOpTimes(OperationContext* opCtx, - Collection* oplog, - std::size_t count, - OplogSlot* slotsOut, - bool persist = true) { - auto& oplogInfo = localOplogInfo(opCtx->getServiceContext()); +std::vector<OplogSlot> LocalOplogInfo::getNextOpTimes(OperationContext* opCtx, std::size_t count) { auto replCoord = ReplicationCoordinator::get(opCtx); long long term = OpTime::kUninitializedTerm; @@ -172,18 +229,22 @@ void _getNextOpTimes(OperationContext* opCtx, // Allow the storage engine to start the transaction outside the critical section. opCtx->recoveryUnit()->preallocateSnapshot(); - stdx::lock_guard<stdx::mutex> lk(oplogInfo.newOpMutex); + stdx::lock_guard<stdx::mutex> lk(_newOpMutex); ts = LogicalClock::get(opCtx)->reserveTicks(count).asTimestamp(); const bool orderedCommit = false; - if (persist) { - fassert(28560, oplog->getRecordStore()->oplogDiskLocRegister(opCtx, ts, orderedCommit)); - } + // The local oplog collection pointer must already be established by this point. + // We can't establish it here because that would require locking the local database, which would + // be a lock order violation. + invariant(_oplog); + fassert(28560, _oplog->getRecordStore()->oplogDiskLocRegister(opCtx, ts, orderedCommit)); + std::vector<OplogSlot> oplogSlots(count); for (std::size_t i = 0; i < count; i++) { - slotsOut[i] = {Timestamp(ts.asULL() + i), term}; + oplogSlots[i] = {Timestamp(ts.asULL() + i), term}; } + return oplogSlots; } /** @@ -256,10 +317,10 @@ bool shouldBuildInForeground(OperationContext* opCtx, } // namespace -void setOplogCollectionName(ServiceContext* service) { +void LocalOplogInfo::setOplogCollectionName(ServiceContext* service) { switch (ReplicationCoordinator::get(service)->getReplicationMode()) { case ReplicationCoordinator::modeReplSet: - localOplogInfo(service).oplogName = NamespaceString::kRsOplogNamespace; + _oplogName = NamespaceString::kRsOplogNamespace; break; case ReplicationCoordinator::modeNone: // leave empty. @@ -267,6 +328,10 @@ void setOplogCollectionName(ServiceContext* service) { } } +void setOplogCollectionName(ServiceContext* service) { + LocalOplogInfo::get(service)->setOplogCollectionName(service); +} + /** * Parse the given BSON array of BSON into a vector of BSON. */ @@ -593,25 +658,25 @@ OpTime logOp(OperationContext* opCtx, return {}; } - const auto& oplogInfo = localOplogInfo(opCtx->getServiceContext()); + auto oplogInfo = LocalOplogInfo::get(opCtx); // Obtain Collection exclusive intent write lock for non-document-locking storage engines. boost::optional<Lock::DBLock> dbWriteLock; boost::optional<Lock::CollectionLock> collWriteLock; if (!opCtx->getServiceContext()->getStorageEngine()->supportsDocLocking()) { dbWriteLock.emplace(opCtx, NamespaceString::kLocalDb, MODE_IX); - collWriteLock.emplace(opCtx, oplogInfo.oplogName, MODE_IX); + collWriteLock.emplace(opCtx, oplogInfo->getOplogCollectionName(), MODE_IX); } - auto const oplog = oplogInfo.oplog; OplogSlot slot; WriteUnitOfWork wuow(opCtx); if (oplogSlot.isNull()) { - _getNextOpTimes(opCtx, oplog, 1, &slot); + slot = oplogInfo->getNextOpTimes(opCtx, 1U)[0]; } else { slot = oplogSlot; } + auto oplog = oplogInfo->getCollection(); auto writer = _logOpWriter(opCtx, opstr, nss, @@ -654,17 +719,16 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx, const size_t count = end - begin; std::vector<OplogDocWriter> writers; writers.reserve(count); - const auto& oplogInfo = localOplogInfo(opCtx->getServiceContext()); + auto oplogInfo = LocalOplogInfo::get(opCtx); // Obtain Collection exclusive intent write lock for non-document-locking storage engines. boost::optional<Lock::DBLock> dbWriteLock; boost::optional<Lock::CollectionLock> collWriteLock; if (!opCtx->getServiceContext()->getStorageEngine()->supportsDocLocking()) { dbWriteLock.emplace(opCtx, NamespaceString::kLocalDb, MODE_IX); - collWriteLock.emplace(opCtx, oplogInfo.oplogName, MODE_IX); + collWriteLock.emplace(opCtx, oplogInfo->getOplogCollectionName(), MODE_IX); } - auto oplog = oplogInfo.oplog; WriteUnitOfWork wuow(opCtx); OperationSessionInfo sessionInfo; @@ -684,7 +748,7 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx, auto insertStatementOplogSlot = begin[i].oplogSlot; // Fetch optime now, if not already fetched. if (insertStatementOplogSlot.isNull()) { - _getNextOpTimes(opCtx, oplog, 1, &insertStatementOplogSlot); + insertStatementOplogSlot = oplogInfo->getNextOpTimes(opCtx, 1U)[0]; } // Only 'applyOps' oplog entries can be prepared. constexpr bool prepare = false; @@ -723,6 +787,7 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx, invariant(!opTimes.empty()); auto lastOpTime = opTimes.back(); invariant(!lastOpTime.isNull()); + auto oplog = oplogInfo->getCollection(); _logOpsInner( opCtx, nss, basePtrs.get(), timestamps.get(), count, oplog, lastOpTime, wallClockTime); wuow.commit(); @@ -837,7 +902,7 @@ void createOplog(OperationContext* opCtx, const std::string& oplogCollectionName void createOplog(OperationContext* opCtx) { const auto isReplSet = ReplicationCoordinator::get(opCtx)->getReplicationMode() == ReplicationCoordinator::modeReplSet; - createOplog(opCtx, localOplogInfo(opCtx->getServiceContext()).oplogName.ns(), isReplSet); + createOplog(opCtx, LocalOplogInfo::get(opCtx)->getOplogCollectionName().ns(), isReplSet); } OplogSlot getNextOpTimeNoPersistForTesting(OperationContext* opCtx) { @@ -851,15 +916,7 @@ OplogSlot getNextOpTimeNoPersistForTesting(OperationContext* opCtx) { MONGO_REGISTER_SHIM(GetNextOpTimeClass::getNextOpTimes) (OperationContext* opCtx, std::size_t count)->std::vector<OplogSlot> { - // The local oplog collection pointer must already be established by this point. - // We can't establish it here because that would require locking the local database, which would - // be a lock order violation. - auto oplog = localOplogInfo(opCtx->getServiceContext()).oplog; - invariant(oplog); - std::vector<OplogSlot> oplogSlots(count); - auto oplogSlot = oplogSlots.begin(); - _getNextOpTimes(opCtx, oplog, count, &(*oplogSlot)); - return oplogSlots; + return LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, count); } // ------------------------------------- @@ -2072,11 +2129,15 @@ Status applyCommand_inlock(OperationContext* opCtx, return Status::OK(); } -void setNewTimestamp(ServiceContext* service, const Timestamp& newTime) { - stdx::lock_guard<stdx::mutex> lk(localOplogInfo(service).newOpMutex); +void LocalOplogInfo::setNewTimestamp(ServiceContext* service, const Timestamp& newTime) { + stdx::lock_guard<stdx::mutex> lk(_newOpMutex); LogicalClock::get(service)->setClusterTimeFromTrustedSource(LogicalTime(newTime)); } +void setNewTimestamp(ServiceContext* service, const Timestamp& newTime) { + LocalOplogInfo::get(service)->setNewTimestamp(service, newTime); +} + void initTimestampFromOplog(OperationContext* opCtx, const std::string& oplogNS) { DBDirectClient c(opCtx); static const BSONObj reverseNaturalObj = BSON("$natural" << -1); @@ -2092,30 +2153,31 @@ void initTimestampFromOplog(OperationContext* opCtx, const std::string& oplogNS) void oplogCheckCloseDatabase(OperationContext* opCtx, const Database* db) { invariant(opCtx->lockState()->isW()); if (db->name() == "local") { - localOplogInfo(opCtx->getServiceContext()).oplog = nullptr; + LocalOplogInfo::get(opCtx)->resetCollection(); } } void clearLocalOplogPtr() { - localOplogInfo(getGlobalServiceContext()).oplog = nullptr; + LocalOplogInfo::get(getGlobalServiceContext())->resetCollection(); } void acquireOplogCollectionForLogging(OperationContext* opCtx) { - auto& oplogInfo = localOplogInfo(opCtx->getServiceContext()); - if (!oplogInfo.oplogName.isEmpty()) { - AutoGetCollection autoColl(opCtx, oplogInfo.oplogName, MODE_IX); - oplogInfo.oplog = autoColl.getCollection(); + auto oplogInfo = LocalOplogInfo::get(opCtx); + const auto& nss = oplogInfo->getOplogCollectionName(); + if (!nss.isEmpty()) { + AutoGetCollection autoColl(opCtx, nss, MODE_IX); + LocalOplogInfo::get(opCtx)->setCollection(autoColl.getCollection()); } } void establishOplogCollectionForLogging(OperationContext* opCtx, Collection* oplog) { invariant(opCtx->lockState()->isW()); invariant(oplog); - localOplogInfo(opCtx->getServiceContext()).oplog = oplog; + LocalOplogInfo::get(opCtx)->setCollection(oplog); } void signalOplogWaiters() { - auto oplog = localOplogInfo(getGlobalServiceContext()).oplog; + auto oplog = LocalOplogInfo::get(getGlobalServiceContext())->getCollection(); if (oplog) { oplog->getCappedCallback()->notifyCappedWaitersIfNeeded(); } |