summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-11-09 04:47:48 -0500
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-11-11 02:20:22 -0500
commitefea76aea1b32b56503a1ecb954f8e14b8415eae (patch)
treefba41ea476b4c423ea09a40d6026d40adf259362 /src
parent77fbada9b93dfa474d41216baa19f556e75bc8ca (diff)
downloadmongo-efea76aea1b32b56503a1ecb954f8e14b8415eae.tar.gz
SERVER-31873 Make mongos retry findAndModify with txnNumber
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/commands.cpp5
-rw-r--r--src/mongo/db/commands/find_and_modify.cpp2
-rw-r--r--src/mongo/db/commands/find_cmd.cpp17
-rw-r--r--src/mongo/db/query/find.cpp15
-rw-r--r--src/mongo/db/s/collection_range_deleter.cpp13
-rw-r--r--src/mongo/db/s/collection_range_deleter.h19
-rw-r--r--src/mongo/db/s/metadata_manager.cpp4
-rw-r--r--src/mongo/s/client/shard.cpp4
-rw-r--r--src/mongo/s/client/shard.h4
-rw-r--r--src/mongo/s/commands/cluster_find_and_modify_cmd.cpp190
-rw-r--r--src/mongo/s/write_ops/batch_write_exec.cpp9
11 files changed, 126 insertions, 156 deletions
diff --git a/src/mongo/db/commands.cpp b/src/mongo/db/commands.cpp
index a32f3060ec2..6c747881e2c 100644
--- a/src/mongo/db/commands.cpp
+++ b/src/mongo/db/commands.cpp
@@ -438,8 +438,9 @@ BSONObj Command::filterCommandRequestForPassthrough(const BSONObj& cmdObj) {
name == "$queryOptions" || //
name == "maxTimeMS" || //
name == "readConcern" || //
- name == "writeConcern" ||
- name == "lsid" || name == "txnNumber") {
+ name == "writeConcern" || //
+ name == "lsid" || //
+ name == "txnNumber") {
// This is the whitelist of generic arguments that commands can be trusted to blindly
// forward to the shards.
bob.append(elem);
diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp
index 59c1c387708..dbbd28447bf 100644
--- a/src/mongo/db/commands/find_and_modify.cpp
+++ b/src/mongo/db/commands/find_and_modify.cpp
@@ -311,7 +311,7 @@ public:
auto css = CollectionShardingState::get(opCtx, nsString);
css->checkShardVersionOrThrow(opCtx);
- Collection* collection = autoColl.getCollection();
+ Collection* const collection = autoColl.getCollection();
auto statusWithPlanExecutor =
getExecutorUpdate(opCtx, opDebug, collection, &parsedUpdate);
if (!statusWithPlanExecutor.isOK()) {
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index e4473be4b48..9d73ecc39fa 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -30,12 +30,8 @@
#include "mongo/platform/basic.h"
-#include <memory>
-
-#include "mongo/base/disallow_copying.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/catalog/collection.h"
-#include "mongo/db/catalog/database.h"
#include "mongo/db/client.h"
#include "mongo/db/clientcursor.h"
#include "mongo/db/commands.h"
@@ -58,24 +54,18 @@
#include "mongo/util/log.h"
namespace mongo {
-
namespace {
-const char kTermField[] = "term";
-
-} // namespace
+const auto kTermField = "term"_sd;
/**
* A command for running .find() queries.
*/
class FindCmd : public BasicCommand {
- MONGO_DISALLOW_COPYING(FindCmd);
-
public:
FindCmd() : BasicCommand("find") {}
-
- virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
return false;
}
@@ -107,7 +97,7 @@ public:
return LogicalOp::opQuery;
}
- ReadWriteType getReadWriteType() const {
+ ReadWriteType getReadWriteType() const override {
return ReadWriteType::kRead;
}
@@ -420,4 +410,5 @@ public:
} findCmd;
+} // namespace
} // namespace mongo
diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp
index e3419187390..50db4cff04b 100644
--- a/src/mongo/db/query/find.cpp
+++ b/src/mongo/db/query/find.cpp
@@ -36,7 +36,6 @@
#include "mongo/client/dbclientinterface.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/catalog/collection.h"
-#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/client.h"
#include "mongo/db/clientcursor.h"
#include "mongo/db/commands.h"
@@ -52,7 +51,7 @@
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/query/plan_summary_stats.h"
#include "mongo/db/query/query_planner_params.h"
-#include "mongo/db/repl/replication_coordinator_global.h"
+#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/server_options.h"
#include "mongo/db/server_parameters.h"
@@ -297,9 +296,8 @@ Message getMore(OperationContext* opCtx,
// This checks to make sure the operation is allowed on a replicated node. Since we are not
// passing in a query object (necessary to check SlaveOK query option), we allow reads
// whether we are PRIMARY or SECONDARY.
- Status status =
- repl::getGlobalReplicationCoordinator()->checkCanServeReadsFor(opCtx, nss, true);
- uassertStatusOK(status);
+ uassertStatusOK(
+ repl::ReplicationCoordinator::get(opCtx)->checkCanServeReadsFor(opCtx, nss, true));
}
LOG(5) << "Running getMore, cursorid: " << cursorid;
@@ -564,10 +562,9 @@ std::string runQuery(OperationContext* opCtx,
// uassert if we are not on a primary, and not a secondary with SlaveOk query parameter set.
// TODO(SERVER-31293): Don't set slaveOk for reads with a read pref of "primary".
- bool slaveOK = qr.isSlaveOk() || qr.hasReadPref();
- Status serveReadsStatus =
- repl::getGlobalReplicationCoordinator()->checkCanServeReadsFor(opCtx, nss, slaveOK);
- uassertStatusOK(serveReadsStatus);
+ const bool slaveOK = qr.isSlaveOk() || qr.hasReadPref();
+ uassertStatusOK(
+ repl::ReplicationCoordinator::get(opCtx)->checkCanServeReadsFor(opCtx, nss, slaveOK));
}
// We have a parsed query. Time to get the execution plan for it.
diff --git a/src/mongo/db/s/collection_range_deleter.cpp b/src/mongo/db/s/collection_range_deleter.cpp
index 66ed5783a95..7bf41703968 100644
--- a/src/mongo/db/s/collection_range_deleter.cpp
+++ b/src/mongo/db/s/collection_range_deleter.cpp
@@ -431,19 +431,16 @@ void CollectionRangeDeleter::_pop(Status result) {
// DeleteNotification
CollectionRangeDeleter::DeleteNotification::DeleteNotification()
- : notification(std::make_shared<Notification<Status>>()) {}
+ : _notification(std::make_shared<Notification<Status>>()) {}
CollectionRangeDeleter::DeleteNotification::DeleteNotification(Status status)
- : notification(std::make_shared<Notification<Status>>()) {
- notify(status);
-}
+ : _notification(std::make_shared<Notification<Status>>(std::move(status))) {}
Status CollectionRangeDeleter::DeleteNotification::waitStatus(OperationContext* opCtx) {
try {
- return notification->get(opCtx);
- } catch (...) {
- notification = std::make_shared<Notification<Status>>();
- notify({ErrorCodes::Interrupted, "Wait for range delete request completion interrupted"});
+ return _notification->get(opCtx);
+ } catch (const DBException& ex) {
+ _notification = std::make_shared<Notification<Status>>(ex.toStatus());
throw;
}
}
diff --git a/src/mongo/db/s/collection_range_deleter.h b/src/mongo/db/s/collection_range_deleter.h
index 32bd61e2133..f3b020be971 100644
--- a/src/mongo/db/s/collection_range_deleter.h
+++ b/src/mongo/db/s/collection_range_deleter.h
@@ -55,7 +55,8 @@ public:
* It is an error to destroy a returned CleanupNotification object n unless either n.ready()
* is true or n.abandon() has been called. After n.abandon(), n is in a moved-from state.
*/
- struct DeleteNotification {
+ class DeleteNotification {
+ public:
DeleteNotification();
DeleteNotification(Status status);
@@ -68,12 +69,12 @@ public:
DeleteNotification& operator=(DeleteNotification const& notifn) = default;
~DeleteNotification() {
- // can be null only if moved from
- dassert(!notification || *notification || notification.use_count() == 1);
+ // Can be null only if moved from
+ dassert(!_notification || *_notification || _notification.use_count() == 1);
}
void notify(Status status) const {
- notification->set(status);
+ _notification->set(status);
}
/**
@@ -83,20 +84,20 @@ public:
Status waitStatus(OperationContext* opCtx);
bool ready() const {
- return bool(*notification);
+ return bool(*_notification);
}
void abandon() {
- notification = nullptr;
+ _notification = nullptr;
}
bool operator==(DeleteNotification const& other) const {
- return notification == other.notification;
+ return _notification == other._notification;
}
bool operator!=(DeleteNotification const& other) const {
- return notification != other.notification;
+ return _notification != other._notification;
}
private:
- std::shared_ptr<Notification<Status>> notification;
+ std::shared_ptr<Notification<Status>> _notification;
};
struct Deletion {
diff --git a/src/mongo/db/s/metadata_manager.cpp b/src/mongo/db/s/metadata_manager.cpp
index 3ad381e04c4..18c53b50fd1 100644
--- a/src/mongo/db/s/metadata_manager.cpp
+++ b/src/mongo/db/s/metadata_manager.cpp
@@ -510,8 +510,8 @@ auto MetadataManager::cleanUpRange(ChunkRange const& range, Date_t whenToDelete)
// Put it on the oldest metadata permissible; the current one might live a long time.
auto& orphans = overlapMetadata->_tracker.orphans;
- orphans.emplace_back(
- Deletion{ChunkRange(range.getMin().getOwned(), range.getMax().getOwned()), whenToDelete});
+ orphans.emplace_back(ChunkRange(range.getMin().getOwned(), range.getMax().getOwned()),
+ whenToDelete);
return orphans.back().notification;
}
diff --git a/src/mongo/s/client/shard.cpp b/src/mongo/s/client/shard.cpp
index f365eb3529d..6b22c20c131 100644
--- a/src/mongo/s/client/shard.cpp
+++ b/src/mongo/s/client/shard.cpp
@@ -103,10 +103,6 @@ bool Shard::shouldErrorBePropagated(ErrorCodes::Error code) {
Shard::Shard(const ShardId& id) : _id(id) {}
-const ShardId Shard::getId() const {
- return _id;
-}
-
bool Shard::isConfig() const {
return _id == "config";
}
diff --git a/src/mongo/s/client/shard.h b/src/mongo/s/client/shard.h
index 9cbdebce88c..ca1c0a39ffd 100644
--- a/src/mongo/s/client/shard.h
+++ b/src/mongo/s/client/shard.h
@@ -91,7 +91,9 @@ public:
virtual ~Shard() = default;
- const ShardId getId() const;
+ const ShardId& getId() const {
+ return _id;
+ }
/**
* Returns true if this shard object represents the config server.
diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
index dc2ab778ce9..86d69329e80 100644
--- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
+++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
@@ -34,9 +34,11 @@
#include "mongo/db/commands.h"
#include "mongo/db/commands/find_and_modify_common.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
+#include "mongo/executor/task_executor_pool.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/s/async_requests_sender.h"
#include "mongo/s/balancer_configuration.h"
#include "mongo/s/catalog_cache.h"
-#include "mongo/s/client/shard_connection.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/commands/cluster_commands_helpers.h"
#include "mongo/s/commands/cluster_explain.h"
@@ -49,6 +51,29 @@
namespace mongo {
namespace {
+const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOnly);
+
+BSONObj getCollation(const BSONObj& cmdObj) {
+ BSONElement collationElement;
+ auto status = bsonExtractTypedField(cmdObj, "collation", BSONType::Object, &collationElement);
+ if (status.isOK()) {
+ return collationElement.Obj();
+ } else if (status != ErrorCodes::NoSuchKey) {
+ uassertStatusOK(status);
+ }
+
+ return BSONObj();
+}
+
+BSONObj getShardKey(OperationContext* opCtx, const ChunkManager& chunkMgr, const BSONObj& query) {
+ BSONObj shardKey =
+ uassertStatusOK(chunkMgr.getShardKeyPattern().extractShardKeyFromQuery(opCtx, query));
+ uassert(ErrorCodes::ShardKeyNotFound,
+ "Query for sharded findAndModify must contain the shard key",
+ !shardKey.isEmpty());
+ return shardKey;
+}
+
class FindAndModifyCmd : public BasicCommand {
public:
FindAndModifyCmd() : BasicCommand("findAndModify", "findandmodify") {}
@@ -90,48 +115,26 @@ public:
chunkMgr = routingInfo.cm();
const BSONObj query = cmdObj.getObjectField("query");
+ const BSONObj collation = getCollation(cmdObj);
+ const BSONObj shardKey = getShardKey(opCtx, *chunkMgr, query);
+ const auto chunk = chunkMgr->findIntersectingChunk(shardKey, collation);
- BSONObj collation;
- BSONElement collationElement;
- auto collationElementStatus =
- bsonExtractTypedField(cmdObj, "collation", BSONType::Object, &collationElement);
- if (collationElementStatus.isOK()) {
- collation = collationElement.Obj();
- } else if (collationElementStatus != ErrorCodes::NoSuchKey) {
- return collationElementStatus;
- }
-
- StatusWith<BSONObj> status = _getShardKey(opCtx, *chunkMgr, query);
- if (!status.isOK()) {
- return status.getStatus();
- }
-
- BSONObj shardKey = status.getValue();
- auto chunk = chunkMgr->findIntersectingChunk(shardKey, collation);
-
- auto shardStatus =
- Grid::get(opCtx)->shardRegistry()->getShard(opCtx, chunk->getShardId());
- if (!shardStatus.isOK()) {
- return shardStatus.getStatus();
- }
-
- shard = shardStatus.getValue();
+ shard = uassertStatusOK(
+ Grid::get(opCtx)->shardRegistry()->getShard(opCtx, chunk->getShardId()));
}
const auto explainCmd = ClusterExplain::wrapAsExplain(cmdObj, verbosity);
// Time how long it takes to run the explain command on the shard.
Timer timer;
-
BSONObjBuilder result;
- bool ok = _runCommand(opCtx, chunkMgr, shard->getId(), nss, explainCmd, result);
- long long millisElapsed = timer.millis();
-
- if (!ok) {
- BSONObj res = result.obj();
- return Status(ErrorCodes::OperationFailed,
- str::stream() << "Explain for findAndModify failed: " << res);
- }
+ _runCommand(opCtx,
+ shard->getId(),
+ (chunkMgr ? chunkMgr->getVersion(shard->getId()) : ChunkVersion::UNSHARDED()),
+ nss,
+ explainCmd,
+ &result);
+ const auto millisElapsed = timer.millis();
Strategy::CommandResult cmdResult;
cmdResult.shardTargetId = shard->getId();
@@ -149,96 +152,81 @@ public:
const std::string& dbName,
const BSONObj& cmdObj,
BSONObjBuilder& result) override {
- const NamespaceString nss = parseNsCollectionRequired(dbName, cmdObj);
+ const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj));
// findAndModify should only be creating database if upsert is true, but this would require
// that the parsing be pulled into this function.
uassertStatusOK(createShardDatabase(opCtx, nss.db()));
- auto routingInfo =
+ const auto routingInfo =
uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
if (!routingInfo.cm()) {
- return _runCommand(opCtx, nullptr, routingInfo.primaryId(), nss, cmdObj, result);
+ _runCommand(
+ opCtx, routingInfo.primaryId(), ChunkVersion::UNSHARDED(), nss, cmdObj, &result);
+ return true;
}
const auto chunkMgr = routingInfo.cm();
const BSONObj query = cmdObj.getObjectField("query");
+ const BSONObj collation = getCollation(cmdObj);
+ const BSONObj shardKey = getShardKey(opCtx, *chunkMgr, query);
+ const auto chunk = chunkMgr->findIntersectingChunk(shardKey, collation);
+
+ _runCommand(opCtx,
+ chunk->getShardId(),
+ chunkMgr->getVersion(chunk->getShardId()),
+ nss,
+ cmdObj,
+ &result);
+ updateChunkWriteStatsAndSplitIfNeeded(
+ opCtx, chunkMgr.get(), chunk.get(), cmdObj.getObjectField("update").objsize());
- BSONObj collation;
- BSONElement collationElement;
- auto collationElementStatus =
- bsonExtractTypedField(cmdObj, "collation", BSONType::Object, &collationElement);
- if (collationElementStatus.isOK()) {
- collation = collationElement.Obj();
- } else if (collationElementStatus != ErrorCodes::NoSuchKey) {
- return appendCommandStatus(result, collationElementStatus);
- }
-
- BSONObj shardKey = uassertStatusOK(_getShardKey(opCtx, *chunkMgr, query));
-
- auto chunk = chunkMgr->findIntersectingChunk(shardKey, collation);
-
- const bool ok = _runCommand(opCtx, chunkMgr, chunk->getShardId(), nss, cmdObj, result);
- if (ok) {
- updateChunkWriteStatsAndSplitIfNeeded(
- opCtx, chunkMgr.get(), chunk.get(), cmdObj.getObjectField("update").objsize());
- }
-
- return ok;
+ return true;
}
private:
- static StatusWith<BSONObj> _getShardKey(OperationContext* opCtx,
- const ChunkManager& chunkMgr,
- const BSONObj& query) {
- // Verify that the query has an equality predicate using the shard key
- StatusWith<BSONObj> status =
- chunkMgr.getShardKeyPattern().extractShardKeyFromQuery(opCtx, query);
-
- if (!status.isOK()) {
- return status;
- }
-
- BSONObj shardKey = status.getValue();
-
- if (shardKey.isEmpty()) {
- return Status(ErrorCodes::ShardKeyNotFound,
- "query for sharded findAndModify must have shardkey");
- }
-
- return shardKey;
- }
-
- static bool _runCommand(OperationContext* opCtx,
- std::shared_ptr<ChunkManager> chunkManager,
+ static void _runCommand(OperationContext* opCtx,
const ShardId& shardId,
+ const ChunkVersion& shardVersion,
const NamespaceString& nss,
const BSONObj& cmdObj,
- BSONObjBuilder& result) {
- BSONObj res;
-
- const auto shard =
- uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId));
-
- ShardConnection conn(shard->getConnString(), nss.ns(), chunkManager);
- bool ok =
- conn->runCommand(nss.db().toString(), filterCommandRequestForPassthrough(cmdObj), res);
- conn.done();
-
- if (!ok && res.getIntField("code") == ErrorCodes::StaleConfig) {
+ BSONObjBuilder* result) {
+ const auto response = [&] {
+ std::vector<AsyncRequestsSender::Request> requests;
+ requests.emplace_back(
+ shardId,
+ appendShardVersion(filterCommandRequestForPassthrough(cmdObj), shardVersion));
+
+ AsyncRequestsSender ars(opCtx,
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ nss.db().toString(),
+ requests,
+ kPrimaryOnlyReadPreference,
+ opCtx->getTxnNumber() ? Shard::RetryPolicy::kIdempotent
+ : Shard::RetryPolicy::kNoRetry);
+
+ auto response = ars.next();
+ invariant(ars.done());
+
+ return uassertStatusOK(std::move(response.swResponse));
+ }();
+
+ uassertStatusOK(response.status);
+
+ const auto responseStatus = getStatusFromCommandResult(response.data);
+ if (ErrorCodes::isStaleShardingError(responseStatus.code())) {
// Command code traps this exception and re-runs
- throw StaleConfigException("FindAndModify", res);
+ throw StaleConfigException("findAndModify", response.data);
}
- // First append the properly constructed writeConcernError. It will then be skipped
- // in appendElementsUnique.
- if (auto wcErrorElem = res["writeConcernError"]) {
- appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, result);
+ // First append the properly constructed writeConcernError. It will then be skipped in
+ // appendElementsUnique.
+ if (auto wcErrorElem = response.data["writeConcernError"]) {
+ appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, *result);
}
- result.appendElementsUnique(filterCommandReplyForPassthrough(res));
- return ok;
+ result->appendElementsUnique(filterCommandReplyForPassthrough(response.data));
}
} findAndModifyCmd;
diff --git a/src/mongo/s/write_ops/batch_write_exec.cpp b/src/mongo/s/write_ops/batch_write_exec.cpp
index 7b31f8cd1f2..a58ff30da32 100644
--- a/src/mongo/s/write_ops/batch_write_exec.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec.cpp
@@ -49,6 +49,8 @@
namespace mongo {
namespace {
+const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOnly);
+
//
// Map which allows associating ConnectionString hosts with TargetedWriteBatches
// This is needed since the dispatcher only returns hosts with responses.
@@ -203,16 +205,11 @@ void BatchWriteExec::executeBatch(OperationContext* opCtx,
pendingBatches.insert(std::make_pair(targetShardId, nextBatch));
}
- //
- // Send the requests.
- //
-
- const ReadPreferenceSetting readPref(ReadPreference::PrimaryOnly, TagSet());
AsyncRequestsSender ars(opCtx,
Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
clientRequest.getTargetingNS().db().toString(),
requests,
- readPref,
+ kPrimaryOnlyReadPreference,
opCtx->getTxnNumber() ? Shard::RetryPolicy::kIdempotent
: Shard::RetryPolicy::kNoRetry);
numSent += pendingBatches.size();