summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authormatt dannenberg <matt.dannenberg@10gen.com>2015-06-24 10:33:42 -0400
committermatt dannenberg <matt.dannenberg@10gen.com>2015-06-26 05:59:45 -0400
commit0f59479a8dfdb310c2ccdc99ce6438ce3cbe41a6 (patch)
tree5e37826c333331cdd442c2f9098cc5c07d9f977e /src
parent20273cf8122556935a2fc6042ad5ccc498782955 (diff)
downloadmongo-0f59479a8dfdb310c2ccdc99ce6438ce3cbe41a6.tar.gz
SERVER-18153 request and return replication metadata
Diffstat (limited to 'src')
-rw-r--r--src/mongo/client/fetcher.cpp9
-rw-r--r--src/mongo/client/fetcher.h5
-rw-r--r--src/mongo/client/query_fetcher.cpp6
-rw-r--r--src/mongo/client/query_fetcher.h3
-rw-r--r--src/mongo/db/dbcommands.cpp11
-rw-r--r--src/mongo/db/repl/data_replicator.cpp3
-rw-r--r--src/mongo/db/repl/replication_coordinator.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp11
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h8
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h2
-rw-r--r--src/mongo/db/repl/topology_coordinator.h4
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.cpp4
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.h4
-rw-r--r--src/mongo/rpc/metadata.cpp2
-rw-r--r--src/mongo/rpc/metadata.h2
16 files changed, 50 insertions, 28 deletions
diff --git a/src/mongo/client/fetcher.cpp b/src/mongo/client/fetcher.cpp
index f1ecc54aba8..f45d0c99be5 100644
--- a/src/mongo/client/fetcher.cpp
+++ b/src/mongo/client/fetcher.cpp
@@ -149,11 +149,13 @@ Fetcher::Fetcher(executor::TaskExecutor* executor,
const HostAndPort& source,
const std::string& dbname,
const BSONObj& findCmdObj,
- const CallbackFn& work)
+ const CallbackFn& work,
+ const BSONObj& metadata)
: _executor(executor),
_source(source),
_dbname(dbname),
_cmdObj(findCmdObj.getOwned()),
+ _metadata(metadata.getOwned()),
_work(work),
_active(false),
_remoteCommandCallbackHandle() {
@@ -175,6 +177,7 @@ std::string Fetcher::getDiagnosticString() const {
output << " source: " << _source.toString();
output << " database: " << _dbname;
output << " query: " << _cmdObj;
+ output << " query metadata: " << _metadata;
output << " active: " << _active;
return output;
}
@@ -216,7 +219,7 @@ void Fetcher::wait() {
Status Fetcher::_schedule_inlock(const BSONObj& cmdObj, const char* batchFieldName) {
StatusWith<executor::TaskExecutor::CallbackHandle> scheduleResult =
_executor->scheduleRemoteCommand(
- RemoteCommandRequest(_source, _dbname, cmdObj),
+ RemoteCommandRequest(_source, _dbname, cmdObj, _metadata),
stdx::bind(&Fetcher::_callback, this, stdx::placeholders::_1, batchFieldName));
if (!scheduleResult.isOK()) {
@@ -258,6 +261,8 @@ void Fetcher::_callback(const RemoteCommandCallbackArgs& rcbd, const char* batch
return;
}
+ batchData.otherFields.metadata = std::move(rcbd.response.getValue().metadata);
+
NextAction nextAction = NextAction::kNoAction;
if (!batchData.cursorId) {
diff --git a/src/mongo/client/fetcher.h b/src/mongo/client/fetcher.h
index 99748941ca2..d258bf50eaa 100644
--- a/src/mongo/client/fetcher.h
+++ b/src/mongo/client/fetcher.h
@@ -63,7 +63,6 @@ public:
CursorId cursorId = 0;
NamespaceString nss;
Documents documents;
- // TODO: fill in with replication metadata.
struct OtherFields {
BSONObj metadata;
} otherFields;
@@ -118,7 +117,8 @@ public:
const HostAndPort& source,
const std::string& dbname,
const BSONObj& cmdObj,
- const CallbackFn& work);
+ const CallbackFn& work,
+ const BSONObj& metadata = rpc::makeEmptyMetadata());
virtual ~Fetcher();
@@ -180,6 +180,7 @@ private:
HostAndPort _source;
std::string _dbname;
BSONObj _cmdObj;
+ BSONObj _metadata;
CallbackFn _work;
// Protects member data of this Fetcher.
diff --git a/src/mongo/client/query_fetcher.cpp b/src/mongo/client/query_fetcher.cpp
index ec588238a6c..e1dfe3eaa33 100644
--- a/src/mongo/client/query_fetcher.cpp
+++ b/src/mongo/client/query_fetcher.cpp
@@ -36,7 +36,8 @@ QueryFetcher::QueryFetcher(executor::TaskExecutor* exec,
const HostAndPort& src,
const NamespaceString& nss,
const BSONObj& cmdBSON,
- const CallbackFn& work)
+ const CallbackFn& work,
+ const BSONObj& metadata)
: _exec(exec),
_fetcher(exec,
src,
@@ -46,7 +47,8 @@ QueryFetcher::QueryFetcher(executor::TaskExecutor* exec,
this,
stdx::placeholders::_1,
stdx::placeholders::_2,
- stdx::placeholders::_3)),
+ stdx::placeholders::_3),
+ metadata),
_responses(0),
_work(work) {}
diff --git a/src/mongo/client/query_fetcher.h b/src/mongo/client/query_fetcher.h
index bd79f6e133d..fc5d09c489a 100644
--- a/src/mongo/client/query_fetcher.h
+++ b/src/mongo/client/query_fetcher.h
@@ -58,7 +58,8 @@ public:
const HostAndPort& source,
const NamespaceString& nss,
const BSONObj& cmdBSON,
- const QueryFetcher::CallbackFn& onBatchAvailable);
+ const QueryFetcher::CallbackFn& onBatchAvailable,
+ const BSONObj& metadata = rpc::makeEmptyMetadata());
virtual ~QueryFetcher() = default;
bool isActive() const {
diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp
index b951d2832e8..88e6bd11a7e 100644
--- a/src/mongo/db/dbcommands.cpp
+++ b/src/mongo/db/dbcommands.cpp
@@ -1299,13 +1299,20 @@ bool Command::run(OperationContext* txn,
// For commands from mongos, append some info to help getLastError(w) work.
// TODO: refactor out of here as part of SERVER-18326
- if (replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet &&
- shardingState.enabled()) {
+ bool isReplSet = replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet;
+
+ if (isReplSet && shardingState.enabled()) {
rpc::ShardingMetadata(
repl::ReplClientInfo::forClient(txn->getClient()).getLastOp().getTimestamp(),
replCoord->getElectionId()).writeToMetadata(&metadataBob);
}
+ const auto& metadata = request.getMetadata();
+ if (isReplSet && metadata.hasField(rpc::kReplicationMetadataFieldName)) {
+ BSONObjBuilder replInfoBob(metadataBob.subobjStart(rpc::kReplicationMetadataFieldName));
+ replCoord->prepareReplResponseMetadata(&replInfoBob);
+ }
+
auto cmdResponse = replyBuilderBob.done();
replyBuilder->setMetadata(metadataBob.done());
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp
index b8cb4265aeb..234d8af14af 100644
--- a/src/mongo/db/repl/data_replicator.cpp
+++ b/src/mongo/db/repl/data_replicator.cpp
@@ -134,7 +134,8 @@ OplogFetcher::OplogFetcher(ReplicationExecutor* exec,
oplogNSS,
BSON("find" << oplogNSS.coll() << "filter"
<< BSON("ts" << BSON("$gte" << startTS))),
- work),
+ work,
+ BSON(rpc::kReplicationMetadataFieldName << 1)),
_startTS(startTS) {}
std::string OplogFetcher::toString() const {
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index 84b2d6a56c4..84edfddd86c 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -590,7 +590,7 @@ public:
/**
* Prepares a BSONObj describing the current term, primary, and lastOp information.
*/
- virtual void prepareCursorResponseInfo(BSONObjBuilder* objBuilder) = 0;
+ virtual void prepareReplResponseMetadata(BSONObjBuilder* objBuilder) = 0;
/**
* Returns true if the V1 election protocol is being used and false otherwise.
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index f932279f245..42ec10d610f 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -2529,24 +2529,25 @@ void ReplicationCoordinatorImpl::_processReplSetDeclareElectionWinner_finish(
*result = _topCoord->processReplSetDeclareElectionWinner(args, responseTerm);
}
-void ReplicationCoordinatorImpl::prepareCursorResponseInfo(BSONObjBuilder* objBuilder) {
+void ReplicationCoordinatorImpl::prepareReplResponseMetadata(BSONObjBuilder* objBuilder) {
if (getReplicationMode() == modeReplSet && isV1ElectionProtocol()) {
CBHStatus cbh = _replExecutor.scheduleWork(
- stdx::bind(&ReplicationCoordinatorImpl::_prepareCursorResponseInfo_finish,
+ stdx::bind(&ReplicationCoordinatorImpl::_prepareReplResponseMetadata_finish,
this,
stdx::placeholders::_1,
objBuilder));
if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
return;
}
+ fassert(28709, cbh.getStatus());
_replExecutor.wait(cbh.getValue());
}
}
-void ReplicationCoordinatorImpl::_prepareCursorResponseInfo_finish(
+void ReplicationCoordinatorImpl::_prepareReplResponseMetadata_finish(
const ReplicationExecutor::CallbackArgs& cbData, BSONObjBuilder* objBuilder) {
- BSONObjBuilder replObj(objBuilder->subobjStart("repl"));
- _topCoord->prepareCursorResponseInfo(&replObj, getLastCommittedOpTime());
+ BSONObjBuilder replObj(objBuilder->subobjStart(rpc::kReplicationMetadataFieldName));
+ _topCoord->prepareReplResponseMetadata(&replObj, getLastCommittedOpTime());
replObj.done();
}
bool ReplicationCoordinatorImpl::isV1ElectionProtocol() {
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 977d87b2a10..7df4345eba0 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -250,7 +250,7 @@ public:
virtual Status processReplSetDeclareElectionWinner(const ReplSetDeclareElectionWinnerArgs& args,
long long* responseTerm) override;
- virtual void prepareCursorResponseInfo(BSONObjBuilder* objBuilder);
+ virtual void prepareReplResponseMetadata(BSONObjBuilder* objBuilder);
virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args,
ReplSetHeartbeatResponse* response) override;
@@ -476,10 +476,10 @@ private:
Status* result);
/**
- * Bottom half of prepareCursorResponseInfo.
+ * Bottom half of prepareReplResponseMetadata.
*/
- void _prepareCursorResponseInfo_finish(const ReplicationExecutor::CallbackArgs& cbData,
- BSONObjBuilder* objBuilder);
+ void _prepareReplResponseMetadata_finish(const ReplicationExecutor::CallbackArgs& cbData,
+ BSONObjBuilder* objBuilder);
/**
* Scheduled to cause the ReplicationCoordinator to reconsider any state that might
* need to change as a result of time passing - for instance becoming PRIMARY when a single
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index 19ee9730cae..d3841b5904e 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -327,7 +327,7 @@ Status ReplicationCoordinatorMock::processReplSetDeclareElectionWinner(
return Status::OK();
}
-void ReplicationCoordinatorMock::prepareCursorResponseInfo(BSONObjBuilder* objBuilder) {}
+void ReplicationCoordinatorMock::prepareReplResponseMetadata(BSONObjBuilder* objBuilder) {}
Status ReplicationCoordinatorMock::processHeartbeatV1(const ReplSetHeartbeatArgsV1& args,
ReplSetHeartbeatResponse* response) {
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index 2856878bd6f..89d3db5c81a 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -187,7 +187,7 @@ public:
virtual Status processReplSetDeclareElectionWinner(const ReplSetDeclareElectionWinnerArgs& args,
long long* responseTerm);
- virtual void prepareCursorResponseInfo(BSONObjBuilder* objBuilder);
+ virtual void prepareReplResponseMetadata(BSONObjBuilder* objBuilder);
virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args,
ReplSetHeartbeatResponse* response);
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
index 0dbc81baac5..b5a2ec3b892 100644
--- a/src/mongo/db/repl/topology_coordinator.h
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -390,8 +390,8 @@ public:
/**
* Prepares a BSONObj describing the current term, primary, and lastOp information.
*/
- virtual void prepareCursorResponseInfo(BSONObjBuilder* objBuilder,
- const OpTime& lastCommittedOpTime) const = 0;
+ virtual void prepareReplResponseMetadata(BSONObjBuilder* objBuilder,
+ const OpTime& lastCommittedOpTime) const = 0;
/**
* Writes into 'output' all the information needed to generate a summary of the current
diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp
index 08ec6149f38..0d3babaa27c 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl.cpp
@@ -2113,8 +2113,8 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentS
return false;
}
-void TopologyCoordinatorImpl::prepareCursorResponseInfo(BSONObjBuilder* objBuilder,
- const OpTime& lastCommittedOpTime) const {
+void TopologyCoordinatorImpl::prepareReplResponseMetadata(BSONObjBuilder* objBuilder,
+ const OpTime& lastCommittedOpTime) const {
objBuilder->append("term", _term);
objBuilder->append("lastOpCommittedTimestamp", lastCommittedOpTime.getTimestamp());
objBuilder->append("lastOpCommittedTerm", lastCommittedOpTime.getTerm());
diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h
index c6dd3e26533..abd7189673f 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.h
+++ b/src/mongo/db/repl/topology_coordinator_impl.h
@@ -202,8 +202,8 @@ public:
virtual bool stepDown(Date_t until, bool force, const OpTime& lastOpApplied);
virtual bool stepDownIfPending();
virtual Date_t getStepDownTime() const;
- virtual void prepareCursorResponseInfo(BSONObjBuilder* objBuilder,
- const OpTime& lastCommitttedOpTime) const;
+ virtual void prepareReplResponseMetadata(BSONObjBuilder* objBuilder,
+ const OpTime& lastCommitttedOpTime) const;
Status processReplSetDeclareElectionWinner(const ReplSetDeclareElectionWinnerArgs& args,
long long* responseTerm);
virtual void processReplSetRequestVotes(const ReplSetRequestVotesArgs& args,
diff --git a/src/mongo/rpc/metadata.cpp b/src/mongo/rpc/metadata.cpp
index 87db6ce5389..e80daabaa15 100644
--- a/src/mongo/rpc/metadata.cpp
+++ b/src/mongo/rpc/metadata.cpp
@@ -39,6 +39,8 @@
namespace mongo {
namespace rpc {
+const char kReplicationMetadataFieldName[] = "$replData";
+
BSONObj makeEmptyMetadata() {
return BSONObj();
}
diff --git a/src/mongo/rpc/metadata.h b/src/mongo/rpc/metadata.h
index 393c3c57234..ed2267bbca6 100644
--- a/src/mongo/rpc/metadata.h
+++ b/src/mongo/rpc/metadata.h
@@ -57,6 +57,8 @@ class OperationContext;
*/
namespace rpc {
+extern const char kReplicationMetadataFieldName[];
+
/**
* Returns an empty metadata object.
*/