/**
* Copyright 2014 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/platform/basic.h"
#include "mongo/db/repl/replica_set_config.h"
#include
#include "mongo/bson/util/bson_check.h"
#include "mongo/bson/util/bson_extract.h"
#include "mongo/db/jsobj.h"
#include "mongo/stdx/functional.h"
namespace mongo {
namespace repl {
#ifndef _MSC_VER
const size_t ReplicaSetConfig::kMaxMembers;
const size_t ReplicaSetConfig::kMaxVotingMembers;
#endif
const std::string ReplicaSetConfig::kVersionFieldName = "version";
const std::string ReplicaSetConfig::kMajorityWriteConcernModeName = "$majority";
const Seconds ReplicaSetConfig::kDefaultHeartbeatTimeoutPeriod(10);
namespace {
const std::string kIdFieldName = "_id";
const std::string kMembersFieldName = "members";
const std::string kSettingsFieldName = "settings";
const std::string kStepDownCheckWriteConcernModeName = "$stepDownCheck";
const std::string kProtocolVersionFieldName = "protocolVersion";
const std::string kLegalConfigTopFieldNames[] = {kIdFieldName,
ReplicaSetConfig::kVersionFieldName,
kMembersFieldName,
kSettingsFieldName,
kProtocolVersionFieldName};
const std::string kHeartbeatTimeoutFieldName = "heartbeatTimeoutSecs";
const std::string kChainingAllowedFieldName = "chainingAllowed";
const std::string kGetLastErrorDefaultsFieldName = "getLastErrorDefaults";
const std::string kGetLastErrorModesFieldName = "getLastErrorModes";
} // namespace
ReplicaSetConfig::ReplicaSetConfig()
: _isInitialized(false), _heartbeatTimeoutPeriod(0), _protocolVersion(0) {}
Status ReplicaSetConfig::initialize(const BSONObj& cfg) {
_isInitialized = false;
_members.clear();
Status status =
bsonCheckOnlyHasFields("replica set configuration", cfg, kLegalConfigTopFieldNames);
if (!status.isOK())
return status;
//
// Parse replSetName
//
status = bsonExtractStringField(cfg, kIdFieldName, &_replSetName);
if (!status.isOK())
return status;
//
// Parse version
//
status = bsonExtractIntegerField(cfg, kVersionFieldName, &_version);
if (!status.isOK())
return status;
//
// Parse members
//
BSONElement membersElement;
status = bsonExtractTypedField(cfg, kMembersFieldName, Array, &membersElement);
if (!status.isOK())
return status;
for (BSONObj::iterator membersIterator(membersElement.Obj()); membersIterator.more();) {
BSONElement memberElement = membersIterator.next();
if (memberElement.type() != Object) {
return Status(ErrorCodes::TypeMismatch,
str::stream() << "Expected type of " << kMembersFieldName << "."
<< memberElement.fieldName() << " to be Object, but found "
<< typeName(memberElement.type()));
}
_members.resize(_members.size() + 1);
status = _members.back().initialize(memberElement.Obj(), &_tagConfig);
if (!status.isOK())
return status;
}
//
// Parse settings
//
BSONElement settingsElement;
status = bsonExtractTypedField(cfg, kSettingsFieldName, Object, &settingsElement);
BSONObj settings;
if (status.isOK()) {
settings = settingsElement.Obj();
} else if (status != ErrorCodes::NoSuchKey) {
return status;
}
status = _parseSettingsSubdocument(settings);
if (!status.isOK())
return status;
_calculateMajorities();
_addInternalWriteConcernModes();
_isInitialized = true;
return Status::OK();
}
Status ReplicaSetConfig::_parseSettingsSubdocument(const BSONObj& settings) {
//
// Parse heartbeatTimeoutSecs
//
BSONElement hbTimeoutSecsElement = settings[kHeartbeatTimeoutFieldName];
if (hbTimeoutSecsElement.eoo()) {
_heartbeatTimeoutPeriod = Seconds(kDefaultHeartbeatTimeoutPeriod);
} else if (hbTimeoutSecsElement.isNumber()) {
_heartbeatTimeoutPeriod = Seconds(hbTimeoutSecsElement.numberInt());
} else {
return Status(ErrorCodes::TypeMismatch,
str::stream() << "Expected type of " << kSettingsFieldName << "."
<< kHeartbeatTimeoutFieldName
<< " to be a number, but found a value of type "
<< typeName(hbTimeoutSecsElement.type()));
}
//
// Parse chainingAllowed
//
Status status = bsonExtractBooleanFieldWithDefault(
settings, kChainingAllowedFieldName, true, &_chainingAllowed);
if (!status.isOK())
return status;
//
// Parse getLastErrorDefaults
//
BSONElement gleDefaultsElement;
status = bsonExtractTypedField(
settings, kGetLastErrorDefaultsFieldName, Object, &gleDefaultsElement);
if (status.isOK()) {
status = _defaultWriteConcern.parse(gleDefaultsElement.Obj());
if (!status.isOK())
return status;
} else if (status == ErrorCodes::NoSuchKey) {
// Default write concern is w: 1.
_defaultWriteConcern.reset();
_defaultWriteConcern.wNumNodes = 1;
} else {
return status;
}
//
// Parse getLastErrorModes
//
BSONElement gleModesElement;
status = bsonExtractTypedField(settings, kGetLastErrorModesFieldName, Object, &gleModesElement);
BSONObj gleModes;
if (status.isOK()) {
gleModes = gleModesElement.Obj();
} else if (status != ErrorCodes::NoSuchKey) {
return status;
}
for (BSONObj::iterator gleModeIter(gleModes); gleModeIter.more();) {
const BSONElement modeElement = gleModeIter.next();
if (_customWriteConcernModes.find(modeElement.fieldNameStringData()) !=
_customWriteConcernModes.end()) {
return Status(ErrorCodes::DuplicateKey,
str::stream() << kSettingsFieldName << '.' << kGetLastErrorModesFieldName
<< " contains multiple fields named "
<< modeElement.fieldName());
}
if (modeElement.type() != Object) {
return Status(ErrorCodes::TypeMismatch,
str::stream() << "Expected " << kSettingsFieldName << '.'
<< kGetLastErrorModesFieldName << '.'
<< modeElement.fieldName() << " to be an Object, not "
<< typeName(modeElement.type()));
}
ReplicaSetTagPattern pattern = _tagConfig.makePattern();
for (BSONObj::iterator constraintIter(modeElement.Obj()); constraintIter.more();) {
const BSONElement constraintElement = constraintIter.next();
if (!constraintElement.isNumber()) {
return Status(ErrorCodes::TypeMismatch,
str::stream()
<< "Expected " << kSettingsFieldName << '.'
<< kGetLastErrorModesFieldName << '.' << modeElement.fieldName()
<< '.' << constraintElement.fieldName() << " to be a number, not "
<< typeName(constraintElement.type()));
}
const int minCount = constraintElement.numberInt();
if (minCount <= 0) {
return Status(ErrorCodes::BadValue,
str::stream() << "Value of " << kSettingsFieldName << '.'
<< kGetLastErrorModesFieldName << '.'
<< modeElement.fieldName() << '.'
<< constraintElement.fieldName()
<< " must be positive, but found " << minCount);
}
status = _tagConfig.addTagCountConstraintToPattern(
&pattern, constraintElement.fieldNameStringData(), minCount);
if (!status.isOK()) {
return status;
}
}
_customWriteConcernModes[modeElement.fieldNameStringData()] = pattern;
}
//
// Parse protocol version
//
status = bsonExtractIntegerField(settings, kProtocolVersionFieldName, &_protocolVersion);
if (!status.isOK() && status != ErrorCodes::NoSuchKey) {
return status;
}
return Status::OK();
}
Status ReplicaSetConfig::validate() const {
if (_version <= 0 || _version > std::numeric_limits::max()) {
return Status(ErrorCodes::BadValue,
str::stream() << kVersionFieldName << " field value of " << _version
<< " is out of range");
}
if (_replSetName.empty()) {
return Status(ErrorCodes::BadValue,
str::stream() << "Replica set configuration must have non-empty "
<< kIdFieldName << " field");
}
if (_heartbeatTimeoutPeriod < Seconds(0)) {
return Status(ErrorCodes::BadValue,
str::stream() << kSettingsFieldName << '.' << kHeartbeatTimeoutFieldName
<< " field value must be non-negative, "
"but found " << _heartbeatTimeoutPeriod.count());
}
if (_members.size() > kMaxMembers || _members.empty()) {
return Status(ErrorCodes::BadValue,
str::stream() << "Replica set configuration contains " << _members.size()
<< " members, but must have at least 1 and no more than "
<< kMaxMembers);
}
size_t localhostCount = 0;
size_t voterCount = 0;
size_t arbiterCount = 0;
size_t electableCount = 0;
for (size_t i = 0; i < _members.size(); ++i) {
const MemberConfig& memberI = _members[i];
Status status = memberI.validate();
if (!status.isOK())
return status;
if (memberI.getHostAndPort().isLocalHost()) {
++localhostCount;
}
if (memberI.isVoter()) {
++voterCount;
}
// Nodes may be arbiters or electable, or neither, but never both.
if (memberI.isArbiter()) {
++arbiterCount;
} else if (memberI.getPriority() > 0) {
++electableCount;
}
for (size_t j = 0; j < _members.size(); ++j) {
if (i == j)
continue;
const MemberConfig& memberJ = _members[j];
if (memberI.getId() == memberJ.getId()) {
return Status(ErrorCodes::BadValue,
str::stream()
<< "Found two member configurations with same "
<< MemberConfig::kIdFieldName << " field, " << kMembersFieldName
<< "." << i << "." << MemberConfig::kIdFieldName
<< " == " << kMembersFieldName << "." << j << "."
<< MemberConfig::kIdFieldName << " == " << memberI.getId());
}
if (memberI.getHostAndPort() == memberJ.getHostAndPort()) {
return Status(ErrorCodes::BadValue,
str::stream() << "Found two member configurations with same "
<< MemberConfig::kHostFieldName << " field, "
<< kMembersFieldName << "." << i << "."
<< MemberConfig::kHostFieldName
<< " == " << kMembersFieldName << "." << j << "."
<< MemberConfig::kHostFieldName
<< " == " << memberI.getHostAndPort().toString());
}
}
}
if (localhostCount != 0 && localhostCount != _members.size()) {
return Status(
ErrorCodes::BadValue,
str::stream()
<< "Either all host names in a replica set configuration must be localhost "
"references, or none must be; found " << localhostCount << " out of "
<< _members.size());
}
if (voterCount > kMaxVotingMembers || voterCount == 0) {
return Status(ErrorCodes::BadValue,
str::stream() << "Replica set configuration contains " << voterCount
<< " voting members, but must be at least 1 and no more than "
<< kMaxVotingMembers);
}
if (electableCount == 0) {
return Status(ErrorCodes::BadValue,
"Replica set configuration must contain at least "
"one non-arbiter member with priority > 0");
}
// TODO(schwerin): Validate satisfiability of write modes? Omitting for backwards
// compatibility.
if (_defaultWriteConcern.wMode.empty()) {
if (_defaultWriteConcern.wNumNodes == 0) {
return Status(ErrorCodes::BadValue,
"Default write concern mode must wait for at least 1 member");
}
} else {
if (WriteConcernOptions::kMajority != _defaultWriteConcern.wMode &&
!findCustomWriteMode(_defaultWriteConcern.wMode).isOK()) {
return Status(ErrorCodes::BadValue,
str::stream() << "Default write concern requires undefined write mode "
<< _defaultWriteConcern.wMode);
}
}
if (_protocolVersion < 0 || _protocolVersion > std::numeric_limits::max()) {
return Status(ErrorCodes::BadValue,
str::stream() << kProtocolVersionFieldName << " field value of "
<< _protocolVersion << " is out of range");
}
return Status::OK();
}
Status ReplicaSetConfig::checkIfWriteConcernCanBeSatisfied(
const WriteConcernOptions& writeConcern) const {
if (!writeConcern.wMode.empty() && writeConcern.wMode != WriteConcernOptions::kMajority) {
StatusWith tagPatternStatus = findCustomWriteMode(writeConcern.wMode);
if (!tagPatternStatus.isOK()) {
return tagPatternStatus.getStatus();
}
ReplicaSetTagMatch matcher(tagPatternStatus.getValue());
for (size_t j = 0; j < _members.size(); ++j) {
const MemberConfig& memberConfig = _members[j];
for (MemberConfig::TagIterator it = memberConfig.tagsBegin();
it != memberConfig.tagsEnd();
++it) {
if (matcher.update(*it)) {
return Status::OK();
}
}
}
// Even if all the nodes in the set had a given write it still would not satisfy this
// write concern mode.
return Status(ErrorCodes::CannotSatisfyWriteConcern,
str::stream() << "Not enough nodes match write concern mode \""
<< writeConcern.wMode << "\"");
} else {
int nodesRemaining = writeConcern.wNumNodes;
for (size_t j = 0; j < _members.size(); ++j) {
if (!_members[j].isArbiter()) { // Only count data-bearing nodes
--nodesRemaining;
if (nodesRemaining <= 0) {
return Status::OK();
}
}
}
return Status(ErrorCodes::CannotSatisfyWriteConcern, "Not enough data-bearing nodes");
}
}
const MemberConfig& ReplicaSetConfig::getMemberAt(size_t i) const {
invariant(i < _members.size());
return _members[i];
}
const MemberConfig* ReplicaSetConfig::findMemberByID(int id) const {
for (std::vector::const_iterator it = _members.begin(); it != _members.end();
++it) {
if (it->getId() == id) {
return &(*it);
}
}
return NULL;
}
const int ReplicaSetConfig::findMemberIndexByHostAndPort(const HostAndPort& hap) const {
int x = 0;
for (std::vector::const_iterator it = _members.begin(); it != _members.end();
++it) {
if (it->getHostAndPort() == hap) {
return x;
}
++x;
}
return -1;
}
const int ReplicaSetConfig::findMemberIndexByConfigId(long long configId) const {
int x = 0;
for (const auto& member : _members) {
if (member.getId() == configId) {
return x;
}
++x;
}
return -1;
}
const MemberConfig* ReplicaSetConfig::findMemberByHostAndPort(const HostAndPort& hap) const {
int idx = findMemberIndexByHostAndPort(hap);
return idx != -1 ? &getMemberAt(idx) : NULL;
}
ReplicaSetTag ReplicaSetConfig::findTag(StringData key, StringData value) const {
return _tagConfig.findTag(key, value);
}
StatusWith ReplicaSetConfig::findCustomWriteMode(
StringData patternName) const {
const StringMap::const_iterator iter =
_customWriteConcernModes.find(patternName);
if (iter == _customWriteConcernModes.end()) {
return StatusWith(
ErrorCodes::UnknownReplWriteConcern,
str::stream() << "No write concern mode named '" << escape(patternName.toString())
<< "' found in replica set configuration");
}
return StatusWith(iter->second);
}
void ReplicaSetConfig::_calculateMajorities() {
const int voters = std::count_if(_members.begin(),
_members.end(),
stdx::bind(&MemberConfig::isVoter, stdx::placeholders::_1));
const int arbiters =
std::count_if(_members.begin(),
_members.end(),
stdx::bind(&MemberConfig::isArbiter, stdx::placeholders::_1));
_totalVotingMembers = voters;
_majorityVoteCount = voters / 2 + 1;
_writeMajority = std::min(_majorityVoteCount, voters - arbiters);
}
void ReplicaSetConfig::_addInternalWriteConcernModes() {
// $majority: the majority of voting nodes or all non-arbiter voting nodes if
// the majority of voting nodes are arbiters.
ReplicaSetTagPattern pattern = _tagConfig.makePattern();
Status status = _tagConfig.addTagCountConstraintToPattern(
&pattern, MemberConfig::kInternalVoterTagName, _writeMajority);
if (status.isOK()) {
_customWriteConcernModes[kMajorityWriteConcernModeName] = pattern;
} else if (status != ErrorCodes::NoSuchKey) {
// NoSuchKey means we have no $voter-tagged nodes in this config;
// other errors are unexpected.
fassert(28693, status);
}
// $stepDownCheck: one electable node plus ourselves
pattern = _tagConfig.makePattern();
status = _tagConfig.addTagCountConstraintToPattern(
&pattern, MemberConfig::kInternalElectableTagName, 2);
if (status.isOK()) {
_customWriteConcernModes[kStepDownCheckWriteConcernModeName] = pattern;
} else if (status != ErrorCodes::NoSuchKey) {
// NoSuchKey means we have no $electable-tagged nodes in this config;
// other errors are unexpected
fassert(28694, status);
}
}
BSONObj ReplicaSetConfig::toBSON() const {
BSONObjBuilder configBuilder;
configBuilder.append(kIdFieldName, _replSetName);
configBuilder.appendIntOrLL(kVersionFieldName, _version);
BSONArrayBuilder members(configBuilder.subarrayStart(kMembersFieldName));
for (MemberIterator mem = membersBegin(); mem != membersEnd(); mem++) {
members.append(mem->toBSON(getTagConfig()));
}
members.done();
BSONObjBuilder settingsBuilder(configBuilder.subobjStart(kSettingsFieldName));
settingsBuilder.append(kChainingAllowedFieldName, _chainingAllowed);
settingsBuilder.appendIntOrLL(kHeartbeatTimeoutFieldName, _heartbeatTimeoutPeriod.count());
BSONObjBuilder gleModes(settingsBuilder.subobjStart(kGetLastErrorModesFieldName));
for (StringMap::const_iterator mode = _customWriteConcernModes.begin();
mode != _customWriteConcernModes.end();
++mode) {
if (mode->first[0] == '$') {
// Filter out internal modes
continue;
}
BSONObjBuilder modeBuilder(gleModes.subobjStart(mode->first));
for (ReplicaSetTagPattern::ConstraintIterator itr = mode->second.constraintsBegin();
itr != mode->second.constraintsEnd();
itr++) {
modeBuilder.append(_tagConfig.getTagKey(ReplicaSetTag(itr->getKeyIndex(), 0)),
itr->getMinCount());
}
modeBuilder.done();
}
gleModes.done();
settingsBuilder.append(kGetLastErrorDefaultsFieldName, _defaultWriteConcern.toBSON());
settingsBuilder.append(kProtocolVersionFieldName, _protocolVersion);
settingsBuilder.done();
return configBuilder.obj();
}
std::vector ReplicaSetConfig::getWriteConcernNames() const {
std::vector names;
for (StringMap::const_iterator mode = _customWriteConcernModes.begin();
mode != _customWriteConcernModes.end();
++mode) {
names.push_back(mode->first);
}
return names;
}
} // namespace repl
} // namespace mongo