summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp49
-rw-r--r--src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h9
-rw-r--r--src/mongo/db/repl/replication_coordinator.h7
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp6
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_noop.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator_noop.h2
-rw-r--r--src/mongo/db/repl/topology_coordinator.cpp12
-rw-r--r--src/mongo/db/repl/topology_coordinator.h8
-rw-r--r--src/mongo/embedded/replication_coordinator_embedded.cpp4
-rw-r--r--src/mongo/embedded/replication_coordinator_embedded.h2
13 files changed, 102 insertions, 9 deletions
diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp
index 44c381834e6..bed008e597e 100644
--- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp
@@ -39,6 +39,8 @@
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/index_builds_coordinator.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/util/future.h"
namespace mongo {
@@ -178,4 +180,51 @@ ReplicaSetNodeProcessInterface::attachCursorSourceToPipeline(
return attachCursorSourceToPipelineForLocalRead(expCtx, ownedPipeline);
}
+StatusWith<BSONObj> ReplicaSetNodeProcessInterface::_executeCommandOnPrimary(
+ OperationContext* opCtx, const NamespaceString& ns, const BSONObj& cmdObj) const {
+ executor::RemoteCommandRequest request(
+ repl::ReplicationCoordinator::get(opCtx)->getCurrentPrimaryHostAndPort(),
+ ns.db().toString(),
+ cmdObj,
+ opCtx);
+ auto [promise, future] = makePromiseFuture<executor::TaskExecutor::RemoteCommandCallbackArgs>();
+ auto promisePtr = std::make_shared<Promise<executor::TaskExecutor::RemoteCommandCallbackArgs>>(
+ std::move(promise));
+ auto scheduleResult = _executor->scheduleRemoteCommand(
+ std::move(request), [promisePtr](const auto& args) { promisePtr->emplaceValue(args); });
+ if (!scheduleResult.getStatus().isOK()) {
+ // Since the command failed to be scheduled, the callback above did not and will not run.
+ // Thus, it is safe to fulfill the promise here without worrying about synchronizing access
+ // with the executor's thread.
+ promisePtr->setError(scheduleResult.getStatus());
+ }
+
+ auto response = future.getNoThrow(opCtx);
+ if (!response.isOK()) {
+ return response.getStatus();
+ }
+
+ auto rcr = std::move(response.getValue());
+ if (!rcr.response.status.isOK()) {
+ return rcr.response.status;
+ }
+
+ auto commandStatus = getStatusFromCommandResult(rcr.response.data);
+ if (!commandStatus.isOK()) {
+ return commandStatus;
+ }
+
+ auto writeConcernStatus = getWriteConcernStatusFromCommandResult(rcr.response.data);
+ if (!writeConcernStatus.isOK()) {
+ return writeConcernStatus;
+ }
+
+ auto writeStatus = getFirstWriteErrorStatusFromCommandResult(rcr.response.data);
+ if (!writeStatus.isOK()) {
+ return writeStatus;
+ }
+
+ return rcr.response.data;
+}
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h
index 992e28b6839..0729ff75866 100644
--- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h
@@ -89,6 +89,15 @@ public:
bool allowTargetingShards) override;
private:
+ /**
+ * Attemps to execute the specified command on the primary. Returns the command response upon
+ * success or a non-OK status upon a failed command response, a writeConcernError, or any
+ * writeErrors.
+ */
+ StatusWith<BSONObj> _executeCommandOnPrimary(OperationContext* opCtx,
+ const NamespaceString& ns,
+ const BSONObj& cmdObj) const;
+
executor::TaskExecutor* _executor;
};
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index cd431951ca9..ef796d6877c 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -977,6 +977,13 @@ public:
*/
virtual OpTime getLatestWriteOpTime(OperationContext* opCtx) const = 0;
+ /**
+ * Returns the HostAndPort of the current primary, or an empty HostAndPort if there is no
+ * primary. Note that the primary can change at any time and thus the result may be immediately
+ * stale unless run from the primary with the RSTL held.
+ */
+ virtual HostAndPort getCurrentPrimaryHostAndPort() const = 0;
+
protected:
ReplicationCoordinator();
};
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 43a9eab03a2..bbbbaf3250a 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -2066,6 +2066,12 @@ OpTime ReplicationCoordinatorImpl::getLatestWriteOpTime(OperationContext* opCtx)
return OpTime(latestOplogTimestamp, getTerm());
}
+HostAndPort ReplicationCoordinatorImpl::getCurrentPrimaryHostAndPort() const {
+ stdx::lock_guard<Latch> lock(_mutex);
+ auto primary = _topCoord->getCurrentPrimaryMember();
+ return primary ? primary->getHostAndPort() : HostAndPort();
+}
+
void ReplicationCoordinatorImpl::_killConflictingOpsOnStepUpAndStepDown(
AutoGetRstlForStepUpStepDown* arsc, ErrorCodes::Error reason) {
const OperationContext* rstlOpCtx = arsc->getOpCtx();
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 94d0a868300..ac68f2f3d63 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -349,6 +349,8 @@ public:
virtual OpTime getLatestWriteOpTime(OperationContext* opCtx) const override;
+ virtual HostAndPort getCurrentPrimaryHostAndPort() const override;
+
// ================== Test support API ===================
/**
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index 22bd2968623..ef691cbcb53 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -582,5 +582,9 @@ OpTime ReplicationCoordinatorMock::getLatestWriteOpTime(OperationContext* opCtx)
return getMyLastAppliedOpTime();
}
+HostAndPort ReplicationCoordinatorMock::getCurrentPrimaryHostAndPort() const {
+ return HostAndPort();
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index a5676725335..db8e460605d 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -331,6 +331,8 @@ public:
virtual OpTime getLatestWriteOpTime(OperationContext* opCtx) const override;
+ virtual HostAndPort getCurrentPrimaryHostAndPort() const override;
+
private:
ServiceContext* const _service;
ReplSettings _settings;
diff --git a/src/mongo/db/repl/replication_coordinator_noop.cpp b/src/mongo/db/repl/replication_coordinator_noop.cpp
index eac430b7b34..59d96c0bbfd 100644
--- a/src/mongo/db/repl/replication_coordinator_noop.cpp
+++ b/src/mongo/db/repl/replication_coordinator_noop.cpp
@@ -484,5 +484,9 @@ OpTime ReplicationCoordinatorNoOp::getLatestWriteOpTime(OperationContext* opCtx)
return getMyLastAppliedOpTime();
}
+HostAndPort ReplicationCoordinatorNoOp::getCurrentPrimaryHostAndPort() const {
+ MONGO_UNREACHABLE;
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_coordinator_noop.h b/src/mongo/db/repl/replication_coordinator_noop.h
index f0958c3a9ce..4b385d86423 100644
--- a/src/mongo/db/repl/replication_coordinator_noop.h
+++ b/src/mongo/db/repl/replication_coordinator_noop.h
@@ -268,6 +268,8 @@ public:
OpTime getLatestWriteOpTime(OperationContext* opCtx) const override;
+ HostAndPort getCurrentPrimaryHostAndPort() const override;
+
private:
ServiceContext* const _service;
};
diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp
index fb7d8c65bd3..2b329b0f0d6 100644
--- a/src/mongo/db/repl/topology_coordinator.cpp
+++ b/src/mongo/db/repl/topology_coordinator.cpp
@@ -287,13 +287,13 @@ HostAndPort TopologyCoordinator::chooseNewSyncSource(Date_t now,
" not allowed and primary is unknown/down");
_syncSource = HostAndPort();
return _syncSource;
- } else if (_memberIsBlacklisted(*_currentPrimaryMember(), now)) {
+ } else if (_memberIsBlacklisted(*getCurrentPrimaryMember(), now)) {
LOGV2_DEBUG(21785,
1,
"Cannot select a sync source because chaining is not allowed and primary "
"member is blacklisted: {currentPrimaryMember_getHostAndPort}",
"currentPrimaryMember_getHostAndPort"_attr =
- _currentPrimaryMember()->getHostAndPort());
+ getCurrentPrimaryMember()->getHostAndPort());
_syncSource = HostAndPort();
return _syncSource;
} else if (_currentPrimaryIndex == _selfIndex) {
@@ -308,13 +308,13 @@ HostAndPort TopologyCoordinator::chooseNewSyncSource(Date_t now,
LOG(1) << "Cannot select a sync source because chaining is not allowed and the primary "
"is behind me. Last oplog optime of primary {}: {}, my last fetched oplog "
"optime: {}"_format(
- _currentPrimaryMember()->getHostAndPort(),
+ getCurrentPrimaryMember()->getHostAndPort(),
_memberData.at(_currentPrimaryIndex).getLastAppliedOpTime().toBSON(),
lastOpTimeFetched.toBSON());
_syncSource = HostAndPort();
return _syncSource;
} else {
- _syncSource = _currentPrimaryMember()->getHostAndPort();
+ _syncSource = getCurrentPrimaryMember()->getHostAndPort();
LOGV2(21787,
"chaining not allowed, choosing primary as sync source candidate: {syncSource}",
"syncSource"_attr = _syncSource);
@@ -1542,7 +1542,7 @@ void TopologyCoordinator::setCurrentPrimary_forTest(int primaryIndex,
}
}
-const MemberConfig* TopologyCoordinator::_currentPrimaryMember() const {
+const MemberConfig* TopologyCoordinator::getCurrentPrimaryMember() const {
if (_currentPrimaryIndex == -1)
return nullptr;
@@ -1886,7 +1886,7 @@ void TopologyCoordinator::fillIsMasterForReplSet(std::shared_ptr<IsMasterRespons
response->setIsMaster(myState.primary() && !isSteppingDown());
response->setIsSecondary(myState.secondary());
- const MemberConfig* curPrimary = _currentPrimaryMember();
+ const MemberConfig* curPrimary = getCurrentPrimaryMember();
if (curPrimary) {
response->setPrimary(curPrimary->getHostAndPort(horizonString));
}
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
index 7e5b0a2664c..9fa00cbc6f7 100644
--- a/src/mongo/db/repl/topology_coordinator.h
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -718,6 +718,11 @@ public:
bool checkIfCommitQuorumCanBeSatisfied(const CommitQuorumOptions& commitQuorum,
const std::vector<MemberConfig>& members) const;
+ /**
+ * Returns nullptr if there is no primary, or the MemberConfig* for the current primary.
+ */
+ const MemberConfig* getCurrentPrimaryMember() const;
+
////////////////////////////////////////////////////////////
//
// Test support methods
@@ -851,9 +856,6 @@ private:
*/
MemberData* _findMemberDataByMemberId(const int memberId);
- // Returns NULL if there is no primary, or the MemberConfig* for the current primary
- const MemberConfig* _currentPrimaryMember() const;
-
/**
* Performs updating "_currentPrimaryIndex" for processHeartbeatResponse(), and determines if an
* election or stepdown should commence.
diff --git a/src/mongo/embedded/replication_coordinator_embedded.cpp b/src/mongo/embedded/replication_coordinator_embedded.cpp
index fbf1f3a8b43..6dcdf347463 100644
--- a/src/mongo/embedded/replication_coordinator_embedded.cpp
+++ b/src/mongo/embedded/replication_coordinator_embedded.cpp
@@ -510,5 +510,9 @@ OpTime ReplicationCoordinatorEmbedded::getLatestWriteOpTime(OperationContext* op
return getMyLastAppliedOpTime();
}
+HostAndPort ReplicationCoordinatorEmbedded::getCurrentPrimaryHostAndPort() const {
+ UASSERT_NOT_IMPLEMENTED;
+}
+
} // namespace embedded
} // namespace mongo
diff --git a/src/mongo/embedded/replication_coordinator_embedded.h b/src/mongo/embedded/replication_coordinator_embedded.h
index a81d6ff0d38..8790c2bb2a2 100644
--- a/src/mongo/embedded/replication_coordinator_embedded.h
+++ b/src/mongo/embedded/replication_coordinator_embedded.h
@@ -275,6 +275,8 @@ public:
repl::OpTime getLatestWriteOpTime(OperationContext* opCtx) const override;
+ HostAndPort getCurrentPrimaryHostAndPort() const override;
+
private:
// Back pointer to the ServiceContext that has started the instance.
ServiceContext* const _service;