summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorEsha Maharishi <esha.maharishi@mongodb.com>2016-04-06 12:00:02 -0400
committerEsha Maharishi <esha.maharishi@mongodb.com>2016-04-13 13:36:30 -0400
commit1fcf28a69c1d365806c56a9225c2acbe804680e0 (patch)
treea7b9a06b1923fbe594eaa3b71ee3cdb92752cbfa /src
parentde8b1060e3af529b2136f289d73666b6be49891b (diff)
downloadmongo-1fcf28a69c1d365806c56a9225c2acbe804680e0.tar.gz
SERVER-23209 maintain config server opTime through ShardingEgressMetadataHook
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/dbcommands.cpp63
-rw-r--r--src/mongo/executor/network_interface_mock.cpp15
-rw-r--r--src/mongo/executor/network_interface_mock.h5
-rw-r--r--src/mongo/s/client/shard_registry.cpp20
-rw-r--r--src/mongo/s/client/shard_registry.h1
-rw-r--r--src/mongo/s/sharding_egress_metadata_hook.cpp66
-rw-r--r--src/mongo/s/sharding_egress_metadata_hook.h4
-rw-r--r--src/mongo/s/sharding_test_fixture.cpp4
8 files changed, 98 insertions, 80 deletions
diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp
index b705399bc54..ea1b9e4fb27 100644
--- a/src/mongo/db/dbcommands.cpp
+++ b/src/mongo/db/dbcommands.cpp
@@ -1224,6 +1224,36 @@ const std::array<StringData, 4> neededFieldNames{LiteParsedQuery::cmdOptionMaxTi
LiteParsedQuery::queryOptionMaxTimeMS};
} // namespace
+void appendOpTimeMetadata(OperationContext* txn,
+ const rpc::RequestInterface& request,
+ BSONObjBuilder* metadataBob) {
+ const bool isShardingAware = ShardingState::get(txn)->enabled();
+ const bool isConfig = serverGlobalParams.clusterRole == ClusterRole::ConfigServer;
+ repl::ReplicationCoordinator* replCoord = repl::getGlobalReplicationCoordinator();
+ const bool isReplSet =
+ replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet;
+
+ if (isReplSet) {
+ // Attach our own last opTime.
+ repl::OpTime lastOpTimeFromClient =
+ repl::ReplClientInfo::forClient(txn->getClient()).getLastOp();
+ replCoord->prepareReplResponseMetadata(request, lastOpTimeFromClient, metadataBob);
+
+ // For commands from mongos, append some info to help getLastError(w) work.
+ // TODO: refactor out of here as part of SERVER-18236
+ if (isShardingAware || isConfig) {
+ rpc::ShardingMetadata(lastOpTimeFromClient, replCoord->getElectionId())
+ .writeToMetadata(metadataBob, request.getProtocol());
+ }
+ }
+
+ // If we're a shard other than the config shard, attach the last configOpTime we know about.
+ if (isShardingAware && !isConfig) {
+ auto opTime = grid.configOpTime();
+ rpc::ConfigServerMetadata(opTime).writeToMetadata(metadataBob);
+ }
+}
+
/**
* this handles
- auth
@@ -1374,15 +1404,10 @@ void Command::execCommand(OperationContext* txn,
->onStaleShardVersion(txn, NamespaceString(sce.getns()), sce.getVersionReceived());
}
- BSONObj metadata = rpc::makeEmptyMetadata();
- if (ShardingState::get(txn)->enabled()) {
- auto opTime = grid.configOpTime();
- BSONObjBuilder metadataBob;
- rpc::ConfigServerMetadata(opTime).writeToMetadata(&metadataBob);
- metadata = metadataBob.obj();
- }
+ BSONObjBuilder metadataBob;
+ appendOpTimeMetadata(txn, request, &metadataBob);
- Command::generateErrorResponse(txn, replyBuilder, e, request, command, metadata);
+ Command::generateErrorResponse(txn, replyBuilder, e, request, command, metadataBob.done());
}
}
@@ -1553,27 +1578,7 @@ bool Command::run(OperationContext* txn,
inPlaceReplyBob.doneFast();
BSONObjBuilder metadataBob;
-
- const bool isShardingAware = ShardingState::get(txn)->enabled();
- bool isReplSet = replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet;
- if (isReplSet) {
- repl::OpTime lastOpTimeFromClient =
- repl::ReplClientInfo::forClient(txn->getClient()).getLastOp();
- replCoord->prepareReplResponseMetadata(request, lastOpTimeFromClient, &metadataBob);
-
- // For commands from mongos, append some info to help getLastError(w) work.
- // TODO: refactor out of here as part of SERVER-18326
- if (isShardingAware || serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
- rpc::ShardingMetadata(lastOpTimeFromClient, replCoord->getElectionId())
- .writeToMetadata(&metadataBob, request.getProtocol());
- }
- }
-
- if (isShardingAware) {
- auto opTime = grid.configOpTime();
- rpc::ConfigServerMetadata(opTime).writeToMetadata(&metadataBob);
- }
-
+ appendOpTimeMetadata(txn, request, &metadataBob);
replyBuilder->setMetadata(metadataBob.done());
return result;
diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp
index 0e5f09c877e..341b2b05afb 100644
--- a/src/mongo/executor/network_interface_mock.cpp
+++ b/src/mongo/executor/network_interface_mock.cpp
@@ -271,6 +271,13 @@ void NetworkInterfaceMock::scheduleResponse(NetworkOperationIterator noi,
while ((insertBefore != _scheduled.end()) && (insertBefore->getResponseDate() <= when)) {
++insertBefore;
}
+
+ // If no RemoteCommandResponse was returned (for example, on a simulated network error), then
+ // do not attempt to run the metadata hook, since there is no returned metadata.
+ if (_metadataHook && response.isOK()) {
+ _metadataHook->readReplyMetadata(noi->getRequest().target, response.getValue().metadata);
+ }
+
noi->setResponse(when, response);
_scheduled.splice(insertBefore, _processing, noi);
}
@@ -432,6 +439,14 @@ void NetworkInterfaceMock::setConnectionHook(std::unique_ptr<NetworkConnectionHo
_hook = std::move(hook);
}
+void NetworkInterfaceMock::setEgressMetadataHook(
+ std::unique_ptr<rpc::EgressMetadataHook> metadataHook) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(!_hasStarted);
+ invariant(!_metadataHook);
+ _metadataHook = std::move(metadataHook);
+}
+
void NetworkInterfaceMock::signalWorkAvailable() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
_waitingToRunMask |= kExecutorThread;
diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h
index 4a9173f1954..787d6415817 100644
--- a/src/mongo/executor/network_interface_mock.h
+++ b/src/mongo/executor/network_interface_mock.h
@@ -36,6 +36,7 @@
#include <vector>
#include "mongo/executor/network_interface.h"
+#include "mongo/rpc/metadata/metadata_hook.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/list.h"
#include "mongo/stdx/mutex.h"
@@ -90,6 +91,7 @@ public:
virtual void waitForWork();
virtual void waitForWorkUntil(Date_t when);
virtual void setConnectionHook(std::unique_ptr<NetworkConnectionHook> hook);
+ virtual void setEgressMetadataHook(std::unique_ptr<rpc::EgressMetadataHook> metadataHook);
virtual void signalWorkAvailable();
virtual Date_t now();
virtual std::string getHostName();
@@ -320,6 +322,9 @@ private:
// The connection hook.
std::unique_ptr<NetworkConnectionHook> _hook; // (R)
+ // The metadata hook.
+ std::unique_ptr<rpc::EgressMetadataHook> _metadataHook; // (R)
+
// The set of hosts we have seen so far. If we see a new host, we will execute the
// ConnectionHook's validation and post-connection logic.
//
diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp
index e83b687473a..2735e9b59c4 100644
--- a/src/mongo/s/client/shard_registry.cpp
+++ b/src/mongo/s/client/shard_registry.cpp
@@ -546,7 +546,10 @@ StatusWith<ShardRegistry::QueryResponse> ShardRegistry::_exhaustiveFindOnConfig(
}
response.opTime = replParseStatus.getValue().getLastOpVisible();
- grid.advanceConfigOpTime(response.opTime);
+
+ // We return the config opTime that was returned for this particular request, but as a
+ // safeguard we ensure our global configOpTime is at least as large as it.
+ invariant(grid.configOpTime() >= response.opTime);
}
for (const BSONObj& doc : data.documents) {
@@ -822,21 +825,6 @@ StatusWith<ShardRegistry::CommandResponse> ShardRegistry::_runCommandWithMetadat
cmdResponse.response = response.data.getOwned();
cmdResponse.metadata = response.metadata.getOwned();
- if (response.metadata.hasField(rpc::kReplSetMetadataFieldName)) {
- auto replParseStatus = rpc::ReplSetMetadata::readFromMetadata(response.metadata);
-
- if (!replParseStatus.isOK()) {
- return replParseStatus.getStatus();
- }
-
- const auto& replMetadata = replParseStatus.getValue();
- cmdResponse.visibleOpTime = replMetadata.getLastOpVisible();
-
- if (shard->isConfig()) {
- grid.advanceConfigOpTime(cmdResponse.visibleOpTime);
- }
- }
-
if (errorsToCheck.count(commandSpecificStatus.code())) {
return commandSpecificStatus;
}
diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h
index be09dc636ff..c057d7c7910 100644
--- a/src/mongo/s/client/shard_registry.h
+++ b/src/mongo/s/client/shard_registry.h
@@ -327,7 +327,6 @@ private:
struct CommandResponse {
BSONObj response;
BSONObj metadata;
- repl::OpTime visibleOpTime;
};
/**
diff --git a/src/mongo/s/sharding_egress_metadata_hook.cpp b/src/mongo/s/sharding_egress_metadata_hook.cpp
index 7cdbdd837ff..2e1b6793346 100644
--- a/src/mongo/s/sharding_egress_metadata_hook.cpp
+++ b/src/mongo/s/sharding_egress_metadata_hook.cpp
@@ -57,8 +57,8 @@ Status ShardingEgressMetadataHook::writeRequestMetadata(bool shardedConnection,
if (!shardedConnection) {
return Status::OK();
}
- auto shard = grid.shardRegistry()->getShardNoReload(target.toString());
- return _writeRequestMetadataForShard(shard, metadataBob);
+ rpc::ConfigServerMetadata(grid.configOpTime()).writeToMetadata(metadataBob);
+ return Status::OK();
} catch (...) {
return exceptionToStatus();
}
@@ -68,32 +68,18 @@ Status ShardingEgressMetadataHook::writeRequestMetadata(const HostAndPort& targe
BSONObjBuilder* metadataBob) {
try {
audit::writeImpersonatedUsersToMetadata(metadataBob);
- auto shard = grid.shardRegistry()->getShardForHostNoReload(target);
- return _writeRequestMetadataForShard(shard, metadataBob);
+ rpc::ConfigServerMetadata(grid.configOpTime()).writeToMetadata(metadataBob);
+ return Status::OK();
} catch (...) {
return exceptionToStatus();
}
}
-Status ShardingEgressMetadataHook::_writeRequestMetadataForShard(shared_ptr<Shard> shard,
- BSONObjBuilder* metadataBob) {
- if (!shard) {
- return Status(ErrorCodes::ShardNotFound,
- str::stream() << "Shard not found for server: " << shard->toString());
- }
- if (shard->isConfig()) {
- return Status::OK();
- }
- rpc::ConfigServerMetadata(grid.configOpTime()).writeToMetadata(metadataBob);
- return Status::OK();
-}
-
Status ShardingEgressMetadataHook::readReplyMetadata(const StringData replySource,
const BSONObj& metadataObj) {
try {
_saveGLEStats(metadataObj, replySource);
- auto shard = grid.shardRegistry()->getShardNoReload(replySource.toString());
- return _readReplyMetadataForShard(shard, metadataObj);
+ return _advanceConfigOptimeFromShard(replySource.toString(), metadataObj);
} catch (...) {
return exceptionToStatus();
}
@@ -103,29 +89,45 @@ Status ShardingEgressMetadataHook::readReplyMetadata(const HostAndPort& replySou
const BSONObj& metadataObj) {
try {
_saveGLEStats(metadataObj, replySource.toString());
- auto shard = grid.shardRegistry()->getShardForHostNoReload(replySource);
- return _readReplyMetadataForShard(shard, metadataObj);
+ return _advanceConfigOptimeFromShard(replySource.toString(), metadataObj);
} catch (...) {
return exceptionToStatus();
}
}
-Status ShardingEgressMetadataHook::_readReplyMetadataForShard(shared_ptr<Shard> shard,
- const BSONObj& metadataObj) {
+Status ShardingEgressMetadataHook::_advanceConfigOptimeFromShard(ShardId shardId,
+ const BSONObj& metadataObj) {
try {
+ auto shard = grid.shardRegistry()->getShardNoReload(shardId);
if (!shard) {
return Status::OK();
}
- // If this host is a known shard of ours, look for a config server optime in the
- // response metadata to use to update our notion of the current config server optime.
- auto responseStatus = rpc::ConfigServerMetadata::readFromMetadata(metadataObj);
- if (!responseStatus.isOK()) {
- return responseStatus.getStatus();
- }
- auto opTime = responseStatus.getValue().getOpTime();
- if (opTime.is_initialized()) {
- grid.advanceConfigOpTime(opTime.get());
+ // Update our notion of the config server opTime from the configOpTime in the response.
+ if (shard->isConfig()) {
+ // Config servers return the config opTime as part of their own metadata.
+ if (metadataObj.hasField(rpc::kReplSetMetadataFieldName)) {
+ auto parseStatus = rpc::ReplSetMetadata::readFromMetadata(metadataObj);
+ if (!parseStatus.isOK()) {
+ return parseStatus.getStatus();
+ }
+
+ const auto& replMetadata = parseStatus.getValue();
+ auto opTime = replMetadata.getLastOpVisible();
+ grid.advanceConfigOpTime(opTime);
+ }
+ } else {
+ // Regular shards return the config opTime as part of ConfigServerMetadata.
+ auto parseStatus = rpc::ConfigServerMetadata::readFromMetadata(metadataObj);
+ if (!parseStatus.isOK()) {
+ return parseStatus.getStatus();
+ }
+
+ const auto& configMetadata = parseStatus.getValue();
+ auto opTime = configMetadata.getOpTime();
+ if (opTime.is_initialized()) {
+ grid.advanceConfigOpTime(opTime.get());
+ }
}
return Status::OK();
} catch (...) {
diff --git a/src/mongo/s/sharding_egress_metadata_hook.h b/src/mongo/s/sharding_egress_metadata_hook.h
index ce3a0887620..06fe41b1ebe 100644
--- a/src/mongo/s/sharding_egress_metadata_hook.h
+++ b/src/mongo/s/sharding_egress_metadata_hook.h
@@ -32,6 +32,7 @@
#include "mongo/base/string_data.h"
#include "mongo/rpc/metadata/metadata_hook.h"
+#include "mongo/s/client/shard.h"
namespace mongo {
@@ -57,8 +58,7 @@ public:
private:
virtual void _saveGLEStats(const BSONObj& metadata, StringData hostString);
- Status _readReplyMetadataForShard(std::shared_ptr<Shard> shard, const BSONObj& metadataObj);
- Status _writeRequestMetadataForShard(std::shared_ptr<Shard> shard, BSONObjBuilder* metadataBob);
+ Status _advanceConfigOptimeFromShard(ShardId shardId, const BSONObj& metadataObj);
};
} // namespace rpc
diff --git a/src/mongo/s/sharding_test_fixture.cpp b/src/mongo/s/sharding_test_fixture.cpp
index f2b168621e6..f8e86017e7c 100644
--- a/src/mongo/s/sharding_test_fixture.cpp
+++ b/src/mongo/s/sharding_test_fixture.cpp
@@ -56,6 +56,7 @@
#include "mongo/s/grid.h"
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/set_shard_version_request.h"
+#include "mongo/s/sharding_egress_metadata_hook.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/stdx/memory.h"
@@ -67,6 +68,7 @@ using executor::NetworkInterfaceMock;
using executor::NetworkTestEnv;
using executor::RemoteCommandRequest;
using executor::RemoteCommandResponse;
+using rpc::ShardingEgressMetadataHook;
using unittest::assertGet;
using std::string;
@@ -91,12 +93,14 @@ void ShardingTestFixture::setUp() {
// Set up executor pool used for most operations.
auto fixedNet = stdx::make_unique<executor::NetworkInterfaceMock>();
+ fixedNet->setEgressMetadataHook(stdx::make_unique<ShardingEgressMetadataHook>());
_mockNetwork = fixedNet.get();
auto fixedExec = makeThreadPoolTestExecutor(std::move(fixedNet));
_networkTestEnv = stdx::make_unique<NetworkTestEnv>(fixedExec.get(), _mockNetwork);
_executor = fixedExec.get();
auto netForPool = stdx::make_unique<executor::NetworkInterfaceMock>();
+ netForPool->setEgressMetadataHook(stdx::make_unique<ShardingEgressMetadataHook>());
auto execForPool = makeThreadPoolTestExecutor(std::move(netForPool));
std::vector<std::unique_ptr<executor::TaskExecutor>> executorsForPool;
executorsForPool.emplace_back(std::move(execForPool));