summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2021-05-31 16:42:11 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-06-02 22:32:50 +0000
commit9748e291b6a10e12b85c978d459fb6f6bf2d9ea3 (patch)
tree33405c1ac82fcf0d127748a6774122fe50164932 /src
parent9f6598c1e321b88d7467547a2d0a80a92ae5a944 (diff)
downloadmongo-9748e291b6a10e12b85c978d459fb6f6bf2d9ea3.tar.gz
SERVER-57018 Make shards release their dist locks on step-up
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/dist_lock_catalog.h44
-rw-r--r--src/mongo/db/s/dist_lock_catalog_mock.cpp48
-rw-r--r--src/mongo/db/s/dist_lock_catalog_mock.h21
-rw-r--r--src/mongo/db/s/dist_lock_catalog_replset.cpp41
-rw-r--r--src/mongo/db/s/dist_lock_catalog_replset.h8
-rw-r--r--src/mongo/db/s/dist_lock_catalog_replset_test.cpp168
-rw-r--r--src/mongo/db/s/dist_lock_manager.h4
-rw-r--r--src/mongo/db/s/dist_lock_manager_replset.cpp149
-rw-r--r--src/mongo/db/s/dist_lock_manager_replset.h22
-rw-r--r--src/mongo/db/s/dist_lock_manager_replset_test.cpp51
-rw-r--r--src/mongo/db/s/type_locks.cpp1
-rw-r--r--src/mongo/db/s/type_locks.h1
-rw-r--r--src/mongo/s/client/shard.cpp11
13 files changed, 324 insertions, 245 deletions
diff --git a/src/mongo/db/s/dist_lock_catalog.h b/src/mongo/db/s/dist_lock_catalog.h
index 8c6810ba957..639c2178cd2 100644
--- a/src/mongo/db/s/dist_lock_catalog.h
+++ b/src/mongo/db/s/dist_lock_catalog.h
@@ -83,8 +83,10 @@ public:
virtual Status ping(OperationContext* opCtx, StringData processID, Date_t ping) = 0;
/**
- * Attempts to update the owner of a lock identified by lockID to lockSessionID.
- * Will only be successful if lock is not held.
+ * If 'lockID' is currently free, acquires it with lockSessionID as the owner.
+ *
+ * The term corresponds to the current replication term of the locking processID (which is the
+ * id of the shard taking the lock).
*
* The other parameters are for diagnostic purposes:
* - who: unique string for the caller trying to grab the lock.
@@ -98,19 +100,22 @@ public:
*
* Common status errors include socket and duplicate key errors.
*/
- virtual StatusWith<LocksType> grabLock(
- OperationContext* opCtx,
- StringData lockID,
- const OID& lockSessionID,
- StringData who,
- StringData processId,
- Date_t time,
- StringData why,
- const WriteConcernOptions& writeConcern = kMajorityWriteConcern) = 0;
+ virtual StatusWith<LocksType> grabLock(OperationContext* opCtx,
+ StringData lockID,
+ const OID& lockSessionID,
+ long long term,
+ StringData who,
+ StringData processId,
+ Date_t time,
+ StringData why,
+ const WriteConcernOptions& writeConcern) = 0;
/**
- * Attempts to forcefully transfer the ownership of a lock from currentHolderTS
- * to lockSessionID.
+ * If 'lockID' currently free or the current owner is currentHolderTS, acquires it with
+ * lockSessionID as the new owner.
+ *
+ * The term corresponds to the current replication term of the locking processID (which is the
+ * id of the shard taking the lock).
*
* The other parameters are for diagnostic purposes:
* - who: unique string for the caller trying to grab the lock.
@@ -126,6 +131,7 @@ public:
virtual StatusWith<LocksType> overtakeLock(OperationContext* opCtx,
StringData lockID,
const OID& lockSessionID,
+ long long term,
const OID& currentHolderTS,
StringData who,
StringData processId,
@@ -144,7 +150,9 @@ public:
* indication as to how many locks were actually unlocked. So long as the update command runs
* successfully, returns OK, otherwise returns an error status.
*/
- virtual Status unlockAll(OperationContext* opCtx, const std::string& processID) = 0;
+ virtual Status unlockAll(OperationContext* opCtx,
+ const std::string& processID,
+ boost::optional<long long> term) = 0;
/**
* Get some information from the config server primary.
@@ -154,14 +162,6 @@ public:
/**
* Returns the lock document.
- * Returns LockNotFound if lock document doesn't exist.
- * Common status errors include socket errors.
- */
- virtual StatusWith<LocksType> getLockByTS(OperationContext* opCtx,
- const OID& lockSessionID) = 0;
-
- /**
- * Returns the lock document.
* Common status errors include socket errors.
*/
virtual StatusWith<LocksType> getLockByName(OperationContext* opCtx, StringData name) = 0;
diff --git a/src/mongo/db/s/dist_lock_catalog_mock.cpp b/src/mongo/db/s/dist_lock_catalog_mock.cpp
index 1e536a4b0f2..edde6046354 100644
--- a/src/mongo/db/s/dist_lock_catalog_mock.cpp
+++ b/src/mongo/db/s/dist_lock_catalog_mock.cpp
@@ -118,7 +118,12 @@ DistLockCatalogMock::DistLockCatalogMock()
_getPingChecker(noGetPingSet),
_getPingReturnValue(kLockpingsTypeBadRetValue),
_getServerInfoChecker(noGetServerInfoSet),
- _getServerInfoReturnValue(kServerInfoBadRetValue) {}
+ _getServerInfoReturnValue(kServerInfoBadRetValue),
+ _unlockAllChecker([](StringData processID, boost::optional<long long> term) {
+ uasserted(ErrorCodes::IllegalOperation,
+ str::stream() << "unlockAll not expected to be called; processID: "
+ << processID);
+ }) {}
DistLockCatalogMock::~DistLockCatalogMock() {}
@@ -154,6 +159,7 @@ Status DistLockCatalogMock::ping(OperationContext* opCtx, StringData processID,
StatusWith<LocksType> DistLockCatalogMock::grabLock(OperationContext* opCtx,
StringData lockID,
const OID& lockSessionID,
+ long long term,
StringData who,
StringData processId,
Date_t time,
@@ -175,6 +181,7 @@ StatusWith<LocksType> DistLockCatalogMock::grabLock(OperationContext* opCtx,
StatusWith<LocksType> DistLockCatalogMock::overtakeLock(OperationContext* opCtx,
StringData lockID,
const OID& lockSessionID,
+ long long term,
const OID& currentHolderTS,
StringData who,
StringData processId,
@@ -225,21 +232,6 @@ StatusWith<DistLockCatalog::ServerInfo> DistLockCatalogMock::getServerInfo(
return ret;
}
-StatusWith<LocksType> DistLockCatalogMock::getLockByTS(OperationContext* opCtx,
- const OID& lockSessionID) {
- auto ret = kLocksTypeBadRetValue;
- GetLockByTSFunc checkerFunc = noGetLockByTSSet;
-
- {
- stdx::lock_guard<Latch> lk(_mutex);
- ret = _getLockByTSReturnValue;
- checkerFunc = _getLockByTSChecker;
- }
-
- checkerFunc(lockSessionID);
- return ret;
-}
-
StatusWith<LocksType> DistLockCatalogMock::getLockByName(OperationContext* opCtx, StringData name) {
auto ret = kLocksTypeBadRetValue;
GetLockByNameFunc checkerFunc = noGetLockByNameSet;
@@ -300,13 +292,6 @@ void DistLockCatalogMock::expectStopPing(StopPingFunc checkerFunc, Status return
_stopPingReturnValue = returnThis;
}
-void DistLockCatalogMock::expectGetLockByTS(GetLockByTSFunc checkerFunc,
- StatusWith<LocksType> returnThis) {
- stdx::lock_guard<Latch> lk(_mutex);
- _getLockByTSChecker = checkerFunc;
- _getLockByTSReturnValue = returnThis;
-}
-
void DistLockCatalogMock::expectGetLockByName(GetLockByNameFunc checkerFunc,
StatusWith<LocksType> returnThis) {
stdx::lock_guard<Latch> lk(_mutex);
@@ -335,9 +320,20 @@ void DistLockCatalogMock::expectGetServerInfo(GetServerInfoFunc checkerFunc,
_getServerInfoReturnValue = returnThis;
}
-Status DistLockCatalogMock::unlockAll(OperationContext* opCtx, const std::string& processID) {
- return Status(ErrorCodes::IllegalOperation,
- str::stream() << "unlockAll not expected to be called; processID: " << processID);
+void DistLockCatalogMock::expectUnlockAll(UnlockAllFunc checkerFunc) {
+ stdx::lock_guard<Latch> lk(_mutex);
+ _unlockAllChecker = checkerFunc;
+}
+
+Status DistLockCatalogMock::unlockAll(OperationContext* opCtx,
+ const std::string& processID,
+ boost::optional<long long> term) {
+ try {
+ _unlockAllChecker(processID, term);
+ return Status::OK();
+ } catch (const DBException& ex) {
+ return ex.toStatus();
+ }
}
} // namespace mongo
diff --git a/src/mongo/db/s/dist_lock_catalog_mock.h b/src/mongo/db/s/dist_lock_catalog_mock.h
index 2d71657a62a..0166a62ec1c 100644
--- a/src/mongo/db/s/dist_lock_catalog_mock.h
+++ b/src/mongo/db/s/dist_lock_catalog_mock.h
@@ -89,6 +89,8 @@ public:
using GetLockByTSFunc = std::function<void(const OID& ts)>;
using GetLockByNameFunc = std::function<void(StringData name)>;
using GetServerInfoFunc = std::function<void()>;
+ using UnlockAllFunc =
+ std::function<void(StringData processID, boost::optional<long long> term)>;
virtual StatusWith<LockpingsType> getPing(OperationContext* opCtx,
StringData processID) override;
@@ -98,6 +100,7 @@ public:
virtual StatusWith<LocksType> grabLock(OperationContext* opCtx,
StringData lockID,
const OID& lockSessionID,
+ long long term,
StringData who,
StringData processId,
Date_t time,
@@ -107,6 +110,7 @@ public:
virtual StatusWith<LocksType> overtakeLock(OperationContext* opCtx,
StringData lockID,
const OID& lockSessionID,
+ long long term,
const OID& currentHolderTS,
StringData who,
StringData processId,
@@ -117,13 +121,12 @@ public:
const OID& lockSessionID,
StringData name) override;
- virtual Status unlockAll(OperationContext* opCtx, const std::string& processID) override;
+ virtual Status unlockAll(OperationContext* opCtx,
+ const std::string& processID,
+ boost::optional<long long> term) override;
virtual StatusWith<ServerInfo> getServerInfo(OperationContext* opCtx) override;
- virtual StatusWith<LocksType> getLockByTS(OperationContext* opCtx,
- const OID& lockSessionID) override;
-
virtual StatusWith<LocksType> getLockByName(OperationContext* opCtx, StringData name) override;
virtual Status stopPing(OperationContext* opCtx, StringData processId) override;
@@ -157,12 +160,6 @@ public:
/**
* Sets the checker method to use and its return value the every time
- * getLockByTS is called.
- */
- void expectGetLockByTS(GetLockByTSFunc checkerFunc, StatusWith<LocksType> returnThis);
-
- /**
- * Sets the checker method to use and its return value the every time
* getLockByName is called.
*/
void expectGetLockByName(GetLockByNameFunc checkerFunc, StatusWith<LocksType> returnThis);
@@ -186,6 +183,8 @@ public:
void expectGetServerInfo(GetServerInfoFunc checkerFunc,
StatusWith<DistLockCatalog::ServerInfo> returnThis);
+ void expectUnlockAll(UnlockAllFunc checkerFunc);
+
private:
// Protects all the member variables.
Mutex _mutex = MONGO_MAKE_LATCH("DistLockCatalogMock::_mutex");
@@ -216,6 +215,8 @@ private:
GetServerInfoFunc _getServerInfoChecker;
StatusWith<DistLockCatalog::ServerInfo> _getServerInfoReturnValue;
+
+ UnlockAllFunc _unlockAllChecker;
};
} // namespace mongo
diff --git a/src/mongo/db/s/dist_lock_catalog_replset.cpp b/src/mongo/db/s/dist_lock_catalog_replset.cpp
index fa1b4aa0a4c..7e62b1a9369 100644
--- a/src/mongo/db/s/dist_lock_catalog_replset.cpp
+++ b/src/mongo/db/s/dist_lock_catalog_replset.cpp
@@ -229,6 +229,7 @@ Status DistLockCatalogImpl::ping(OperationContext* opCtx, StringData processID,
StatusWith<LocksType> DistLockCatalogImpl::grabLock(OperationContext* opCtx,
StringData lockID,
const OID& lockSessionID,
+ long long term,
StringData who,
StringData processId,
Date_t time,
@@ -237,7 +238,7 @@ StatusWith<LocksType> DistLockCatalogImpl::grabLock(OperationContext* opCtx,
BSONObj newLockDetails(BSON(LocksType::lockID(lockSessionID)
<< LocksType::state(LocksType::LOCKED) << LocksType::who() << who
<< LocksType::process() << processId << LocksType::when(time)
- << LocksType::why() << why));
+ << LocksType::term(term) << LocksType::why() << why));
auto request = makeFindAndModifyRequest(
_locksNS,
@@ -281,6 +282,7 @@ StatusWith<LocksType> DistLockCatalogImpl::grabLock(OperationContext* opCtx,
StatusWith<LocksType> DistLockCatalogImpl::overtakeLock(OperationContext* opCtx,
StringData lockID,
const OID& lockSessionID,
+ long long term,
const OID& currentHolderTS,
StringData who,
StringData processId,
@@ -294,7 +296,7 @@ StatusWith<LocksType> DistLockCatalogImpl::overtakeLock(OperationContext* opCtx,
BSONObj newLockDetails(BSON(LocksType::lockID(lockSessionID)
<< LocksType::state(LocksType::LOCKED) << LocksType::who() << who
<< LocksType::process() << processId << LocksType::when(time)
- << LocksType::why() << why));
+ << LocksType::term(term) << LocksType::why() << why));
auto request = makeFindAndModifyRequest(
_locksNS,
@@ -358,12 +360,17 @@ Status DistLockCatalogImpl::unlock(OperationContext* opCtx,
return findAndModifyStatus.getStatus();
}
-Status DistLockCatalogImpl::unlockAll(OperationContext* opCtx, const std::string& processID) {
+Status DistLockCatalogImpl::unlockAll(OperationContext* opCtx,
+ const std::string& processID,
+ boost::optional<long long> term) {
BatchedCommandRequest request([&] {
write_ops::UpdateCommandRequest updateOp(_locksNS);
updateOp.setUpdates({[&] {
write_ops::UpdateOpEntry entry;
- entry.setQ(BSON(LocksType::process(processID)));
+ auto query = BSON(LocksType::process(processID));
+ if (term)
+ query.addFields(BSON(LocksType::term() << BSON("$lte" << *term)));
+ entry.setQ(query);
entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(
BSON("$set" << BSON(LocksType::state(LocksType::UNLOCKED)))));
entry.setUpsert(false);
@@ -444,32 +451,6 @@ StatusWith<DistLockCatalog::ServerInfo> DistLockCatalogImpl::getServerInfo(
return DistLockCatalog::ServerInfo(localTimeElem.date(), electionIdStatus.getValue());
}
-StatusWith<LocksType> DistLockCatalogImpl::getLockByTS(OperationContext* opCtx,
- const OID& lockSessionID) {
- auto findResult =
- findOnConfig(opCtx, kReadPref, _locksNS, BSON(LocksType::lockID(lockSessionID)), {}, 1);
- if (!findResult.isOK()) {
- return findResult.getStatus();
- }
-
- const auto& findResultSet = findResult.getValue();
-
- if (findResultSet.empty()) {
- return {ErrorCodes::LockNotFound,
- str::stream() << "lock with ts " << lockSessionID << " not found"};
- }
-
- BSONObj doc = findResultSet.front();
- auto locksTypeResult = LocksType::fromBSON(doc);
- if (!locksTypeResult.isOK()) {
- return {ErrorCodes::FailedToParse,
- str::stream() << "failed to parse: " << doc << " : "
- << locksTypeResult.getStatus().toString()};
- }
-
- return locksTypeResult.getValue();
-}
-
StatusWith<LocksType> DistLockCatalogImpl::getLockByName(OperationContext* opCtx, StringData name) {
auto findResult =
findOnConfig(opCtx, kReadPref, _locksNS, BSON(LocksType::name() << name), {}, 1);
diff --git a/src/mongo/db/s/dist_lock_catalog_replset.h b/src/mongo/db/s/dist_lock_catalog_replset.h
index 0f0e3b184ea..a965a95f454 100644
--- a/src/mongo/db/s/dist_lock_catalog_replset.h
+++ b/src/mongo/db/s/dist_lock_catalog_replset.h
@@ -53,6 +53,7 @@ public:
StatusWith<LocksType> grabLock(OperationContext* opCtx,
StringData lockID,
const OID& lockSessionID,
+ long long term,
StringData who,
StringData processId,
Date_t time,
@@ -62,6 +63,7 @@ public:
StatusWith<LocksType> overtakeLock(OperationContext* opCtx,
StringData lockID,
const OID& lockSessionID,
+ long long term,
const OID& currentHolderTS,
StringData who,
StringData processId,
@@ -70,12 +72,12 @@ public:
Status unlock(OperationContext* opCtx, const OID& lockSessionID, StringData name) override;
- Status unlockAll(OperationContext* opCtx, const std::string& processID) override;
+ Status unlockAll(OperationContext* opCtx,
+ const std::string& processID,
+ boost::optional<long long> term) override;
StatusWith<ServerInfo> getServerInfo(OperationContext* opCtx) override;
- StatusWith<LocksType> getLockByTS(OperationContext* opCtx, const OID& lockSessionID) override;
-
StatusWith<LocksType> getLockByName(OperationContext* opCtx, StringData name) override;
Status stopPing(OperationContext* opCtx, StringData processId) override;
diff --git a/src/mongo/db/s/dist_lock_catalog_replset_test.cpp b/src/mongo/db/s/dist_lock_catalog_replset_test.cpp
index b6ef5ed706d..0d8cbae56b0 100644
--- a/src/mongo/db/s/dist_lock_catalog_replset_test.cpp
+++ b/src/mongo/db/s/dist_lock_catalog_replset_test.cpp
@@ -259,6 +259,7 @@ TEST_F(DistLockCatalogReplSetTest, GrabLockNoOp) {
.grabLock(opCtx,
"test",
myID,
+ 0LL,
"me",
"mongos",
now,
@@ -283,6 +284,7 @@ TEST_F(DistLockCatalogReplSetTest, GrabLockNoOp) {
who: "me",
process: "mongos",
when: { $date: "2015-05-22T19:17:18.098Z" },
+ term: 0,
why: "because"
}
},
@@ -313,6 +315,7 @@ TEST_F(DistLockCatalogReplSetTest, GrabLockWithNewDoc) {
auto resultStatus = _distLockCatalog.grabLock(opCtx,
"test",
myID,
+ 0LL,
"me",
"mongos",
now,
@@ -343,6 +346,7 @@ TEST_F(DistLockCatalogReplSetTest, GrabLockWithNewDoc) {
who: "me",
process: "mongos",
when: { $date: "2015-05-22T19:17:18.098Z" },
+ term: 0,
why: "because"
}
},
@@ -379,11 +383,17 @@ TEST_F(DistLockCatalogReplSetTest, GrabLockWithNewDoc) {
TEST_F(DistLockCatalogReplSetTest, GrabLockWithBadLockDoc) {
auto future = launchOnSeparateThread([this](OperationContext* opCtx) {
Date_t now(dateFromISOString("2015-05-22T19:17:18.098Z").getValue());
- auto resultStatus =
- _distLockCatalog
- .grabLock(
- opCtx, "test", OID(), "", "", now, "", DistLockCatalog::kMajorityWriteConcern)
- .getStatus();
+ auto resultStatus = _distLockCatalog
+ .grabLock(opCtx,
+ "test",
+ OID(),
+ 0LL,
+ "",
+ "",
+ now,
+ "",
+ DistLockCatalog::kMajorityWriteConcern)
+ .getStatus();
ASSERT_EQUALS(ErrorCodes::FailedToParse, resultStatus.code());
});
@@ -419,6 +429,7 @@ TEST_F(DistLockCatalogReplSetTest, GrabLockTargetError) {
.grabLock(operationContext(),
"",
OID::gen(),
+ 0LL,
"",
"",
Date_t::now(),
@@ -435,6 +446,7 @@ TEST_F(DistLockCatalogReplSetTest, GrabLockRunCmdError) {
.grabLock(operationContext(),
"",
OID::gen(),
+ 0LL,
"",
"",
Date_t::now(),
@@ -451,6 +463,7 @@ TEST_F(DistLockCatalogReplSetTest, GrabLockCommandError) {
.grabLock(opCtx,
"",
OID::gen(),
+ 0LL,
"",
"",
Date_t::now(),
@@ -478,6 +491,7 @@ TEST_F(DistLockCatalogReplSetTest, GrabLockDupKeyError) {
.grabLock(opCtx,
"",
OID::gen(),
+ 0LL,
"",
"",
Date_t::now(),
@@ -502,6 +516,7 @@ TEST_F(DistLockCatalogReplSetTest, GrabLockWriteError) {
.grabLock(opCtx,
"",
OID::gen(),
+ 0LL,
"",
"",
Date_t::now(),
@@ -529,6 +544,7 @@ TEST_F(DistLockCatalogReplSetTest, GrabLockWriteConcernError) {
.grabLock(operationContext(),
"",
OID::gen(),
+ 0LL,
"",
"",
Date_t::now(),
@@ -559,6 +575,7 @@ TEST_F(DistLockCatalogReplSetTest, GrabLockWriteConcernErrorBadType) {
.grabLock(operationContext(),
"",
OID::gen(),
+ 0LL,
"",
"",
Date_t::now(),
@@ -587,6 +604,7 @@ TEST_F(DistLockCatalogReplSetTest, GrabLockResponseMissingValueField) {
.grabLock(operationContext(),
"",
OID::gen(),
+ 0LL,
"",
"",
Date_t::now(),
@@ -610,6 +628,7 @@ TEST_F(DistLockCatalogReplSetTest, GrabLockUnsupportedWriteConcernResponse) {
.grabLock(operationContext(),
"",
OID::gen(),
+ 0LL,
"",
"",
Date_t::now(),
@@ -639,6 +658,7 @@ TEST_F(DistLockCatalogReplSetTest, GrabLockUnsupportedResponseFormat) {
.grabLock(operationContext(),
"",
OID::gen(),
+ 0LL,
"",
"",
Date_t::now(),
@@ -660,11 +680,17 @@ TEST_F(DistLockCatalogReplSetTest, OvertakeLockNoOp) {
OID myID("555f80be366c194b13fb0372");
OID currentOwner("555f99712c99a78c5b083358");
Date_t now(dateFromISOString("2015-05-22T19:17:18.098Z").getValue());
- auto resultStatus =
- _distLockCatalog
- .overtakeLock(
- operationContext(), "test", myID, currentOwner, "me", "mongos", now, "because")
- .getStatus();
+ auto resultStatus = _distLockCatalog
+ .overtakeLock(operationContext(),
+ "test",
+ myID,
+ 0LL,
+ currentOwner,
+ "me",
+ "mongos",
+ now,
+ "because")
+ .getStatus();
ASSERT_EQUALS(ErrorCodes::LockStateChangeFailed, resultStatus.code());
});
@@ -688,6 +714,7 @@ TEST_F(DistLockCatalogReplSetTest, OvertakeLockNoOp) {
who: "me",
process: "mongos",
when: { $date: "2015-05-22T19:17:18.098Z" },
+ term: 0,
why: "because"
}
},
@@ -716,7 +743,7 @@ TEST_F(DistLockCatalogReplSetTest, OvertakeLockWithNewDoc) {
OID currentOwner("555f99712c99a78c5b083358");
Date_t now(dateFromISOString("2015-05-22T19:17:18.098Z").getValue());
auto resultStatus = _distLockCatalog.overtakeLock(
- operationContext(), "test", myID, currentOwner, "me", "mongos", now, "because");
+ operationContext(), "test", myID, 0LL, currentOwner, "me", "mongos", now, "because");
ASSERT_OK(resultStatus.getStatus());
const auto& lockDoc = resultStatus.getValue();
@@ -747,6 +774,7 @@ TEST_F(DistLockCatalogReplSetTest, OvertakeLockWithNewDoc) {
who: "me",
process: "mongos",
when: { $date: "2015-05-22T19:17:18.098Z" },
+ term: 0,
why: "because"
}
},
@@ -783,7 +811,8 @@ TEST_F(DistLockCatalogReplSetTest, OvertakeLockWithBadLockDoc) {
auto future = launchOnSeparateThread([this](OperationContext* opCtx) {
Date_t now(dateFromISOString("2015-05-22T19:17:18.098Z").getValue());
auto resultStatus =
- _distLockCatalog.overtakeLock(operationContext(), "test", OID(), OID(), "", "", now, "")
+ _distLockCatalog
+ .overtakeLock(operationContext(), "test", OID(), 0LL, OID(), "", "", now, "")
.getStatus();
ASSERT_EQUALS(ErrorCodes::FailedToParse, resultStatus.code());
});
@@ -815,18 +844,20 @@ TEST_F(DistLockCatalogReplSetTest, OvertakeLockWithBadLockDoc) {
TEST_F(DistLockCatalogReplSetTest, OvertakeLockTargetError) {
configTargeter()->setFindHostReturnValue({ErrorCodes::InternalError, "can't target"});
- auto status = _distLockCatalog
- .overtakeLock(operationContext(), "", OID(), OID(), "", "", Date_t::now(), "")
- .getStatus();
+ auto status =
+ _distLockCatalog
+ .overtakeLock(operationContext(), "", OID(), 0LL, OID(), "", "", Date_t::now(), "")
+ .getStatus();
ASSERT_NOT_OK(status);
}
TEST_F(DistLockCatalogReplSetTest, OvertakeLockRunCmdError) {
shutdownExecutorPool();
- auto status = _distLockCatalog
- .overtakeLock(operationContext(), "", OID(), OID(), "", "", Date_t::now(), "")
- .getStatus();
+ auto status =
+ _distLockCatalog
+ .overtakeLock(operationContext(), "", OID(), 0LL, OID(), "", "", Date_t::now(), "")
+ .getStatus();
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status.code());
ASSERT_FALSE(status.reason().empty());
}
@@ -835,7 +866,7 @@ TEST_F(DistLockCatalogReplSetTest, OvertakeLockCommandError) {
auto future = launchOnSeparateThread([this](OperationContext* opCtx) {
auto status =
_distLockCatalog
- .overtakeLock(operationContext(), "", OID(), OID(), "", "", Date_t::now(), "")
+ .overtakeLock(operationContext(), "", OID(), 0LL, OID(), "", "", Date_t::now(), "")
.getStatus();
ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code());
ASSERT_FALSE(status.reason().empty());
@@ -856,7 +887,7 @@ TEST_F(DistLockCatalogReplSetTest, OvertakeLockWriteError) {
auto future = launchOnSeparateThread([this](OperationContext* opCtx) {
auto status =
_distLockCatalog
- .overtakeLock(operationContext(), "", OID(), OID(), "", "", Date_t::now(), "")
+ .overtakeLock(operationContext(), "", OID(), 0LL, OID(), "", "", Date_t::now(), "")
.getStatus();
ASSERT_EQUALS(ErrorCodes::Unauthorized, status.code());
ASSERT_FALSE(status.reason().empty());
@@ -877,7 +908,7 @@ TEST_F(DistLockCatalogReplSetTest, OvertakeLockWriteConcernError) {
auto future = launchOnSeparateThread([this](OperationContext* opCtx) {
auto status =
_distLockCatalog
- .overtakeLock(operationContext(), "", OID(), OID(), "", "", Date_t::now(), "")
+ .overtakeLock(operationContext(), "", OID(), 0LL, OID(), "", "", Date_t::now(), "")
.getStatus();
ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, status.code());
ASSERT_FALSE(status.reason().empty());
@@ -901,7 +932,7 @@ TEST_F(DistLockCatalogReplSetTest, OvertakeLockUnsupportedWriteConcernResponse)
auto future = launchOnSeparateThread([this](OperationContext* opCtx) {
auto status =
_distLockCatalog
- .overtakeLock(operationContext(), "", OID(), OID(), "", "", Date_t::now(), "")
+ .overtakeLock(operationContext(), "", OID(), 0LL, OID(), "", "", Date_t::now(), "")
.getStatus();
ASSERT_EQUALS(ErrorCodes::UnsupportedFormat, status.code());
ASSERT_FALSE(status.reason().empty());
@@ -926,7 +957,7 @@ TEST_F(DistLockCatalogReplSetTest, OvertakeLockUnsupportedResponseFormat) {
auto future = launchOnSeparateThread([this](OperationContext* opCtx) {
ASSERT_NOT_OK(
_distLockCatalog
- .overtakeLock(operationContext(), "", OID(), OID(), "", "", Date_t::now(), "")
+ .overtakeLock(operationContext(), "", OID(), 0LL, OID(), "", "", Date_t::now(), "")
.getStatus());
});
@@ -1191,7 +1222,7 @@ TEST_F(DistLockCatalogReplSetTest, UnlockUnsupportedResponseFormat) {
TEST_F(DistLockCatalogReplSetTest, BasicUnlockAll) {
auto future = launchOnSeparateThread([this](OperationContext* opCtx) {
- auto status = _distLockCatalog.unlockAll(operationContext(), "processID");
+ auto status = _distLockCatalog.unlockAll(operationContext(), "processID", boost::none);
ASSERT_OK(status);
});
@@ -1225,7 +1256,7 @@ TEST_F(DistLockCatalogReplSetTest, BasicUnlockAll) {
TEST_F(DistLockCatalogReplSetTest, UnlockAllWriteFailed) {
auto future = launchOnSeparateThread([this](OperationContext* opCtx) {
- auto status = _distLockCatalog.unlockAll(operationContext(), "processID");
+ auto status = _distLockCatalog.unlockAll(operationContext(), "processID", boost::none);
ASSERT_EQUALS(ErrorCodes::IllegalOperation, status);
});
@@ -1239,7 +1270,7 @@ TEST_F(DistLockCatalogReplSetTest, UnlockAllWriteFailed) {
TEST_F(DistLockCatalogReplSetTest, UnlockAllNetworkError) {
auto future = launchOnSeparateThread([this](OperationContext* opCtx) {
- auto status = _distLockCatalog.unlockAll(operationContext(), "processID");
+ auto status = _distLockCatalog.unlockAll(operationContext(), "processID", boost::none);
ASSERT_EQUALS(ErrorCodes::NetworkTimeout, status);
});
@@ -1663,91 +1694,6 @@ TEST_F(DistLockCatalogReplSetTest, GetPingUnsupportedFormat) {
future.default_timed_get();
}
-TEST_F(DistLockCatalogReplSetTest, BasicGetLockByTS) {
- auto future = launchOnSeparateThread([this](OperationContext* opCtx) {
- OID ts("555f99712c99a78c5b083358");
- auto resultStatus = _distLockCatalog.getLockByTS(operationContext(), ts);
- ASSERT_OK(resultStatus.getStatus());
-
- const auto& lockDoc = resultStatus.getValue();
- ASSERT_EQUALS("test", lockDoc.getName());
- ASSERT_EQUALS(ts, lockDoc.getLockID());
- });
-
- onFindCommand([](const RemoteCommandRequest& request) -> StatusWith<std::vector<BSONObj>> {
- ASSERT_EQUALS(dummyHost, request.target);
- ASSERT_EQUALS("config", request.dbname);
-
- const auto& findCmd = request.cmdObj;
- ASSERT_EQUALS("locks", findCmd["find"].str());
- ASSERT_BSONOBJ_EQ(BSON("ts" << OID("555f99712c99a78c5b083358")), findCmd["filter"].Obj());
- ASSERT_EQUALS(1, findCmd["limit"].numberLong());
- checkReadConcern(findCmd);
-
- BSONObj lockDoc(fromjson(R"({
- _id: "test",
- state: 2,
- ts: ObjectId("555f99712c99a78c5b083358")
- })"));
-
- std::vector<BSONObj> result;
- result.push_back(lockDoc);
- return result;
- });
-
- future.default_timed_get();
-}
-
-TEST_F(DistLockCatalogReplSetTest, GetLockByTSTargetError) {
- configTargeter()->setFindHostReturnValue({ErrorCodes::InternalError, "can't target"});
- auto status = _distLockCatalog.getLockByTS(operationContext(), OID()).getStatus();
- ASSERT_EQUALS(ErrorCodes::InternalError, status.code());
-}
-
-TEST_F(DistLockCatalogReplSetTest, GetLockByTSRunCmdError) {
- shutdownExecutorPool();
- auto status = _distLockCatalog.getLockByTS(operationContext(), OID()).getStatus();
- ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status.code());
- ASSERT_FALSE(status.reason().empty());
-}
-
-TEST_F(DistLockCatalogReplSetTest, GetLockByTSNotFound) {
- auto future = launchOnSeparateThread([this](OperationContext* opCtx) {
- auto status = _distLockCatalog.getLockByTS(operationContext(), OID()).getStatus();
- ASSERT_EQUALS(ErrorCodes::LockNotFound, status.code());
- ASSERT_FALSE(status.reason().empty());
- });
-
- onFindCommand([](const RemoteCommandRequest& request) -> StatusWith<std::vector<BSONObj>> {
- return std::vector<BSONObj>();
- });
-
- future.default_timed_get();
-}
-
-TEST_F(DistLockCatalogReplSetTest, GetLockByTSUnsupportedFormat) {
- auto future = launchOnSeparateThread([this](OperationContext* opCtx) {
- auto status = _distLockCatalog.getLockByTS(operationContext(), OID()).getStatus();
- ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code());
- ASSERT_FALSE(status.reason().empty());
- });
-
- onFindCommand([](const RemoteCommandRequest& request) -> StatusWith<std::vector<BSONObj>> {
- // return invalid non-numeric type for state.
- BSONObj lockDoc(fromjson(R"({
- _id: "test",
- state: "bad"
- })"));
-
- std::vector<BSONObj> result;
- result.push_back(lockDoc);
-
- return result;
- });
-
- future.default_timed_get();
-}
-
TEST_F(DistLockCatalogReplSetTest, BasicGetLockByName) {
auto future = launchOnSeparateThread([this](OperationContext* opCtx) {
OID ts("555f99712c99a78c5b083358");
diff --git a/src/mongo/db/s/dist_lock_manager.h b/src/mongo/db/s/dist_lock_manager.h
index 1398f018c27..dfd0270732e 100644
--- a/src/mongo/db/s/dist_lock_manager.h
+++ b/src/mongo/db/s/dist_lock_manager.h
@@ -173,6 +173,8 @@ public:
* Specialized locking method, which only succeeds if the specified lock name is not held by
* anyone. Uses local write concern and does not attempt to overtake the lock or check whether
* the lock lease has expired.
+ *
+ * This method is only used by the Balancer, which re-acquires dist locks while in drain mode.
*/
virtual Status tryLockDirectWithLocalWriteConcern(OperationContext* opCtx,
StringData name,
@@ -193,8 +195,6 @@ public:
virtual void unlockAll(OperationContext* opCtx) = 0;
protected:
- friend class MigrationManager;
-
DistLockManager(OID lockSessionID);
const OID _lockSessionID;
diff --git a/src/mongo/db/s/dist_lock_manager_replset.cpp b/src/mongo/db/s/dist_lock_manager_replset.cpp
index 78a13c29606..d4c5cf0df8a 100644
--- a/src/mongo/db/s/dist_lock_manager_replset.cpp
+++ b/src/mongo/db/s/dist_lock_manager_replset.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/s/dist_lock_manager_replset.h"
+#include "mongo/db/repl/replica_set_aware_service.h"
#include "mongo/db/s/type_lockpings.h"
#include "mongo/db/s/type_locks.h"
#include "mongo/logv2/log.h"
@@ -58,12 +59,32 @@ const Milliseconds kLockRetryInterval(500);
MONGO_FAIL_POINT_DEFINE(setDistLockTimeout);
MONGO_FAIL_POINT_DEFINE(disableReplSetDistLockManager);
+class DistLockManagerService : public ReplicaSetAwareServiceShardSvr<DistLockManagerService> {
+public:
+ static DistLockManagerService* get(ServiceContext* opCtx);
+ static DistLockManagerService* get(OperationContext* opCtx);
+
+ void onStartup(OperationContext* opCtx) override {}
+ void onShutdown() override {}
+ void onStepUpBegin(OperationContext* opCtx, long long term) override {
+ auto distLockManager = DistLockManager::get(opCtx);
+ if (!distLockManager) // Sharding not initialised yet
+ return;
+ checked_cast<ReplSetDistLockManager*>(distLockManager)->onStepUp(term);
+ }
+ void onStepUpComplete(OperationContext* opCtx, long long term) override {}
+ void onStepDown() override {}
+ void onBecomeArbiter() override {}
+};
+
+const auto serviceDecorator = ServiceContext::declareDecoration<DistLockManagerService>();
+
/**
* With this logic, the LockID handle for the config server is always fixed and different from that
* of the shards, but all shards have the same LockID handle. This means that locks taken from
* different shards OR different nodes from the same shard are always compatible.
*
- * This is OK and is only needed as a step for upgrade from 4.4 to 4.9+ in order to ensure that the
+ * This is OK and is only needed as a step for upgrade from 4.4 to 5.0 in order to ensure that the
* new DDL operations, which are driven by the DB primaries, lock out the legacy DDL operations,
* which are driven by a 4.4 config server.
*/
@@ -79,6 +100,17 @@ OID shardNameToOID(StringData name) {
return OID::from(oidData.c_str());
}
+DistLockManagerService* DistLockManagerService::get(ServiceContext* service) {
+ return &serviceDecorator(service);
+}
+
+DistLockManagerService* DistLockManagerService::get(OperationContext* opCtx) {
+ return get(opCtx->getServiceContext());
+}
+
+const ReplicaSetAwareServiceRegistry::Registerer<DistLockManagerService>
+ distLockManagerServiceServiceRegisterer("DistLockManagerService");
+
} // namespace
const Seconds ReplSetDistLockManager::kDistLockPingInterval{30};
@@ -94,7 +126,8 @@ ReplSetDistLockManager::ReplSetDistLockManager(ServiceContext* service,
_processID(processID.toString()),
_catalog(std::move(catalog)),
_pingInterval(pingInterval),
- _lockExpiration(lockExpiration) {}
+ _lockExpiration(lockExpiration),
+ _recoveryState(_processID == ShardId::kConfigServerId ? kRecovered : kMustRecover) {}
ReplSetDistLockManager::~ReplSetDistLockManager() = default;
@@ -178,11 +211,17 @@ void ReplSetDistLockManager::doTask() {
elapsedSincelastPing.reset();
}
- // Process the unlock queue
{
auto opCtxHolder = tc->makeOperationContext();
auto* opCtx = opCtxHolder.get();
+ try {
+ _waitForRecovery(opCtx);
+ } catch (const DBException& ex) {
+ LOGV2_WARNING(
+ 570180, "Error recovering dist lock manager", "error"_attr = redact(ex));
+ }
+
std::deque<UnlockRequest> toUnlockBatch;
{
stdx::unique_lock<Latch> lk(_mutex);
@@ -201,15 +240,13 @@ void ReplSetDistLockManager::doTask() {
if (!unlockStatus.isOK()) {
LOGV2_WARNING(22670,
- "Error unlocking distributed lock {lockName} with sessionID "
- "{lockSessionId} caused by {error}",
"Error unlocking distributed lock",
- "lockSessionId"_attr = toUnlock.lockId,
"lockName"_attr = toUnlock.name,
- "error"_attr = unlockStatus);
+ "lockSessionId"_attr = toUnlock.lockId,
+ "error"_attr = redact(unlockStatus));
// Queue another attempt, unless the problem was no longer being primary.
if (unlockStatus != ErrorCodes::NotWritablePrimary) {
- (void)queueUnlock(toUnlock.lockId, toUnlock.name);
+ (void)_queueUnlock(toUnlock.lockId, toUnlock.name);
}
} else {
LOGV2(22650,
@@ -350,6 +387,9 @@ Status ReplSetDistLockManager::lockDirect(OperationContext* opCtx,
StringData name,
StringData whyMessage,
Milliseconds waitFor) {
+ // Exclude recovery from the accounting for the duration of the dist lock acquisition
+ long long term = _waitForRecovery(opCtx);
+
Timer timer(_serviceContext->getTickSource());
Timer msgTimer(_serviceContext->getTickSource());
@@ -375,8 +415,8 @@ Status ReplSetDistLockManager::lockDirect(OperationContext* opCtx,
LOGV2_DEBUG(22654,
1,
"Trying to acquire new distributed lock for {lockName} ( "
- "lockSessionID: {lockSessionId}, "
"process : {processId}, "
+ "lockSessionID: {lockSessionId}, "
"lock timeout : {lockExpirationTimeout}, "
"ping interval : {pingInterval}, "
"reason: {reason} )",
@@ -388,13 +428,16 @@ Status ReplSetDistLockManager::lockDirect(OperationContext* opCtx,
"pingInterval"_attr = _pingInterval,
"reason"_attr = whyMessage);
- auto lockResult = _catalog->grabLock(
- opCtx, name, _lockSessionID, who, _processID, Date_t::now(), whyMessage.toString());
-
- auto status = lockResult.getStatus();
- if (status.isOK()) {
- // Lock is acquired since findAndModify was able to successfully modify
- // the lock document.
+ auto lockResult = _catalog->grabLock(opCtx,
+ name,
+ _lockSessionID,
+ term,
+ who,
+ _processID,
+ Date_t::now(),
+ whyMessage.toString(),
+ DistLockCatalog::kMajorityWriteConcern);
+ if (lockResult.isOK()) {
LOGV2(22655,
"Acquired distributed lock {lockName} with session ID {lockSessionId} for "
"{reason}",
@@ -406,6 +449,7 @@ Status ReplSetDistLockManager::lockDirect(OperationContext* opCtx,
}
// If a network error occurred, unlock the lock synchronously and try again
+ auto status = lockResult.getStatus();
if (configShard->isRetriableError(status.code(), Shard::RetryPolicy::kIdempotent) &&
networkErrorRetries < kMaxNumLockAcquireRetries) {
LOGV2_DEBUG(22656,
@@ -439,7 +483,7 @@ Status ReplSetDistLockManager::lockDirect(OperationContext* opCtx,
if (status != ErrorCodes::LockStateChangeFailed) {
// An error occurred but the write might have actually been applied on the
// other side. Schedule an unlock to clean it up just in case.
- (void)queueUnlock(_lockSessionID, name.toString());
+ (void)_queueUnlock(_lockSessionID, name.toString());
return status;
}
@@ -465,6 +509,7 @@ Status ReplSetDistLockManager::lockDirect(OperationContext* opCtx,
auto overtakeResult = _catalog->overtakeLock(opCtx,
name,
_lockSessionID,
+ term,
currentLock.getLockID(),
who,
_processID,
@@ -488,7 +533,7 @@ Status ReplSetDistLockManager::lockDirect(OperationContext* opCtx,
if (overtakeStatus != ErrorCodes::LockStateChangeFailed) {
// An error occurred but the write might have actually been applied on the
// other side. Schedule an unlock to clean it up just in case.
- (void)queueUnlock(_lockSessionID, name.toString());
+ (void)_queueUnlock(_lockSessionID, name.toString());
return overtakeStatus;
}
}
@@ -552,6 +597,7 @@ Status ReplSetDistLockManager::tryLockDirectWithLocalWriteConcern(OperationConte
auto lockStatus = _catalog->grabLock(opCtx,
name,
_lockSessionID,
+ 0LL, // Zero term for ConfigServer acquisitions
who,
_processID,
Date_t::now(),
@@ -582,13 +628,13 @@ Status ReplSetDistLockManager::tryLockDirectWithLocalWriteConcern(OperationConte
}
void ReplSetDistLockManager::unlock(Interruptible* intr, StringData name) {
- auto unlockFuture = queueUnlock(_lockSessionID, name.toString());
+ auto unlockFuture = _queueUnlock(_lockSessionID, name.toString());
if (intr)
unlockFuture.getNoThrow(intr).ignore();
}
void ReplSetDistLockManager::unlockAll(OperationContext* opCtx) {
- Status status = _catalog->unlockAll(opCtx, getProcessID());
+ Status status = _catalog->unlockAll(opCtx, getProcessID(), boost::none);
if (!status.isOK()) {
LOGV2_WARNING(
22672,
@@ -599,14 +645,73 @@ void ReplSetDistLockManager::unlockAll(OperationContext* opCtx) {
}
}
-SharedSemiFuture<void> ReplSetDistLockManager::queueUnlock(const OID& lockSessionID,
- const std::string& name) {
+void ReplSetDistLockManager::onStepUp(long long term) {
+ stdx::unique_lock<Latch> lk(_mutex);
+ _recoveryState = kMustRecover;
+ _recoveryTerm = term;
+ _waitForRecoveryCV.notify_all();
+}
+
+void ReplSetDistLockManager::markRecovered_forTest() {
+ stdx::unique_lock<Latch> lk(_mutex);
+ _recoveryState = kRecovered;
+ _recoveryTerm = 0;
+ _waitForRecoveryCV.notify_all();
+}
+
+SharedSemiFuture<void> ReplSetDistLockManager::_queueUnlock(const OID& lockSessionID,
+ const std::string& name) {
stdx::unique_lock<Latch> lk(_mutex);
auto& req = _unlockList.emplace_back(lockSessionID, name);
_shutDownCV.notify_all();
return req.unlockCompleted.getFuture();
}
+long long ReplSetDistLockManager::_waitForRecovery(OperationContext* opCtx) {
+ while (true) {
+ stdx::unique_lock lk(_mutex);
+ if (_recoveryState == kRecovered)
+ return _recoveryTerm;
+
+ const auto term = _recoveryTerm;
+
+ if (_recoveryState == kMustWaitForRecovery) {
+ opCtx->waitForConditionOrInterrupt(_waitForRecoveryCV, lk, [&] {
+ return !(_recoveryTerm == term && _recoveryState == kMustWaitForRecovery);
+ });
+ continue;
+ }
+
+ // This is the thread which will perform the recovery, while all others will block on it
+ invariant(_recoveryState == kMustRecover);
+ _recoveryState = kMustWaitForRecovery;
+ lk.unlock();
+
+ LOGV2(570181, "Recovering dist lock manager", "term"_attr = term);
+
+ auto anotherThreadMustRecoverGuard = makeGuard([&] {
+ lk.lock();
+ if (term == _recoveryTerm) {
+ _recoveryState = kMustRecover;
+ _waitForRecoveryCV.notify_all();
+ }
+ });
+
+ uassertStatusOKWithContext(
+ _catalog->unlockAll(opCtx, getProcessID(), term),
+ str::stream() << "While recovering the dist lock manager for term " << term);
+
+ anotherThreadMustRecoverGuard.dismiss();
+
+ lk.lock();
+ if (term == _recoveryTerm) {
+ _recoveryState = kRecovered;
+ _waitForRecoveryCV.notify_all();
+ }
+ continue;
+ }
+}
+
ReplSetDistLockManager::DistLockPingInfo::DistLockPingInfo() = default;
ReplSetDistLockManager::DistLockPingInfo::DistLockPingInfo(
diff --git a/src/mongo/db/s/dist_lock_manager_replset.h b/src/mongo/db/s/dist_lock_manager_replset.h
index 2eddb76326b..564eac82fae 100644
--- a/src/mongo/db/s/dist_lock_manager_replset.h
+++ b/src/mongo/db/s/dist_lock_manager_replset.h
@@ -74,11 +74,19 @@ public:
void unlockAll(OperationContext* opCtx) override;
+ /**
+ * Indicates to the dist lock manager that a step-up has occurred with the specified term. This
+ * in turn requests that the dist lock manager performs a recovery, freeing all locks it might
+ * have previously held, before it attempts to acquire any new ones.
+ */
+ void onStepUp(long long term);
+ void markRecovered_forTest();
+
private:
/**
* Queue a lock to be unlocked asynchronously with retry until it doesn't error.
*/
- SharedSemiFuture<void> queueUnlock(const OID& lockSessionID, const std::string& name);
+ SharedSemiFuture<void> _queueUnlock(const OID& lockSessionID, const std::string& name);
/**
* Periodically pings and checks if there are locks queued that needs unlocking.
@@ -98,6 +106,8 @@ private:
const LocksType lockDoc,
const Milliseconds& lockExpiration);
+ long long _waitForRecovery(OperationContext* opCtx);
+
/**
* Data structure for storing information about distributed lock pings.
*/
@@ -169,6 +179,16 @@ private:
// Map of lockName to last ping information.
stdx::unordered_map<std::string, DistLockPingInfo> _pingHistory; // (M)
+
+ // Tracks the state of post step-up recovery.
+ enum Recovery {
+ kMustRecover,
+ kMustWaitForRecovery,
+ kRecovered,
+ };
+ Recovery _recoveryState;
+ long long _recoveryTerm{-1};
+ stdx::condition_variable _waitForRecoveryCV;
};
} // namespace mongo
diff --git a/src/mongo/db/s/dist_lock_manager_replset_test.cpp b/src/mongo/db/s/dist_lock_manager_replset_test.cpp
index d3408d3d3f3..bd75caa8745 100644
--- a/src/mongo/db/s/dist_lock_manager_replset_test.cpp
+++ b/src/mongo/db/s/dist_lock_manager_replset_test.cpp
@@ -56,6 +56,8 @@
namespace mongo {
namespace {
+using unittest::assertGet;
+
// Max duration to wait to satisfy test invariant before joining with main test thread.
const Seconds kJoinTimeout(30);
const Milliseconds kPingInterval(2);
@@ -95,11 +97,14 @@ protected:
std::unique_ptr<DistLockManager> makeDistLockManager() override {
auto distLockCatalogMock = std::make_unique<DistLockCatalogMock>();
_distLockCatalogMock = distLockCatalogMock.get();
- return std::make_unique<ReplSetDistLockManager>(getServiceContext(),
- _processID,
- std::move(distLockCatalogMock),
- kPingInterval,
- kLockExpiration);
+ auto distLockManager =
+ std::make_unique<ReplSetDistLockManager>(getServiceContext(),
+ _processID,
+ std::move(distLockCatalogMock),
+ kPingInterval,
+ kLockExpiration);
+ distLockManager->markRecovered_forTest();
+ return distLockManager;
}
std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient() override {
@@ -1583,9 +1588,33 @@ TEST_F(DistLockManagerReplSetTest, LockAcquisitionRetriesOnInterruptionNeverSucc
ASSERT_NOT_OK(status);
}
-class RSDistLockMgrWithMockTickSource : public DistLockManagerReplSetTest {
+TEST_F(DistLockManagerReplSetTest, RecoverySuccess) {
+ getMockCatalog()->expectUnlockAll(
+ [&](StringData processID, boost::optional<long long> term) {});
+
+ getMockCatalog()->expectGrabLock(
+ [&](StringData, const OID&, StringData, StringData, Date_t, StringData) {},
+ [&] {
+ LocksType doc;
+ doc.setName("RecoveryLock");
+ doc.setState(LocksType::LOCKED);
+ doc.setProcess(getProcessID());
+ doc.setWho("me");
+ doc.setWhy("because");
+ doc.setLockID(OID::gen());
+ return doc;
+ }());
+
+ auto replSetDistLockManager =
+ checked_cast<ReplSetDistLockManager*>(DistLockManager::get(operationContext()));
+ replSetDistLockManager->onStepUp(1LL);
+ ASSERT_OK(replSetDistLockManager->lockDirect(
+ operationContext(), "RecoveryLock", "because", DistLockManager::kDefaultLockTimeout));
+}
+
+class DistLockManagerReplSetTestWithMockTickSource : public DistLockManagerReplSetTest {
protected:
- RSDistLockMgrWithMockTickSource() {
+ DistLockManagerReplSetTestWithMockTickSource() {
getServiceContext()->setTickSource(std::make_unique<TickSourceMock<>>());
}
@@ -1604,7 +1633,7 @@ protected:
* 3. Unlock (on destructor of ScopedDistLock).
* 4. Check lock id used in lock and unlock are the same.
*/
-TEST_F(RSDistLockMgrWithMockTickSource, LockSuccessAfterRetry) {
+TEST_F(DistLockManagerReplSetTestWithMockTickSource, LockSuccessAfterRetry) {
std::string lockName("test");
std::string me("me");
boost::optional<OID> lastTS;
@@ -1740,7 +1769,7 @@ TEST_F(RSDistLockMgrWithMockTickSource, LockSuccessAfterRetry) {
* 3. Grab lock errors out on the fourth try.
* 4. Make sure that unlock is called to cleanup the last lock attempted that error out.
*/
-TEST_F(RSDistLockMgrWithMockTickSource, LockFailsAfterRetry) {
+TEST_F(DistLockManagerReplSetTestWithMockTickSource, LockFailsAfterRetry) {
std::string lockName("test");
std::string me("me");
boost::optional<OID> lastTS;
@@ -1874,7 +1903,7 @@ TEST_F(DistLockManagerReplSetTest, LockBusyNoRetry) {
* 4. Checks result is error.
* 5. Implicitly check that unlock is not called (default setting of mock catalog).
*/
-TEST_F(RSDistLockMgrWithMockTickSource, LockRetryTimeout) {
+TEST_F(DistLockManagerReplSetTestWithMockTickSource, LockRetryTimeout) {
std::string lockName("test");
std::string me("me");
boost::optional<OID> lastTS;
@@ -1930,7 +1959,7 @@ TEST_F(RSDistLockMgrWithMockTickSource, LockRetryTimeout) {
* 5. 2nd attempt to grab lock still fails for the same reason.
* 6. But since the ping has not been updated, dist lock manager should overtake lock.
*/
-TEST_F(RSDistLockMgrWithMockTickSource, CanOvertakeIfNoPingDocument) {
+TEST_F(DistLockManagerReplSetTestWithMockTickSource, CanOvertakeIfNoPingDocument) {
getMockCatalog()->expectGrabLock(
[](StringData, const OID&, StringData, StringData, Date_t, StringData) {
// Don't care
diff --git a/src/mongo/db/s/type_locks.cpp b/src/mongo/db/s/type_locks.cpp
index 9333ff42496..59a9b6e44c2 100644
--- a/src/mongo/db/s/type_locks.cpp
+++ b/src/mongo/db/s/type_locks.cpp
@@ -49,6 +49,7 @@ const BSONField<OID> LocksType::lockID("ts");
const BSONField<std::string> LocksType::who("who");
const BSONField<std::string> LocksType::why("why");
const BSONField<Date_t> LocksType::when("when");
+const BSONField<long long> LocksType::term("term");
StatusWith<LocksType> LocksType::fromBSON(const BSONObj& source) {
LocksType lock;
diff --git a/src/mongo/db/s/type_locks.h b/src/mongo/db/s/type_locks.h
index 8b567960012..c404aabd43b 100644
--- a/src/mongo/db/s/type_locks.h
+++ b/src/mongo/db/s/type_locks.h
@@ -63,6 +63,7 @@ public:
static const BSONField<std::string> who;
static const BSONField<std::string> why;
static const BSONField<Date_t> when;
+ static const BSONField<long long> term;
/**
* Constructs a new LocksType object from BSON.
diff --git a/src/mongo/s/client/shard.cpp b/src/mongo/s/client/shard.cpp
index 69aefb113eb..5a94298bf82 100644
--- a/src/mongo/s/client/shard.cpp
+++ b/src/mongo/s/client/shard.cpp
@@ -166,13 +166,10 @@ StatusWith<Shard::CommandResponse> Shard::runCommandWithFixedRetryAttempts(
auto swResponse = _runCommand(opCtx, readPref, dbName, maxTimeMSOverride, cmdObj);
auto status = CommandResponse::getEffectiveStatus(swResponse);
if (retry < kOnErrorNumRetries && isRetriableError(status.code(), retryPolicy)) {
- LOGV2_DEBUG(22720,
- 2,
- "Command {command} failed with retryable error and will be retried. Caused "
- "by {error}",
- "Command failed with retryable error and will be retried",
- "command"_attr = redact(cmdObj),
- "error"_attr = redact(status));
+ LOGV2(22720,
+ "Command failed with a retryable error and will be retried",
+ "command"_attr = redact(cmdObj),
+ "error"_attr = redact(status));
continue;
}