diff options
author | matt dannenberg <matt.dannenberg@10gen.com> | 2015-06-24 10:33:42 -0400 |
---|---|---|
committer | matt dannenberg <matt.dannenberg@10gen.com> | 2015-06-26 05:59:45 -0400 |
commit | 0f59479a8dfdb310c2ccdc99ce6438ce3cbe41a6 (patch) | |
tree | 5e37826c333331cdd442c2f9098cc5c07d9f977e /src | |
parent | 20273cf8122556935a2fc6042ad5ccc498782955 (diff) | |
download | mongo-0f59479a8dfdb310c2ccdc99ce6438ce3cbe41a6.tar.gz |
SERVER-18153 request and return replication metadata
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/client/fetcher.cpp | 9 | ||||
-rw-r--r-- | src/mongo/client/fetcher.h | 5 | ||||
-rw-r--r-- | src/mongo/client/query_fetcher.cpp | 6 | ||||
-rw-r--r-- | src/mongo/client/query_fetcher.h | 3 | ||||
-rw-r--r-- | src/mongo/db/dbcommands.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.h | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_mock.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_mock.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl.h | 4 | ||||
-rw-r--r-- | src/mongo/rpc/metadata.cpp | 2 | ||||
-rw-r--r-- | src/mongo/rpc/metadata.h | 2 |
16 files changed, 50 insertions, 28 deletions
diff --git a/src/mongo/client/fetcher.cpp b/src/mongo/client/fetcher.cpp index f1ecc54aba8..f45d0c99be5 100644 --- a/src/mongo/client/fetcher.cpp +++ b/src/mongo/client/fetcher.cpp @@ -149,11 +149,13 @@ Fetcher::Fetcher(executor::TaskExecutor* executor, const HostAndPort& source, const std::string& dbname, const BSONObj& findCmdObj, - const CallbackFn& work) + const CallbackFn& work, + const BSONObj& metadata) : _executor(executor), _source(source), _dbname(dbname), _cmdObj(findCmdObj.getOwned()), + _metadata(metadata.getOwned()), _work(work), _active(false), _remoteCommandCallbackHandle() { @@ -175,6 +177,7 @@ std::string Fetcher::getDiagnosticString() const { output << " source: " << _source.toString(); output << " database: " << _dbname; output << " query: " << _cmdObj; + output << " query metadata: " << _metadata; output << " active: " << _active; return output; } @@ -216,7 +219,7 @@ void Fetcher::wait() { Status Fetcher::_schedule_inlock(const BSONObj& cmdObj, const char* batchFieldName) { StatusWith<executor::TaskExecutor::CallbackHandle> scheduleResult = _executor->scheduleRemoteCommand( - RemoteCommandRequest(_source, _dbname, cmdObj), + RemoteCommandRequest(_source, _dbname, cmdObj, _metadata), stdx::bind(&Fetcher::_callback, this, stdx::placeholders::_1, batchFieldName)); if (!scheduleResult.isOK()) { @@ -258,6 +261,8 @@ void Fetcher::_callback(const RemoteCommandCallbackArgs& rcbd, const char* batch return; } + batchData.otherFields.metadata = std::move(rcbd.response.getValue().metadata); + NextAction nextAction = NextAction::kNoAction; if (!batchData.cursorId) { diff --git a/src/mongo/client/fetcher.h b/src/mongo/client/fetcher.h index 99748941ca2..d258bf50eaa 100644 --- a/src/mongo/client/fetcher.h +++ b/src/mongo/client/fetcher.h @@ -63,7 +63,6 @@ public: CursorId cursorId = 0; NamespaceString nss; Documents documents; - // TODO: fill in with replication metadata. struct OtherFields { BSONObj metadata; } otherFields; @@ -118,7 +117,8 @@ public: const HostAndPort& source, const std::string& dbname, const BSONObj& cmdObj, - const CallbackFn& work); + const CallbackFn& work, + const BSONObj& metadata = rpc::makeEmptyMetadata()); virtual ~Fetcher(); @@ -180,6 +180,7 @@ private: HostAndPort _source; std::string _dbname; BSONObj _cmdObj; + BSONObj _metadata; CallbackFn _work; // Protects member data of this Fetcher. diff --git a/src/mongo/client/query_fetcher.cpp b/src/mongo/client/query_fetcher.cpp index ec588238a6c..e1dfe3eaa33 100644 --- a/src/mongo/client/query_fetcher.cpp +++ b/src/mongo/client/query_fetcher.cpp @@ -36,7 +36,8 @@ QueryFetcher::QueryFetcher(executor::TaskExecutor* exec, const HostAndPort& src, const NamespaceString& nss, const BSONObj& cmdBSON, - const CallbackFn& work) + const CallbackFn& work, + const BSONObj& metadata) : _exec(exec), _fetcher(exec, src, @@ -46,7 +47,8 @@ QueryFetcher::QueryFetcher(executor::TaskExecutor* exec, this, stdx::placeholders::_1, stdx::placeholders::_2, - stdx::placeholders::_3)), + stdx::placeholders::_3), + metadata), _responses(0), _work(work) {} diff --git a/src/mongo/client/query_fetcher.h b/src/mongo/client/query_fetcher.h index bd79f6e133d..fc5d09c489a 100644 --- a/src/mongo/client/query_fetcher.h +++ b/src/mongo/client/query_fetcher.h @@ -58,7 +58,8 @@ public: const HostAndPort& source, const NamespaceString& nss, const BSONObj& cmdBSON, - const QueryFetcher::CallbackFn& onBatchAvailable); + const QueryFetcher::CallbackFn& onBatchAvailable, + const BSONObj& metadata = rpc::makeEmptyMetadata()); virtual ~QueryFetcher() = default; bool isActive() const { diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp index b951d2832e8..88e6bd11a7e 100644 --- a/src/mongo/db/dbcommands.cpp +++ b/src/mongo/db/dbcommands.cpp @@ -1299,13 +1299,20 @@ bool Command::run(OperationContext* txn, // For commands from mongos, append some info to help getLastError(w) work. // TODO: refactor out of here as part of SERVER-18326 - if (replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet && - shardingState.enabled()) { + bool isReplSet = replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; + + if (isReplSet && shardingState.enabled()) { rpc::ShardingMetadata( repl::ReplClientInfo::forClient(txn->getClient()).getLastOp().getTimestamp(), replCoord->getElectionId()).writeToMetadata(&metadataBob); } + const auto& metadata = request.getMetadata(); + if (isReplSet && metadata.hasField(rpc::kReplicationMetadataFieldName)) { + BSONObjBuilder replInfoBob(metadataBob.subobjStart(rpc::kReplicationMetadataFieldName)); + replCoord->prepareReplResponseMetadata(&replInfoBob); + } + auto cmdResponse = replyBuilderBob.done(); replyBuilder->setMetadata(metadataBob.done()); diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp index b8cb4265aeb..234d8af14af 100644 --- a/src/mongo/db/repl/data_replicator.cpp +++ b/src/mongo/db/repl/data_replicator.cpp @@ -134,7 +134,8 @@ OplogFetcher::OplogFetcher(ReplicationExecutor* exec, oplogNSS, BSON("find" << oplogNSS.coll() << "filter" << BSON("ts" << BSON("$gte" << startTS))), - work), + work, + BSON(rpc::kReplicationMetadataFieldName << 1)), _startTS(startTS) {} std::string OplogFetcher::toString() const { diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index 84b2d6a56c4..84edfddd86c 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -590,7 +590,7 @@ public: /** * Prepares a BSONObj describing the current term, primary, and lastOp information. */ - virtual void prepareCursorResponseInfo(BSONObjBuilder* objBuilder) = 0; + virtual void prepareReplResponseMetadata(BSONObjBuilder* objBuilder) = 0; /** * Returns true if the V1 election protocol is being used and false otherwise. diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index f932279f245..42ec10d610f 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -2529,24 +2529,25 @@ void ReplicationCoordinatorImpl::_processReplSetDeclareElectionWinner_finish( *result = _topCoord->processReplSetDeclareElectionWinner(args, responseTerm); } -void ReplicationCoordinatorImpl::prepareCursorResponseInfo(BSONObjBuilder* objBuilder) { +void ReplicationCoordinatorImpl::prepareReplResponseMetadata(BSONObjBuilder* objBuilder) { if (getReplicationMode() == modeReplSet && isV1ElectionProtocol()) { CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_prepareCursorResponseInfo_finish, + stdx::bind(&ReplicationCoordinatorImpl::_prepareReplResponseMetadata_finish, this, stdx::placeholders::_1, objBuilder)); if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { return; } + fassert(28709, cbh.getStatus()); _replExecutor.wait(cbh.getValue()); } } -void ReplicationCoordinatorImpl::_prepareCursorResponseInfo_finish( +void ReplicationCoordinatorImpl::_prepareReplResponseMetadata_finish( const ReplicationExecutor::CallbackArgs& cbData, BSONObjBuilder* objBuilder) { - BSONObjBuilder replObj(objBuilder->subobjStart("repl")); - _topCoord->prepareCursorResponseInfo(&replObj, getLastCommittedOpTime()); + BSONObjBuilder replObj(objBuilder->subobjStart(rpc::kReplicationMetadataFieldName)); + _topCoord->prepareReplResponseMetadata(&replObj, getLastCommittedOpTime()); replObj.done(); } bool ReplicationCoordinatorImpl::isV1ElectionProtocol() { diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 977d87b2a10..7df4345eba0 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -250,7 +250,7 @@ public: virtual Status processReplSetDeclareElectionWinner(const ReplSetDeclareElectionWinnerArgs& args, long long* responseTerm) override; - virtual void prepareCursorResponseInfo(BSONObjBuilder* objBuilder); + virtual void prepareReplResponseMetadata(BSONObjBuilder* objBuilder); virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args, ReplSetHeartbeatResponse* response) override; @@ -476,10 +476,10 @@ private: Status* result); /** - * Bottom half of prepareCursorResponseInfo. + * Bottom half of prepareReplResponseMetadata. */ - void _prepareCursorResponseInfo_finish(const ReplicationExecutor::CallbackArgs& cbData, - BSONObjBuilder* objBuilder); + void _prepareReplResponseMetadata_finish(const ReplicationExecutor::CallbackArgs& cbData, + BSONObjBuilder* objBuilder); /** * Scheduled to cause the ReplicationCoordinator to reconsider any state that might * need to change as a result of time passing - for instance becoming PRIMARY when a single diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 19ee9730cae..d3841b5904e 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -327,7 +327,7 @@ Status ReplicationCoordinatorMock::processReplSetDeclareElectionWinner( return Status::OK(); } -void ReplicationCoordinatorMock::prepareCursorResponseInfo(BSONObjBuilder* objBuilder) {} +void ReplicationCoordinatorMock::prepareReplResponseMetadata(BSONObjBuilder* objBuilder) {} Status ReplicationCoordinatorMock::processHeartbeatV1(const ReplSetHeartbeatArgsV1& args, ReplSetHeartbeatResponse* response) { diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 2856878bd6f..89d3db5c81a 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -187,7 +187,7 @@ public: virtual Status processReplSetDeclareElectionWinner(const ReplSetDeclareElectionWinnerArgs& args, long long* responseTerm); - virtual void prepareCursorResponseInfo(BSONObjBuilder* objBuilder); + virtual void prepareReplResponseMetadata(BSONObjBuilder* objBuilder); virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args, ReplSetHeartbeatResponse* response); diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index 0dbc81baac5..b5a2ec3b892 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -390,8 +390,8 @@ public: /** * Prepares a BSONObj describing the current term, primary, and lastOp information. */ - virtual void prepareCursorResponseInfo(BSONObjBuilder* objBuilder, - const OpTime& lastCommittedOpTime) const = 0; + virtual void prepareReplResponseMetadata(BSONObjBuilder* objBuilder, + const OpTime& lastCommittedOpTime) const = 0; /** * Writes into 'output' all the information needed to generate a summary of the current diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp index 08ec6149f38..0d3babaa27c 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl.cpp @@ -2113,8 +2113,8 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentS return false; } -void TopologyCoordinatorImpl::prepareCursorResponseInfo(BSONObjBuilder* objBuilder, - const OpTime& lastCommittedOpTime) const { +void TopologyCoordinatorImpl::prepareReplResponseMetadata(BSONObjBuilder* objBuilder, + const OpTime& lastCommittedOpTime) const { objBuilder->append("term", _term); objBuilder->append("lastOpCommittedTimestamp", lastCommittedOpTime.getTimestamp()); objBuilder->append("lastOpCommittedTerm", lastCommittedOpTime.getTerm()); diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h index c6dd3e26533..abd7189673f 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.h +++ b/src/mongo/db/repl/topology_coordinator_impl.h @@ -202,8 +202,8 @@ public: virtual bool stepDown(Date_t until, bool force, const OpTime& lastOpApplied); virtual bool stepDownIfPending(); virtual Date_t getStepDownTime() const; - virtual void prepareCursorResponseInfo(BSONObjBuilder* objBuilder, - const OpTime& lastCommitttedOpTime) const; + virtual void prepareReplResponseMetadata(BSONObjBuilder* objBuilder, + const OpTime& lastCommitttedOpTime) const; Status processReplSetDeclareElectionWinner(const ReplSetDeclareElectionWinnerArgs& args, long long* responseTerm); virtual void processReplSetRequestVotes(const ReplSetRequestVotesArgs& args, diff --git a/src/mongo/rpc/metadata.cpp b/src/mongo/rpc/metadata.cpp index 87db6ce5389..e80daabaa15 100644 --- a/src/mongo/rpc/metadata.cpp +++ b/src/mongo/rpc/metadata.cpp @@ -39,6 +39,8 @@ namespace mongo { namespace rpc { +const char kReplicationMetadataFieldName[] = "$replData"; + BSONObj makeEmptyMetadata() { return BSONObj(); } diff --git a/src/mongo/rpc/metadata.h b/src/mongo/rpc/metadata.h index 393c3c57234..ed2267bbca6 100644 --- a/src/mongo/rpc/metadata.h +++ b/src/mongo/rpc/metadata.h @@ -57,6 +57,8 @@ class OperationContext; */ namespace rpc { +extern const char kReplicationMetadataFieldName[]; + /** * Returns an empty metadata object. */ |