diff options
author | Esha Maharishi <esha.maharishi@mongodb.com> | 2016-04-06 12:00:02 -0400 |
---|---|---|
committer | Esha Maharishi <esha.maharishi@mongodb.com> | 2016-04-13 13:36:30 -0400 |
commit | 1fcf28a69c1d365806c56a9225c2acbe804680e0 (patch) | |
tree | a7b9a06b1923fbe594eaa3b71ee3cdb92752cbfa /src | |
parent | de8b1060e3af529b2136f289d73666b6be49891b (diff) | |
download | mongo-1fcf28a69c1d365806c56a9225c2acbe804680e0.tar.gz |
SERVER-23209 maintain config server opTime through ShardingEgressMetadataHook
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/dbcommands.cpp | 63 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_mock.cpp | 15 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_mock.h | 5 | ||||
-rw-r--r-- | src/mongo/s/client/shard_registry.cpp | 20 | ||||
-rw-r--r-- | src/mongo/s/client/shard_registry.h | 1 | ||||
-rw-r--r-- | src/mongo/s/sharding_egress_metadata_hook.cpp | 66 | ||||
-rw-r--r-- | src/mongo/s/sharding_egress_metadata_hook.h | 4 | ||||
-rw-r--r-- | src/mongo/s/sharding_test_fixture.cpp | 4 |
8 files changed, 98 insertions, 80 deletions
diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp index b705399bc54..ea1b9e4fb27 100644 --- a/src/mongo/db/dbcommands.cpp +++ b/src/mongo/db/dbcommands.cpp @@ -1224,6 +1224,36 @@ const std::array<StringData, 4> neededFieldNames{LiteParsedQuery::cmdOptionMaxTi LiteParsedQuery::queryOptionMaxTimeMS}; } // namespace +void appendOpTimeMetadata(OperationContext* txn, + const rpc::RequestInterface& request, + BSONObjBuilder* metadataBob) { + const bool isShardingAware = ShardingState::get(txn)->enabled(); + const bool isConfig = serverGlobalParams.clusterRole == ClusterRole::ConfigServer; + repl::ReplicationCoordinator* replCoord = repl::getGlobalReplicationCoordinator(); + const bool isReplSet = + replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; + + if (isReplSet) { + // Attach our own last opTime. + repl::OpTime lastOpTimeFromClient = + repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(); + replCoord->prepareReplResponseMetadata(request, lastOpTimeFromClient, metadataBob); + + // For commands from mongos, append some info to help getLastError(w) work. + // TODO: refactor out of here as part of SERVER-18236 + if (isShardingAware || isConfig) { + rpc::ShardingMetadata(lastOpTimeFromClient, replCoord->getElectionId()) + .writeToMetadata(metadataBob, request.getProtocol()); + } + } + + // If we're a shard other than the config shard, attach the last configOpTime we know about. + if (isShardingAware && !isConfig) { + auto opTime = grid.configOpTime(); + rpc::ConfigServerMetadata(opTime).writeToMetadata(metadataBob); + } +} + /** * this handles - auth @@ -1374,15 +1404,10 @@ void Command::execCommand(OperationContext* txn, ->onStaleShardVersion(txn, NamespaceString(sce.getns()), sce.getVersionReceived()); } - BSONObj metadata = rpc::makeEmptyMetadata(); - if (ShardingState::get(txn)->enabled()) { - auto opTime = grid.configOpTime(); - BSONObjBuilder metadataBob; - rpc::ConfigServerMetadata(opTime).writeToMetadata(&metadataBob); - metadata = metadataBob.obj(); - } + BSONObjBuilder metadataBob; + appendOpTimeMetadata(txn, request, &metadataBob); - Command::generateErrorResponse(txn, replyBuilder, e, request, command, metadata); + Command::generateErrorResponse(txn, replyBuilder, e, request, command, metadataBob.done()); } } @@ -1553,27 +1578,7 @@ bool Command::run(OperationContext* txn, inPlaceReplyBob.doneFast(); BSONObjBuilder metadataBob; - - const bool isShardingAware = ShardingState::get(txn)->enabled(); - bool isReplSet = replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; - if (isReplSet) { - repl::OpTime lastOpTimeFromClient = - repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(); - replCoord->prepareReplResponseMetadata(request, lastOpTimeFromClient, &metadataBob); - - // For commands from mongos, append some info to help getLastError(w) work. - // TODO: refactor out of here as part of SERVER-18326 - if (isShardingAware || serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { - rpc::ShardingMetadata(lastOpTimeFromClient, replCoord->getElectionId()) - .writeToMetadata(&metadataBob, request.getProtocol()); - } - } - - if (isShardingAware) { - auto opTime = grid.configOpTime(); - rpc::ConfigServerMetadata(opTime).writeToMetadata(&metadataBob); - } - + appendOpTimeMetadata(txn, request, &metadataBob); replyBuilder->setMetadata(metadataBob.done()); return result; diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp index 0e5f09c877e..341b2b05afb 100644 --- a/src/mongo/executor/network_interface_mock.cpp +++ b/src/mongo/executor/network_interface_mock.cpp @@ -271,6 +271,13 @@ void NetworkInterfaceMock::scheduleResponse(NetworkOperationIterator noi, while ((insertBefore != _scheduled.end()) && (insertBefore->getResponseDate() <= when)) { ++insertBefore; } + + // If no RemoteCommandResponse was returned (for example, on a simulated network error), then + // do not attempt to run the metadata hook, since there is no returned metadata. + if (_metadataHook && response.isOK()) { + _metadataHook->readReplyMetadata(noi->getRequest().target, response.getValue().metadata); + } + noi->setResponse(when, response); _scheduled.splice(insertBefore, _processing, noi); } @@ -432,6 +439,14 @@ void NetworkInterfaceMock::setConnectionHook(std::unique_ptr<NetworkConnectionHo _hook = std::move(hook); } +void NetworkInterfaceMock::setEgressMetadataHook( + std::unique_ptr<rpc::EgressMetadataHook> metadataHook) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(!_hasStarted); + invariant(!_metadataHook); + _metadataHook = std::move(metadataHook); +} + void NetworkInterfaceMock::signalWorkAvailable() { stdx::lock_guard<stdx::mutex> lk(_mutex); _waitingToRunMask |= kExecutorThread; diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h index 4a9173f1954..787d6415817 100644 --- a/src/mongo/executor/network_interface_mock.h +++ b/src/mongo/executor/network_interface_mock.h @@ -36,6 +36,7 @@ #include <vector> #include "mongo/executor/network_interface.h" +#include "mongo/rpc/metadata/metadata_hook.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/list.h" #include "mongo/stdx/mutex.h" @@ -90,6 +91,7 @@ public: virtual void waitForWork(); virtual void waitForWorkUntil(Date_t when); virtual void setConnectionHook(std::unique_ptr<NetworkConnectionHook> hook); + virtual void setEgressMetadataHook(std::unique_ptr<rpc::EgressMetadataHook> metadataHook); virtual void signalWorkAvailable(); virtual Date_t now(); virtual std::string getHostName(); @@ -320,6 +322,9 @@ private: // The connection hook. std::unique_ptr<NetworkConnectionHook> _hook; // (R) + // The metadata hook. + std::unique_ptr<rpc::EgressMetadataHook> _metadataHook; // (R) + // The set of hosts we have seen so far. If we see a new host, we will execute the // ConnectionHook's validation and post-connection logic. // diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp index e83b687473a..2735e9b59c4 100644 --- a/src/mongo/s/client/shard_registry.cpp +++ b/src/mongo/s/client/shard_registry.cpp @@ -546,7 +546,10 @@ StatusWith<ShardRegistry::QueryResponse> ShardRegistry::_exhaustiveFindOnConfig( } response.opTime = replParseStatus.getValue().getLastOpVisible(); - grid.advanceConfigOpTime(response.opTime); + + // We return the config opTime that was returned for this particular request, but as a + // safeguard we ensure our global configOpTime is at least as large as it. + invariant(grid.configOpTime() >= response.opTime); } for (const BSONObj& doc : data.documents) { @@ -822,21 +825,6 @@ StatusWith<ShardRegistry::CommandResponse> ShardRegistry::_runCommandWithMetadat cmdResponse.response = response.data.getOwned(); cmdResponse.metadata = response.metadata.getOwned(); - if (response.metadata.hasField(rpc::kReplSetMetadataFieldName)) { - auto replParseStatus = rpc::ReplSetMetadata::readFromMetadata(response.metadata); - - if (!replParseStatus.isOK()) { - return replParseStatus.getStatus(); - } - - const auto& replMetadata = replParseStatus.getValue(); - cmdResponse.visibleOpTime = replMetadata.getLastOpVisible(); - - if (shard->isConfig()) { - grid.advanceConfigOpTime(cmdResponse.visibleOpTime); - } - } - if (errorsToCheck.count(commandSpecificStatus.code())) { return commandSpecificStatus; } diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h index be09dc636ff..c057d7c7910 100644 --- a/src/mongo/s/client/shard_registry.h +++ b/src/mongo/s/client/shard_registry.h @@ -327,7 +327,6 @@ private: struct CommandResponse { BSONObj response; BSONObj metadata; - repl::OpTime visibleOpTime; }; /** diff --git a/src/mongo/s/sharding_egress_metadata_hook.cpp b/src/mongo/s/sharding_egress_metadata_hook.cpp index 7cdbdd837ff..2e1b6793346 100644 --- a/src/mongo/s/sharding_egress_metadata_hook.cpp +++ b/src/mongo/s/sharding_egress_metadata_hook.cpp @@ -57,8 +57,8 @@ Status ShardingEgressMetadataHook::writeRequestMetadata(bool shardedConnection, if (!shardedConnection) { return Status::OK(); } - auto shard = grid.shardRegistry()->getShardNoReload(target.toString()); - return _writeRequestMetadataForShard(shard, metadataBob); + rpc::ConfigServerMetadata(grid.configOpTime()).writeToMetadata(metadataBob); + return Status::OK(); } catch (...) { return exceptionToStatus(); } @@ -68,32 +68,18 @@ Status ShardingEgressMetadataHook::writeRequestMetadata(const HostAndPort& targe BSONObjBuilder* metadataBob) { try { audit::writeImpersonatedUsersToMetadata(metadataBob); - auto shard = grid.shardRegistry()->getShardForHostNoReload(target); - return _writeRequestMetadataForShard(shard, metadataBob); + rpc::ConfigServerMetadata(grid.configOpTime()).writeToMetadata(metadataBob); + return Status::OK(); } catch (...) { return exceptionToStatus(); } } -Status ShardingEgressMetadataHook::_writeRequestMetadataForShard(shared_ptr<Shard> shard, - BSONObjBuilder* metadataBob) { - if (!shard) { - return Status(ErrorCodes::ShardNotFound, - str::stream() << "Shard not found for server: " << shard->toString()); - } - if (shard->isConfig()) { - return Status::OK(); - } - rpc::ConfigServerMetadata(grid.configOpTime()).writeToMetadata(metadataBob); - return Status::OK(); -} - Status ShardingEgressMetadataHook::readReplyMetadata(const StringData replySource, const BSONObj& metadataObj) { try { _saveGLEStats(metadataObj, replySource); - auto shard = grid.shardRegistry()->getShardNoReload(replySource.toString()); - return _readReplyMetadataForShard(shard, metadataObj); + return _advanceConfigOptimeFromShard(replySource.toString(), metadataObj); } catch (...) { return exceptionToStatus(); } @@ -103,29 +89,45 @@ Status ShardingEgressMetadataHook::readReplyMetadata(const HostAndPort& replySou const BSONObj& metadataObj) { try { _saveGLEStats(metadataObj, replySource.toString()); - auto shard = grid.shardRegistry()->getShardForHostNoReload(replySource); - return _readReplyMetadataForShard(shard, metadataObj); + return _advanceConfigOptimeFromShard(replySource.toString(), metadataObj); } catch (...) { return exceptionToStatus(); } } -Status ShardingEgressMetadataHook::_readReplyMetadataForShard(shared_ptr<Shard> shard, - const BSONObj& metadataObj) { +Status ShardingEgressMetadataHook::_advanceConfigOptimeFromShard(ShardId shardId, + const BSONObj& metadataObj) { try { + auto shard = grid.shardRegistry()->getShardNoReload(shardId); if (!shard) { return Status::OK(); } - // If this host is a known shard of ours, look for a config server optime in the - // response metadata to use to update our notion of the current config server optime. - auto responseStatus = rpc::ConfigServerMetadata::readFromMetadata(metadataObj); - if (!responseStatus.isOK()) { - return responseStatus.getStatus(); - } - auto opTime = responseStatus.getValue().getOpTime(); - if (opTime.is_initialized()) { - grid.advanceConfigOpTime(opTime.get()); + // Update our notion of the config server opTime from the configOpTime in the response. + if (shard->isConfig()) { + // Config servers return the config opTime as part of their own metadata. + if (metadataObj.hasField(rpc::kReplSetMetadataFieldName)) { + auto parseStatus = rpc::ReplSetMetadata::readFromMetadata(metadataObj); + if (!parseStatus.isOK()) { + return parseStatus.getStatus(); + } + + const auto& replMetadata = parseStatus.getValue(); + auto opTime = replMetadata.getLastOpVisible(); + grid.advanceConfigOpTime(opTime); + } + } else { + // Regular shards return the config opTime as part of ConfigServerMetadata. + auto parseStatus = rpc::ConfigServerMetadata::readFromMetadata(metadataObj); + if (!parseStatus.isOK()) { + return parseStatus.getStatus(); + } + + const auto& configMetadata = parseStatus.getValue(); + auto opTime = configMetadata.getOpTime(); + if (opTime.is_initialized()) { + grid.advanceConfigOpTime(opTime.get()); + } } return Status::OK(); } catch (...) { diff --git a/src/mongo/s/sharding_egress_metadata_hook.h b/src/mongo/s/sharding_egress_metadata_hook.h index ce3a0887620..06fe41b1ebe 100644 --- a/src/mongo/s/sharding_egress_metadata_hook.h +++ b/src/mongo/s/sharding_egress_metadata_hook.h @@ -32,6 +32,7 @@ #include "mongo/base/string_data.h" #include "mongo/rpc/metadata/metadata_hook.h" +#include "mongo/s/client/shard.h" namespace mongo { @@ -57,8 +58,7 @@ public: private: virtual void _saveGLEStats(const BSONObj& metadata, StringData hostString); - Status _readReplyMetadataForShard(std::shared_ptr<Shard> shard, const BSONObj& metadataObj); - Status _writeRequestMetadataForShard(std::shared_ptr<Shard> shard, BSONObjBuilder* metadataBob); + Status _advanceConfigOptimeFromShard(ShardId shardId, const BSONObj& metadataObj); }; } // namespace rpc diff --git a/src/mongo/s/sharding_test_fixture.cpp b/src/mongo/s/sharding_test_fixture.cpp index f2b168621e6..f8e86017e7c 100644 --- a/src/mongo/s/sharding_test_fixture.cpp +++ b/src/mongo/s/sharding_test_fixture.cpp @@ -56,6 +56,7 @@ #include "mongo/s/grid.h" #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/set_shard_version_request.h" +#include "mongo/s/sharding_egress_metadata_hook.h" #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batched_command_response.h" #include "mongo/stdx/memory.h" @@ -67,6 +68,7 @@ using executor::NetworkInterfaceMock; using executor::NetworkTestEnv; using executor::RemoteCommandRequest; using executor::RemoteCommandResponse; +using rpc::ShardingEgressMetadataHook; using unittest::assertGet; using std::string; @@ -91,12 +93,14 @@ void ShardingTestFixture::setUp() { // Set up executor pool used for most operations. auto fixedNet = stdx::make_unique<executor::NetworkInterfaceMock>(); + fixedNet->setEgressMetadataHook(stdx::make_unique<ShardingEgressMetadataHook>()); _mockNetwork = fixedNet.get(); auto fixedExec = makeThreadPoolTestExecutor(std::move(fixedNet)); _networkTestEnv = stdx::make_unique<NetworkTestEnv>(fixedExec.get(), _mockNetwork); _executor = fixedExec.get(); auto netForPool = stdx::make_unique<executor::NetworkInterfaceMock>(); + netForPool->setEgressMetadataHook(stdx::make_unique<ShardingEgressMetadataHook>()); auto execForPool = makeThreadPoolTestExecutor(std::move(netForPool)); std::vector<std::unique_ptr<executor::TaskExecutor>> executorsForPool; executorsForPool.emplace_back(std::move(execForPool)); |