diff options
Diffstat (limited to 'src/mongo')
13 files changed, 102 insertions, 9 deletions
diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp index 44c381834e6..bed008e597e 100644 --- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp @@ -39,6 +39,8 @@ #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/db_raii.h" #include "mongo/db/index_builds_coordinator.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/util/future.h" namespace mongo { @@ -178,4 +180,51 @@ ReplicaSetNodeProcessInterface::attachCursorSourceToPipeline( return attachCursorSourceToPipelineForLocalRead(expCtx, ownedPipeline); } +StatusWith<BSONObj> ReplicaSetNodeProcessInterface::_executeCommandOnPrimary( + OperationContext* opCtx, const NamespaceString& ns, const BSONObj& cmdObj) const { + executor::RemoteCommandRequest request( + repl::ReplicationCoordinator::get(opCtx)->getCurrentPrimaryHostAndPort(), + ns.db().toString(), + cmdObj, + opCtx); + auto [promise, future] = makePromiseFuture<executor::TaskExecutor::RemoteCommandCallbackArgs>(); + auto promisePtr = std::make_shared<Promise<executor::TaskExecutor::RemoteCommandCallbackArgs>>( + std::move(promise)); + auto scheduleResult = _executor->scheduleRemoteCommand( + std::move(request), [promisePtr](const auto& args) { promisePtr->emplaceValue(args); }); + if (!scheduleResult.getStatus().isOK()) { + // Since the command failed to be scheduled, the callback above did not and will not run. + // Thus, it is safe to fulfill the promise here without worrying about synchronizing access + // with the executor's thread. + promisePtr->setError(scheduleResult.getStatus()); + } + + auto response = future.getNoThrow(opCtx); + if (!response.isOK()) { + return response.getStatus(); + } + + auto rcr = std::move(response.getValue()); + if (!rcr.response.status.isOK()) { + return rcr.response.status; + } + + auto commandStatus = getStatusFromCommandResult(rcr.response.data); + if (!commandStatus.isOK()) { + return commandStatus; + } + + auto writeConcernStatus = getWriteConcernStatusFromCommandResult(rcr.response.data); + if (!writeConcernStatus.isOK()) { + return writeConcernStatus; + } + + auto writeStatus = getFirstWriteErrorStatusFromCommandResult(rcr.response.data); + if (!writeStatus.isOK()) { + return writeStatus; + } + + return rcr.response.data; +} + } // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h index 992e28b6839..0729ff75866 100644 --- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h @@ -89,6 +89,15 @@ public: bool allowTargetingShards) override; private: + /** + * Attemps to execute the specified command on the primary. Returns the command response upon + * success or a non-OK status upon a failed command response, a writeConcernError, or any + * writeErrors. + */ + StatusWith<BSONObj> _executeCommandOnPrimary(OperationContext* opCtx, + const NamespaceString& ns, + const BSONObj& cmdObj) const; + executor::TaskExecutor* _executor; }; diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index cd431951ca9..ef796d6877c 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -977,6 +977,13 @@ public: */ virtual OpTime getLatestWriteOpTime(OperationContext* opCtx) const = 0; + /** + * Returns the HostAndPort of the current primary, or an empty HostAndPort if there is no + * primary. Note that the primary can change at any time and thus the result may be immediately + * stale unless run from the primary with the RSTL held. + */ + virtual HostAndPort getCurrentPrimaryHostAndPort() const = 0; + protected: ReplicationCoordinator(); }; diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 43a9eab03a2..bbbbaf3250a 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -2066,6 +2066,12 @@ OpTime ReplicationCoordinatorImpl::getLatestWriteOpTime(OperationContext* opCtx) return OpTime(latestOplogTimestamp, getTerm()); } +HostAndPort ReplicationCoordinatorImpl::getCurrentPrimaryHostAndPort() const { + stdx::lock_guard<Latch> lock(_mutex); + auto primary = _topCoord->getCurrentPrimaryMember(); + return primary ? primary->getHostAndPort() : HostAndPort(); +} + void ReplicationCoordinatorImpl::_killConflictingOpsOnStepUpAndStepDown( AutoGetRstlForStepUpStepDown* arsc, ErrorCodes::Error reason) { const OperationContext* rstlOpCtx = arsc->getOpCtx(); diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 94d0a868300..ac68f2f3d63 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -349,6 +349,8 @@ public: virtual OpTime getLatestWriteOpTime(OperationContext* opCtx) const override; + virtual HostAndPort getCurrentPrimaryHostAndPort() const override; + // ================== Test support API =================== /** diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 22bd2968623..ef691cbcb53 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -582,5 +582,9 @@ OpTime ReplicationCoordinatorMock::getLatestWriteOpTime(OperationContext* opCtx) return getMyLastAppliedOpTime(); } +HostAndPort ReplicationCoordinatorMock::getCurrentPrimaryHostAndPort() const { + return HostAndPort(); +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index a5676725335..db8e460605d 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -331,6 +331,8 @@ public: virtual OpTime getLatestWriteOpTime(OperationContext* opCtx) const override; + virtual HostAndPort getCurrentPrimaryHostAndPort() const override; + private: ServiceContext* const _service; ReplSettings _settings; diff --git a/src/mongo/db/repl/replication_coordinator_noop.cpp b/src/mongo/db/repl/replication_coordinator_noop.cpp index eac430b7b34..59d96c0bbfd 100644 --- a/src/mongo/db/repl/replication_coordinator_noop.cpp +++ b/src/mongo/db/repl/replication_coordinator_noop.cpp @@ -484,5 +484,9 @@ OpTime ReplicationCoordinatorNoOp::getLatestWriteOpTime(OperationContext* opCtx) return getMyLastAppliedOpTime(); } +HostAndPort ReplicationCoordinatorNoOp::getCurrentPrimaryHostAndPort() const { + MONGO_UNREACHABLE; +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_noop.h b/src/mongo/db/repl/replication_coordinator_noop.h index f0958c3a9ce..4b385d86423 100644 --- a/src/mongo/db/repl/replication_coordinator_noop.h +++ b/src/mongo/db/repl/replication_coordinator_noop.h @@ -268,6 +268,8 @@ public: OpTime getLatestWriteOpTime(OperationContext* opCtx) const override; + HostAndPort getCurrentPrimaryHostAndPort() const override; + private: ServiceContext* const _service; }; diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp index fb7d8c65bd3..2b329b0f0d6 100644 --- a/src/mongo/db/repl/topology_coordinator.cpp +++ b/src/mongo/db/repl/topology_coordinator.cpp @@ -287,13 +287,13 @@ HostAndPort TopologyCoordinator::chooseNewSyncSource(Date_t now, " not allowed and primary is unknown/down"); _syncSource = HostAndPort(); return _syncSource; - } else if (_memberIsBlacklisted(*_currentPrimaryMember(), now)) { + } else if (_memberIsBlacklisted(*getCurrentPrimaryMember(), now)) { LOGV2_DEBUG(21785, 1, "Cannot select a sync source because chaining is not allowed and primary " "member is blacklisted: {currentPrimaryMember_getHostAndPort}", "currentPrimaryMember_getHostAndPort"_attr = - _currentPrimaryMember()->getHostAndPort()); + getCurrentPrimaryMember()->getHostAndPort()); _syncSource = HostAndPort(); return _syncSource; } else if (_currentPrimaryIndex == _selfIndex) { @@ -308,13 +308,13 @@ HostAndPort TopologyCoordinator::chooseNewSyncSource(Date_t now, LOG(1) << "Cannot select a sync source because chaining is not allowed and the primary " "is behind me. Last oplog optime of primary {}: {}, my last fetched oplog " "optime: {}"_format( - _currentPrimaryMember()->getHostAndPort(), + getCurrentPrimaryMember()->getHostAndPort(), _memberData.at(_currentPrimaryIndex).getLastAppliedOpTime().toBSON(), lastOpTimeFetched.toBSON()); _syncSource = HostAndPort(); return _syncSource; } else { - _syncSource = _currentPrimaryMember()->getHostAndPort(); + _syncSource = getCurrentPrimaryMember()->getHostAndPort(); LOGV2(21787, "chaining not allowed, choosing primary as sync source candidate: {syncSource}", "syncSource"_attr = _syncSource); @@ -1542,7 +1542,7 @@ void TopologyCoordinator::setCurrentPrimary_forTest(int primaryIndex, } } -const MemberConfig* TopologyCoordinator::_currentPrimaryMember() const { +const MemberConfig* TopologyCoordinator::getCurrentPrimaryMember() const { if (_currentPrimaryIndex == -1) return nullptr; @@ -1886,7 +1886,7 @@ void TopologyCoordinator::fillIsMasterForReplSet(std::shared_ptr<IsMasterRespons response->setIsMaster(myState.primary() && !isSteppingDown()); response->setIsSecondary(myState.secondary()); - const MemberConfig* curPrimary = _currentPrimaryMember(); + const MemberConfig* curPrimary = getCurrentPrimaryMember(); if (curPrimary) { response->setPrimary(curPrimary->getHostAndPort(horizonString)); } diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index 7e5b0a2664c..9fa00cbc6f7 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -718,6 +718,11 @@ public: bool checkIfCommitQuorumCanBeSatisfied(const CommitQuorumOptions& commitQuorum, const std::vector<MemberConfig>& members) const; + /** + * Returns nullptr if there is no primary, or the MemberConfig* for the current primary. + */ + const MemberConfig* getCurrentPrimaryMember() const; + //////////////////////////////////////////////////////////// // // Test support methods @@ -851,9 +856,6 @@ private: */ MemberData* _findMemberDataByMemberId(const int memberId); - // Returns NULL if there is no primary, or the MemberConfig* for the current primary - const MemberConfig* _currentPrimaryMember() const; - /** * Performs updating "_currentPrimaryIndex" for processHeartbeatResponse(), and determines if an * election or stepdown should commence. diff --git a/src/mongo/embedded/replication_coordinator_embedded.cpp b/src/mongo/embedded/replication_coordinator_embedded.cpp index fbf1f3a8b43..6dcdf347463 100644 --- a/src/mongo/embedded/replication_coordinator_embedded.cpp +++ b/src/mongo/embedded/replication_coordinator_embedded.cpp @@ -510,5 +510,9 @@ OpTime ReplicationCoordinatorEmbedded::getLatestWriteOpTime(OperationContext* op return getMyLastAppliedOpTime(); } +HostAndPort ReplicationCoordinatorEmbedded::getCurrentPrimaryHostAndPort() const { + UASSERT_NOT_IMPLEMENTED; +} + } // namespace embedded } // namespace mongo diff --git a/src/mongo/embedded/replication_coordinator_embedded.h b/src/mongo/embedded/replication_coordinator_embedded.h index a81d6ff0d38..8790c2bb2a2 100644 --- a/src/mongo/embedded/replication_coordinator_embedded.h +++ b/src/mongo/embedded/replication_coordinator_embedded.h @@ -275,6 +275,8 @@ public: repl::OpTime getLatestWriteOpTime(OperationContext* opCtx) const override; + HostAndPort getCurrentPrimaryHostAndPort() const override; + private: // Back pointer to the ServiceContext that has started the instance. ServiceContext* const _service; |