diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-11-09 04:47:48 -0500 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-11-11 02:20:22 -0500 |
commit | efea76aea1b32b56503a1ecb954f8e14b8415eae (patch) | |
tree | fba41ea476b4c423ea09a40d6026d40adf259362 /src/mongo | |
parent | 77fbada9b93dfa474d41216baa19f556e75bc8ca (diff) | |
download | mongo-efea76aea1b32b56503a1ecb954f8e14b8415eae.tar.gz |
SERVER-31873 Make mongos retry findAndModify with txnNumber
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/commands.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/commands/find_and_modify.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/commands/find_cmd.cpp | 17 | ||||
-rw-r--r-- | src/mongo/db/query/find.cpp | 15 | ||||
-rw-r--r-- | src/mongo/db/s/collection_range_deleter.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/s/collection_range_deleter.h | 19 | ||||
-rw-r--r-- | src/mongo/db/s/metadata_manager.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/client/shard.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/client/shard.h | 4 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_find_and_modify_cmd.cpp | 190 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_exec.cpp | 9 |
11 files changed, 126 insertions, 156 deletions
diff --git a/src/mongo/db/commands.cpp b/src/mongo/db/commands.cpp index a32f3060ec2..6c747881e2c 100644 --- a/src/mongo/db/commands.cpp +++ b/src/mongo/db/commands.cpp @@ -438,8 +438,9 @@ BSONObj Command::filterCommandRequestForPassthrough(const BSONObj& cmdObj) { name == "$queryOptions" || // name == "maxTimeMS" || // name == "readConcern" || // - name == "writeConcern" || - name == "lsid" || name == "txnNumber") { + name == "writeConcern" || // + name == "lsid" || // + name == "txnNumber") { // This is the whitelist of generic arguments that commands can be trusted to blindly // forward to the shards. bob.append(elem); diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp index 59c1c387708..dbbd28447bf 100644 --- a/src/mongo/db/commands/find_and_modify.cpp +++ b/src/mongo/db/commands/find_and_modify.cpp @@ -311,7 +311,7 @@ public: auto css = CollectionShardingState::get(opCtx, nsString); css->checkShardVersionOrThrow(opCtx); - Collection* collection = autoColl.getCollection(); + Collection* const collection = autoColl.getCollection(); auto statusWithPlanExecutor = getExecutorUpdate(opCtx, opDebug, collection, &parsedUpdate); if (!statusWithPlanExecutor.isOK()) { diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index e4473be4b48..9d73ecc39fa 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -30,12 +30,8 @@ #include "mongo/platform/basic.h" -#include <memory> - -#include "mongo/base/disallow_copying.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/catalog/collection.h" -#include "mongo/db/catalog/database.h" #include "mongo/db/client.h" #include "mongo/db/clientcursor.h" #include "mongo/db/commands.h" @@ -58,24 +54,18 @@ #include "mongo/util/log.h" namespace mongo { - namespace { -const char kTermField[] = "term"; - -} // namespace +const auto kTermField = "term"_sd; /** * A command for running .find() queries. */ class FindCmd : public BasicCommand { - MONGO_DISALLOW_COPYING(FindCmd); - public: FindCmd() : BasicCommand("find") {} - - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } @@ -107,7 +97,7 @@ public: return LogicalOp::opQuery; } - ReadWriteType getReadWriteType() const { + ReadWriteType getReadWriteType() const override { return ReadWriteType::kRead; } @@ -420,4 +410,5 @@ public: } findCmd; +} // namespace } // namespace mongo diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp index e3419187390..50db4cff04b 100644 --- a/src/mongo/db/query/find.cpp +++ b/src/mongo/db/query/find.cpp @@ -36,7 +36,6 @@ #include "mongo/client/dbclientinterface.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/catalog/collection.h" -#include "mongo/db/catalog/database_holder.h" #include "mongo/db/client.h" #include "mongo/db/clientcursor.h" #include "mongo/db/commands.h" @@ -52,7 +51,7 @@ #include "mongo/db/query/internal_plans.h" #include "mongo/db/query/plan_summary_stats.h" #include "mongo/db/query/query_planner_params.h" -#include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/server_options.h" #include "mongo/db/server_parameters.h" @@ -297,9 +296,8 @@ Message getMore(OperationContext* opCtx, // This checks to make sure the operation is allowed on a replicated node. Since we are not // passing in a query object (necessary to check SlaveOK query option), we allow reads // whether we are PRIMARY or SECONDARY. - Status status = - repl::getGlobalReplicationCoordinator()->checkCanServeReadsFor(opCtx, nss, true); - uassertStatusOK(status); + uassertStatusOK( + repl::ReplicationCoordinator::get(opCtx)->checkCanServeReadsFor(opCtx, nss, true)); } LOG(5) << "Running getMore, cursorid: " << cursorid; @@ -564,10 +562,9 @@ std::string runQuery(OperationContext* opCtx, // uassert if we are not on a primary, and not a secondary with SlaveOk query parameter set. // TODO(SERVER-31293): Don't set slaveOk for reads with a read pref of "primary". - bool slaveOK = qr.isSlaveOk() || qr.hasReadPref(); - Status serveReadsStatus = - repl::getGlobalReplicationCoordinator()->checkCanServeReadsFor(opCtx, nss, slaveOK); - uassertStatusOK(serveReadsStatus); + const bool slaveOK = qr.isSlaveOk() || qr.hasReadPref(); + uassertStatusOK( + repl::ReplicationCoordinator::get(opCtx)->checkCanServeReadsFor(opCtx, nss, slaveOK)); } // We have a parsed query. Time to get the execution plan for it. diff --git a/src/mongo/db/s/collection_range_deleter.cpp b/src/mongo/db/s/collection_range_deleter.cpp index 66ed5783a95..7bf41703968 100644 --- a/src/mongo/db/s/collection_range_deleter.cpp +++ b/src/mongo/db/s/collection_range_deleter.cpp @@ -431,19 +431,16 @@ void CollectionRangeDeleter::_pop(Status result) { // DeleteNotification CollectionRangeDeleter::DeleteNotification::DeleteNotification() - : notification(std::make_shared<Notification<Status>>()) {} + : _notification(std::make_shared<Notification<Status>>()) {} CollectionRangeDeleter::DeleteNotification::DeleteNotification(Status status) - : notification(std::make_shared<Notification<Status>>()) { - notify(status); -} + : _notification(std::make_shared<Notification<Status>>(std::move(status))) {} Status CollectionRangeDeleter::DeleteNotification::waitStatus(OperationContext* opCtx) { try { - return notification->get(opCtx); - } catch (...) { - notification = std::make_shared<Notification<Status>>(); - notify({ErrorCodes::Interrupted, "Wait for range delete request completion interrupted"}); + return _notification->get(opCtx); + } catch (const DBException& ex) { + _notification = std::make_shared<Notification<Status>>(ex.toStatus()); throw; } } diff --git a/src/mongo/db/s/collection_range_deleter.h b/src/mongo/db/s/collection_range_deleter.h index 32bd61e2133..f3b020be971 100644 --- a/src/mongo/db/s/collection_range_deleter.h +++ b/src/mongo/db/s/collection_range_deleter.h @@ -55,7 +55,8 @@ public: * It is an error to destroy a returned CleanupNotification object n unless either n.ready() * is true or n.abandon() has been called. After n.abandon(), n is in a moved-from state. */ - struct DeleteNotification { + class DeleteNotification { + public: DeleteNotification(); DeleteNotification(Status status); @@ -68,12 +69,12 @@ public: DeleteNotification& operator=(DeleteNotification const& notifn) = default; ~DeleteNotification() { - // can be null only if moved from - dassert(!notification || *notification || notification.use_count() == 1); + // Can be null only if moved from + dassert(!_notification || *_notification || _notification.use_count() == 1); } void notify(Status status) const { - notification->set(status); + _notification->set(status); } /** @@ -83,20 +84,20 @@ public: Status waitStatus(OperationContext* opCtx); bool ready() const { - return bool(*notification); + return bool(*_notification); } void abandon() { - notification = nullptr; + _notification = nullptr; } bool operator==(DeleteNotification const& other) const { - return notification == other.notification; + return _notification == other._notification; } bool operator!=(DeleteNotification const& other) const { - return notification != other.notification; + return _notification != other._notification; } private: - std::shared_ptr<Notification<Status>> notification; + std::shared_ptr<Notification<Status>> _notification; }; struct Deletion { diff --git a/src/mongo/db/s/metadata_manager.cpp b/src/mongo/db/s/metadata_manager.cpp index 3ad381e04c4..18c53b50fd1 100644 --- a/src/mongo/db/s/metadata_manager.cpp +++ b/src/mongo/db/s/metadata_manager.cpp @@ -510,8 +510,8 @@ auto MetadataManager::cleanUpRange(ChunkRange const& range, Date_t whenToDelete) // Put it on the oldest metadata permissible; the current one might live a long time. auto& orphans = overlapMetadata->_tracker.orphans; - orphans.emplace_back( - Deletion{ChunkRange(range.getMin().getOwned(), range.getMax().getOwned()), whenToDelete}); + orphans.emplace_back(ChunkRange(range.getMin().getOwned(), range.getMax().getOwned()), + whenToDelete); return orphans.back().notification; } diff --git a/src/mongo/s/client/shard.cpp b/src/mongo/s/client/shard.cpp index f365eb3529d..6b22c20c131 100644 --- a/src/mongo/s/client/shard.cpp +++ b/src/mongo/s/client/shard.cpp @@ -103,10 +103,6 @@ bool Shard::shouldErrorBePropagated(ErrorCodes::Error code) { Shard::Shard(const ShardId& id) : _id(id) {} -const ShardId Shard::getId() const { - return _id; -} - bool Shard::isConfig() const { return _id == "config"; } diff --git a/src/mongo/s/client/shard.h b/src/mongo/s/client/shard.h index 9cbdebce88c..ca1c0a39ffd 100644 --- a/src/mongo/s/client/shard.h +++ b/src/mongo/s/client/shard.h @@ -91,7 +91,9 @@ public: virtual ~Shard() = default; - const ShardId getId() const; + const ShardId& getId() const { + return _id; + } /** * Returns true if this shard object represents the config server. diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp index dc2ab778ce9..86d69329e80 100644 --- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp @@ -34,9 +34,11 @@ #include "mongo/db/commands.h" #include "mongo/db/commands/find_and_modify_common.h" #include "mongo/db/query/collation/collator_factory_interface.h" +#include "mongo/executor/task_executor_pool.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/async_requests_sender.h" #include "mongo/s/balancer_configuration.h" #include "mongo/s/catalog_cache.h" -#include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/cluster_commands_helpers.h" #include "mongo/s/commands/cluster_explain.h" @@ -49,6 +51,29 @@ namespace mongo { namespace { +const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOnly); + +BSONObj getCollation(const BSONObj& cmdObj) { + BSONElement collationElement; + auto status = bsonExtractTypedField(cmdObj, "collation", BSONType::Object, &collationElement); + if (status.isOK()) { + return collationElement.Obj(); + } else if (status != ErrorCodes::NoSuchKey) { + uassertStatusOK(status); + } + + return BSONObj(); +} + +BSONObj getShardKey(OperationContext* opCtx, const ChunkManager& chunkMgr, const BSONObj& query) { + BSONObj shardKey = + uassertStatusOK(chunkMgr.getShardKeyPattern().extractShardKeyFromQuery(opCtx, query)); + uassert(ErrorCodes::ShardKeyNotFound, + "Query for sharded findAndModify must contain the shard key", + !shardKey.isEmpty()); + return shardKey; +} + class FindAndModifyCmd : public BasicCommand { public: FindAndModifyCmd() : BasicCommand("findAndModify", "findandmodify") {} @@ -90,48 +115,26 @@ public: chunkMgr = routingInfo.cm(); const BSONObj query = cmdObj.getObjectField("query"); + const BSONObj collation = getCollation(cmdObj); + const BSONObj shardKey = getShardKey(opCtx, *chunkMgr, query); + const auto chunk = chunkMgr->findIntersectingChunk(shardKey, collation); - BSONObj collation; - BSONElement collationElement; - auto collationElementStatus = - bsonExtractTypedField(cmdObj, "collation", BSONType::Object, &collationElement); - if (collationElementStatus.isOK()) { - collation = collationElement.Obj(); - } else if (collationElementStatus != ErrorCodes::NoSuchKey) { - return collationElementStatus; - } - - StatusWith<BSONObj> status = _getShardKey(opCtx, *chunkMgr, query); - if (!status.isOK()) { - return status.getStatus(); - } - - BSONObj shardKey = status.getValue(); - auto chunk = chunkMgr->findIntersectingChunk(shardKey, collation); - - auto shardStatus = - Grid::get(opCtx)->shardRegistry()->getShard(opCtx, chunk->getShardId()); - if (!shardStatus.isOK()) { - return shardStatus.getStatus(); - } - - shard = shardStatus.getValue(); + shard = uassertStatusOK( + Grid::get(opCtx)->shardRegistry()->getShard(opCtx, chunk->getShardId())); } const auto explainCmd = ClusterExplain::wrapAsExplain(cmdObj, verbosity); // Time how long it takes to run the explain command on the shard. Timer timer; - BSONObjBuilder result; - bool ok = _runCommand(opCtx, chunkMgr, shard->getId(), nss, explainCmd, result); - long long millisElapsed = timer.millis(); - - if (!ok) { - BSONObj res = result.obj(); - return Status(ErrorCodes::OperationFailed, - str::stream() << "Explain for findAndModify failed: " << res); - } + _runCommand(opCtx, + shard->getId(), + (chunkMgr ? chunkMgr->getVersion(shard->getId()) : ChunkVersion::UNSHARDED()), + nss, + explainCmd, + &result); + const auto millisElapsed = timer.millis(); Strategy::CommandResult cmdResult; cmdResult.shardTargetId = shard->getId(); @@ -149,96 +152,81 @@ public: const std::string& dbName, const BSONObj& cmdObj, BSONObjBuilder& result) override { - const NamespaceString nss = parseNsCollectionRequired(dbName, cmdObj); + const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj)); // findAndModify should only be creating database if upsert is true, but this would require // that the parsing be pulled into this function. uassertStatusOK(createShardDatabase(opCtx, nss.db())); - auto routingInfo = + const auto routingInfo = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); if (!routingInfo.cm()) { - return _runCommand(opCtx, nullptr, routingInfo.primaryId(), nss, cmdObj, result); + _runCommand( + opCtx, routingInfo.primaryId(), ChunkVersion::UNSHARDED(), nss, cmdObj, &result); + return true; } const auto chunkMgr = routingInfo.cm(); const BSONObj query = cmdObj.getObjectField("query"); + const BSONObj collation = getCollation(cmdObj); + const BSONObj shardKey = getShardKey(opCtx, *chunkMgr, query); + const auto chunk = chunkMgr->findIntersectingChunk(shardKey, collation); + + _runCommand(opCtx, + chunk->getShardId(), + chunkMgr->getVersion(chunk->getShardId()), + nss, + cmdObj, + &result); + updateChunkWriteStatsAndSplitIfNeeded( + opCtx, chunkMgr.get(), chunk.get(), cmdObj.getObjectField("update").objsize()); - BSONObj collation; - BSONElement collationElement; - auto collationElementStatus = - bsonExtractTypedField(cmdObj, "collation", BSONType::Object, &collationElement); - if (collationElementStatus.isOK()) { - collation = collationElement.Obj(); - } else if (collationElementStatus != ErrorCodes::NoSuchKey) { - return appendCommandStatus(result, collationElementStatus); - } - - BSONObj shardKey = uassertStatusOK(_getShardKey(opCtx, *chunkMgr, query)); - - auto chunk = chunkMgr->findIntersectingChunk(shardKey, collation); - - const bool ok = _runCommand(opCtx, chunkMgr, chunk->getShardId(), nss, cmdObj, result); - if (ok) { - updateChunkWriteStatsAndSplitIfNeeded( - opCtx, chunkMgr.get(), chunk.get(), cmdObj.getObjectField("update").objsize()); - } - - return ok; + return true; } private: - static StatusWith<BSONObj> _getShardKey(OperationContext* opCtx, - const ChunkManager& chunkMgr, - const BSONObj& query) { - // Verify that the query has an equality predicate using the shard key - StatusWith<BSONObj> status = - chunkMgr.getShardKeyPattern().extractShardKeyFromQuery(opCtx, query); - - if (!status.isOK()) { - return status; - } - - BSONObj shardKey = status.getValue(); - - if (shardKey.isEmpty()) { - return Status(ErrorCodes::ShardKeyNotFound, - "query for sharded findAndModify must have shardkey"); - } - - return shardKey; - } - - static bool _runCommand(OperationContext* opCtx, - std::shared_ptr<ChunkManager> chunkManager, + static void _runCommand(OperationContext* opCtx, const ShardId& shardId, + const ChunkVersion& shardVersion, const NamespaceString& nss, const BSONObj& cmdObj, - BSONObjBuilder& result) { - BSONObj res; - - const auto shard = - uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId)); - - ShardConnection conn(shard->getConnString(), nss.ns(), chunkManager); - bool ok = - conn->runCommand(nss.db().toString(), filterCommandRequestForPassthrough(cmdObj), res); - conn.done(); - - if (!ok && res.getIntField("code") == ErrorCodes::StaleConfig) { + BSONObjBuilder* result) { + const auto response = [&] { + std::vector<AsyncRequestsSender::Request> requests; + requests.emplace_back( + shardId, + appendShardVersion(filterCommandRequestForPassthrough(cmdObj), shardVersion)); + + AsyncRequestsSender ars(opCtx, + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + nss.db().toString(), + requests, + kPrimaryOnlyReadPreference, + opCtx->getTxnNumber() ? Shard::RetryPolicy::kIdempotent + : Shard::RetryPolicy::kNoRetry); + + auto response = ars.next(); + invariant(ars.done()); + + return uassertStatusOK(std::move(response.swResponse)); + }(); + + uassertStatusOK(response.status); + + const auto responseStatus = getStatusFromCommandResult(response.data); + if (ErrorCodes::isStaleShardingError(responseStatus.code())) { // Command code traps this exception and re-runs - throw StaleConfigException("FindAndModify", res); + throw StaleConfigException("findAndModify", response.data); } - // First append the properly constructed writeConcernError. It will then be skipped - // in appendElementsUnique. - if (auto wcErrorElem = res["writeConcernError"]) { - appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, result); + // First append the properly constructed writeConcernError. It will then be skipped in + // appendElementsUnique. + if (auto wcErrorElem = response.data["writeConcernError"]) { + appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, *result); } - result.appendElementsUnique(filterCommandReplyForPassthrough(res)); - return ok; + result->appendElementsUnique(filterCommandReplyForPassthrough(response.data)); } } findAndModifyCmd; diff --git a/src/mongo/s/write_ops/batch_write_exec.cpp b/src/mongo/s/write_ops/batch_write_exec.cpp index 7b31f8cd1f2..a58ff30da32 100644 --- a/src/mongo/s/write_ops/batch_write_exec.cpp +++ b/src/mongo/s/write_ops/batch_write_exec.cpp @@ -49,6 +49,8 @@ namespace mongo { namespace { +const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOnly); + // // Map which allows associating ConnectionString hosts with TargetedWriteBatches // This is needed since the dispatcher only returns hosts with responses. @@ -203,16 +205,11 @@ void BatchWriteExec::executeBatch(OperationContext* opCtx, pendingBatches.insert(std::make_pair(targetShardId, nextBatch)); } - // - // Send the requests. - // - - const ReadPreferenceSetting readPref(ReadPreference::PrimaryOnly, TagSet()); AsyncRequestsSender ars(opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), clientRequest.getTargetingNS().db().toString(), requests, - readPref, + kPrimaryOnlyReadPreference, opCtx->getTxnNumber() ? Shard::RetryPolicy::kIdempotent : Shard::RetryPolicy::kNoRetry); numSent += pendingBatches.size(); |