summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authormatt dannenberg <matt.dannenberg@10gen.com>2014-07-16 10:59:01 -0400
committermatt dannenberg <matt.dannenberg@10gen.com>2014-07-17 07:19:01 -0400
commitb7a18a30b27b900cbfc6c71b033c85023296a2fd (patch)
treef88b568911ae6035aafa50831c3786a25523d8d4 /src/mongo
parent4da16b54347c0503d96d95eab3d0952e59062825 (diff)
downloadmongo-b7a18a30b27b900cbfc6c71b033c85023296a2fd.tar.gz
SERVER-14458 swap LegacyReplicationCoordinator for the HybridReplicationCoordinator
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/SConscript1
-rw-r--r--src/mongo/db/db.cpp18
-rw-r--r--src/mongo/db/repl/repl_coordinator_hybrid.cpp286
-rw-r--r--src/mongo/db/repl/repl_coordinator_hybrid.h148
4 files changed, 438 insertions, 15 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript
index e7f7c57f3f4..7a935b1a8f4 100644
--- a/src/mongo/SConscript
+++ b/src/mongo/SConscript
@@ -575,6 +575,7 @@ serverOnlyFiles = [ "db/curop.cpp",
"db/repl/rs.cpp",
"db/repl/consensus.cpp",
"db/repl/rs_initiate.cpp",
+ "db/repl/repl_coordinator_hybrid.cpp",
"db/repl/repl_coordinator_legacy.cpp",
"db/repl/repl_set_seed_list.cpp",
"db/repl/repl_set_health_poll_task.cpp",
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index 7d067fea53f..0b18379ea20 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -70,8 +70,8 @@
#include "mongo/db/repair_database.h"
#include "mongo/db/repl/network_interface_impl.h"
#include "mongo/db/repl/repl_coordinator_global.h"
+#include "mongo/db/repl/repl_coordinator_hybrid.h"
#include "mongo/db/repl/repl_coordinator_impl.h"
-#include "mongo/db/repl/repl_coordinator_legacy.h"
#include "mongo/db/repl/repl_settings.h"
#include "mongo/db/repl/rs.h"
#include "mongo/db/repl/topology_coordinator_impl.h"
@@ -828,12 +828,6 @@ MONGO_INITIALIZER(SetGlobalConfigExperiment)(InitializerContext* context) {
}
namespace {
- // TODO(spencer): Remove this startup parameter once the new ReplicationCoordinator is fully
- // working
- MONGO_EXPORT_STARTUP_SERVER_PARAMETER(useNewReplCoordinator, bool, false);
-} // namespace
-
-namespace {
repl::ReplSettings replSettings;
} // namespace
@@ -843,14 +837,8 @@ namespace mongo {
}
} // namespace mongo
-MONGO_INITIALIZER(CreateReplicationCoordinator)(InitializerContext* context) {
- if (useNewReplCoordinator) {
- repl::setGlobalReplicationCoordinator(
- new repl::ReplicationCoordinatorImpl(replSettings));
- } else {
- repl::setGlobalReplicationCoordinator(
- new repl::LegacyReplicationCoordinator(replSettings));
- }
+MONGO_INITIALIZER(CreateReplicationManager)(InitializerContext* context) {
+ repl::setGlobalReplicationCoordinator(new repl::HybridReplicationCoordinator(replSettings));
return Status::OK();
}
diff --git a/src/mongo/db/repl/repl_coordinator_hybrid.cpp b/src/mongo/db/repl/repl_coordinator_hybrid.cpp
new file mode 100644
index 00000000000..7ca34597f20
--- /dev/null
+++ b/src/mongo/db/repl/repl_coordinator_hybrid.cpp
@@ -0,0 +1,286 @@
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/repl_coordinator_hybrid.h"
+
+namespace mongo {
+
+ MONGO_LOG_DEFAULT_COMPONENT_FILE(::mongo::logger::LogComponent::kReplication);
+
+namespace repl {
+
+ HybridReplicationCoordinator::HybridReplicationCoordinator(const ReplSettings& settings) :
+ _legacy(settings), _impl(settings) {}
+ HybridReplicationCoordinator::~HybridReplicationCoordinator() {}
+
+ void HybridReplicationCoordinator::startReplication(
+ TopologyCoordinator* topCoord,
+ ReplicationExecutor::NetworkInterface* network) {
+ _legacy.startReplication(topCoord, network);
+ _impl.startReplication(topCoord, network);
+ }
+
+ void HybridReplicationCoordinator::shutdown() {
+ _legacy.shutdown();
+ _impl.shutdown();
+ }
+
+ bool HybridReplicationCoordinator::isShutdownOkay() const {
+ bool legacyResponse = _legacy.isShutdownOkay();
+ return legacyResponse;
+ }
+
+ ReplSettings& HybridReplicationCoordinator::getSettings() {
+ ReplSettings& legacySettings = _legacy.getSettings();
+ return legacySettings;
+ }
+
+ ReplicationCoordinator::Mode HybridReplicationCoordinator::getReplicationMode() const {
+ Mode legacyMode = _legacy.getReplicationMode();
+ return legacyMode;
+ }
+
+ MemberState HybridReplicationCoordinator::getCurrentMemberState() const {
+ MemberState legacyState = _legacy.getCurrentMemberState();
+ return legacyState;
+ }
+
+ ReplicationCoordinator::StatusAndDuration HybridReplicationCoordinator::awaitReplication(
+ const OperationContext* txn,
+ const OpTime& ts,
+ const WriteConcernOptions& writeConcern) {
+ StatusAndDuration legacyStatus = _legacy.awaitReplication(txn, ts, writeConcern);
+ return legacyStatus;
+ }
+
+ ReplicationCoordinator::StatusAndDuration
+ HybridReplicationCoordinator::awaitReplicationOfLastOp(
+ const OperationContext* txn,
+ const WriteConcernOptions& writeConcern) {
+ StatusAndDuration legacyStatus = _legacy.awaitReplicationOfLastOp(txn, writeConcern);
+ return legacyStatus;
+ }
+
+ Status HybridReplicationCoordinator::stepDown(bool force,
+ const Milliseconds& waitTime,
+ const Milliseconds& stepdownTime) {
+ Status legacyStatus = _legacy.stepDown(force, waitTime, stepdownTime);
+ Status implStatus = _impl.stepDown(force, waitTime, stepdownTime);
+ return legacyStatus;
+ }
+
+ Status HybridReplicationCoordinator::stepDownAndWaitForSecondary(
+ const Milliseconds& initialWaitTime,
+ const Milliseconds& stepdownTime,
+ const Milliseconds& postStepdownWaitTime) {
+ Status legacyStatus = _legacy.stepDownAndWaitForSecondary(initialWaitTime,
+ stepdownTime,
+ postStepdownWaitTime);
+ Status implStatus = _impl.stepDownAndWaitForSecondary(initialWaitTime,
+ stepdownTime,
+ postStepdownWaitTime);
+ return legacyStatus;
+ }
+
+ bool HybridReplicationCoordinator::isMasterForReportingPurposes() {
+ bool legacyResponse = _legacy.isMasterForReportingPurposes();
+ _impl.isMasterForReportingPurposes();
+ return legacyResponse;
+ }
+
+ bool HybridReplicationCoordinator::canAcceptWritesForDatabase(const StringData& dbName) {
+ bool legacyResponse = _legacy.canAcceptWritesForDatabase(dbName);
+ _impl.canAcceptWritesForDatabase(dbName);
+ return legacyResponse;
+ }
+
+ Status HybridReplicationCoordinator::canServeReadsFor(const NamespaceString& ns, bool slaveOk) {
+ Status legacyStatus = _legacy.canServeReadsFor(ns, slaveOk);
+ Status implStatus = _impl.canServeReadsFor(ns, slaveOk);
+ return legacyStatus;
+ }
+
+ bool HybridReplicationCoordinator::shouldIgnoreUniqueIndex(const IndexDescriptor* idx) {
+ bool legacyResponse = _legacy.shouldIgnoreUniqueIndex(idx);
+ _impl.shouldIgnoreUniqueIndex(idx);
+ return legacyResponse;
+ }
+
+ Status HybridReplicationCoordinator::setLastOptime(const OID& rid,
+ const OpTime& ts) {
+ Status legacyStatus = _legacy.setLastOptime(rid, ts);
+ Status implStatus = _impl.setLastOptime(rid, ts);
+ return legacyStatus;
+ }
+
+ OID HybridReplicationCoordinator::getElectionId() {
+ OID legacyOID = _legacy.getElectionId();
+ _impl.getElectionId();
+ return legacyOID;
+ }
+
+ void HybridReplicationCoordinator::processReplSetGetStatus(BSONObjBuilder* result) {
+ _legacy.processReplSetGetStatus(result);
+ BSONObjBuilder implResult;
+ _impl.processReplSetGetStatus(&implResult);
+ }
+
+ bool HybridReplicationCoordinator::setMaintenanceMode(bool activate) {
+ bool legacyResponse = _legacy.setMaintenanceMode(activate);
+ _impl.setMaintenanceMode(activate);
+ return legacyResponse;
+ }
+
+ Status HybridReplicationCoordinator::processHeartbeat(const BSONObj& cmdObj,
+ BSONObjBuilder* resultObj) {
+ Status legacyStatus = _legacy.processHeartbeat(cmdObj, resultObj);
+ BSONObjBuilder implResult;
+ return legacyStatus;
+ }
+
+ Status HybridReplicationCoordinator::processReplSetReconfig(OperationContext* txn,
+ const ReplSetReconfigArgs& args,
+ BSONObjBuilder* resultObj) {
+ Status legacyStatus = _legacy.processReplSetReconfig(txn, args, resultObj);
+ BSONObjBuilder implResult;
+ Status implStatus = _impl.processReplSetReconfig(txn, args, &implResult);
+ return legacyStatus;
+ }
+
+ Status HybridReplicationCoordinator::processReplSetInitiate(OperationContext* txn,
+ const BSONObj& givenConfig,
+ BSONObjBuilder* resultObj) {
+ Status legacyStatus = _legacy.processReplSetInitiate(txn, givenConfig, resultObj);
+ BSONObjBuilder implResult;
+ Status implStatus = _impl.processReplSetInitiate(txn, givenConfig, &implResult);
+ return legacyStatus;
+ }
+
+ Status HybridReplicationCoordinator::processReplSetGetRBID(BSONObjBuilder* resultObj) {
+ Status legacyStatus = _legacy.processReplSetGetRBID(resultObj);
+ BSONObjBuilder implResult;
+ Status implStatus = _impl.processReplSetGetRBID(&implResult);
+ return legacyStatus;
+ }
+
+ Status HybridReplicationCoordinator::processReplSetFresh(const ReplSetFreshArgs& args,
+ BSONObjBuilder* resultObj) {
+ Status legacyStatus = _legacy.processReplSetFresh(args, resultObj);
+ BSONObjBuilder implResult;
+ Status implStatus = _impl.processReplSetFresh(args, &implResult);
+ return legacyStatus;
+ }
+
+ Status HybridReplicationCoordinator::processReplSetElect(const ReplSetElectArgs& args,
+ BSONObjBuilder* resultObj) {
+ Status legacyStatus = _legacy.processReplSetElect(args, resultObj);
+ BSONObjBuilder implResult;
+ Status implStatus = _impl.processReplSetElect(args, &implResult);
+ return legacyStatus;
+ }
+
+ void HybridReplicationCoordinator::incrementRollbackID() {
+ _legacy.incrementRollbackID();
+ _impl.incrementRollbackID();
+ }
+
+ Status HybridReplicationCoordinator::processReplSetFreeze(int secs, BSONObjBuilder* resultObj) {
+ Status legacyStatus = _legacy.processReplSetFreeze(secs, resultObj);
+ BSONObjBuilder implResult;
+ Status implStatus = _impl.processReplSetFreeze(secs, &implResult);
+ return legacyStatus;
+ }
+
+ Status HybridReplicationCoordinator::processReplSetMaintenance(bool activate,
+ BSONObjBuilder* resultObj) {
+ Status legacyStatus = _legacy.processReplSetMaintenance(activate, resultObj);
+ BSONObjBuilder implResult;
+ Status implStatus = _impl.processReplSetMaintenance(activate, &implResult);
+ return legacyStatus;
+ }
+
+ Status HybridReplicationCoordinator::processReplSetSyncFrom(const std::string& target,
+ BSONObjBuilder* resultObj) {
+ Status legacyStatus = _legacy.processReplSetSyncFrom(target, resultObj);
+ BSONObjBuilder implResult;
+ Status implStatus = _impl.processReplSetSyncFrom(target, &implResult);
+ return legacyStatus;
+ }
+
+ Status HybridReplicationCoordinator::processReplSetUpdatePosition(const BSONArray& updates,
+ BSONObjBuilder* resultObj) {
+ Status legacyStatus = _legacy.processReplSetUpdatePosition(updates, resultObj);
+ BSONObjBuilder implResult;
+ Status implStatus = _impl.processReplSetUpdatePosition(updates, &implResult);
+ return legacyStatus;
+ }
+
+ Status HybridReplicationCoordinator::processReplSetUpdatePositionHandshake(
+ const BSONObj& handshake,
+ BSONObjBuilder* resultObj) {
+ Status legacyStatus = _legacy.processReplSetUpdatePositionHandshake(handshake, resultObj);
+ BSONObjBuilder implResult;
+ Status implStatus = _impl.processReplSetUpdatePositionHandshake(handshake, &implResult);
+ return legacyStatus;
+ }
+
+ bool HybridReplicationCoordinator::processHandshake(const OID& remoteID,
+ const BSONObj& handshake) {
+ bool legacyResponse = _legacy.processHandshake(remoteID, handshake);
+ _impl.processHandshake(remoteID, handshake);
+ return legacyResponse;
+ }
+
+ void HybridReplicationCoordinator::waitUpToOneSecondForOptimeChange(const OpTime& ot) {
+ _legacy.waitUpToOneSecondForOptimeChange(ot);
+ //TODO(spencer) switch to _impl.waitUpToOneSecondForOptimeChange(ot); once implemented
+ }
+
+ bool HybridReplicationCoordinator::buildsIndexes() {
+ bool legacyResponse = _legacy.buildsIndexes();
+ _impl.buildsIndexes();
+ return legacyResponse;
+ }
+
+ vector<BSONObj> HybridReplicationCoordinator::getHostsWrittenTo(const OpTime& op) {
+ vector<BSONObj> legacyResponse = _legacy.getHostsWrittenTo(op);
+ vector<BSONObj> implResponse = _impl.getHostsWrittenTo(op);
+ return legacyResponse;
+ }
+
+ Status HybridReplicationCoordinator::checkIfWriteConcernCanBeSatisfied(
+ const WriteConcernOptions& writeConcern) const {
+ Status legacyStatus = _legacy.checkIfWriteConcernCanBeSatisfied(writeConcern);
+ Status implStatus = _impl.checkIfWriteConcernCanBeSatisfied(writeConcern);
+ return legacyStatus;
+ }
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/repl_coordinator_hybrid.h b/src/mongo/db/repl/repl_coordinator_hybrid.h
new file mode 100644
index 00000000000..77ea87a4488
--- /dev/null
+++ b/src/mongo/db/repl/repl_coordinator_hybrid.h
@@ -0,0 +1,148 @@
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/status.h"
+#include "mongo/db/repl/repl_coordinator.h"
+#include "mongo/db/repl/repl_coordinator_impl.h"
+#include "mongo/db/repl/repl_coordinator_legacy.h"
+
+namespace mongo {
+namespace repl {
+
+ /**
+ * An implementation of ReplicationCoordinator that will evolve with ReplicationCoordinatorImpl
+ * to aid in the transition from LegacyReplicationCoordinator to ReplicationCoordinatorImpl and
+ * to verify the correctness of ReplicationCoordinatorImpl.
+ */
+ class HybridReplicationCoordinator : public ReplicationCoordinator {
+ MONGO_DISALLOW_COPYING(HybridReplicationCoordinator);
+
+ public:
+
+ HybridReplicationCoordinator(const ReplSettings& settings);
+ virtual ~HybridReplicationCoordinator();
+
+ virtual void startReplication(TopologyCoordinator* topCoord,
+ ReplicationExecutor::NetworkInterface* network);
+
+ virtual void shutdown();
+
+ virtual bool isShutdownOkay() const;
+
+ virtual ReplSettings& getSettings();
+
+ virtual Mode getReplicationMode() const;
+
+ virtual MemberState getCurrentMemberState() const;
+
+ virtual ReplicationCoordinator::StatusAndDuration awaitReplication(
+ const OperationContext* txn,
+ const OpTime& ts,
+ const WriteConcernOptions& writeConcern);
+
+ virtual ReplicationCoordinator::StatusAndDuration awaitReplicationOfLastOp(
+ const OperationContext* txn,
+ const WriteConcernOptions& writeConcern);
+
+ virtual Status stepDown(bool force,
+ const Milliseconds& waitTime,
+ const Milliseconds& stepdownTime);
+
+ virtual Status stepDownAndWaitForSecondary(const Milliseconds& initialWaitTime,
+ const Milliseconds& stepdownTime,
+ const Milliseconds& postStepdownWaitTime);
+
+ virtual bool isMasterForReportingPurposes();
+
+ virtual bool canAcceptWritesForDatabase(const StringData& dbName);
+
+ virtual Status checkIfWriteConcernCanBeSatisfied(
+ const WriteConcernOptions& writeConcern) const;
+
+ virtual Status canServeReadsFor(const NamespaceString& ns, bool slaveOk);
+
+ virtual bool shouldIgnoreUniqueIndex(const IndexDescriptor* idx);
+
+ virtual Status setLastOptime(const OID& rid, const OpTime& ts);
+
+ virtual OID getElectionId();
+
+ virtual void processReplSetGetStatus(BSONObjBuilder* result);
+
+ virtual bool setMaintenanceMode(bool activate);
+
+ virtual Status processReplSetMaintenance(bool activate, BSONObjBuilder* resultObj);
+
+ virtual Status processReplSetSyncFrom(const std::string& target,
+ BSONObjBuilder* resultObj);
+
+ virtual Status processReplSetFreeze(int secs, BSONObjBuilder* resultObj);
+
+ virtual Status processHeartbeat(const BSONObj& cmdObj, BSONObjBuilder* resultObj);
+
+ virtual Status processReplSetReconfig(OperationContext* txn,
+ const ReplSetReconfigArgs& args,
+ BSONObjBuilder* resultObj);
+
+ virtual Status processReplSetInitiate(OperationContext* txn,
+ const BSONObj& configObj,
+ BSONObjBuilder* resultObj);
+
+ virtual Status processReplSetGetRBID(BSONObjBuilder* resultObj);
+
+ virtual void incrementRollbackID();
+
+ virtual Status processReplSetFresh(const ReplSetFreshArgs& args,
+ BSONObjBuilder* resultObj);
+
+ virtual Status processReplSetElect(const ReplSetElectArgs& args,
+ BSONObjBuilder* resultObj);
+
+ virtual Status processReplSetUpdatePosition(const BSONArray& updates,
+ BSONObjBuilder* resultObj);
+
+ virtual Status processReplSetUpdatePositionHandshake(const BSONObj& handshake,
+ BSONObjBuilder* resultObj);
+
+ virtual bool processHandshake(const OID& remoteID, const BSONObj& handshake);
+
+ virtual void waitUpToOneSecondForOptimeChange(const OpTime& ot);
+
+ virtual bool buildsIndexes();
+
+ virtual std::vector<BSONObj> getHostsWrittenTo(const OpTime& op);
+
+ private:
+ LegacyReplicationCoordinator _legacy;
+ ReplicationCoordinatorImpl _impl;
+ };
+
+} // namespace repl
+} // namespace mongo