summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2019-04-24 18:09:11 -0400
committerBenety Goh <benety@mongodb.com>2019-04-24 18:09:25 -0400
commitdd01e3b6f14f52205ccb9ff738d712531665355b (patch)
tree235691ef948f5611e20bb8881156ab1b5089cb46 /src
parenta5c727a27bd9b3a2b1f43190b9cb8e08812bd2f9 (diff)
downloadmongo-dd01e3b6f14f52205ccb9ff738d712531665355b.tar.gz
SERVER-40809 clean up LocalOplogInfo in oplog.cpp
remove persist argument from _getNextOpTimes() add LocalOplogInfo::get() move _getNextOpTimes() into LocalOplogInfo make LocalOplogInfo::oplog and oplogName private and provide accessors/mutators
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/oplog.cpp152
1 files changed, 107 insertions, 45 deletions
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 2a3dcd239d0..89995c53a80 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -121,37 +121,94 @@ MONGO_FAIL_POINT_DEFINE(hangBeforeLogOpAdvancesLastApplied);
/**
* This structure contains per-service-context state related to the oplog.
*/
-struct LocalOplogInfo {
+class LocalOplogInfo {
+public:
+ static LocalOplogInfo* get(ServiceContext& service);
+ static LocalOplogInfo* get(ServiceContext* service);
+ static LocalOplogInfo* get(OperationContext* opCtx);
+
LocalOplogInfo(const LocalOplogInfo&) = delete;
LocalOplogInfo& operator=(const LocalOplogInfo&) = delete;
LocalOplogInfo() = default;
+ /**
+ * Returns namespace of the local oplog collection.
+ */
+ const NamespaceString& getOplogCollectionName() const;
+
+ /**
+ * Detects the current replication mode and sets the "_oplogName" accordingly.
+ */
+ void setOplogCollectionName(ServiceContext* service);
+
+ Collection* getCollection() const;
+ void setCollection(Collection* oplog);
+ void resetCollection();
+
+ /**
+ * Sets the global Timestamp to be 'newTime'.
+ */
+ void setNewTimestamp(ServiceContext* opCtx, const Timestamp& newTime);
+
+ /**
+ * Allocates optimes for new entries in the oplog. Stores results in 'slotsOut', which
+ * contain the new optimes along with their terms and newly calculated hash fields.
+ */
+ std::vector<OplogSlot> getNextOpTimes(OperationContext* opCtx, std::size_t count);
+
+private:
// Name of the oplog collection.
- NamespaceString oplogName;
+ NamespaceString _oplogName;
// The "oplog" pointer is always valid (or null) because an operation must take the global
// exclusive lock to set the pointer to null when the Collection instance is destroyed. See
// "oplogCheckCloseDatabase".
- Collection* oplog = nullptr;
+ Collection* _oplog = nullptr;
// Synchronizes the section where a new Timestamp is generated and when it is registered in the
// storage engine.
- stdx::mutex newOpMutex;
+ mutable stdx::mutex _newOpMutex;
};
const auto localOplogInfo = ServiceContext::declareDecoration<LocalOplogInfo>();
+// static
+LocalOplogInfo* LocalOplogInfo::get(ServiceContext& service) {
+ return get(&service);
+}
+
+// static
+LocalOplogInfo* LocalOplogInfo::get(ServiceContext* service) {
+ return &localOplogInfo(service);
+}
+
+// static
+LocalOplogInfo* LocalOplogInfo::get(OperationContext* opCtx) {
+ return get(opCtx->getServiceContext());
+}
+
+const NamespaceString& LocalOplogInfo::getOplogCollectionName() const {
+ return _oplogName;
+}
+
+Collection* LocalOplogInfo::getCollection() const {
+ return _oplog;
+}
+
+void LocalOplogInfo::setCollection(Collection* oplog) {
+ _oplog = oplog;
+}
+
+void LocalOplogInfo::resetCollection() {
+ _oplog = nullptr;
+}
+
// so we can fail the same way
void checkOplogInsert(Status result) {
massert(17322, str::stream() << "write to oplog failed: " << result.toString(), result.isOK());
}
-void _getNextOpTimes(OperationContext* opCtx,
- Collection* oplog,
- std::size_t count,
- OplogSlot* slotsOut,
- bool persist = true) {
- auto& oplogInfo = localOplogInfo(opCtx->getServiceContext());
+std::vector<OplogSlot> LocalOplogInfo::getNextOpTimes(OperationContext* opCtx, std::size_t count) {
auto replCoord = ReplicationCoordinator::get(opCtx);
long long term = OpTime::kUninitializedTerm;
@@ -172,18 +229,22 @@ void _getNextOpTimes(OperationContext* opCtx,
// Allow the storage engine to start the transaction outside the critical section.
opCtx->recoveryUnit()->preallocateSnapshot();
- stdx::lock_guard<stdx::mutex> lk(oplogInfo.newOpMutex);
+ stdx::lock_guard<stdx::mutex> lk(_newOpMutex);
ts = LogicalClock::get(opCtx)->reserveTicks(count).asTimestamp();
const bool orderedCommit = false;
- if (persist) {
- fassert(28560, oplog->getRecordStore()->oplogDiskLocRegister(opCtx, ts, orderedCommit));
- }
+ // The local oplog collection pointer must already be established by this point.
+ // We can't establish it here because that would require locking the local database, which would
+ // be a lock order violation.
+ invariant(_oplog);
+ fassert(28560, _oplog->getRecordStore()->oplogDiskLocRegister(opCtx, ts, orderedCommit));
+ std::vector<OplogSlot> oplogSlots(count);
for (std::size_t i = 0; i < count; i++) {
- slotsOut[i] = {Timestamp(ts.asULL() + i), term};
+ oplogSlots[i] = {Timestamp(ts.asULL() + i), term};
}
+ return oplogSlots;
}
/**
@@ -256,10 +317,10 @@ bool shouldBuildInForeground(OperationContext* opCtx,
} // namespace
-void setOplogCollectionName(ServiceContext* service) {
+void LocalOplogInfo::setOplogCollectionName(ServiceContext* service) {
switch (ReplicationCoordinator::get(service)->getReplicationMode()) {
case ReplicationCoordinator::modeReplSet:
- localOplogInfo(service).oplogName = NamespaceString::kRsOplogNamespace;
+ _oplogName = NamespaceString::kRsOplogNamespace;
break;
case ReplicationCoordinator::modeNone:
// leave empty.
@@ -267,6 +328,10 @@ void setOplogCollectionName(ServiceContext* service) {
}
}
+void setOplogCollectionName(ServiceContext* service) {
+ LocalOplogInfo::get(service)->setOplogCollectionName(service);
+}
+
/**
* Parse the given BSON array of BSON into a vector of BSON.
*/
@@ -593,25 +658,25 @@ OpTime logOp(OperationContext* opCtx,
return {};
}
- const auto& oplogInfo = localOplogInfo(opCtx->getServiceContext());
+ auto oplogInfo = LocalOplogInfo::get(opCtx);
// Obtain Collection exclusive intent write lock for non-document-locking storage engines.
boost::optional<Lock::DBLock> dbWriteLock;
boost::optional<Lock::CollectionLock> collWriteLock;
if (!opCtx->getServiceContext()->getStorageEngine()->supportsDocLocking()) {
dbWriteLock.emplace(opCtx, NamespaceString::kLocalDb, MODE_IX);
- collWriteLock.emplace(opCtx, oplogInfo.oplogName, MODE_IX);
+ collWriteLock.emplace(opCtx, oplogInfo->getOplogCollectionName(), MODE_IX);
}
- auto const oplog = oplogInfo.oplog;
OplogSlot slot;
WriteUnitOfWork wuow(opCtx);
if (oplogSlot.isNull()) {
- _getNextOpTimes(opCtx, oplog, 1, &slot);
+ slot = oplogInfo->getNextOpTimes(opCtx, 1U)[0];
} else {
slot = oplogSlot;
}
+ auto oplog = oplogInfo->getCollection();
auto writer = _logOpWriter(opCtx,
opstr,
nss,
@@ -654,17 +719,16 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx,
const size_t count = end - begin;
std::vector<OplogDocWriter> writers;
writers.reserve(count);
- const auto& oplogInfo = localOplogInfo(opCtx->getServiceContext());
+ auto oplogInfo = LocalOplogInfo::get(opCtx);
// Obtain Collection exclusive intent write lock for non-document-locking storage engines.
boost::optional<Lock::DBLock> dbWriteLock;
boost::optional<Lock::CollectionLock> collWriteLock;
if (!opCtx->getServiceContext()->getStorageEngine()->supportsDocLocking()) {
dbWriteLock.emplace(opCtx, NamespaceString::kLocalDb, MODE_IX);
- collWriteLock.emplace(opCtx, oplogInfo.oplogName, MODE_IX);
+ collWriteLock.emplace(opCtx, oplogInfo->getOplogCollectionName(), MODE_IX);
}
- auto oplog = oplogInfo.oplog;
WriteUnitOfWork wuow(opCtx);
OperationSessionInfo sessionInfo;
@@ -684,7 +748,7 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx,
auto insertStatementOplogSlot = begin[i].oplogSlot;
// Fetch optime now, if not already fetched.
if (insertStatementOplogSlot.isNull()) {
- _getNextOpTimes(opCtx, oplog, 1, &insertStatementOplogSlot);
+ insertStatementOplogSlot = oplogInfo->getNextOpTimes(opCtx, 1U)[0];
}
// Only 'applyOps' oplog entries can be prepared.
constexpr bool prepare = false;
@@ -723,6 +787,7 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx,
invariant(!opTimes.empty());
auto lastOpTime = opTimes.back();
invariant(!lastOpTime.isNull());
+ auto oplog = oplogInfo->getCollection();
_logOpsInner(
opCtx, nss, basePtrs.get(), timestamps.get(), count, oplog, lastOpTime, wallClockTime);
wuow.commit();
@@ -837,7 +902,7 @@ void createOplog(OperationContext* opCtx, const std::string& oplogCollectionName
void createOplog(OperationContext* opCtx) {
const auto isReplSet = ReplicationCoordinator::get(opCtx)->getReplicationMode() ==
ReplicationCoordinator::modeReplSet;
- createOplog(opCtx, localOplogInfo(opCtx->getServiceContext()).oplogName.ns(), isReplSet);
+ createOplog(opCtx, LocalOplogInfo::get(opCtx)->getOplogCollectionName().ns(), isReplSet);
}
OplogSlot getNextOpTimeNoPersistForTesting(OperationContext* opCtx) {
@@ -851,15 +916,7 @@ OplogSlot getNextOpTimeNoPersistForTesting(OperationContext* opCtx) {
MONGO_REGISTER_SHIM(GetNextOpTimeClass::getNextOpTimes)
(OperationContext* opCtx, std::size_t count)->std::vector<OplogSlot> {
- // The local oplog collection pointer must already be established by this point.
- // We can't establish it here because that would require locking the local database, which would
- // be a lock order violation.
- auto oplog = localOplogInfo(opCtx->getServiceContext()).oplog;
- invariant(oplog);
- std::vector<OplogSlot> oplogSlots(count);
- auto oplogSlot = oplogSlots.begin();
- _getNextOpTimes(opCtx, oplog, count, &(*oplogSlot));
- return oplogSlots;
+ return LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, count);
}
// -------------------------------------
@@ -2072,11 +2129,15 @@ Status applyCommand_inlock(OperationContext* opCtx,
return Status::OK();
}
-void setNewTimestamp(ServiceContext* service, const Timestamp& newTime) {
- stdx::lock_guard<stdx::mutex> lk(localOplogInfo(service).newOpMutex);
+void LocalOplogInfo::setNewTimestamp(ServiceContext* service, const Timestamp& newTime) {
+ stdx::lock_guard<stdx::mutex> lk(_newOpMutex);
LogicalClock::get(service)->setClusterTimeFromTrustedSource(LogicalTime(newTime));
}
+void setNewTimestamp(ServiceContext* service, const Timestamp& newTime) {
+ LocalOplogInfo::get(service)->setNewTimestamp(service, newTime);
+}
+
void initTimestampFromOplog(OperationContext* opCtx, const std::string& oplogNS) {
DBDirectClient c(opCtx);
static const BSONObj reverseNaturalObj = BSON("$natural" << -1);
@@ -2092,30 +2153,31 @@ void initTimestampFromOplog(OperationContext* opCtx, const std::string& oplogNS)
void oplogCheckCloseDatabase(OperationContext* opCtx, const Database* db) {
invariant(opCtx->lockState()->isW());
if (db->name() == "local") {
- localOplogInfo(opCtx->getServiceContext()).oplog = nullptr;
+ LocalOplogInfo::get(opCtx)->resetCollection();
}
}
void clearLocalOplogPtr() {
- localOplogInfo(getGlobalServiceContext()).oplog = nullptr;
+ LocalOplogInfo::get(getGlobalServiceContext())->resetCollection();
}
void acquireOplogCollectionForLogging(OperationContext* opCtx) {
- auto& oplogInfo = localOplogInfo(opCtx->getServiceContext());
- if (!oplogInfo.oplogName.isEmpty()) {
- AutoGetCollection autoColl(opCtx, oplogInfo.oplogName, MODE_IX);
- oplogInfo.oplog = autoColl.getCollection();
+ auto oplogInfo = LocalOplogInfo::get(opCtx);
+ const auto& nss = oplogInfo->getOplogCollectionName();
+ if (!nss.isEmpty()) {
+ AutoGetCollection autoColl(opCtx, nss, MODE_IX);
+ LocalOplogInfo::get(opCtx)->setCollection(autoColl.getCollection());
}
}
void establishOplogCollectionForLogging(OperationContext* opCtx, Collection* oplog) {
invariant(opCtx->lockState()->isW());
invariant(oplog);
- localOplogInfo(opCtx->getServiceContext()).oplog = oplog;
+ LocalOplogInfo::get(opCtx)->setCollection(oplog);
}
void signalOplogWaiters() {
- auto oplog = localOplogInfo(getGlobalServiceContext()).oplog;
+ auto oplog = LocalOplogInfo::get(getGlobalServiceContext())->getCollection();
if (oplog) {
oplog->getCappedCallback()->notifyCappedWaitersIfNeeded();
}