diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2018-06-22 15:21:18 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2018-06-27 15:19:13 -0400 |
commit | 60559a00b81293184922b3418a8e56610edf8dd9 (patch) | |
tree | 4d74eaf849b70303f26aeb5ee91742e45a1a39b4 /src | |
parent | e7a75ec01e4e3683cc6b83e3bbc0f4c4b05168dc (diff) | |
download | mongo-60559a00b81293184922b3418a8e56610edf8dd9.tar.gz |
SERVER-32198 Add support for an optional `vWanted` to StaleConfigInfo
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/s/collection_sharding_state.cpp | 179 | ||||
-rw-r--r-- | src/mongo/db/s/collection_sharding_state.h | 28 | ||||
-rw-r--r-- | src/mongo/rpc/legacy_reply.cpp | 18 | ||||
-rw-r--r-- | src/mongo/s/client/parallel.cpp | 10 | ||||
-rw-r--r-- | src/mongo/s/commands/strategy.cpp | 30 | ||||
-rw-r--r-- | src/mongo/s/ns_targeter.h | 5 | ||||
-rw-r--r-- | src/mongo/s/stale_exception.cpp | 10 | ||||
-rw-r--r-- | src/mongo/s/stale_exception.h | 6 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_exec.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_exec_test.cpp | 8 | ||||
-rw-r--r-- | src/mongo/s/write_ops/chunk_manager_targeter.cpp | 10 | ||||
-rw-r--r-- | src/mongo/s/write_ops/chunk_manager_targeter.h | 3 | ||||
-rw-r--r-- | src/mongo/s/write_ops/mock_ns_targeter.h | 3 |
13 files changed, 136 insertions, 178 deletions
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index 1c8bd64828e..fc925d5d153 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -247,13 +247,89 @@ void CollectionShardingState::exitCriticalSection(OperationContext* opCtx) { } void CollectionShardingState::checkShardVersionOrThrow(OperationContext* opCtx) { - std::string errmsg; - ChunkVersion received; - ChunkVersion wanted; - if (!_checkShardVersionOk(opCtx, &errmsg, &received, &wanted)) { - uasserted(StaleConfigInfo(_nss, received, wanted), - str::stream() << "shard version not ok: " << errmsg); + auto& oss = OperationShardingState::get(opCtx); + + const auto receivedShardVersion = [&] { + // If there is a version attached to the OperationContext, use it as the received version, + // otherwise get the received version from the ShardedConnectionInfo + if (oss.hasShardVersion()) { + return oss.getShardVersion(_nss); + } else if (auto const info = ShardedConnectionInfo::get(opCtx->getClient(), false)) { + auto connectionShardVersion = info->getVersion(_nss.ns()); + + // For backwards compatibility with map/reduce, which can access up to 2 sharded + // collections in a single call, the lack of version for a namespace on the collection + // must be treated as UNSHARDED + return connectionShardVersion.value_or(ChunkVersion::UNSHARDED()); + } else { + // There is no shard version information on either 'opCtx' or 'client'. This means that + // the operation represented by 'opCtx' is unversioned, and the shard version is always + // OK for unversioned operations. + return ChunkVersion::IGNORED(); + } + }(); + + if (ChunkVersion::isIgnoredVersion(receivedShardVersion)) { + return; + } + + // An operation with read concern 'available' should never have shardVersion set. + invariant(repl::ReadConcernArgs::get(opCtx).getLevel() != + repl::ReadConcernLevel::kAvailableReadConcern); + + // Set this for error messaging purposes before potentially returning false. + auto metadata = getMetadata(opCtx); + const auto wantedShardVersion = + metadata ? metadata->getShardVersion() : ChunkVersion::UNSHARDED(); + + auto criticalSectionSignal = _critSec.getSignal(opCtx->lockState()->isWriteLocked() + ? ShardingMigrationCriticalSection::kWrite + : ShardingMigrationCriticalSection::kRead); + if (criticalSectionSignal) { + // Set migration critical section on operation sharding state: operation will wait for the + // migration to finish before returning failure and retrying. + oss.setMigrationCriticalSectionSignal(criticalSectionSignal); + + uasserted(StaleConfigInfo(_nss, receivedShardVersion, wantedShardVersion), + str::stream() << "migration commit in progress for " << _nss.ns()); } + + if (receivedShardVersion.isWriteCompatibleWith(wantedShardVersion)) { + return; + } + + // + // Figure out exactly why not compatible, send appropriate error message + // The versions themselves are returned in the error, so not needed in messages here + // + + StaleConfigInfo sci(_nss, receivedShardVersion, wantedShardVersion); + + uassert(std::move(sci), + str::stream() << "epoch mismatch detected for " << _nss.ns() << ", " + << "the collection may have been dropped and recreated", + wantedShardVersion.epoch() == receivedShardVersion.epoch()); + + if (!wantedShardVersion.isSet() && receivedShardVersion.isSet()) { + uasserted(std::move(sci), + str::stream() << "this shard no longer contains chunks for " << _nss.ns() << ", " + << "the collection may have been dropped"); + } + + if (wantedShardVersion.isSet() && !receivedShardVersion.isSet()) { + uasserted(std::move(sci), + str::stream() << "this shard contains chunks for " << _nss.ns() << ", " + << "but the client expects unsharded collection"); + } + + if (wantedShardVersion.majorVersion() != receivedShardVersion.majorVersion()) { + // Could be > or < - wanted is > if this is the source of a migration, wanted < if this is + // the target of a migration + uasserted(std::move(sci), str::stream() << "version mismatch detected for " << _nss.ns()); + } + + // Those are all the reasons the versions can mismatch + MONGO_UNREACHABLE; } // Call with collection unlocked. Note that the CollectionShardingState object involved might not @@ -311,95 +387,4 @@ boost::optional<ChunkRange> CollectionShardingState::getNextOrphanRange(BSONObj return _metadataManager->getNextOrphanRange(from); } -bool CollectionShardingState::_checkShardVersionOk(OperationContext* opCtx, - std::string* errmsg, - ChunkVersion* expectedShardVersion, - ChunkVersion* actualShardVersion) { - auto& oss = OperationShardingState::get(opCtx); - - // If there is a version attached to the OperationContext, use it as the received version. - // Otherwise, get the received version from the ShardedConnectionInfo. - if (oss.hasShardVersion()) { - *expectedShardVersion = oss.getShardVersion(_nss); - } else { - auto const info = ShardedConnectionInfo::get(opCtx->getClient(), false); - if (!info) { - // There is no shard version information on either 'opCtx' or 'client'. This means that - // the operation represented by 'opCtx' is unversioned, and the shard version is always - // OK for unversioned operations. - return true; - } - - auto connectionExpectedShardVersion = info->getVersion(_nss.ns()); - if (!connectionExpectedShardVersion) { - *expectedShardVersion = ChunkVersion::UNSHARDED(); - } else { - *expectedShardVersion = std::move(*connectionExpectedShardVersion); - } - } - - // An operation with read concern 'available' should never have shardVersion set. - invariant(repl::ReadConcernArgs::get(opCtx).getLevel() != - repl::ReadConcernLevel::kAvailableReadConcern); - - if (ChunkVersion::isIgnoredVersion(*expectedShardVersion)) { - return true; - } - - // Set this for error messaging purposes before potentially returning false. - auto metadata = getMetadata(opCtx); - *actualShardVersion = metadata ? metadata->getShardVersion() : ChunkVersion::UNSHARDED(); - - auto criticalSectionSignal = _critSec.getSignal(opCtx->lockState()->isWriteLocked() - ? ShardingMigrationCriticalSection::kWrite - : ShardingMigrationCriticalSection::kRead); - if (criticalSectionSignal) { - *errmsg = str::stream() << "migration commit in progress for " << _nss.ns(); - - // Set migration critical section on operation sharding state: operation will wait for - // the migration to finish before returning failure and retrying. - oss.setMigrationCriticalSectionSignal(criticalSectionSignal); - return false; - } - - if (expectedShardVersion->isWriteCompatibleWith(*actualShardVersion)) { - return true; - } - - // - // Figure out exactly why not compatible, send appropriate error message - // The versions themselves are returned in the error, so not needed in messages here - // - - // Check epoch first, to send more meaningful message, since other parameters probably won't - // match either. - if (actualShardVersion->epoch() != expectedShardVersion->epoch()) { - *errmsg = str::stream() << "version epoch mismatch detected for " << _nss.ns() << ", " - << "the collection may have been dropped and recreated"; - return false; - } - - if (!actualShardVersion->isSet() && expectedShardVersion->isSet()) { - *errmsg = str::stream() << "this shard no longer contains chunks for " << _nss.ns() << ", " - << "the collection may have been dropped"; - return false; - } - - if (actualShardVersion->isSet() && !expectedShardVersion->isSet()) { - *errmsg = str::stream() << "this shard contains versioned chunks for " << _nss.ns() << ", " - << "but no version set in request"; - return false; - } - - if (actualShardVersion->majorVersion() != expectedShardVersion->majorVersion()) { - // Could be > or < - wanted is > if this is the source of a migration, wanted < if this is - // the target of a migration - *errmsg = str::stream() << "version mismatch detected for " << _nss.ns(); - return false; - } - - // Those are all the reasons the versions can mismatch - MONGO_UNREACHABLE; -} - } // namespace mongo diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h index 77ab8b4afa1..3af06506280 100644 --- a/src/mongo/db/s/collection_sharding_state.h +++ b/src/mongo/db/s/collection_sharding_state.h @@ -154,13 +154,9 @@ public: } /** - * Checks whether the shard version in the context is compatible with the shard version of the - * collection locally and if not throws StaleConfigException populated with the expected and - * actual versions. - * - * Because StaleConfigException has special semantics in terms of how a sharded command's - * response is constructed, this function should be the only means of checking for shard version - * match. + * Checks whether the shard version in the operation context is compatible with the shard + * version of the collection and if not throws StaleConfigException populated with the received + * and wanted versions. */ void checkShardVersionOrThrow(OperationContext* opCtx); @@ -189,24 +185,6 @@ public: boost::optional<ChunkRange> getNextOrphanRange(BSONObj const& startingFrom); private: - /** - * Checks whether the shard version of the operation matches that of the collection. - * - * opCtx - Operation context from which to retrieve the operation's expected version. - * errmsg (out) - On false return contains an explanatory error message. - * expectedShardVersion (out) - On false return contains the expected collection version on this - * shard. Obtained from the operation sharding state. - * actualShardVersion (out) - On false return contains the actual collection version on this - * shard. Obtained from the collection sharding state. - * - * Returns true if the expected collection version on the shard matches its actual version on - * the shard and false otherwise. Upon false return, the output parameters will be set. - */ - bool _checkShardVersionOk(OperationContext* opCtx, - std::string* errmsg, - ChunkVersion* expectedShardVersion, - ChunkVersion* actualShardVersion); - // Namespace this state belongs to. const NamespaceString _nss; diff --git a/src/mongo/rpc/legacy_reply.cpp b/src/mongo/rpc/legacy_reply.cpp index 20ddf0ba377..d21b880e60c 100644 --- a/src/mongo/rpc/legacy_reply.cpp +++ b/src/mongo/rpc/legacy_reply.cpp @@ -79,12 +79,22 @@ LegacyReply::LegacyReply(const Message* message) { _commandReply.shareOwnershipWith(message->sharedBuffer()); if (_commandReply.firstElementFieldName() == "$err"_sd) { - // Upconvert legacy errors. + // Upconvert legacy errors + auto codeElement = _commandReply["code"]; + int code = codeElement.numberInt(); + if (!code) { + code = ErrorCodes::UnknownError; + } + + auto errmsg = _commandReply.firstElement().String(); + Status status(ErrorCodes::Error(code), errmsg, _commandReply); + BSONObjBuilder bob; - bob.appendAs(_commandReply.firstElement(), "errmsg"); bob.append("ok", 0.0); - if (auto code = _commandReply["code"]) { - bob.append(code); + bob.append("code", status.code()); + bob.append("errmsg", status.reason()); + if (auto extraInfo = status.extraInfo()) { + extraInfo->serialize(&bob); } _commandReply = bob.obj(); } diff --git a/src/mongo/s/client/parallel.cpp b/src/mongo/s/client/parallel.cpp index d4bfddd22df..62d9425ba35 100644 --- a/src/mongo/s/client/parallel.cpp +++ b/src/mongo/s/client/parallel.cpp @@ -612,12 +612,8 @@ void ParallelSortClusteredCursor::startInit(OperationContext* opCtx) { // Our version isn't compatible with the current version anymore on at least one shard, // need to retry immediately NamespaceString staleNS(e->getNss()); - - // For legacy reasons, this may not be set in the exception :-( - if (staleNS.size() == 0) - staleNS = nss; // ns is the *versioned* namespace, be careful of this - _markStaleNS(staleNS, e); + Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(staleNS); LOG(1) << "stale config of ns " << staleNS << " during initialization, will retry" @@ -768,10 +764,6 @@ void ParallelSortClusteredCursor::finishInit(OperationContext* opCtx) { std::string staleNS = e->getNss().ns(); - // For legacy reasons, ns may not always be set in exception :-( - if (staleNS.size() == 0) - staleNS = ns; // ns is versioned namespace, be careful of this - // Will retry all at once staleNSExceptions.emplace(staleNS, e); diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index d0df81713a6..5b82fe3a055 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -390,25 +390,14 @@ void runCommand(OperationContext* opCtx, } }(); - if (staleNs.isEmpty()) { - // This should be impossible but older versions tried incorrectly to handle - // it here. - log() << "Received a stale config error with an empty namespace while " - "executing " - << redact(request.body) << " : " << redact(ex); - throw; - } - // Send setShardVersion on this thread's versioned connections to shards (to support // commands that use the legacy (ShardConnection) versioning protocol). if (!MONGO_FAIL_POINT(doNotRefreshShardsOnRetargettingError)) { ShardConnection::checkMyConnectionVersions(opCtx, staleNs.ns()); } - // Mark collection entry in cache as stale. - if (staleNs.isValid()) { - Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(staleNs); - } + Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(staleNs); + if (canRetry) { continue; } @@ -796,25 +785,14 @@ void Strategy::explainFind(OperationContext* opCtx, } }(); - if (staleNs.isEmpty()) { - // This should be impossible but older versions tried incorrectly to handle - // it here. - log() << "Received a stale config error with an empty namespace while " - "executing " - << redact(explainCmd) << " : " << redact(ex); - throw; - } - // Send setShardVersion on this thread's versioned connections to shards (to support // commands that use the legacy (ShardConnection) versioning protocol). if (!MONGO_FAIL_POINT(doNotRefreshShardsOnRetargettingError)) { ShardConnection::checkMyConnectionVersions(opCtx, staleNs.ns()); } - // Mark collection entry in cache as stale. - if (staleNs.isValid()) { - Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(staleNs); - } + Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(staleNs); + if (canRetry) { continue; } diff --git a/src/mongo/s/ns_targeter.h b/src/mongo/s/ns_targeter.h index f915be42ec4..537bef877a2 100644 --- a/src/mongo/s/ns_targeter.h +++ b/src/mongo/s/ns_targeter.h @@ -30,12 +30,12 @@ #include <vector> -#include "mongo/base/status_with.h" #include "mongo/bson/bsonobj.h" #include "mongo/db/namespace_string.h" #include "mongo/db/ops/write_ops.h" #include "mongo/s/chunk_version.h" #include "mongo/s/shard_id.h" +#include "mongo/s/stale_exception.h" namespace mongo { @@ -142,7 +142,8 @@ public: * * If stale responses are is noted, we must not have noted that we cannot target. */ - virtual void noteStaleResponse(const ShardEndpoint& endpoint, const BSONObj& staleInfo) = 0; + virtual void noteStaleResponse(const ShardEndpoint& endpoint, + const StaleConfigInfo& staleInfo) = 0; /** * Refreshes the targeting metadata for the namespace if needed, based on previously-noted diff --git a/src/mongo/s/stale_exception.cpp b/src/mongo/s/stale_exception.cpp index 8ebe97e75f0..3919bf86f3e 100644 --- a/src/mongo/s/stale_exception.cpp +++ b/src/mongo/s/stale_exception.cpp @@ -39,7 +39,7 @@ namespace { MONGO_INIT_REGISTER_ERROR_EXTRA_INFO(StaleConfigInfo); MONGO_INIT_REGISTER_ERROR_EXTRA_INFO(StaleDbRoutingVersion); -ChunkVersion extractOptionalVersion(const BSONObj& obj, StringData field) { +boost::optional<ChunkVersion> extractOptionalVersion(const BSONObj& obj, StringData field) { auto swChunkVersion = ChunkVersion::parseLegacyWithField(obj, field); if (swChunkVersion == ErrorCodes::NoSuchKey) return ChunkVersion::UNSHARDED(); @@ -51,7 +51,9 @@ ChunkVersion extractOptionalVersion(const BSONObj& obj, StringData field) { void StaleConfigInfo::serialize(BSONObjBuilder* bob) const { bob->append("ns", _nss.ns()); _received.appendLegacyWithField(bob, "vReceived"); - _wanted.appendLegacyWithField(bob, "vWanted"); + if (_wanted) { + _wanted->appendLegacyWithField(bob, "vWanted"); + } } std::shared_ptr<const ErrorExtraInfo> StaleConfigInfo::parse(const BSONObj& obj) { @@ -59,8 +61,8 @@ std::shared_ptr<const ErrorExtraInfo> StaleConfigInfo::parse(const BSONObj& obj) } StaleConfigInfo StaleConfigInfo::parseFromCommandError(const BSONObj& obj) { - return StaleConfigInfo(NamespaceString(obj["ns"].str()), - extractOptionalVersion(obj, "vReceived"), + return StaleConfigInfo(NamespaceString(obj["ns"].String()), + uassertStatusOK(ChunkVersion::parseLegacyWithField(obj, "vReceived")), extractOptionalVersion(obj, "vWanted")); } diff --git a/src/mongo/s/stale_exception.h b/src/mongo/s/stale_exception.h index bfc98c55ec7..7678d9c45b4 100644 --- a/src/mongo/s/stale_exception.h +++ b/src/mongo/s/stale_exception.h @@ -38,7 +38,9 @@ class StaleConfigInfo final : public ErrorExtraInfo { public: static constexpr auto code = ErrorCodes::StaleConfig; - StaleConfigInfo(NamespaceString nss, ChunkVersion received, ChunkVersion wanted) + StaleConfigInfo(NamespaceString nss, + ChunkVersion received, + boost::optional<ChunkVersion> wanted) : _nss(std::move(nss)), _received(received), _wanted(wanted) {} const auto& getNss() const { @@ -60,7 +62,7 @@ public: private: NamespaceString _nss; ChunkVersion _received; - ChunkVersion _wanted; + boost::optional<ChunkVersion> _wanted; }; using StaleConfigException = ExceptionFor<ErrorCodes::StaleConfig>; diff --git a/src/mongo/s/write_ops/batch_write_exec.cpp b/src/mongo/s/write_ops/batch_write_exec.cpp index d9dfe2d788b..94cd978b401 100644 --- a/src/mongo/s/write_ops/batch_write_exec.cpp +++ b/src/mongo/s/write_ops/batch_write_exec.cpp @@ -69,7 +69,9 @@ WriteErrorDetail errorFromStatus(const Status& status) { void noteStaleResponses(const std::vector<ShardError>& staleErrors, NSTargeter* targeter) { for (const auto& error : staleErrors) { targeter->noteStaleResponse( - error.endpoint, error.error.isErrInfoSet() ? error.error.getErrInfo() : BSONObj()); + error.endpoint, + StaleConfigInfo::parseFromCommandError( + error.error.isErrInfoSet() ? error.error.getErrInfo() : BSONObj())); } } diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp index d563fa8a11f..6509ad29e6b 100644 --- a/src/mongo/s/write_ops/batch_write_exec_test.cpp +++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp @@ -142,11 +142,19 @@ public: staleResponse.setStatus(Status::OK()); staleResponse.setN(0); + auto epoch = OID::gen(); + // Report a stale version error for each write in the batch. int i = 0; for (itInserted = inserted.begin(); itInserted != inserted.end(); ++itInserted) { WriteErrorDetail* error = new WriteErrorDetail; error->setStatus({ErrorCodes::StaleShardVersion, "mock stale error"}); + error->setErrInfo([&] { + StaleConfigInfo sci(nss, ChunkVersion(1, 0, epoch), ChunkVersion(2, 0, epoch)); + BSONObjBuilder builder; + sci.serialize(&builder); + return builder.obj(); + }()); error->setIndex(i); staleResponse.addToErrDetails(error); diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.cpp b/src/mongo/s/write_ops/chunk_manager_targeter.cpp index a856c3db9d5..37962c4feef 100644 --- a/src/mongo/s/write_ops/chunk_manager_targeter.cpp +++ b/src/mongo/s/write_ops/chunk_manager_targeter.cpp @@ -688,18 +688,16 @@ void ChunkManagerTargeter::noteCouldNotTarget() { } void ChunkManagerTargeter::noteStaleResponse(const ShardEndpoint& endpoint, - const BSONObj& staleInfo) { + const StaleConfigInfo& staleInfo) { dassert(!_needsTargetingRefresh); ChunkVersion remoteShardVersion; - if (staleInfo["vWanted"].eoo()) { - // If we don't have a vWanted sent, assume the version is higher than our current - // version. + if (!staleInfo.getVersionWanted()) { + // If we don't have a vWanted sent, assume the version is higher than our current version. remoteShardVersion = getShardVersion(*_routingInfo, endpoint.shardName); remoteShardVersion.incMajor(); } else { - remoteShardVersion = - uassertStatusOK(ChunkVersion::parseLegacyWithField(staleInfo, "vWanted")); + remoteShardVersion = *staleInfo.getVersionWanted(); } ShardVersionMap::iterator it = _remoteShardVersions.find(endpoint.shardName); diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.h b/src/mongo/s/write_ops/chunk_manager_targeter.h index 9575d2460f7..abeb2735038 100644 --- a/src/mongo/s/write_ops/chunk_manager_targeter.h +++ b/src/mongo/s/write_ops/chunk_manager_targeter.h @@ -86,7 +86,8 @@ public: void noteCouldNotTarget() override; - void noteStaleResponse(const ShardEndpoint& endpoint, const BSONObj& staleInfo) override; + void noteStaleResponse(const ShardEndpoint& endpoint, + const StaleConfigInfo& staleInfo) override; /** * Replaces the targeting information with the latest information from the cache. If this diff --git a/src/mongo/s/write_ops/mock_ns_targeter.h b/src/mongo/s/write_ops/mock_ns_targeter.h index b546e71a2af..c9ebaa982cd 100644 --- a/src/mongo/s/write_ops/mock_ns_targeter.h +++ b/src/mongo/s/write_ops/mock_ns_targeter.h @@ -118,7 +118,8 @@ public: // No-op } - void noteStaleResponse(const ShardEndpoint& endpoint, const BSONObj& staleInfo) override { + void noteStaleResponse(const ShardEndpoint& endpoint, + const StaleConfigInfo& staleInfo) override { // No-op } |