diff options
author | Benety Goh <benety@mongodb.com> | 2019-04-24 18:56:12 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2019-04-24 18:56:12 -0400 |
commit | a663aa73d03b1f2f8c20b01490c1faf58d9afa1f (patch) | |
tree | 5434294f5d10f2f0169b6ff797238f4ee40660e1 /src/mongo | |
parent | dd01e3b6f14f52205ccb9ff738d712531665355b (diff) | |
download | mongo-a663aa73d03b1f2f8c20b01490c1faf58d9afa1f.tar.gz |
Revert "SERVER-40809 clean up LocalOplogInfo in oplog.cpp"
This reverts commit dd01e3b6f14f52205ccb9ff738d712531665355b.
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 152 |
1 files changed, 45 insertions, 107 deletions
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 89995c53a80..2a3dcd239d0 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -121,94 +121,37 @@ MONGO_FAIL_POINT_DEFINE(hangBeforeLogOpAdvancesLastApplied); /** * This structure contains per-service-context state related to the oplog. */ -class LocalOplogInfo { -public: - static LocalOplogInfo* get(ServiceContext& service); - static LocalOplogInfo* get(ServiceContext* service); - static LocalOplogInfo* get(OperationContext* opCtx); - +struct LocalOplogInfo { LocalOplogInfo(const LocalOplogInfo&) = delete; LocalOplogInfo& operator=(const LocalOplogInfo&) = delete; LocalOplogInfo() = default; - /** - * Returns namespace of the local oplog collection. - */ - const NamespaceString& getOplogCollectionName() const; - - /** - * Detects the current replication mode and sets the "_oplogName" accordingly. - */ - void setOplogCollectionName(ServiceContext* service); - - Collection* getCollection() const; - void setCollection(Collection* oplog); - void resetCollection(); - - /** - * Sets the global Timestamp to be 'newTime'. - */ - void setNewTimestamp(ServiceContext* opCtx, const Timestamp& newTime); - - /** - * Allocates optimes for new entries in the oplog. Stores results in 'slotsOut', which - * contain the new optimes along with their terms and newly calculated hash fields. - */ - std::vector<OplogSlot> getNextOpTimes(OperationContext* opCtx, std::size_t count); - -private: // Name of the oplog collection. - NamespaceString _oplogName; + NamespaceString oplogName; // The "oplog" pointer is always valid (or null) because an operation must take the global // exclusive lock to set the pointer to null when the Collection instance is destroyed. See // "oplogCheckCloseDatabase". - Collection* _oplog = nullptr; + Collection* oplog = nullptr; // Synchronizes the section where a new Timestamp is generated and when it is registered in the // storage engine. - mutable stdx::mutex _newOpMutex; + stdx::mutex newOpMutex; }; const auto localOplogInfo = ServiceContext::declareDecoration<LocalOplogInfo>(); -// static -LocalOplogInfo* LocalOplogInfo::get(ServiceContext& service) { - return get(&service); -} - -// static -LocalOplogInfo* LocalOplogInfo::get(ServiceContext* service) { - return &localOplogInfo(service); -} - -// static -LocalOplogInfo* LocalOplogInfo::get(OperationContext* opCtx) { - return get(opCtx->getServiceContext()); -} - -const NamespaceString& LocalOplogInfo::getOplogCollectionName() const { - return _oplogName; -} - -Collection* LocalOplogInfo::getCollection() const { - return _oplog; -} - -void LocalOplogInfo::setCollection(Collection* oplog) { - _oplog = oplog; -} - -void LocalOplogInfo::resetCollection() { - _oplog = nullptr; -} - // so we can fail the same way void checkOplogInsert(Status result) { massert(17322, str::stream() << "write to oplog failed: " << result.toString(), result.isOK()); } -std::vector<OplogSlot> LocalOplogInfo::getNextOpTimes(OperationContext* opCtx, std::size_t count) { +void _getNextOpTimes(OperationContext* opCtx, + Collection* oplog, + std::size_t count, + OplogSlot* slotsOut, + bool persist = true) { + auto& oplogInfo = localOplogInfo(opCtx->getServiceContext()); auto replCoord = ReplicationCoordinator::get(opCtx); long long term = OpTime::kUninitializedTerm; @@ -229,22 +172,18 @@ std::vector<OplogSlot> LocalOplogInfo::getNextOpTimes(OperationContext* opCtx, s // Allow the storage engine to start the transaction outside the critical section. opCtx->recoveryUnit()->preallocateSnapshot(); - stdx::lock_guard<stdx::mutex> lk(_newOpMutex); + stdx::lock_guard<stdx::mutex> lk(oplogInfo.newOpMutex); ts = LogicalClock::get(opCtx)->reserveTicks(count).asTimestamp(); const bool orderedCommit = false; - // The local oplog collection pointer must already be established by this point. - // We can't establish it here because that would require locking the local database, which would - // be a lock order violation. - invariant(_oplog); - fassert(28560, _oplog->getRecordStore()->oplogDiskLocRegister(opCtx, ts, orderedCommit)); + if (persist) { + fassert(28560, oplog->getRecordStore()->oplogDiskLocRegister(opCtx, ts, orderedCommit)); + } - std::vector<OplogSlot> oplogSlots(count); for (std::size_t i = 0; i < count; i++) { - oplogSlots[i] = {Timestamp(ts.asULL() + i), term}; + slotsOut[i] = {Timestamp(ts.asULL() + i), term}; } - return oplogSlots; } /** @@ -317,10 +256,10 @@ bool shouldBuildInForeground(OperationContext* opCtx, } // namespace -void LocalOplogInfo::setOplogCollectionName(ServiceContext* service) { +void setOplogCollectionName(ServiceContext* service) { switch (ReplicationCoordinator::get(service)->getReplicationMode()) { case ReplicationCoordinator::modeReplSet: - _oplogName = NamespaceString::kRsOplogNamespace; + localOplogInfo(service).oplogName = NamespaceString::kRsOplogNamespace; break; case ReplicationCoordinator::modeNone: // leave empty. @@ -328,10 +267,6 @@ void LocalOplogInfo::setOplogCollectionName(ServiceContext* service) { } } -void setOplogCollectionName(ServiceContext* service) { - LocalOplogInfo::get(service)->setOplogCollectionName(service); -} - /** * Parse the given BSON array of BSON into a vector of BSON. */ @@ -658,25 +593,25 @@ OpTime logOp(OperationContext* opCtx, return {}; } - auto oplogInfo = LocalOplogInfo::get(opCtx); + const auto& oplogInfo = localOplogInfo(opCtx->getServiceContext()); // Obtain Collection exclusive intent write lock for non-document-locking storage engines. boost::optional<Lock::DBLock> dbWriteLock; boost::optional<Lock::CollectionLock> collWriteLock; if (!opCtx->getServiceContext()->getStorageEngine()->supportsDocLocking()) { dbWriteLock.emplace(opCtx, NamespaceString::kLocalDb, MODE_IX); - collWriteLock.emplace(opCtx, oplogInfo->getOplogCollectionName(), MODE_IX); + collWriteLock.emplace(opCtx, oplogInfo.oplogName, MODE_IX); } + auto const oplog = oplogInfo.oplog; OplogSlot slot; WriteUnitOfWork wuow(opCtx); if (oplogSlot.isNull()) { - slot = oplogInfo->getNextOpTimes(opCtx, 1U)[0]; + _getNextOpTimes(opCtx, oplog, 1, &slot); } else { slot = oplogSlot; } - auto oplog = oplogInfo->getCollection(); auto writer = _logOpWriter(opCtx, opstr, nss, @@ -719,16 +654,17 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx, const size_t count = end - begin; std::vector<OplogDocWriter> writers; writers.reserve(count); - auto oplogInfo = LocalOplogInfo::get(opCtx); + const auto& oplogInfo = localOplogInfo(opCtx->getServiceContext()); // Obtain Collection exclusive intent write lock for non-document-locking storage engines. boost::optional<Lock::DBLock> dbWriteLock; boost::optional<Lock::CollectionLock> collWriteLock; if (!opCtx->getServiceContext()->getStorageEngine()->supportsDocLocking()) { dbWriteLock.emplace(opCtx, NamespaceString::kLocalDb, MODE_IX); - collWriteLock.emplace(opCtx, oplogInfo->getOplogCollectionName(), MODE_IX); + collWriteLock.emplace(opCtx, oplogInfo.oplogName, MODE_IX); } + auto oplog = oplogInfo.oplog; WriteUnitOfWork wuow(opCtx); OperationSessionInfo sessionInfo; @@ -748,7 +684,7 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx, auto insertStatementOplogSlot = begin[i].oplogSlot; // Fetch optime now, if not already fetched. if (insertStatementOplogSlot.isNull()) { - insertStatementOplogSlot = oplogInfo->getNextOpTimes(opCtx, 1U)[0]; + _getNextOpTimes(opCtx, oplog, 1, &insertStatementOplogSlot); } // Only 'applyOps' oplog entries can be prepared. constexpr bool prepare = false; @@ -787,7 +723,6 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx, invariant(!opTimes.empty()); auto lastOpTime = opTimes.back(); invariant(!lastOpTime.isNull()); - auto oplog = oplogInfo->getCollection(); _logOpsInner( opCtx, nss, basePtrs.get(), timestamps.get(), count, oplog, lastOpTime, wallClockTime); wuow.commit(); @@ -902,7 +837,7 @@ void createOplog(OperationContext* opCtx, const std::string& oplogCollectionName void createOplog(OperationContext* opCtx) { const auto isReplSet = ReplicationCoordinator::get(opCtx)->getReplicationMode() == ReplicationCoordinator::modeReplSet; - createOplog(opCtx, LocalOplogInfo::get(opCtx)->getOplogCollectionName().ns(), isReplSet); + createOplog(opCtx, localOplogInfo(opCtx->getServiceContext()).oplogName.ns(), isReplSet); } OplogSlot getNextOpTimeNoPersistForTesting(OperationContext* opCtx) { @@ -916,7 +851,15 @@ OplogSlot getNextOpTimeNoPersistForTesting(OperationContext* opCtx) { MONGO_REGISTER_SHIM(GetNextOpTimeClass::getNextOpTimes) (OperationContext* opCtx, std::size_t count)->std::vector<OplogSlot> { - return LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, count); + // The local oplog collection pointer must already be established by this point. + // We can't establish it here because that would require locking the local database, which would + // be a lock order violation. + auto oplog = localOplogInfo(opCtx->getServiceContext()).oplog; + invariant(oplog); + std::vector<OplogSlot> oplogSlots(count); + auto oplogSlot = oplogSlots.begin(); + _getNextOpTimes(opCtx, oplog, count, &(*oplogSlot)); + return oplogSlots; } // ------------------------------------- @@ -2129,13 +2072,9 @@ Status applyCommand_inlock(OperationContext* opCtx, return Status::OK(); } -void LocalOplogInfo::setNewTimestamp(ServiceContext* service, const Timestamp& newTime) { - stdx::lock_guard<stdx::mutex> lk(_newOpMutex); - LogicalClock::get(service)->setClusterTimeFromTrustedSource(LogicalTime(newTime)); -} - void setNewTimestamp(ServiceContext* service, const Timestamp& newTime) { - LocalOplogInfo::get(service)->setNewTimestamp(service, newTime); + stdx::lock_guard<stdx::mutex> lk(localOplogInfo(service).newOpMutex); + LogicalClock::get(service)->setClusterTimeFromTrustedSource(LogicalTime(newTime)); } void initTimestampFromOplog(OperationContext* opCtx, const std::string& oplogNS) { @@ -2153,31 +2092,30 @@ void initTimestampFromOplog(OperationContext* opCtx, const std::string& oplogNS) void oplogCheckCloseDatabase(OperationContext* opCtx, const Database* db) { invariant(opCtx->lockState()->isW()); if (db->name() == "local") { - LocalOplogInfo::get(opCtx)->resetCollection(); + localOplogInfo(opCtx->getServiceContext()).oplog = nullptr; } } void clearLocalOplogPtr() { - LocalOplogInfo::get(getGlobalServiceContext())->resetCollection(); + localOplogInfo(getGlobalServiceContext()).oplog = nullptr; } void acquireOplogCollectionForLogging(OperationContext* opCtx) { - auto oplogInfo = LocalOplogInfo::get(opCtx); - const auto& nss = oplogInfo->getOplogCollectionName(); - if (!nss.isEmpty()) { - AutoGetCollection autoColl(opCtx, nss, MODE_IX); - LocalOplogInfo::get(opCtx)->setCollection(autoColl.getCollection()); + auto& oplogInfo = localOplogInfo(opCtx->getServiceContext()); + if (!oplogInfo.oplogName.isEmpty()) { + AutoGetCollection autoColl(opCtx, oplogInfo.oplogName, MODE_IX); + oplogInfo.oplog = autoColl.getCollection(); } } void establishOplogCollectionForLogging(OperationContext* opCtx, Collection* oplog) { invariant(opCtx->lockState()->isW()); invariant(oplog); - LocalOplogInfo::get(opCtx)->setCollection(oplog); + localOplogInfo(opCtx->getServiceContext()).oplog = oplog; } void signalOplogWaiters() { - auto oplog = LocalOplogInfo::get(getGlobalServiceContext())->getCollection(); + auto oplog = localOplogInfo(getGlobalServiceContext()).oplog; if (oplog) { oplog->getCappedCallback()->notifyCappedWaitersIfNeeded(); } |