summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorScott Hernandez <scotthernandez@gmail.com>2015-08-28 15:25:06 -0400
committerScott Hernandez <scotthernandez@gmail.com>2015-08-31 08:05:52 -0400
commitccbac2bd346c7bf422d1e13742842919932aafc6 (patch)
tree9dece88a55db37e3f8b8e396e48742ea3000f7de /src/mongo/db
parent18b5dbd7ebb80cfa0c07a0f480554293c1a0b3f7 (diff)
downloadmongo-ccbac2bd346c7bf422d1e13742842919932aafc6.tar.gz
SERVER-19956: arbiter uses last committed time during electionswq
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp46
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h9
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp32
3 files changed, 66 insertions, 21 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index f71b24c7b5d..557c3b0f919 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -365,7 +365,7 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig(
invariant(_rsConfigState == kConfigStartingUp);
const PostMemberStateUpdateAction action =
_setCurrentRSConfig_inlock(localConfig, myIndex.getValue());
- _setMyLastOptime_inlock(&lk, lastOpTime, false);
+ _setMyLastOptimeAndReport_inlock(&lk, lastOpTime, false);
_externalState->setGlobalTimestamp(lastOpTime.getTimestamp());
_updateTerm_incallback(term);
LOG(1) << "Current term is now " << term;
@@ -760,39 +760,30 @@ void ReplicationCoordinatorImpl::setMyHeartbeatMessage(const std::string& msg) {
void ReplicationCoordinatorImpl::setMyLastOptimeForward(const OpTime& opTime) {
stdx::unique_lock<stdx::mutex> lock(_mutex);
if (opTime > _getMyLastOptime_inlock()) {
- _setMyLastOptime_inlock(&lock, opTime, false);
+ _setMyLastOptimeAndReport_inlock(&lock, opTime, false);
}
}
void ReplicationCoordinatorImpl::setMyLastOptime(const OpTime& opTime) {
stdx::unique_lock<stdx::mutex> lock(_mutex);
- _setMyLastOptime_inlock(&lock, opTime, false);
+ _setMyLastOptimeAndReport_inlock(&lock, opTime, false);
}
void ReplicationCoordinatorImpl::resetMyLastOptime() {
stdx::unique_lock<stdx::mutex> lock(_mutex);
// Reset to uninitialized OpTime
- _setMyLastOptime_inlock(&lock, OpTime(), true);
+ _setMyLastOptimeAndReport_inlock(&lock, OpTime(), true);
}
-void ReplicationCoordinatorImpl::_setMyLastOptime_inlock(stdx::unique_lock<stdx::mutex>* lock,
- const OpTime& opTime,
- bool isRollbackAllowed) {
+void ReplicationCoordinatorImpl::_setMyLastOptimeAndReport_inlock(
+ stdx::unique_lock<stdx::mutex>* lock, const OpTime& opTime, bool isRollbackAllowed) {
invariant(lock->owns_lock());
- SlaveInfo* mySlaveInfo = &_slaveInfo[_getMyIndexInSlaveInfo_inlock()];
- invariant(isRollbackAllowed || mySlaveInfo->opTime <= opTime);
- _updateSlaveInfoOptime_inlock(mySlaveInfo, opTime);
+ _setMyLastOptime_inlock(opTime, isRollbackAllowed);
if (getReplicationMode() != modeReplSet) {
return;
}
- for (auto& opTimeWaiter : _opTimeWaiterList) {
- if (*(opTimeWaiter->opTime) <= opTime) {
- opTimeWaiter->condVar->notify_all();
- }
- }
-
if (_getMemberState_inlock().primary()) {
return;
}
@@ -802,6 +793,19 @@ void ReplicationCoordinatorImpl::_setMyLastOptime_inlock(stdx::unique_lock<stdx:
_externalState->forwardSlaveProgress(); // Must do this outside _mutex
}
+void ReplicationCoordinatorImpl::_setMyLastOptime_inlock(const OpTime& opTime,
+ bool isRollbackAllowed) {
+ SlaveInfo* mySlaveInfo = &_slaveInfo[_getMyIndexInSlaveInfo_inlock()];
+ invariant(isRollbackAllowed || mySlaveInfo->opTime <= opTime);
+ _updateSlaveInfoOptime_inlock(mySlaveInfo, opTime);
+
+ for (auto& opTimeWaiter : _opTimeWaiterList) {
+ if (*(opTimeWaiter->opTime) <= opTime) {
+ opTimeWaiter->condVar->notify_all();
+ }
+ }
+}
+
OpTime ReplicationCoordinatorImpl::getMyLastOptime() const {
stdx::lock_guard<stdx::mutex> lock(_mutex);
return _getMyLastOptime_inlock();
@@ -2604,7 +2608,7 @@ void ReplicationCoordinatorImpl::resetLastOpTimeFromOplog(OperationContext* txn)
lastOpTime = lastOpTimeStatus.getValue();
}
stdx::unique_lock<stdx::mutex> lk(_mutex);
- _setMyLastOptime_inlock(&lk, lastOpTime, true);
+ _setMyLastOptimeAndReport_inlock(&lk, lastOpTime, true);
_externalState->setGlobalTimestamp(lastOpTime.getTimestamp());
}
@@ -2678,6 +2682,10 @@ void ReplicationCoordinatorImpl::_setLastCommittedOpTime_inlock(const OpTime& co
return;
}
+ if (_getMemberState_inlock().arbiter()) {
+ _setMyLastOptime_inlock(committedOpTime, false);
+ }
+
_lastCommittedOpTime = committedOpTime;
auto maxSnapshotForOpTime = SnapshotInfo{committedOpTime, SnapshotName::max()};
@@ -2719,7 +2727,9 @@ Status ReplicationCoordinatorImpl::processReplSetRequestVotes(
return {ErrorCodes::BadValue, "not using election protocol v1"};
}
- updateTerm(args.getTerm());
+ auto termStatus = updateTerm(args.getTerm());
+ if (!termStatus.isOK())
+ return termStatus;
Status result{ErrorCodes::InternalError, "didn't set status in processReplSetRequestVotes"};
CBHStatus cbh = _replExecutor.scheduleWork(
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 263cc513aa3..6fd652cb951 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -659,10 +659,13 @@ private:
*
* This function has the same rules for "opTime" as setMyLastOptime(), unless
* "isRollbackAllowed" is true.
+ *
+ * This function will also report our position externally (like upstream) if necessary.
*/
- void _setMyLastOptime_inlock(stdx::unique_lock<stdx::mutex>* lock,
- const OpTime& opTime,
- bool isRollbackAllowed);
+ void _setMyLastOptimeAndReport_inlock(stdx::unique_lock<stdx::mutex>* lock,
+ const OpTime& opTime,
+ bool isRollbackAllowed);
+ void _setMyLastOptime_inlock(const OpTime& opTime, bool isRollbackAllowed);
/**
* Schedules a heartbeat to be sent to "target" at "when". "targetIndex" is the index
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
index 6ab99d225f1..1b118e1c53b 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
@@ -30,6 +30,7 @@
#include "mongo/platform/basic.h"
+#include "mongo/bson/json.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/operation_context_noop.h"
#include "mongo/db/repl/repl_set_heartbeat_args.h"
@@ -40,6 +41,7 @@
#include "mongo/db/repl/replication_coordinator_test_fixture.h"
#include "mongo/db/repl/topology_coordinator_impl.h"
#include "mongo/executor/network_interface_mock.h"
+#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/log.h"
@@ -256,6 +258,36 @@ TEST_F(ReplCoordHBV1Test, OnlyUnauthorizedUpCausesRecovering) {
assertMemberState(MemberState::RS_RECOVERING, "0");
}
+TEST_F(ReplCoordHBV1Test, ArbiterRecordsCommittedOpTimeFromHeartbeat) {
+ // Tests that an arbiter will update its committed optime from the heartbeat metadata
+ assertStartSuccess(fromjson(
+ "{_id:'mySet', version:1, protocolVersion:1, members:["
+ "{_id:1, host:'node1:12345', arbiterOnly:true}, "
+ "{_id:2, host:'node2:12345'}]}"),
+ HostAndPort("node1", 12345));
+ ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_ARBITER));
+
+ // calls processReplSetMetadata with the "committed" optime and verifies that the arbiter sets
+ // its current optime to 'expected'
+ auto test = [this](OpTime committedOpTime, OpTime expected) {
+ // process heartbeat metadata directly
+ StatusWith<rpc::ReplSetMetadata> metadata = rpc::ReplSetMetadata::readFromMetadata(BSON(
+ rpc::kReplSetMetadataFieldName << BSON(
+ "lastOpCommitted" << BSON("ts" << committedOpTime.getTimestamp() << "term"
+ << committedOpTime.getTerm()) << "lastOpVisible"
+ << BSON("ts" << committedOpTime.getTimestamp() << "term"
+ << committedOpTime.getTerm()) << "configVersion" << 1
+ << "primaryIndex" << 1 << "term" << committedOpTime.getTerm())));
+ getReplCoord()->processReplSetMetadata(metadata.getValue());
+
+ ASSERT_EQ(getReplCoord()->getMyLastOptime().getTimestamp(), expected.getTimestamp());
+ };
+
+ OpTime committedOpTime{Timestamp{10, 10}, 10};
+ test(committedOpTime, committedOpTime);
+ OpTime olderOpTime{Timestamp{2, 2}, 9};
+ test(olderOpTime, committedOpTime);
+}
} // namespace
} // namespace repl
} // namespace mongo