summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/repl/repl_coordinator_impl.cpp59
-rw-r--r--src/mongo/db/repl/repl_coordinator_impl_test.cpp60
2 files changed, 68 insertions, 51 deletions
diff --git a/src/mongo/db/repl/repl_coordinator_impl.cpp b/src/mongo/db/repl/repl_coordinator_impl.cpp
index e16deb9f26d..374e398ec1f 100644
--- a/src/mongo/db/repl/repl_coordinator_impl.cpp
+++ b/src/mongo/db/repl/repl_coordinator_impl.cpp
@@ -332,14 +332,13 @@ namespace {
{
boost::lock_guard<boost::mutex> lk(_mutex);
- // TODO(spencer): Assert that we've received a handshake for this node once the unit
- // test is doing that correctly
- OpTime& slaveOpTime = _slaveInfoMap[rid].opTime;
- LOG(3) << "Node with RID " << rid << " currently has optime " << slaveOpTime <<
+ SlaveInfo& slaveInfo = _slaveInfoMap[rid];
+ invariant(slaveInfo.memberID >= 0 || _getReplicationMode_inlock() == modeMasterSlave);
+
+ LOG(3) << "Node with RID " << rid << " currently has optime " << slaveInfo.opTime <<
"; updating to " << ts;
- if (slaveOpTime < ts) {
- slaveOpTime = ts;
- // TODO(spencer): update write concern tags if we're a replSet
+ if (slaveInfo.opTime < ts) {
+ slaveInfo.opTime = ts;
// Wake up any threads waiting for replication that now have their replication
// check satisfied
@@ -439,13 +438,7 @@ namespace {
if (slaveTime >= opTime) {
// This node has reached the desired optime, now we need to check if it is a part
// of the tagPattern.
- const MemberConfig* memberConfig = NULL;
- if (it->first == getMyRID(NULL)) {
- memberConfig = &_rsConfig.getMemberAt(_thisMembersConfigIndex);
- }
- else {
- memberConfig = _rsConfig.findMemberByID(it->second.memberID);
- }
+ const MemberConfig* memberConfig = _rsConfig.findMemberByID(it->second.memberID);
invariant(memberConfig);
for (MemberConfig::TagIterator it = memberConfig->tagsBegin();
it != memberConfig->tagsEnd(); ++it) {
@@ -693,16 +686,9 @@ namespace {
// SERVER-14550 Even though the "config" field isn't used on the other end in 2.8,
// we need to keep sending it for 2.6 compatibility.
// TODO(spencer): Remove this after 2.8 is released.
- if (rid == getMyRID(txn)) {
- entry.append("config",
- _rsConfig.getMemberAt(_thisMembersConfigIndex).toBSON(
- _rsConfig.getTagConfig()));
- }
- else {
- const MemberConfig* member = _rsConfig.findMemberByID(info.memberID);
- fassert(18651, member); // We ensured the member existed in processHandshake.
- entry.append("config", member->toBSON(_rsConfig.getTagConfig()));
- }
+ const MemberConfig* member = _rsConfig.findMemberByID(info.memberID);
+ fassert(18651, member); // We ensured the member existed in processHandshake.
+ entry.append("config", member->toBSON(_rsConfig.getTagConfig()));
}
}
}
@@ -711,28 +697,10 @@ namespace {
OperationContext* txn,
std::vector<BSONObj>* handshakes) {
boost::lock_guard<boost::mutex> lock(_mutex);
- // handshake obj for us
- BSONObjBuilder cmd;
- cmd.append("replSetUpdatePosition", 1);
- {
- BSONObjBuilder sub (cmd.subobjStart("handshake"));
- sub.append("handshake", getMyRID(txn));
- sub.append("member", _thisMembersConfigIndex);
- // SERVER-14550 Even though the "config" field isn't used on the other end in 2.8,
- // we need to keep sending it for 2.6 compatibility.
- // TODO(spencer): Remove this after 2.8 is released.
- sub.append("config", _rsConfig.getMemberAt(_thisMembersConfigIndex).toBSON(
- _rsConfig.getTagConfig()));
- }
- handshakes->push_back(cmd.obj());
-
- // handshake objs for all chained members
+ // handshake objs for ourself and all chained members
for (SlaveInfoMap::const_iterator itr = _slaveInfoMap.begin();
itr != _slaveInfoMap.end(); ++itr) {
const OID& oid = itr->first;
- if (oid == getMyRID(txn)) { // Already generated handshake for ourself
- continue;
- }
BSONObjBuilder cmd;
cmd.append("replSetUpdatePosition", 1);
{
@@ -1027,6 +995,11 @@ namespace {
myIndex,
_replExecutor.now(),
_getLastOpApplied_inlock());
+
+ // Ensure that there's an entry in the _slaveInfoMap for ourself
+ _slaveInfoMap[getMyRID(NULL)].memberID = _rsConfig.getMemberAt(myIndex).getId();
+ _slaveInfoMap[getMyRID(NULL)].hostAndPort =
+ _rsConfig.getMemberAt(myIndex).getHostAndPort();
_startHeartbeats();
}
diff --git a/src/mongo/db/repl/repl_coordinator_impl_test.cpp b/src/mongo/db/repl/repl_coordinator_impl_test.cpp
index 672c4acc7fb..f4ee27fd51d 100644
--- a/src/mongo/db/repl/repl_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/repl_coordinator_impl_test.cpp
@@ -489,7 +489,10 @@ namespace {
assertStartSuccess(
BSON("_id" << "mySet" <<
"version" << 2 <<
- "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0 ))),
+ "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0) <<
+ BSON("host" << "node2:12345" << "_id" << 1) <<
+ BSON("host" << "node3:12345" << "_id" << 2) <<
+ BSON("host" << "node4:12345" << "_id" << 3))),
HostAndPort("node1", 12345));
OperationContextNoop txn;
@@ -499,6 +502,16 @@ namespace {
OpTime time1(1, 1);
OpTime time2(1, 2);
+ HandshakeArgs handshake1;
+ ASSERT_OK(handshake1.initialize(BSON("handshake" << client1 << "member" << 1)));
+ ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake1));
+ HandshakeArgs handshake2;
+ ASSERT_OK(handshake2.initialize(BSON("handshake" << client2 << "member" << 2)));
+ ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake2));
+ HandshakeArgs handshake3;
+ ASSERT_OK(handshake3.initialize(BSON("handshake" << client3 << "member" << 3)));
+ ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake3));
+
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
writeConcern.wNumNodes = 2;
@@ -573,16 +586,16 @@ namespace {
HandshakeArgs handshake1;
ASSERT_OK(handshake1.initialize(BSON("handshake" << clientRID1 << "member" << 1)));
- getReplCoord()->processHandshake(&txn, handshake1);
+ ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake1));
HandshakeArgs handshake2;
ASSERT_OK(handshake2.initialize(BSON("handshake" << clientRID2 << "member" << 2)));
- getReplCoord()->processHandshake(&txn, handshake2);
+ ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake2));
HandshakeArgs handshake3;
ASSERT_OK(handshake3.initialize(BSON("handshake" << clientRID3 << "member" << 3)));
- getReplCoord()->processHandshake(&txn, handshake3);
+ ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake3));
HandshakeArgs handshake4;
ASSERT_OK(handshake4.initialize(BSON("handshake" << clientRID4 << "member" << 4)));
- getReplCoord()->processHandshake(&txn, handshake4);
+ ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake4));
// Test invalid write concern
WriteConcernOptions invalidWriteConcern;
@@ -714,7 +727,10 @@ namespace {
assertStartSuccess(
BSON("_id" << "mySet" <<
"version" << 2 <<
- "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0 ))),
+ "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0) <<
+ BSON("host" << "node2:12345" << "_id" << 1) <<
+ BSON("host" << "node3:12345" << "_id" << 2) <<
+ BSON("host" << "node4:12345" << "_id" << 3))),
HostAndPort("node1", 12345));
OperationContextNoop txn;
@@ -726,6 +742,16 @@ namespace {
OpTime time1(1, 1);
OpTime time2(1, 2);
+ HandshakeArgs handshake1;
+ ASSERT_OK(handshake1.initialize(BSON("handshake" << client1 << "member" << 1)));
+ ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake1));
+ HandshakeArgs handshake2;
+ ASSERT_OK(handshake2.initialize(BSON("handshake" << client2 << "member" << 2)));
+ ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake2));
+ HandshakeArgs handshake3;
+ ASSERT_OK(handshake3.initialize(BSON("handshake" << client3 << "member" << 3)));
+ ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake3));
+
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoTimeout;
writeConcern.wNumNodes = 2;
@@ -763,7 +789,9 @@ namespace {
assertStartSuccess(
BSON("_id" << "mySet" <<
"version" << 2 <<
- "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0 ))),
+ "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0) <<
+ BSON("host" << "node2:12345" << "_id" << 1) <<
+ BSON("host" << "node3:12345" << "_id" << 2))),
HostAndPort("node1", 12345));
OperationContextNoop txn;
ReplicationAwaiter awaiter(getReplCoord(), &txn);
@@ -773,6 +801,13 @@ namespace {
OpTime time1(1, 1);
OpTime time2(1, 2);
+ HandshakeArgs handshake1;
+ ASSERT_OK(handshake1.initialize(BSON("handshake" << client1 << "member" << 1)));
+ ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake1));
+ HandshakeArgs handshake2;
+ ASSERT_OK(handshake2.initialize(BSON("handshake" << client2 << "member" << 2)));
+ ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake2));
+
WriteConcernOptions writeConcern;
writeConcern.wTimeout = 50;
writeConcern.wNumNodes = 2;
@@ -792,7 +827,9 @@ namespace {
assertStartSuccess(
BSON("_id" << "mySet" <<
"version" << 2 <<
- "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0 ))),
+ "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0) <<
+ BSON("host" << "node2:12345" << "_id" << 1) <<
+ BSON("host" << "node3:12345" << "_id" << 2))),
HostAndPort("node1", 12345));
OperationContextNoop txn;
ReplicationAwaiter awaiter(getReplCoord(), &txn);
@@ -802,6 +839,13 @@ namespace {
OpTime time1(1, 1);
OpTime time2(1, 2);
+ HandshakeArgs handshake1;
+ ASSERT_OK(handshake1.initialize(BSON("handshake" << client1 << "member" << 1)));
+ ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake1));
+ HandshakeArgs handshake2;
+ ASSERT_OK(handshake2.initialize(BSON("handshake" << client2 << "member" << 2)));
+ ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake2));
+
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoTimeout;
writeConcern.wNumNodes = 2;