diff options
Diffstat (limited to 'src/mongo')
23 files changed, 433 insertions, 466 deletions
diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp index c1666497b78..32b48d4eb3e 100644 --- a/src/mongo/db/commands/find_and_modify.cpp +++ b/src/mongo/db/commands/find_and_modify.cpp @@ -261,9 +261,7 @@ public: } auto css = CollectionShardingState::get(txn, nsString); - if (css) { - css->checkShardVersionOrThrow(txn); - } + css->checkShardVersionOrThrow(txn); Collection* const collection = autoColl.getCollection(); auto statusWithPlanExecutor = getExecutorDelete(txn, collection, &parsedDelete); @@ -296,9 +294,7 @@ public: } auto css = CollectionShardingState::get(txn, nsString); - if (css) { - css->checkShardVersionOrThrow(txn); - } + css->checkShardVersionOrThrow(txn); Collection* collection = autoColl.getCollection(); auto statusWithPlanExecutor = @@ -380,9 +376,7 @@ public: } auto css = CollectionShardingState::get(txn, nsString); - if (css) { - css->checkShardVersionOrThrow(txn); - } + css->checkShardVersionOrThrow(txn); Status isPrimary = checkCanAcceptWritesForDatabase(nsString); if (!isPrimary.isOK()) { @@ -442,9 +436,7 @@ public: } auto css = CollectionShardingState::get(txn, nsString); - if (css) { - css->checkShardVersionOrThrow(txn); - } + css->checkShardVersionOrThrow(txn); Status isPrimary = checkCanAcceptWritesForDatabase(nsString); if (!isPrimary.isOK()) { diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index 5ecc6330afb..5610c4799db 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -49,11 +49,9 @@ #include "mongo/db/query/find.h" #include "mongo/db/query/find_common.h" #include "mongo/db/query/get_executor.h" -#include "mongo/db/s/operation_sharding_state.h" -#include "mongo/db/s/sharding_state.h" +#include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/server_parameters.h" #include "mongo/db/stats/counters.h" -#include "mongo/s/stale_exception.h" #include "mongo/util/log.h" namespace mongo { @@ -238,29 +236,6 @@ public: } std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue()); - ShardingState* const shardingState = ShardingState::get(txn); - - if (OperationShardingState::get(txn).hasShardVersion() && shardingState->enabled()) { - ChunkVersion receivedVersion = OperationShardingState::get(txn).getShardVersion(nss); - ChunkVersion latestVersion; - // Wait for migration completion to get the correct chunk version. - const int maxTimeoutSec = 30; - int timeoutSec = cq->getParsed().getMaxTimeMS() / 1000; - if (!timeoutSec || timeoutSec > maxTimeoutSec) { - timeoutSec = maxTimeoutSec; - } - - if (!shardingState->waitTillNotInCriticalSection(timeoutSec)) { - uasserted(ErrorCodes::LockTimeout, "Timeout while waiting for migration commit"); - } - - // If the received version is newer than the version cached in 'shardingState', then we - // have to refresh 'shardingState' from the config servers. We do this before acquiring - // locks so that we don't hold locks while waiting on the network. - uassertStatusOK(shardingState->refreshMetadataIfNeeded( - txn, nss.ns(), receivedVersion, &latestVersion)); - } - // Acquire locks. AutoGetCollectionForRead ctx(txn, nss); Collection* collection = ctx.getCollection(); @@ -268,11 +243,6 @@ public: const int dbProfilingLevel = ctx.getDb() ? ctx.getDb()->getProfilingLevel() : serverGlobalParams.defaultProfile; - // It is possible that the sharding version will change during yield while we are - // retrieving a plan executor. If this happens we will throw an error and mongos will - // retry. - const ChunkVersion shardingVersionAtStart = shardingState->getVersion(nss.ns()); - // Get the execution plan for the query. auto statusWithPlanExecutor = getExecutorFind(txn, collection, nss, std::move(cq), PlanExecutor::YIELD_AUTO); @@ -325,17 +295,10 @@ public: << WorkingSetCommon::toStatusString(obj))); } - // TODO: Currently, chunk ranges are kept around until all ClientCursors created while the - // chunk belonged on this node are gone. Separating chunk lifetime management from - // ClientCursor should allow this check to go away. - if (!shardingState->getVersion(nss.ns()).isWriteCompatibleWith(shardingVersionAtStart)) { - // Version changed while retrieving a PlanExecutor. Terminate the operation, - // signaling that mongos should retry. - throw SendStaleConfigException(nss.ns(), - "version changed during find command", - shardingVersionAtStart, - shardingState->getVersion(nss.ns())); - } + // Before saving the cursor, ensure that whatever plan we established happened with the + // expected collection version + auto css = CollectionShardingState::get(txn, nss); + css->checkShardVersionOrThrow(txn); // Set up the cursor for getMore. CursorId cursorId = 0; diff --git a/src/mongo/db/commands/write_commands/batch_executor.cpp b/src/mongo/db/commands/write_commands/batch_executor.cpp index d3fb03727e7..1fa931d239b 100644 --- a/src/mongo/db/commands/write_commands/batch_executor.cpp +++ b/src/mongo/db/commands/write_commands/batch_executor.cpp @@ -65,6 +65,7 @@ #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/server_parameters.h" #include "mongo/db/s/collection_metadata.h" +#include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/sharded_connection_info.h" #include "mongo/db/s/sharding_state.h" @@ -150,14 +151,6 @@ void toBatchError(const Status& status, BatchedCommandResponse* response) { dassert(response->isValid(NULL)); } -void noteInCriticalSection(WriteErrorDetail* staleError) { - BSONObjBuilder builder; - if (staleError->isErrInfoSet()) - builder.appendElements(staleError->getErrInfo()); - builder.append("inCriticalSection", true); - staleError->setErrInfo(builder.obj()); -} - /** * Translates write item type to wire protocol op code. Helper for * WriteBatchExecutor::applyWriteItem(). @@ -194,38 +187,20 @@ void buildStaleError(const ChunkVersion& shardVersionRecvd, bool checkShardVersion(OperationContext* txn, const BatchedCommandRequest& request, WriteOpResult* result) { - const NamespaceString& nss = request.getTargetingNSS(); - dassert(txn->lockState()->isCollectionLockedForMode(nss.ns(), MODE_IX)); - - auto& oss = OperationShardingState::get(txn); + const auto& css = CollectionShardingState::get(txn, request.getTargetingNSS()); - if (!oss.hasShardVersion()) { - return true; - } - - ChunkVersion operationShardVersion = oss.getShardVersion(nss); - if (ChunkVersion::isIgnoredVersion(operationShardVersion)) { + try { + css->checkShardVersionOrThrow(txn); return true; - } - - ShardingState* shardingState = ShardingState::get(txn); - CollectionMetadataPtr metadata = shardingState->getCollectionMetadata(nss.ns()); - ChunkVersion shardVersion = metadata ? metadata->getShardVersion() : ChunkVersion::UNSHARDED(); - - if (!operationShardVersion.isWriteCompatibleWith(shardVersion)) { - result->setError(new WriteErrorDetail); - buildStaleError(operationShardVersion, shardVersion, result->getError()); + } catch (const StaleConfigException& e) { + result->setError(new WriteErrorDetail()); + buildStaleError(e.getVersionReceived(), e.getVersionWanted(), result->getError()); return false; } - - return true; } } // namespace -// TODO: Determine queueing behavior we want here -MONGO_EXPORT_SERVER_PARAMETER(queueForMigrationCommit, bool, true); - WriteBatchExecutor::WriteBatchExecutor(OperationContext* txn, OpCounters* opCounters, LastError* le) : _txn(txn), _opCounters(opCounters), _le(le), _stats(new WriteBatchStats) {} @@ -326,51 +301,15 @@ void WriteBatchExecutor::executeBatch(const BatchedCommandRequest& request, // Refresh metadata if needed // - bool staleBatch = + const bool staleBatch = !writeErrors.empty() && writeErrors.back()->getErrCode() == ErrorCodes::StaleShardVersion; if (staleBatch) { - ShardingState* shardingState = ShardingState::get(_txn); - - auto& oss = OperationShardingState::get(_txn); - ChunkVersion requestShardVersion = oss.getShardVersion(request.getTargetingNSS()); - - // - // First, we refresh metadata if we need to based on the requested version. - // - ChunkVersion latestShardVersion; - shardingState->refreshMetadataIfNeeded( - _txn, request.getTargetingNS(), requestShardVersion, &latestShardVersion); - - // Report if we're still changing our metadata - // TODO: Better reporting per-collection - if (shardingState->inCriticalMigrateSection()) { - noteInCriticalSection(writeErrors.back()); - } + const auto& oss = OperationShardingState::get(_txn); - if (queueForMigrationCommit) { - // - // Queue up for migration to end - this allows us to be sure that clients will - // not repeatedly try to refresh metadata that is not yet written to the config - // server. Not necessary for correctness. - // Exposed as optional parameter to allow testing of queuing behavior with - // different network timings. - // - // Only wait if we're an older version (in the current collection epoch) and - // we're not write compatible, implying that the current migration is affecting - // writes. - // - - if (requestShardVersion.isOlderThan(latestShardVersion) && - !requestShardVersion.isWriteCompatibleWith(latestShardVersion)) { - while (shardingState->inCriticalMigrateSection()) { - log() << "write request to old shard version " << requestShardVersion - << " waiting for migration commit"; - - shardingState->waitTillNotInCriticalSection(10 /* secs */); - } - } - } + ChunkVersion requestedShardVersion = oss.getShardVersion(request.getTargetingNSS()); + ShardingState::get(_txn) + ->onStaleShardVersion(_txn, request.getTargetingNSS(), requestedShardVersion); } // @@ -1035,11 +974,9 @@ static void insertOne(WriteBatchExecutor::ExecInsertsState* state, WriteOpResult } MONGO_WRITE_CONFLICT_RETRY_LOOP_END( txn, "insert", state->getCollection() ? state->getCollection()->ns().ns() : "index"); - } catch (const StaleConfigException& staleExcep) { - result->setError(new WriteErrorDetail); - result->getError()->setErrCode(ErrorCodes::StaleShardVersion); - buildStaleError( - staleExcep.getVersionReceived(), staleExcep.getVersionWanted(), result->getError()); + } catch (const StaleConfigException& e) { + result->setError(new WriteErrorDetail()); + buildStaleError(e.getVersionReceived(), e.getVersionWanted(), result->getError()); } catch (const DBException& ex) { Status status(ex.toStatus()); if (ErrorCodes::isInterruption(status.code())) @@ -1238,7 +1175,7 @@ static void multiUpdate(OperationContext* txn, // fire in that case. lastOpSetterGuard.Dismiss(); } - } catch (const WriteConflictException& dle) { + } catch (const WriteConflictException&) { debug->writeConflicts++; if (isMulti) { log() << "Had WriteConflict during multi update, aborting"; @@ -1246,16 +1183,15 @@ static void multiUpdate(OperationContext* txn, } createCollection = false; + // RESTART LOOP fakeLoop = -1; txn->recoveryUnit()->abandonSnapshot(); WriteConflictException::logAndBackoff(attempt++, "update", nsString.ns()); - } catch (const StaleConfigException& staleExcep) { - result->setError(new WriteErrorDetail); - result->getError()->setErrCode(ErrorCodes::StaleShardVersion); - buildStaleError( - staleExcep.getVersionReceived(), staleExcep.getVersionWanted(), result->getError()); + } catch (const StaleConfigException& e) { + result->setError(new WriteErrorDetail()); + buildStaleError(e.getVersionReceived(), e.getVersionWanted(), result->getError()); } catch (const DBException& ex) { Status status = ex.toStatus(); if (ErrorCodes::isInterruption(status.code())) { @@ -1345,14 +1281,12 @@ static void multiRemove(OperationContext* txn, lastOpSetterGuard.Dismiss(); } break; - } catch (const WriteConflictException& dle) { + } catch (const WriteConflictException&) { CurOp::get(txn)->debug().writeConflicts++; WriteConflictException::logAndBackoff(attempt++, "delete", nss.ns()); - } catch (const StaleConfigException& staleExcep) { - result->setError(new WriteErrorDetail); - result->getError()->setErrCode(ErrorCodes::StaleShardVersion); - buildStaleError( - staleExcep.getVersionReceived(), staleExcep.getVersionWanted(), result->getError()); + } catch (const StaleConfigException& e) { + result->setError(new WriteErrorDetail()); + buildStaleError(e.getVersionReceived(), e.getVersionWanted(), result->getError()); return; } catch (const DBException& ex) { Status status = ex.toStatus(); diff --git a/src/mongo/db/db_raii.cpp b/src/mongo/db/db_raii.cpp index 162017fc0f4..8075c940b1a 100644 --- a/src/mongo/db/db_raii.cpp +++ b/src/mongo/db/db_raii.cpp @@ -102,9 +102,7 @@ AutoGetCollectionForRead::AutoGetCollectionForRead(OperationContext* txn, // We have both the DB and collection locked, which is the prerequisite to do a stable shard // version check, but we'd like to do the check after we have a satisfactory snapshot. auto css = CollectionShardingState::get(txn, nss); - if (css) { - css->checkShardVersionOrThrow(txn); - } + css->checkShardVersionOrThrow(txn); } AutoGetCollectionForRead::~AutoGetCollectionForRead() { @@ -196,9 +194,7 @@ void OldClientContext::_checkNotStale() const { break; default: auto css = CollectionShardingState::get(_txn, _ns); - if (css) { - css->checkShardVersionOrThrow(_txn); - } + css->checkShardVersionOrThrow(_txn); } } diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp index a469d2ebfaf..c2648e1f73b 100644 --- a/src/mongo/db/dbcommands.cpp +++ b/src/mongo/db/dbcommands.cpp @@ -1209,7 +1209,6 @@ void Command::execCommand(OperationContext* txn, std::string dbname = request.getDatabase().toString(); unique_ptr<MaintenanceModeSetter> mmSetter; - std::array<BSONElement, std::tuple_size<decltype(neededFieldNames)>::value> extractedFields{}; request.getCommandArgs().getFields(neededFieldNames, &extractedFields); @@ -1322,7 +1321,14 @@ void Command::execCommand(OperationContext* txn, if (!retval) { command->_commandsFailed.increment(); } - } catch (const DBException& exception) { + } catch (const DBException& e) { + // If we got a stale config, wait in case the operation is stuck in a critical section + if (e.getCode() == ErrorCodes::SendStaleConfig) { + auto& sce = static_cast<const StaleConfigException&>(e); + ShardingState::get(txn) + ->onStaleShardVersion(txn, NamespaceString(sce.getns()), sce.getVersionReceived()); + } + BSONObj metadata = rpc::makeEmptyMetadata(); if (ShardingState::get(txn)->enabled()) { auto opTime = grid.shardRegistry()->getConfigOpTime(); @@ -1331,7 +1337,7 @@ void Command::execCommand(OperationContext* txn, metadata = metadataBob.obj(); } - Command::generateErrorResponse(txn, replyBuilder, exception, request, command, metadata); + Command::generateErrorResponse(txn, replyBuilder, e, request, command, metadata); } } diff --git a/src/mongo/db/instance.cpp b/src/mongo/db/instance.cpp index 10ab281ff04..166a2b5683f 100644 --- a/src/mongo/db/instance.cpp +++ b/src/mongo/db/instance.cpp @@ -79,6 +79,7 @@ #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/sharded_connection_info.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/service_context.h" @@ -399,9 +400,16 @@ static void receivedQuery(OperationContext* txn, uassertStatusOK(status); dbResponse.exhaustNS = runQuery(txn, q, nss, dbResponse.response); - } catch (const AssertionException& exception) { + } catch (const AssertionException& e) { + // If we got a stale config, wait in case the operation is stuck in a critical section + if (e.getCode() == ErrorCodes::SendStaleConfig) { + auto& sce = static_cast<const StaleConfigException&>(e); + ShardingState::get(txn) + ->onStaleShardVersion(txn, NamespaceString(sce.getns()), sce.getVersionReceived()); + } + dbResponse.response.reset(); - generateLegacyQueryErrorResponse(&exception, q, &op, &dbResponse.response); + generateLegacyQueryErrorResponse(&e, q, &op, &dbResponse.response); } op.debug().responseLength = dbResponse.response.header().dataLen(); diff --git a/src/mongo/db/op_observer.cpp b/src/mongo/db/op_observer.cpp index 68f5bfcfc08..25bf9866c7f 100644 --- a/src/mongo/db/op_observer.cpp +++ b/src/mongo/db/op_observer.cpp @@ -51,8 +51,8 @@ void OpObserver::onCreateIndex(OperationContext* txn, AuthorizationManager::get(txn->getServiceContext()) ->logOp(txn, "i", ns.c_str(), indexDoc, nullptr); - CollectionShardingState* const css = CollectionShardingState::get(txn, ns); - if (css && !fromMigrate) { + auto css = CollectionShardingState::get(txn, ns); + if (!fromMigrate) { css->onInsertOp(txn, indexDoc); } @@ -66,12 +66,12 @@ void OpObserver::onInserts(OperationContext* txn, bool fromMigrate) { repl::logOps(txn, "i", nss, begin, end, fromMigrate); - CollectionShardingState* const css = CollectionShardingState::get(txn, nss.ns()); + auto css = CollectionShardingState::get(txn, nss.ns()); const char* ns = nss.ns().c_str(); for (auto it = begin; it != end; it++) { AuthorizationManager::get(txn->getServiceContext())->logOp(txn, "i", ns, *it, nullptr); - if (css && !fromMigrate) { + if (!fromMigrate) { css->onInsertOp(txn, *it); } } @@ -92,8 +92,8 @@ void OpObserver::onUpdate(OperationContext* txn, const OplogUpdateEntryArgs& arg AuthorizationManager::get(txn->getServiceContext()) ->logOp(txn, "u", args.ns.c_str(), args.update, &args.criteria); - CollectionShardingState* const css = CollectionShardingState::get(txn, args.ns); - if (css && !args.fromMigrate) { + auto css = CollectionShardingState::get(txn, args.ns); + if (!args.fromMigrate) { css->onUpdateOp(txn, args.updatedDoc); } @@ -112,10 +112,8 @@ OpObserver::DeleteState OpObserver::aboutToDelete(OperationContext* txn, deleteState.idDoc = idElement.wrap(); } - CollectionShardingState* const css = CollectionShardingState::get(txn, ns.ns()); - if (css) { - deleteState.isMigrating = css->isDocumentInMigratingChunk(txn, doc); - } + auto css = CollectionShardingState::get(txn, ns.ns()); + deleteState.isMigrating = css->isDocumentInMigratingChunk(txn, doc); return deleteState; } @@ -131,8 +129,8 @@ void OpObserver::onDelete(OperationContext* txn, AuthorizationManager::get(txn->getServiceContext()) ->logOp(txn, "d", ns.ns().c_str(), deleteState.idDoc, nullptr); - CollectionShardingState* const css = CollectionShardingState::get(txn, ns.ns()); - if (css && !fromMigrate && deleteState.isMigrating) { + auto css = CollectionShardingState::get(txn, ns.ns()); + if (!fromMigrate && deleteState.isMigrating) { css->onDeleteOp(txn, deleteState.idDoc); } diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp index 946562baa27..241943f5fe1 100644 --- a/src/mongo/db/query/find.cpp +++ b/src/mongo/db/query/find.cpp @@ -51,7 +51,7 @@ #include "mongo/db/query/internal_plans.h" #include "mongo/db/query/query_planner_params.h" #include "mongo/db/repl/replication_coordinator_global.h" -#include "mongo/db/s/sharding_state.h" +#include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/server_options.h" #include "mongo/db/server_parameters.h" #include "mongo/db/storage/storage_options.h" @@ -553,11 +553,6 @@ std::string runQuery(OperationContext* txn, return ""; } - ShardingState* const shardingState = ShardingState::get(txn); - - // We freak out later if this changes before we're done with the query. - const ChunkVersion shardingVersionAtStart = shardingState->getVersion(nss.ns()); - // Handle query option $maxTimeMS (not used with commands). curop.setMaxTimeMicros(static_cast<unsigned long long>(pq.getMaxTimeMS()) * 1000); txn->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point. @@ -583,7 +578,6 @@ std::string runQuery(OperationContext* txn, BSONObj obj; PlanExecutor::ExecState state; - // uint64_t numMisplacedDocs = 0; // Get summary info about which plan the executor is using. { @@ -635,17 +629,10 @@ std::string runQuery(OperationContext* txn, uasserted(17144, "Executor error: " + WorkingSetCommon::toStatusString(obj)); } - // TODO: Currently, chunk ranges are kept around until all ClientCursors created while the - // chunk belonged on this node are gone. Separating chunk lifetime management from - // ClientCursor should allow this check to go away. - if (!shardingState->getVersion(nss.ns()).isWriteCompatibleWith(shardingVersionAtStart)) { - // if the version changed during the query we might be missing some data and its safe to - // send this as mongos can resend at this point - throw SendStaleConfigException(nss.ns(), - "version changed during initial query", - shardingVersionAtStart, - shardingState->getVersion(nss.ns())); - } + // Before saving the cursor, ensure that whatever plan we established happened with the expected + // collection version + auto css = CollectionShardingState::get(txn, nss); + css->checkShardVersionOrThrow(txn); // Fill out curop based on query results. If we have a cursorid, we will fill out curop with // this cursorid later. diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index e0fa35d4da2..acff32f0b43 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -68,6 +68,11 @@ CollectionShardingState* CollectionShardingState::get(OperationContext* txn, } void CollectionShardingState::setMetadata(std::shared_ptr<CollectionMetadata> newMetadata) { + if (newMetadata) { + invariant(!newMetadata->getCollVersion().isWriteCompatibleWith(ChunkVersion::UNSHARDED())); + invariant(!newMetadata->getShardVersion().isWriteCompatibleWith(ChunkVersion::UNSHARDED())); + } + _metadata = std::move(newMetadata); } @@ -76,6 +81,13 @@ void CollectionShardingState::checkShardVersionOrThrow(OperationContext* txn) co ChunkVersion received; ChunkVersion wanted; if (!_checkShardVersionOk(txn, &errmsg, &received, &wanted)) { + // Set migration critical section in case we failed because of migration + auto migrationCritSec = + ShardingState::get(txn)->migrationSourceManager()->getMigrationCriticalSection(); + if (migrationCritSec) { + OperationShardingState::get(txn).setMigrationCriticalSection(migrationCritSec); + } + throw SendStaleConfigException(_nss.ns(), str::stream() << "[" << _nss.ns() << "] shard version not ok: " << errmsg, diff --git a/src/mongo/db/s/migration_impl.cpp b/src/mongo/db/s/migration_impl.cpp index 873fc6c6f5c..d3dd4e30442 100644 --- a/src/mongo/db/s/migration_impl.cpp +++ b/src/mongo/db/s/migration_impl.cpp @@ -69,6 +69,7 @@ BSONObj createRecvChunkCommitRequest(const MigrationSessionId& sessionId) { } MONGO_FP_DECLARE(failMigrationCommit); +MONGO_FP_DECLARE(hangBeforeCommitMigration); MONGO_FP_DECLARE(hangBeforeLeavingCriticalSection); } // namespace @@ -133,12 +134,12 @@ Status ChunkMoveOperationState::initialize(const BSONObj& cmdObj) { _toShardCS = toShard->getConnString(); } - auto& operationVersion = OperationShardingState::get(_txn); - if (!operationVersion.hasShardVersion()) { + auto& oss = OperationShardingState::get(_txn); + if (!oss.hasShardVersion()) { return Status{ErrorCodes::InvalidOptions, "moveChunk command is missing shard version"}; } - _collectionVersion = operationVersion.getShardVersion(_nss); + _collectionVersion = oss.getShardVersion(_nss); return Status::OK(); } @@ -385,6 +386,8 @@ Status ChunkMoveOperationState::commitMigration(const MigrationSessionId& sessio preCond.append(b.obj()); } + MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangBeforeCommitMigration); + fassertStatusOK(34431, grid.catalogManager(_txn)->applyChunkOpsDeprecated( _txn, updates.arr(), preCond.arr(), _nss.ns(), nextVersion)); diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index c9dacc78491..980620de09e 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -48,6 +48,7 @@ #include "mongo/logger/ramlog.h" #include "mongo/s/chunk.h" #include "mongo/s/shard_key_pattern.h" +#include "mongo/stdx/memory.h" #include "mongo/util/elapsed_tracker.h" #include "mongo/util/log.h" @@ -209,8 +210,11 @@ void MigrationSourceManager::done(OperationContext* txn) { _sessionId = boost::none; _deleteNotifyExec.reset(NULL); - _inCriticalSection = false; - _inCriticalSectionCV.notify_all(); + + if (_critSec) { + _critSec->exitCriticalSection(); + _critSec = nullptr; + } _deleted.clear(); _reload.clear(); @@ -577,30 +581,23 @@ long long MigrationSourceManager::mbUsed() const { return _memoryUsed / (1024 * 1024); } -bool MigrationSourceManager::getInCriticalSection() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); - return _inCriticalSection; -} - void MigrationSourceManager::setInCriticalSection(bool inCritSec) { stdx::lock_guard<stdx::mutex> lk(_mutex); - _inCriticalSection = inCritSec; - _inCriticalSectionCV.notify_all(); -} -bool MigrationSourceManager::waitTillNotInCriticalSection(int maxSecondsToWait) { - const auto deadline = stdx::chrono::system_clock::now() + Seconds(maxSecondsToWait); - stdx::unique_lock<stdx::mutex> lk(_mutex); - while (_inCriticalSection) { - log() << "Waiting for " << maxSecondsToWait - << " seconds for the migration critical section to end"; - - if (stdx::cv_status::timeout == _inCriticalSectionCV.wait_until(lk, deadline)) { - return false; - } + if (inCritSec) { + invariant(!_critSec); + _critSec = std::make_shared<CriticalSectionState>(); + } else { + invariant(_critSec); + _critSec->exitCriticalSection(); + _critSec = nullptr; } +} - return true; +std::shared_ptr<MigrationSourceManager::CriticalSectionState> +MigrationSourceManager::getMigrationCriticalSection() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _critSec; } bool MigrationSourceManager::isActive() const { @@ -649,4 +646,26 @@ NamespaceString MigrationSourceManager::_getNS() const { return _nss; } +MigrationSourceManager::CriticalSectionState::CriticalSectionState() = default; + +bool MigrationSourceManager::CriticalSectionState::waitUntilOutOfCriticalSection( + Microseconds waitTimeout) { + const auto waitDeadline = stdx::chrono::system_clock::now() + waitTimeout; + + stdx::unique_lock<stdx::mutex> sl(_criticalSectionMutex); + while (_inCriticalSection) { + if (stdx::cv_status::timeout == _criticalSectionCV.wait_until(sl, waitDeadline)) { + return false; + } + } + + return true; +} + +void MigrationSourceManager::CriticalSectionState::exitCriticalSection() { + stdx::unique_lock<stdx::mutex> sl(_criticalSectionMutex); + _inCriticalSection = false; + _criticalSectionCV.notify_all(); +} + } // namespace mongo diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h index 8b5504fd1c9..5c9cf07751e 100644 --- a/src/mongo/db/s/migration_source_manager.h +++ b/src/mongo/db/s/migration_source_manager.h @@ -49,6 +49,8 @@ class MigrationSourceManager { MONGO_DISALLOW_COPYING(MigrationSourceManager); public: + class CriticalSectionState; + MigrationSourceManager(); ~MigrationSourceManager(); @@ -127,18 +129,12 @@ public: long long mbUsed() const; - bool getInCriticalSection() const; - void setInCriticalSection(bool inCritSec); - /** - * Blocks until the "in critical section" state changes and returns true if we are NOT in the - * critical section - */ - bool waitTillNotInCriticalSection(int maxSecondsToWait); - bool isActive() const; + std::shared_ptr<CriticalSectionState> getMigrationCriticalSection(); + private: friend class LogOpForShardingHandler; @@ -175,11 +171,6 @@ private: mutable stdx::mutex _mutex; - stdx::condition_variable _inCriticalSectionCV; // (M) - - // Is migration currently in critical section. This can be used to block new writes. - bool _inCriticalSection{false}; // (M) - std::unique_ptr<PlanExecutor> _deleteNotifyExec; // (M) // List of _id of documents that were modified that must be re-cloned. @@ -203,6 +194,42 @@ private: // List of record id that needs to be transferred from here to the other side. std::set<RecordId> _cloneLocs; // (C) + + // This value is set when setInCriticalSection is called with true argument and is signalled and + // cleared when it is called with false argument. + std::shared_ptr<CriticalSectionState> _critSec; +}; + +/** + * This object is instantiated once the migration logic enters critical section. It contains all + * the state which is associated with being in a critical section, such as the bumped metadata + * version (which has not yet been reflected on the config server). + */ +class MigrationSourceManager::CriticalSectionState { + MONGO_DISALLOW_COPYING(CriticalSectionState); + +public: + CriticalSectionState(); + + /** + * Blocks until the critical section completes. Returns true if the wait succeeded and the + * critical section is no longer active, or false if the waitTimeout was exceeded. + */ + bool waitUntilOutOfCriticalSection(Microseconds waitTimeout); + + /** + * To be called when the critical section has completed. Signals any threads sitting blocked in + * waitUntilOutOfCriticalSection. Must only be used once for the lifetime of this object. + */ + void exitCriticalSection(); + +private: + // Only moves from true to false once. Happens under the critical section mutex and the critical + // section will be signalled. + bool _inCriticalSection{true}; + + stdx::mutex _criticalSectionMutex; + stdx::condition_variable _criticalSectionCV; }; } // namespace mongo diff --git a/src/mongo/db/s/operation_sharding_state.cpp b/src/mongo/db/s/operation_sharding_state.cpp index 906c491aac3..40617c08138 100644 --- a/src/mongo/db/s/operation_sharding_state.cpp +++ b/src/mongo/db/s/operation_sharding_state.cpp @@ -39,7 +39,8 @@ namespace { const OperationContext::Decoration<OperationShardingState> shardingMetadataDecoration = OperationContext::declareDecoration<OperationShardingState>(); -const ChunkVersion kUnshardedVersion(ChunkVersion::UNSHARDED()); +// Max time to wait for the migration critical section to complete +const Minutes kMaxWaitForMigrationCriticalSection(5); } // namespace mongo @@ -49,12 +50,12 @@ OperationShardingState& OperationShardingState::get(OperationContext* txn) { return shardingMetadataDecoration(txn); } -void OperationShardingState::initializeShardVersion(NamespaceString ns, +void OperationShardingState::initializeShardVersion(NamespaceString nss, const BSONElement& shardVersionElt) { invariant(!hasShardVersion()); - if (ns.isSystemDotIndexes()) { - setShardVersion(std::move(ns), ChunkVersion::IGNORED()); + if (nss.isSystemDotIndexes()) { + setShardVersion(std::move(nss), ChunkVersion::IGNORED()); return; } @@ -70,31 +71,54 @@ void OperationShardingState::initializeShardVersion(NamespaceString ns, return; } - setShardVersion(std::move(ns), std::move(newVersion)); + setShardVersion(std::move(nss), std::move(newVersion)); } bool OperationShardingState::hasShardVersion() const { return _hasVersion; } -const ChunkVersion& OperationShardingState::getShardVersion(const NamespaceString& ns) const { - if (_ns != ns) { - return kUnshardedVersion; +ChunkVersion OperationShardingState::getShardVersion(const NamespaceString& nss) const { + if (_ns != nss) { + return ChunkVersion::UNSHARDED(); } return _shardVersion; } -void OperationShardingState::setShardVersion(NamespaceString ns, ChunkVersion newVersion) { +void OperationShardingState::setShardVersion(NamespaceString nss, ChunkVersion newVersion) { // This currently supports only setting the shard version for one namespace. - invariant(!_hasVersion || _ns == ns); - invariant(!ns.isSystemDotIndexes() || ChunkVersion::isIgnoredVersion(newVersion)); + invariant(!_hasVersion || _ns == nss); + invariant(!nss.isSystemDotIndexes() || ChunkVersion::isIgnoredVersion(newVersion)); - _ns = std::move(ns); + _ns = std::move(nss); _shardVersion = std::move(newVersion); _hasVersion = true; } +bool OperationShardingState::waitForMigrationCriticalSection(OperationContext* txn) { + // Must not block while holding a lock + invariant(!txn->lockState()->isLocked()); + + if (_migrationCriticalSection) { + const Microseconds operationRemainingTime(Microseconds(txn->getRemainingMaxTimeMicros())); + _migrationCriticalSection->waitUntilOutOfCriticalSection( + durationCount<Microseconds>(operationRemainingTime) + ? operationRemainingTime + : kMaxWaitForMigrationCriticalSection); + _migrationCriticalSection = nullptr; + return true; + } + + return false; +} + +void OperationShardingState::setMigrationCriticalSection( + std::shared_ptr<MigrationSourceManager::CriticalSectionState> critSec) { + invariant(critSec); + _migrationCriticalSection = std::move(critSec); +} + void OperationShardingState::_clear() { _hasVersion = false; _shardVersion = ChunkVersion(); @@ -104,21 +128,21 @@ void OperationShardingState::_clear() { OperationShardingState::IgnoreVersioningBlock::IgnoreVersioningBlock(OperationContext* txn, const NamespaceString& ns) : _txn(txn), _ns(ns) { - auto& operationVersion = OperationShardingState::get(txn); - _hadOriginalVersion = operationVersion._hasVersion; + auto& oss = OperationShardingState::get(txn); + _hadOriginalVersion = oss._hasVersion; if (_hadOriginalVersion) { - _originalVersion = operationVersion.getShardVersion(ns); + _originalVersion = oss.getShardVersion(ns); } - operationVersion.setShardVersion(ns, ChunkVersion::IGNORED()); + oss.setShardVersion(ns, ChunkVersion::IGNORED()); } OperationShardingState::IgnoreVersioningBlock::~IgnoreVersioningBlock() { - auto& operationVersion = OperationShardingState::get(_txn); - invariant(ChunkVersion::isIgnoredVersion(operationVersion.getShardVersion(_ns))); + auto& oss = OperationShardingState::get(_txn); + invariant(ChunkVersion::isIgnoredVersion(oss.getShardVersion(_ns))); if (_hadOriginalVersion) { - operationVersion.setShardVersion(_ns, _originalVersion); + oss.setShardVersion(_ns, _originalVersion); } else { - operationVersion._clear(); + oss._clear(); } } diff --git a/src/mongo/db/s/operation_sharding_state.h b/src/mongo/db/s/operation_sharding_state.h index 7aa2fc919d3..f52cbd9f702 100644 --- a/src/mongo/db/s/operation_sharding_state.h +++ b/src/mongo/db/s/operation_sharding_state.h @@ -32,6 +32,7 @@ #include "mongo/base/disallow_copying.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/s/migration_source_manager.h" #include "mongo/s/chunk_version.h" namespace mongo { @@ -69,7 +70,7 @@ public: * This initialization may only be performed once for the lifetime of the object, which * coincides with the lifetime of the request. */ - void initializeShardVersion(NamespaceString ns, const BSONElement& shardVersionElement); + void initializeShardVersion(NamespaceString nss, const BSONElement& shardVersionElement); /** * Returns whether or not there is a shard version associated with this operation. @@ -84,12 +85,29 @@ public: * Returns ChunkVersion::UNSHARDED() if this operation has no shard version information * for the requested namespace. */ - const ChunkVersion& getShardVersion(const NamespaceString& ns) const; + ChunkVersion getShardVersion(const NamespaceString& nss) const; /** * Stores the given chunk version of a namespace into this object. */ - void setShardVersion(NamespaceString ns, ChunkVersion newVersion); + void setShardVersion(NamespaceString nss, ChunkVersion newVersion); + + /** + * This call is a no op if there isn't a currently active migration critical section. Otherwise + * it will wait for the critical section to complete up to the remaining operation time. + * + * Returns true if the call actually waited because of migration critical section (regardless if + * whether it timed out or not), false if there was no active migration critical section. + */ + bool waitForMigrationCriticalSection(OperationContext* txn); + + /** + * Setting this value indicates that when the version check failed, there was an active + * migration for the namespace and that it would be prudent to wait for the critical section to + * complete before retrying so the router doesn't make wasteful requests. + */ + void setMigrationCriticalSection( + std::shared_ptr<MigrationSourceManager::CriticalSectionState> critSec); private: /** @@ -101,6 +119,10 @@ private: bool _hasVersion = false; ChunkVersion _shardVersion; NamespaceString _ns; + + // This value will only be non-null if version check during the operation execution failed due + // to stale version and there was a migration for that namespace, which was in critical section. + std::shared_ptr<MigrationSourceManager::CriticalSectionState> _migrationCriticalSection; }; /** diff --git a/src/mongo/db/s/set_shard_version_command.cpp b/src/mongo/db/s/set_shard_version_command.cpp index 04710cd8c95..b4f5f9170cb 100644 --- a/src/mongo/db/s/set_shard_version_command.cpp +++ b/src/mongo/db/s/set_shard_version_command.cpp @@ -37,9 +37,12 @@ #include "mongo/db/auth/privilege.h" #include "mongo/db/client.h" #include "mongo/db/commands.h" +#include "mongo/db/db_raii.h" #include "mongo/db/lasterror.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/db/s/collection_metadata.h" +#include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/sharded_connection_info.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/wire_version.h" @@ -152,157 +155,191 @@ public: return false; } + const NamespaceString nss(ns); + + // Backwards compatibility for SERVER-23119 + if (!nss.isValid()) { + warning() << "Invalid namespace used for setShardVersion: " << ns; + return true; + } // we can run on a slave up to here - if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase( - nsToDatabase(ns))) { + if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(nss.db())) { result.append("errmsg", "not master"); result.append("note", "from post init in setShardVersion"); return false; } // step 2 - ChunkVersion version = + const ChunkVersion requestedVersion = uassertStatusOK(ChunkVersion::parseFromBSONForSetShardVersion(cmdObj)); - // step 3 - const ChunkVersion oldVersion = info->getVersion(ns); - const ChunkVersion globalVersion = shardingState->getVersion(ns); + // step 3 - Actual version checking + const ChunkVersion connectionVersion = info->getVersion(ns); + connectionVersion.addToBSON(result, "oldVersion"); + + { + // Use a stable collection metadata while performing the checks + boost::optional<AutoGetCollection> autoColl; + autoColl.emplace(txn, nss, MODE_IS); + + auto css = CollectionShardingState::get(txn, nss); + const ChunkVersion collectionShardVersion = + (css->getMetadata() ? css->getMetadata()->getShardVersion() + : ChunkVersion::UNSHARDED()); + + if (requestedVersion.isWriteCompatibleWith(collectionShardVersion)) { + // mongos and mongod agree! + if (!connectionVersion.isWriteCompatibleWith(requestedVersion)) { + if (connectionVersion < collectionShardVersion && + connectionVersion.epoch() == collectionShardVersion.epoch()) { + info->setVersion(ns, requestedVersion); + } else if (authoritative) { + // this means there was a drop and our version is reset + info->setVersion(ns, requestedVersion); + } else { + result.append("ns", ns); + result.appendBool("need_authoritative", true); + errmsg = "verifying drop on '" + ns + "'"; + return false; + } + } - oldVersion.addToBSON(result, "oldVersion"); + return true; + } - if (version.isWriteCompatibleWith(globalVersion)) { - // mongos and mongod agree! - if (!oldVersion.isWriteCompatibleWith(version)) { - if (oldVersion < globalVersion && oldVersion.hasEqualEpoch(globalVersion)) { - info->setVersion(ns, version); - } else if (authoritative) { - // this means there was a drop and our version is reset - info->setVersion(ns, version); - } else { - result.append("ns", ns); + // step 4 + // Cases below all either return OR fall-through to remote metadata reload. + const bool isDropRequested = + !requestedVersion.isSet() && collectionShardVersion.isSet(); + + if (isDropRequested) { + if (!authoritative) { result.appendBool("need_authoritative", true); - errmsg = "verifying drop on '" + ns + "'"; + result.append("ns", ns); + collectionShardVersion.addToBSON(result, "globalVersion"); + errmsg = "dropping needs to be authoritative"; return false; } - } - - return true; - } - // step 4 - // Cases below all either return OR fall-through to remote metadata reload. - const bool isDropRequested = !version.isSet() && globalVersion.isSet(); - - if (isDropRequested) { - if (!authoritative) { - result.appendBool("need_authoritative", true); - result.append("ns", ns); - globalVersion.addToBSON(result, "globalVersion"); - errmsg = "dropping needs to be authoritative"; - return false; - } - - // Fall through to metadata reload below - } else { - // Not Dropping + // Fall through to metadata reload below + } else { + // Not Dropping - // TODO: Refactor all of this - if (version < oldVersion && version.hasEqualEpoch(oldVersion)) { - errmsg = str::stream() << "this connection already had a newer version " - << "of collection '" << ns << "'"; - result.append("ns", ns); - version.addToBSON(result, "newVersion"); - globalVersion.addToBSON(result, "globalVersion"); - return false; - } + // TODO: Refactor all of this + if (requestedVersion < connectionVersion && + requestedVersion.epoch() == connectionVersion.epoch()) { + errmsg = str::stream() << "this connection already had a newer version " + << "of collection '" << ns << "'"; + result.append("ns", ns); + requestedVersion.addToBSON(result, "newVersion"); + collectionShardVersion.addToBSON(result, "globalVersion"); + return false; + } - // TODO: Refactor all of this - if (version < globalVersion && version.hasEqualEpoch(globalVersion)) { - while (shardingState->inCriticalMigrateSection()) { - log() << "waiting till out of critical section"; - shardingState->waitTillNotInCriticalSection(10); + // TODO: Refactor all of this + if (requestedVersion < collectionShardVersion && + requestedVersion.epoch() == collectionShardVersion.epoch()) { + auto critSec = + shardingState->migrationSourceManager()->getMigrationCriticalSection(); + if (critSec) { + autoColl.reset(); + log() << "waiting till out of critical section"; + critSec->waitUntilOutOfCriticalSection(Seconds(10)); + } + + errmsg = str::stream() << "shard global version for collection is higher " + << "than trying to set to '" << ns << "'"; + result.append("ns", ns); + requestedVersion.addToBSON(result, "version"); + collectionShardVersion.addToBSON(result, "globalVersion"); + result.appendBool("reloadConfig", true); + return false; } - errmsg = str::stream() << "shard global version for collection is higher " - << "than trying to set to '" << ns << "'"; - result.append("ns", ns); - version.addToBSON(result, "version"); - globalVersion.addToBSON(result, "globalVersion"); - result.appendBool("reloadConfig", true); - return false; - } - if (!globalVersion.isSet() && !authoritative) { - // Needed b/c when the last chunk is moved off a shard, - // the version gets reset to zero, which should require a reload. - while (shardingState->inCriticalMigrateSection()) { - log() << "waiting till out of critical section"; - shardingState->waitTillNotInCriticalSection(10); + if (!collectionShardVersion.isSet() && !authoritative) { + // Needed b/c when the last chunk is moved off a shard, the version gets reset + // to zero, which should require a reload. + auto critSec = + shardingState->migrationSourceManager()->getMigrationCriticalSection(); + if (critSec) { + autoColl.reset(); + log() << "waiting till out of critical section"; + critSec->waitUntilOutOfCriticalSection(Seconds(10)); + } + + // need authoritative for first look + result.append("ns", ns); + result.appendBool("need_authoritative", true); + errmsg = "first time for collection '" + ns + "'"; + return false; } - // need authoritative for first look - result.append("ns", ns); - result.appendBool("need_authoritative", true); - errmsg = "first time for collection '" + ns + "'"; - return false; + // Fall through to metadata reload below } - - // Fall through to metadata reload below } - ChunkVersion currVersion; - Status status = shardingState->refreshMetadataIfNeeded(txn, ns, version, &currVersion); + Status status = shardingState->onStaleShardVersion(txn, nss, requestedVersion); + + { + AutoGetCollection autoColl(txn, nss, MODE_IS); - if (!status.isOK()) { - // The reload itself was interrupted or confused here + ChunkVersion currVersion = ChunkVersion::UNSHARDED(); + auto collMetadata = CollectionShardingState::get(txn, nss)->getMetadata(); + if (collMetadata) { + currVersion = collMetadata->getShardVersion(); + } - errmsg = str::stream() << "could not refresh metadata for " << ns - << " with requested shard version " << version.toString() - << ", stored shard version is " << currVersion.toString() - << causedBy(status.reason()); + if (!status.isOK()) { + // The reload itself was interrupted or confused here - warning() << errmsg; + errmsg = str::stream() + << "could not refresh metadata for " << ns << " with requested shard version " + << requestedVersion.toString() << ", stored shard version is " + << currVersion.toString() << causedBy(status.reason()); - result.append("ns", ns); - version.addToBSON(result, "version"); - currVersion.addToBSON(result, "globalVersion"); - result.appendBool("reloadConfig", true); + warning() << errmsg; - return false; - } else if (!version.isWriteCompatibleWith(currVersion)) { - // We reloaded a version that doesn't match the version mongos was trying to - // set. + result.append("ns", ns); + requestedVersion.addToBSON(result, "version"); + currVersion.addToBSON(result, "globalVersion"); + result.appendBool("reloadConfig", true); - errmsg = str::stream() << "requested shard version differs from" - << " config shard version for " << ns - << ", requested version is " << version.toString() - << " but found version " << currVersion.toString(); + return false; + } else if (!requestedVersion.isWriteCompatibleWith(currVersion)) { + // We reloaded a version that doesn't match the version mongos was trying to + // set. + errmsg = str::stream() << "requested shard version differs from" + << " config shard version for " << ns + << ", requested version is " << requestedVersion.toString() + << " but found version " << currVersion.toString(); - OCCASIONALLY warning() << errmsg; + OCCASIONALLY warning() << errmsg; - // WARNING: the exact fields below are important for compatibility with mongos - // version reload. + // WARNING: the exact fields below are important for compatibility with mongos + // version reload. - result.append("ns", ns); - currVersion.addToBSON(result, "globalVersion"); + result.append("ns", ns); + currVersion.addToBSON(result, "globalVersion"); + + // If this was a reset of a collection or the last chunk moved out, inform mongos to + // do a full reload. + if (currVersion.epoch() != requestedVersion.epoch() || !currVersion.isSet()) { + result.appendBool("reloadConfig", true); + // Zero-version also needed to trigger full mongos reload, sadly + // TODO: Make this saner, and less impactful (full reload on last chunk is bad) + ChunkVersion(0, 0, OID()).addToBSON(result, "version"); + // For debugging + requestedVersion.addToBSON(result, "origVersion"); + } else { + requestedVersion.addToBSON(result, "version"); + } - // If this was a reset of a collection or the last chunk moved out, inform mongos to - // do a full reload. - if (currVersion.epoch() != version.epoch() || !currVersion.isSet()) { - result.appendBool("reloadConfig", true); - // Zero-version also needed to trigger full mongos reload, sadly - // TODO: Make this saner, and less impactful (full reload on last chunk is bad) - ChunkVersion(0, 0, OID()).addToBSON(result, "version"); - // For debugging - version.addToBSON(result, "origVersion"); - } else { - version.addToBSON(result, "version"); + return false; } - - return false; } - info->setVersion(ns, version); + info->setVersion(ns, requestedVersion); return true; } diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp index 775104d4e6b..2baf2f513cb 100644 --- a/src/mongo/db/s/sharding_state.cpp +++ b/src/mongo/db/s/sharding_state.cpp @@ -425,14 +425,6 @@ void ShardingState::mergeChunks(OperationContext* txn, it->second->setMetadata(std::move(cloned)); } -bool ShardingState::inCriticalMigrateSection() { - return _migrationSourceManager.getInCriticalSection(); -} - -bool ShardingState::waitTillNotInCriticalSection(int maxSecondsToWait) { - return _migrationSourceManager.waitTillNotInCriticalSection(maxSecondsToWait); -} - void ShardingState::resetMetadata(const string& ns) { stdx::lock_guard<stdx::mutex> lk(_mutex); @@ -441,47 +433,45 @@ void ShardingState::resetMetadata(const string& ns) { _collections.erase(ns); } -Status ShardingState::refreshMetadataIfNeeded(OperationContext* txn, - const string& ns, - const ChunkVersion& reqShardVersion, - ChunkVersion* latestShardVersion) { - // The _configServerTickets serializes this process such that only a small number of threads - // can try to refresh at the same time. - - LOG(2) << "metadata refresh requested for " << ns << " at shard version " << reqShardVersion; +Status ShardingState::onStaleShardVersion(OperationContext* txn, + const NamespaceString& nss, + const ChunkVersion& expectedVersion) { + invariant(!txn->lockState()->isLocked()); + invariant(enabled()); - // - // Queuing of refresh requests starts here when remote reload is needed. This may take time. - // TODO: Explicitly expose the queuing discipline. - // + LOG(2) << "metadata refresh requested for " << nss.ns() << " at shard version " + << expectedVersion; - _configServerTickets.waitForTicket(); - TicketHolderReleaser needTicketFrom(&_configServerTickets); + // Ensure any ongoing migrations have completed + auto& oss = OperationShardingState::get(txn); + oss.waitForMigrationCriticalSection(txn); - // - // Fast path - check if the requested version is at a higher version than the current - // metadata version or a different epoch before verifying against config server. - // + ChunkVersion collectionShardVersion; - shared_ptr<CollectionMetadata> storedMetadata; + // Fast path - check if the requested version is at a higher version than the current metadata + // version or a different epoch before verifying against config server. { - stdx::lock_guard<stdx::mutex> lk(_mutex); - CollectionShardingStateMap::iterator it = _collections.find(ns); - if (it != _collections.end()) - storedMetadata = it->second->getMetadata(); - } + AutoGetCollection autoColl(txn, nss, MODE_IS); - ChunkVersion storedShardVersion; - if (storedMetadata) - storedShardVersion = storedMetadata->getShardVersion(); - *latestShardVersion = storedShardVersion; + shared_ptr<CollectionMetadata> storedMetadata = + CollectionShardingState::get(txn, nss)->getMetadata(); + if (storedMetadata) { + collectionShardVersion = storedMetadata->getShardVersion(); + } - if (storedShardVersion >= reqShardVersion && - storedShardVersion.epoch() == reqShardVersion.epoch()) { - // Don't need to remotely reload if we're in the same epoch with a >= version - return Status::OK(); + if (collectionShardVersion >= expectedVersion && + collectionShardVersion.epoch() == expectedVersion.epoch()) { + // Don't need to remotely reload if we're in the same epoch and the requested version is + // smaller than the one we know about. This means that the remote side is behind. + return Status::OK(); + } } + // The _configServerTickets serializes this process such that only a small number of threads can + // try to refresh at the same time + _configServerTickets.waitForTicket(); + TicketHolderReleaser needTicketFrom(&_configServerTickets); + // // Slow path - remotely reload // @@ -491,19 +481,20 @@ Status ShardingState::refreshMetadataIfNeeded(OperationContext* txn, // C) Dropping a collection, notified (currently) by mongos. // D) Stale client wants to reload metadata with a different *epoch*, so we aren't sure. - if (storedShardVersion.epoch() != reqShardVersion.epoch()) { + if (collectionShardVersion.epoch() != expectedVersion.epoch()) { // Need to remotely reload if our epochs aren't the same, to verify - LOG(1) << "metadata change requested for " << ns << ", from shard version " - << storedShardVersion << " to " << reqShardVersion + LOG(1) << "metadata change requested for " << nss.ns() << ", from shard version " + << collectionShardVersion << " to " << expectedVersion << ", need to verify with config server"; } else { // Need to remotely reload since our epochs aren't the same but our version is greater - LOG(1) << "metadata version update requested for " << ns << ", from shard version " - << storedShardVersion << " to " << reqShardVersion + LOG(1) << "metadata version update requested for " << nss.ns() << ", from shard version " + << collectionShardVersion << " to " << expectedVersion << ", need to verify with config server"; } - return _refreshMetadata(txn, ns, reqShardVersion, true, latestShardVersion); + ChunkVersion unusedLatestShardVersion; + return _refreshMetadata(txn, nss.ns(), expectedVersion, true, &unusedLatestShardVersion); } Status ShardingState::refreshMetadataNow(OperationContext* txn, diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h index 19dc1be2978..defecd2aafa 100644 --- a/src/mongo/db/s/sharding_state.h +++ b/src/mongo/db/s/sharding_state.h @@ -135,24 +135,12 @@ public: ChunkVersion getVersion(const std::string& ns); /** - * If the metadata for 'ns' at this shard is at or above the requested version, - * 'reqShardVersion', returns OK and fills in 'latestShardVersion' with the latest shard - * version. The latter is always greater or equal than 'reqShardVersion' if in the same epoch. - * - * Otherwise, falls back to refreshMetadataNow. - * - * This call blocks if there are more than _configServerTickets threads currently refreshing - * metadata (currently set to 3). - * - * Locking Note: - * + Must NOT be called with the write lock because this call may go into the network, - * and deadlocks may occur with shard-as-a-config. Therefore, nothing here guarantees - * that 'latestShardVersion' is indeed the current one on return. + * Refreshes the local metadata based on whether the expected version is higher than what we + * have cached. */ - Status refreshMetadataIfNeeded(OperationContext* txn, - const std::string& ns, - const ChunkVersion& reqShardVersion, - ChunkVersion* latestShardVersion); + Status onStaleShardVersion(OperationContext* txn, + const NamespaceString& nss, + const ChunkVersion& expectedVersion); /** * Refreshes collection metadata by asking the config server for the latest information. @@ -303,13 +291,6 @@ public: const BSONObj& maxKey, ChunkVersion mergedVersion); - bool inCriticalMigrateSection(); - - /** - * @return true if we are NOT in the critical section - */ - bool waitTillNotInCriticalSection(int maxSecondsToWait); - /** * TESTING ONLY * Uninstalls the metadata for a given collection. diff --git a/src/mongo/s/client/shard_connection.cpp b/src/mongo/s/client/shard_connection.cpp index 177fd934e8c..0f64014f5e8 100644 --- a/src/mongo/s/client/shard_connection.cpp +++ b/src/mongo/s/client/shard_connection.cpp @@ -41,7 +41,6 @@ #include "mongo/s/client/shard.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" -#include "mongo/s/stale_exception.h" #include "mongo/s/version_manager.h" #include "mongo/util/concurrency/spin_lock.h" #include "mongo/util/exit.h" diff --git a/src/mongo/s/d_split.cpp b/src/mongo/s/d_split.cpp index d8a690d211e..4aa5231ccc6 100644 --- a/src/mongo/s/d_split.cpp +++ b/src/mongo/s/d_split.cpp @@ -681,9 +681,9 @@ public: // Mongos >= v3.2 sends the full version, v3.0 only sends the epoch. // TODO(SERVER-20742): Stop parsing epoch separately after 3.2. OID cmdEpoch; - auto& operationVersion = OperationShardingState::get(txn); - if (operationVersion.hasShardVersion()) { - cmdVersion = operationVersion.getShardVersion(nss); + auto& oss = OperationShardingState::get(txn); + if (oss.hasShardVersion()) { + cmdVersion = oss.getShardVersion(nss); cmdEpoch = cmdVersion.epoch(); } else { BSONElement epochElem(cmdObj["epoch"]); diff --git a/src/mongo/s/write_ops/batch_write_exec.cpp b/src/mongo/s/write_ops/batch_write_exec.cpp index d9ce669e3f7..e056e05609c 100644 --- a/src/mongo/s/write_ops/batch_write_exec.cpp +++ b/src/mongo/s/write_ops/batch_write_exec.cpp @@ -79,12 +79,6 @@ static void noteStaleResponses(const vector<ShardError*>& staleErrors, NSTargete } } -static bool isShardMetadataChanging(const vector<ShardError*>& staleErrors) { - if (!staleErrors.empty() && staleErrors.back()->error.isErrInfoSet()) - return staleErrors.back()->error.getErrInfo()["inCriticalSection"].trueValue(); - return false; -} - // The number of times we'll try to continue a batch op if no progress is being made // This only applies when no writes are occurring and metadata is not changing on reload static const int kMaxRoundsWithoutProgress(5); @@ -152,7 +146,6 @@ void BatchWriteExec::executeBatch(OperationContext* txn, size_t numSent = 0; size_t numToSend = childBatches.size(); - bool remoteMetadataChanging = false; while (numSent != numToSend) { // Collect batches out on the network, mapped by endpoint OwnedHostBatchMap ownedPendingBatches; @@ -270,11 +263,6 @@ void BatchWriteExec::executeBatch(OperationContext* txn, ++stats->numStaleBatches; } - // Remember if the shard is actively changing metadata right now - if (isShardMetadataChanging(staleErrors)) { - remoteMetadataChanging = true; - } - // Remember that we successfully wrote to this shard // NOTE: This will record lastOps for shards where we actually didn't update // or delete any documents, which preserves old behavior but is conservative @@ -327,7 +315,7 @@ void BatchWriteExec::executeBatch(OperationContext* txn, // int currCompletedOps = batchOp.numWriteOpsIn(WriteOpState_Completed); - if (currCompletedOps == numCompletedOps && !targeterChanged && !remoteMetadataChanging) { + if (currCompletedOps == numCompletedOps && !targeterChanged) { ++numRoundsWithoutProgress; } else { numRoundsWithoutProgress = 0; diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp index 100fb1806a1..bf0e4c8685a 100644 --- a/src/mongo/s/write_ops/batch_write_exec_test.cpp +++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp @@ -290,11 +290,6 @@ TEST(BatchWriteExecTests, ManyStaleOpWithMigration) { error.setErrCode(ErrorCodes::StaleShardVersion); error.setErrMessage("mock stale error"); for (int i = 0; i < 10; i++) { - if (i % 2 == 0) - error.setErrInfo(BSONObj()); - else - error.setErrInfo(BSON("inCriticalSection" << true)); - mockResults.push_back(new MockWriteResult(backend.shardHost, error)); } @@ -306,7 +301,7 @@ TEST(BatchWriteExecTests, ManyStaleOpWithMigration) { backend.exec->executeBatch(&txn, request, &response, &stats); ASSERT(response.getOk()); - ASSERT_EQUALS(stats.numStaleBatches, 10); + ASSERT_EQUALS(stats.numStaleBatches, 6); } } // namespace diff --git a/src/mongo/s/write_ops/write_error_detail.cpp b/src/mongo/s/write_ops/write_error_detail.cpp index cc2368154fe..51baaecd790 100644 --- a/src/mongo/s/write_ops/write_error_detail.cpp +++ b/src/mongo/s/write_ops/write_error_detail.cpp @@ -154,10 +154,6 @@ void WriteErrorDetail::setIndex(int index) { _isIndexSet = true; } -void WriteErrorDetail::unsetIndex() { - _isIndexSet = false; -} - bool WriteErrorDetail::isIndexSet() const { return _isIndexSet; } @@ -172,10 +168,6 @@ void WriteErrorDetail::setErrCode(int errCode) { _isErrCodeSet = true; } -void WriteErrorDetail::unsetErrCode() { - _isErrCodeSet = false; -} - bool WriteErrorDetail::isErrCodeSet() const { return _isErrCodeSet; } @@ -190,10 +182,6 @@ void WriteErrorDetail::setErrInfo(const BSONObj& errInfo) { _isErrInfoSet = true; } -void WriteErrorDetail::unsetErrInfo() { - _isErrInfoSet = false; -} - bool WriteErrorDetail::isErrInfoSet() const { return _isErrInfoSet; } diff --git a/src/mongo/s/write_ops/write_error_detail.h b/src/mongo/s/write_ops/write_error_detail.h index 9c431f02f57..e0dec80ce52 100644 --- a/src/mongo/s/write_ops/write_error_detail.h +++ b/src/mongo/s/write_ops/write_error_detail.h @@ -77,17 +77,14 @@ public: // void setIndex(int index); - void unsetIndex(); bool isIndexSet() const; int getIndex() const; void setErrCode(int errCode); - void unsetErrCode(); bool isErrCodeSet() const; int getErrCode() const; void setErrInfo(const BSONObj& errInfo); - void unsetErrInfo(); bool isErrInfoSet() const; const BSONObj& getErrInfo() const; |