summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2019-04-24 18:56:12 -0400
committerBenety Goh <benety@mongodb.com>2019-04-24 18:56:12 -0400
commita663aa73d03b1f2f8c20b01490c1faf58d9afa1f (patch)
tree5434294f5d10f2f0169b6ff797238f4ee40660e1 /src/mongo
parentdd01e3b6f14f52205ccb9ff738d712531665355b (diff)
downloadmongo-a663aa73d03b1f2f8c20b01490c1faf58d9afa1f.tar.gz
Revert "SERVER-40809 clean up LocalOplogInfo in oplog.cpp"
This reverts commit dd01e3b6f14f52205ccb9ff738d712531665355b.
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/repl/oplog.cpp152
1 files changed, 45 insertions, 107 deletions
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 89995c53a80..2a3dcd239d0 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -121,94 +121,37 @@ MONGO_FAIL_POINT_DEFINE(hangBeforeLogOpAdvancesLastApplied);
/**
* This structure contains per-service-context state related to the oplog.
*/
-class LocalOplogInfo {
-public:
- static LocalOplogInfo* get(ServiceContext& service);
- static LocalOplogInfo* get(ServiceContext* service);
- static LocalOplogInfo* get(OperationContext* opCtx);
-
+struct LocalOplogInfo {
LocalOplogInfo(const LocalOplogInfo&) = delete;
LocalOplogInfo& operator=(const LocalOplogInfo&) = delete;
LocalOplogInfo() = default;
- /**
- * Returns namespace of the local oplog collection.
- */
- const NamespaceString& getOplogCollectionName() const;
-
- /**
- * Detects the current replication mode and sets the "_oplogName" accordingly.
- */
- void setOplogCollectionName(ServiceContext* service);
-
- Collection* getCollection() const;
- void setCollection(Collection* oplog);
- void resetCollection();
-
- /**
- * Sets the global Timestamp to be 'newTime'.
- */
- void setNewTimestamp(ServiceContext* opCtx, const Timestamp& newTime);
-
- /**
- * Allocates optimes for new entries in the oplog. Stores results in 'slotsOut', which
- * contain the new optimes along with their terms and newly calculated hash fields.
- */
- std::vector<OplogSlot> getNextOpTimes(OperationContext* opCtx, std::size_t count);
-
-private:
// Name of the oplog collection.
- NamespaceString _oplogName;
+ NamespaceString oplogName;
// The "oplog" pointer is always valid (or null) because an operation must take the global
// exclusive lock to set the pointer to null when the Collection instance is destroyed. See
// "oplogCheckCloseDatabase".
- Collection* _oplog = nullptr;
+ Collection* oplog = nullptr;
// Synchronizes the section where a new Timestamp is generated and when it is registered in the
// storage engine.
- mutable stdx::mutex _newOpMutex;
+ stdx::mutex newOpMutex;
};
const auto localOplogInfo = ServiceContext::declareDecoration<LocalOplogInfo>();
-// static
-LocalOplogInfo* LocalOplogInfo::get(ServiceContext& service) {
- return get(&service);
-}
-
-// static
-LocalOplogInfo* LocalOplogInfo::get(ServiceContext* service) {
- return &localOplogInfo(service);
-}
-
-// static
-LocalOplogInfo* LocalOplogInfo::get(OperationContext* opCtx) {
- return get(opCtx->getServiceContext());
-}
-
-const NamespaceString& LocalOplogInfo::getOplogCollectionName() const {
- return _oplogName;
-}
-
-Collection* LocalOplogInfo::getCollection() const {
- return _oplog;
-}
-
-void LocalOplogInfo::setCollection(Collection* oplog) {
- _oplog = oplog;
-}
-
-void LocalOplogInfo::resetCollection() {
- _oplog = nullptr;
-}
-
// so we can fail the same way
void checkOplogInsert(Status result) {
massert(17322, str::stream() << "write to oplog failed: " << result.toString(), result.isOK());
}
-std::vector<OplogSlot> LocalOplogInfo::getNextOpTimes(OperationContext* opCtx, std::size_t count) {
+void _getNextOpTimes(OperationContext* opCtx,
+ Collection* oplog,
+ std::size_t count,
+ OplogSlot* slotsOut,
+ bool persist = true) {
+ auto& oplogInfo = localOplogInfo(opCtx->getServiceContext());
auto replCoord = ReplicationCoordinator::get(opCtx);
long long term = OpTime::kUninitializedTerm;
@@ -229,22 +172,18 @@ std::vector<OplogSlot> LocalOplogInfo::getNextOpTimes(OperationContext* opCtx, s
// Allow the storage engine to start the transaction outside the critical section.
opCtx->recoveryUnit()->preallocateSnapshot();
- stdx::lock_guard<stdx::mutex> lk(_newOpMutex);
+ stdx::lock_guard<stdx::mutex> lk(oplogInfo.newOpMutex);
ts = LogicalClock::get(opCtx)->reserveTicks(count).asTimestamp();
const bool orderedCommit = false;
- // The local oplog collection pointer must already be established by this point.
- // We can't establish it here because that would require locking the local database, which would
- // be a lock order violation.
- invariant(_oplog);
- fassert(28560, _oplog->getRecordStore()->oplogDiskLocRegister(opCtx, ts, orderedCommit));
+ if (persist) {
+ fassert(28560, oplog->getRecordStore()->oplogDiskLocRegister(opCtx, ts, orderedCommit));
+ }
- std::vector<OplogSlot> oplogSlots(count);
for (std::size_t i = 0; i < count; i++) {
- oplogSlots[i] = {Timestamp(ts.asULL() + i), term};
+ slotsOut[i] = {Timestamp(ts.asULL() + i), term};
}
- return oplogSlots;
}
/**
@@ -317,10 +256,10 @@ bool shouldBuildInForeground(OperationContext* opCtx,
} // namespace
-void LocalOplogInfo::setOplogCollectionName(ServiceContext* service) {
+void setOplogCollectionName(ServiceContext* service) {
switch (ReplicationCoordinator::get(service)->getReplicationMode()) {
case ReplicationCoordinator::modeReplSet:
- _oplogName = NamespaceString::kRsOplogNamespace;
+ localOplogInfo(service).oplogName = NamespaceString::kRsOplogNamespace;
break;
case ReplicationCoordinator::modeNone:
// leave empty.
@@ -328,10 +267,6 @@ void LocalOplogInfo::setOplogCollectionName(ServiceContext* service) {
}
}
-void setOplogCollectionName(ServiceContext* service) {
- LocalOplogInfo::get(service)->setOplogCollectionName(service);
-}
-
/**
* Parse the given BSON array of BSON into a vector of BSON.
*/
@@ -658,25 +593,25 @@ OpTime logOp(OperationContext* opCtx,
return {};
}
- auto oplogInfo = LocalOplogInfo::get(opCtx);
+ const auto& oplogInfo = localOplogInfo(opCtx->getServiceContext());
// Obtain Collection exclusive intent write lock for non-document-locking storage engines.
boost::optional<Lock::DBLock> dbWriteLock;
boost::optional<Lock::CollectionLock> collWriteLock;
if (!opCtx->getServiceContext()->getStorageEngine()->supportsDocLocking()) {
dbWriteLock.emplace(opCtx, NamespaceString::kLocalDb, MODE_IX);
- collWriteLock.emplace(opCtx, oplogInfo->getOplogCollectionName(), MODE_IX);
+ collWriteLock.emplace(opCtx, oplogInfo.oplogName, MODE_IX);
}
+ auto const oplog = oplogInfo.oplog;
OplogSlot slot;
WriteUnitOfWork wuow(opCtx);
if (oplogSlot.isNull()) {
- slot = oplogInfo->getNextOpTimes(opCtx, 1U)[0];
+ _getNextOpTimes(opCtx, oplog, 1, &slot);
} else {
slot = oplogSlot;
}
- auto oplog = oplogInfo->getCollection();
auto writer = _logOpWriter(opCtx,
opstr,
nss,
@@ -719,16 +654,17 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx,
const size_t count = end - begin;
std::vector<OplogDocWriter> writers;
writers.reserve(count);
- auto oplogInfo = LocalOplogInfo::get(opCtx);
+ const auto& oplogInfo = localOplogInfo(opCtx->getServiceContext());
// Obtain Collection exclusive intent write lock for non-document-locking storage engines.
boost::optional<Lock::DBLock> dbWriteLock;
boost::optional<Lock::CollectionLock> collWriteLock;
if (!opCtx->getServiceContext()->getStorageEngine()->supportsDocLocking()) {
dbWriteLock.emplace(opCtx, NamespaceString::kLocalDb, MODE_IX);
- collWriteLock.emplace(opCtx, oplogInfo->getOplogCollectionName(), MODE_IX);
+ collWriteLock.emplace(opCtx, oplogInfo.oplogName, MODE_IX);
}
+ auto oplog = oplogInfo.oplog;
WriteUnitOfWork wuow(opCtx);
OperationSessionInfo sessionInfo;
@@ -748,7 +684,7 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx,
auto insertStatementOplogSlot = begin[i].oplogSlot;
// Fetch optime now, if not already fetched.
if (insertStatementOplogSlot.isNull()) {
- insertStatementOplogSlot = oplogInfo->getNextOpTimes(opCtx, 1U)[0];
+ _getNextOpTimes(opCtx, oplog, 1, &insertStatementOplogSlot);
}
// Only 'applyOps' oplog entries can be prepared.
constexpr bool prepare = false;
@@ -787,7 +723,6 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx,
invariant(!opTimes.empty());
auto lastOpTime = opTimes.back();
invariant(!lastOpTime.isNull());
- auto oplog = oplogInfo->getCollection();
_logOpsInner(
opCtx, nss, basePtrs.get(), timestamps.get(), count, oplog, lastOpTime, wallClockTime);
wuow.commit();
@@ -902,7 +837,7 @@ void createOplog(OperationContext* opCtx, const std::string& oplogCollectionName
void createOplog(OperationContext* opCtx) {
const auto isReplSet = ReplicationCoordinator::get(opCtx)->getReplicationMode() ==
ReplicationCoordinator::modeReplSet;
- createOplog(opCtx, LocalOplogInfo::get(opCtx)->getOplogCollectionName().ns(), isReplSet);
+ createOplog(opCtx, localOplogInfo(opCtx->getServiceContext()).oplogName.ns(), isReplSet);
}
OplogSlot getNextOpTimeNoPersistForTesting(OperationContext* opCtx) {
@@ -916,7 +851,15 @@ OplogSlot getNextOpTimeNoPersistForTesting(OperationContext* opCtx) {
MONGO_REGISTER_SHIM(GetNextOpTimeClass::getNextOpTimes)
(OperationContext* opCtx, std::size_t count)->std::vector<OplogSlot> {
- return LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, count);
+ // The local oplog collection pointer must already be established by this point.
+ // We can't establish it here because that would require locking the local database, which would
+ // be a lock order violation.
+ auto oplog = localOplogInfo(opCtx->getServiceContext()).oplog;
+ invariant(oplog);
+ std::vector<OplogSlot> oplogSlots(count);
+ auto oplogSlot = oplogSlots.begin();
+ _getNextOpTimes(opCtx, oplog, count, &(*oplogSlot));
+ return oplogSlots;
}
// -------------------------------------
@@ -2129,13 +2072,9 @@ Status applyCommand_inlock(OperationContext* opCtx,
return Status::OK();
}
-void LocalOplogInfo::setNewTimestamp(ServiceContext* service, const Timestamp& newTime) {
- stdx::lock_guard<stdx::mutex> lk(_newOpMutex);
- LogicalClock::get(service)->setClusterTimeFromTrustedSource(LogicalTime(newTime));
-}
-
void setNewTimestamp(ServiceContext* service, const Timestamp& newTime) {
- LocalOplogInfo::get(service)->setNewTimestamp(service, newTime);
+ stdx::lock_guard<stdx::mutex> lk(localOplogInfo(service).newOpMutex);
+ LogicalClock::get(service)->setClusterTimeFromTrustedSource(LogicalTime(newTime));
}
void initTimestampFromOplog(OperationContext* opCtx, const std::string& oplogNS) {
@@ -2153,31 +2092,30 @@ void initTimestampFromOplog(OperationContext* opCtx, const std::string& oplogNS)
void oplogCheckCloseDatabase(OperationContext* opCtx, const Database* db) {
invariant(opCtx->lockState()->isW());
if (db->name() == "local") {
- LocalOplogInfo::get(opCtx)->resetCollection();
+ localOplogInfo(opCtx->getServiceContext()).oplog = nullptr;
}
}
void clearLocalOplogPtr() {
- LocalOplogInfo::get(getGlobalServiceContext())->resetCollection();
+ localOplogInfo(getGlobalServiceContext()).oplog = nullptr;
}
void acquireOplogCollectionForLogging(OperationContext* opCtx) {
- auto oplogInfo = LocalOplogInfo::get(opCtx);
- const auto& nss = oplogInfo->getOplogCollectionName();
- if (!nss.isEmpty()) {
- AutoGetCollection autoColl(opCtx, nss, MODE_IX);
- LocalOplogInfo::get(opCtx)->setCollection(autoColl.getCollection());
+ auto& oplogInfo = localOplogInfo(opCtx->getServiceContext());
+ if (!oplogInfo.oplogName.isEmpty()) {
+ AutoGetCollection autoColl(opCtx, oplogInfo.oplogName, MODE_IX);
+ oplogInfo.oplog = autoColl.getCollection();
}
}
void establishOplogCollectionForLogging(OperationContext* opCtx, Collection* oplog) {
invariant(opCtx->lockState()->isW());
invariant(oplog);
- LocalOplogInfo::get(opCtx)->setCollection(oplog);
+ localOplogInfo(opCtx->getServiceContext()).oplog = oplog;
}
void signalOplogWaiters() {
- auto oplog = LocalOplogInfo::get(getGlobalServiceContext())->getCollection();
+ auto oplog = localOplogInfo(getGlobalServiceContext()).oplog;
if (oplog) {
oplog->getCappedCallback()->notifyCappedWaitersIfNeeded();
}