summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/commands/find_and_modify.cpp16
-rw-r--r--src/mongo/db/commands/find_cmd.cpp47
-rw-r--r--src/mongo/db/commands/write_commands/batch_executor.cpp114
-rw-r--r--src/mongo/db/db_raii.cpp8
-rw-r--r--src/mongo/db/dbcommands.cpp12
-rw-r--r--src/mongo/db/instance.cpp12
-rw-r--r--src/mongo/db/op_observer.cpp22
-rw-r--r--src/mongo/db/query/find.cpp23
-rw-r--r--src/mongo/db/s/collection_sharding_state.cpp12
-rw-r--r--src/mongo/db/s/migration_impl.cpp9
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp61
-rw-r--r--src/mongo/db/s/migration_source_manager.h53
-rw-r--r--src/mongo/db/s/operation_sharding_state.cpp64
-rw-r--r--src/mongo/db/s/operation_sharding_state.h28
-rw-r--r--src/mongo/db/s/set_shard_version_command.cpp263
-rw-r--r--src/mongo/db/s/sharding_state.cpp83
-rw-r--r--src/mongo/db/s/sharding_state.h29
-rw-r--r--src/mongo/s/client/shard_connection.cpp1
-rw-r--r--src/mongo/s/d_split.cpp6
-rw-r--r--src/mongo/s/write_ops/batch_write_exec.cpp14
-rw-r--r--src/mongo/s/write_ops/batch_write_exec_test.cpp7
-rw-r--r--src/mongo/s/write_ops/write_error_detail.cpp12
-rw-r--r--src/mongo/s/write_ops/write_error_detail.h3
23 files changed, 433 insertions, 466 deletions
diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp
index c1666497b78..32b48d4eb3e 100644
--- a/src/mongo/db/commands/find_and_modify.cpp
+++ b/src/mongo/db/commands/find_and_modify.cpp
@@ -261,9 +261,7 @@ public:
}
auto css = CollectionShardingState::get(txn, nsString);
- if (css) {
- css->checkShardVersionOrThrow(txn);
- }
+ css->checkShardVersionOrThrow(txn);
Collection* const collection = autoColl.getCollection();
auto statusWithPlanExecutor = getExecutorDelete(txn, collection, &parsedDelete);
@@ -296,9 +294,7 @@ public:
}
auto css = CollectionShardingState::get(txn, nsString);
- if (css) {
- css->checkShardVersionOrThrow(txn);
- }
+ css->checkShardVersionOrThrow(txn);
Collection* collection = autoColl.getCollection();
auto statusWithPlanExecutor =
@@ -380,9 +376,7 @@ public:
}
auto css = CollectionShardingState::get(txn, nsString);
- if (css) {
- css->checkShardVersionOrThrow(txn);
- }
+ css->checkShardVersionOrThrow(txn);
Status isPrimary = checkCanAcceptWritesForDatabase(nsString);
if (!isPrimary.isOK()) {
@@ -442,9 +436,7 @@ public:
}
auto css = CollectionShardingState::get(txn, nsString);
- if (css) {
- css->checkShardVersionOrThrow(txn);
- }
+ css->checkShardVersionOrThrow(txn);
Status isPrimary = checkCanAcceptWritesForDatabase(nsString);
if (!isPrimary.isOK()) {
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index 5ecc6330afb..5610c4799db 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -49,11 +49,9 @@
#include "mongo/db/query/find.h"
#include "mongo/db/query/find_common.h"
#include "mongo/db/query/get_executor.h"
-#include "mongo/db/s/operation_sharding_state.h"
-#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/stats/counters.h"
-#include "mongo/s/stale_exception.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -238,29 +236,6 @@ public:
}
std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue());
- ShardingState* const shardingState = ShardingState::get(txn);
-
- if (OperationShardingState::get(txn).hasShardVersion() && shardingState->enabled()) {
- ChunkVersion receivedVersion = OperationShardingState::get(txn).getShardVersion(nss);
- ChunkVersion latestVersion;
- // Wait for migration completion to get the correct chunk version.
- const int maxTimeoutSec = 30;
- int timeoutSec = cq->getParsed().getMaxTimeMS() / 1000;
- if (!timeoutSec || timeoutSec > maxTimeoutSec) {
- timeoutSec = maxTimeoutSec;
- }
-
- if (!shardingState->waitTillNotInCriticalSection(timeoutSec)) {
- uasserted(ErrorCodes::LockTimeout, "Timeout while waiting for migration commit");
- }
-
- // If the received version is newer than the version cached in 'shardingState', then we
- // have to refresh 'shardingState' from the config servers. We do this before acquiring
- // locks so that we don't hold locks while waiting on the network.
- uassertStatusOK(shardingState->refreshMetadataIfNeeded(
- txn, nss.ns(), receivedVersion, &latestVersion));
- }
-
// Acquire locks.
AutoGetCollectionForRead ctx(txn, nss);
Collection* collection = ctx.getCollection();
@@ -268,11 +243,6 @@ public:
const int dbProfilingLevel =
ctx.getDb() ? ctx.getDb()->getProfilingLevel() : serverGlobalParams.defaultProfile;
- // It is possible that the sharding version will change during yield while we are
- // retrieving a plan executor. If this happens we will throw an error and mongos will
- // retry.
- const ChunkVersion shardingVersionAtStart = shardingState->getVersion(nss.ns());
-
// Get the execution plan for the query.
auto statusWithPlanExecutor =
getExecutorFind(txn, collection, nss, std::move(cq), PlanExecutor::YIELD_AUTO);
@@ -325,17 +295,10 @@ public:
<< WorkingSetCommon::toStatusString(obj)));
}
- // TODO: Currently, chunk ranges are kept around until all ClientCursors created while the
- // chunk belonged on this node are gone. Separating chunk lifetime management from
- // ClientCursor should allow this check to go away.
- if (!shardingState->getVersion(nss.ns()).isWriteCompatibleWith(shardingVersionAtStart)) {
- // Version changed while retrieving a PlanExecutor. Terminate the operation,
- // signaling that mongos should retry.
- throw SendStaleConfigException(nss.ns(),
- "version changed during find command",
- shardingVersionAtStart,
- shardingState->getVersion(nss.ns()));
- }
+ // Before saving the cursor, ensure that whatever plan we established happened with the
+ // expected collection version
+ auto css = CollectionShardingState::get(txn, nss);
+ css->checkShardVersionOrThrow(txn);
// Set up the cursor for getMore.
CursorId cursorId = 0;
diff --git a/src/mongo/db/commands/write_commands/batch_executor.cpp b/src/mongo/db/commands/write_commands/batch_executor.cpp
index d3fb03727e7..1fa931d239b 100644
--- a/src/mongo/db/commands/write_commands/batch_executor.cpp
+++ b/src/mongo/db/commands/write_commands/batch_executor.cpp
@@ -65,6 +65,7 @@
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/s/collection_metadata.h"
+#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/sharded_connection_info.h"
#include "mongo/db/s/sharding_state.h"
@@ -150,14 +151,6 @@ void toBatchError(const Status& status, BatchedCommandResponse* response) {
dassert(response->isValid(NULL));
}
-void noteInCriticalSection(WriteErrorDetail* staleError) {
- BSONObjBuilder builder;
- if (staleError->isErrInfoSet())
- builder.appendElements(staleError->getErrInfo());
- builder.append("inCriticalSection", true);
- staleError->setErrInfo(builder.obj());
-}
-
/**
* Translates write item type to wire protocol op code. Helper for
* WriteBatchExecutor::applyWriteItem().
@@ -194,38 +187,20 @@ void buildStaleError(const ChunkVersion& shardVersionRecvd,
bool checkShardVersion(OperationContext* txn,
const BatchedCommandRequest& request,
WriteOpResult* result) {
- const NamespaceString& nss = request.getTargetingNSS();
- dassert(txn->lockState()->isCollectionLockedForMode(nss.ns(), MODE_IX));
-
- auto& oss = OperationShardingState::get(txn);
+ const auto& css = CollectionShardingState::get(txn, request.getTargetingNSS());
- if (!oss.hasShardVersion()) {
- return true;
- }
-
- ChunkVersion operationShardVersion = oss.getShardVersion(nss);
- if (ChunkVersion::isIgnoredVersion(operationShardVersion)) {
+ try {
+ css->checkShardVersionOrThrow(txn);
return true;
- }
-
- ShardingState* shardingState = ShardingState::get(txn);
- CollectionMetadataPtr metadata = shardingState->getCollectionMetadata(nss.ns());
- ChunkVersion shardVersion = metadata ? metadata->getShardVersion() : ChunkVersion::UNSHARDED();
-
- if (!operationShardVersion.isWriteCompatibleWith(shardVersion)) {
- result->setError(new WriteErrorDetail);
- buildStaleError(operationShardVersion, shardVersion, result->getError());
+ } catch (const StaleConfigException& e) {
+ result->setError(new WriteErrorDetail());
+ buildStaleError(e.getVersionReceived(), e.getVersionWanted(), result->getError());
return false;
}
-
- return true;
}
} // namespace
-// TODO: Determine queueing behavior we want here
-MONGO_EXPORT_SERVER_PARAMETER(queueForMigrationCommit, bool, true);
-
WriteBatchExecutor::WriteBatchExecutor(OperationContext* txn, OpCounters* opCounters, LastError* le)
: _txn(txn), _opCounters(opCounters), _le(le), _stats(new WriteBatchStats) {}
@@ -326,51 +301,15 @@ void WriteBatchExecutor::executeBatch(const BatchedCommandRequest& request,
// Refresh metadata if needed
//
- bool staleBatch =
+ const bool staleBatch =
!writeErrors.empty() && writeErrors.back()->getErrCode() == ErrorCodes::StaleShardVersion;
if (staleBatch) {
- ShardingState* shardingState = ShardingState::get(_txn);
-
- auto& oss = OperationShardingState::get(_txn);
- ChunkVersion requestShardVersion = oss.getShardVersion(request.getTargetingNSS());
-
- //
- // First, we refresh metadata if we need to based on the requested version.
- //
- ChunkVersion latestShardVersion;
- shardingState->refreshMetadataIfNeeded(
- _txn, request.getTargetingNS(), requestShardVersion, &latestShardVersion);
-
- // Report if we're still changing our metadata
- // TODO: Better reporting per-collection
- if (shardingState->inCriticalMigrateSection()) {
- noteInCriticalSection(writeErrors.back());
- }
+ const auto& oss = OperationShardingState::get(_txn);
- if (queueForMigrationCommit) {
- //
- // Queue up for migration to end - this allows us to be sure that clients will
- // not repeatedly try to refresh metadata that is not yet written to the config
- // server. Not necessary for correctness.
- // Exposed as optional parameter to allow testing of queuing behavior with
- // different network timings.
- //
- // Only wait if we're an older version (in the current collection epoch) and
- // we're not write compatible, implying that the current migration is affecting
- // writes.
- //
-
- if (requestShardVersion.isOlderThan(latestShardVersion) &&
- !requestShardVersion.isWriteCompatibleWith(latestShardVersion)) {
- while (shardingState->inCriticalMigrateSection()) {
- log() << "write request to old shard version " << requestShardVersion
- << " waiting for migration commit";
-
- shardingState->waitTillNotInCriticalSection(10 /* secs */);
- }
- }
- }
+ ChunkVersion requestedShardVersion = oss.getShardVersion(request.getTargetingNSS());
+ ShardingState::get(_txn)
+ ->onStaleShardVersion(_txn, request.getTargetingNSS(), requestedShardVersion);
}
//
@@ -1035,11 +974,9 @@ static void insertOne(WriteBatchExecutor::ExecInsertsState* state, WriteOpResult
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
txn, "insert", state->getCollection() ? state->getCollection()->ns().ns() : "index");
- } catch (const StaleConfigException& staleExcep) {
- result->setError(new WriteErrorDetail);
- result->getError()->setErrCode(ErrorCodes::StaleShardVersion);
- buildStaleError(
- staleExcep.getVersionReceived(), staleExcep.getVersionWanted(), result->getError());
+ } catch (const StaleConfigException& e) {
+ result->setError(new WriteErrorDetail());
+ buildStaleError(e.getVersionReceived(), e.getVersionWanted(), result->getError());
} catch (const DBException& ex) {
Status status(ex.toStatus());
if (ErrorCodes::isInterruption(status.code()))
@@ -1238,7 +1175,7 @@ static void multiUpdate(OperationContext* txn,
// fire in that case.
lastOpSetterGuard.Dismiss();
}
- } catch (const WriteConflictException& dle) {
+ } catch (const WriteConflictException&) {
debug->writeConflicts++;
if (isMulti) {
log() << "Had WriteConflict during multi update, aborting";
@@ -1246,16 +1183,15 @@ static void multiUpdate(OperationContext* txn,
}
createCollection = false;
+
// RESTART LOOP
fakeLoop = -1;
txn->recoveryUnit()->abandonSnapshot();
WriteConflictException::logAndBackoff(attempt++, "update", nsString.ns());
- } catch (const StaleConfigException& staleExcep) {
- result->setError(new WriteErrorDetail);
- result->getError()->setErrCode(ErrorCodes::StaleShardVersion);
- buildStaleError(
- staleExcep.getVersionReceived(), staleExcep.getVersionWanted(), result->getError());
+ } catch (const StaleConfigException& e) {
+ result->setError(new WriteErrorDetail());
+ buildStaleError(e.getVersionReceived(), e.getVersionWanted(), result->getError());
} catch (const DBException& ex) {
Status status = ex.toStatus();
if (ErrorCodes::isInterruption(status.code())) {
@@ -1345,14 +1281,12 @@ static void multiRemove(OperationContext* txn,
lastOpSetterGuard.Dismiss();
}
break;
- } catch (const WriteConflictException& dle) {
+ } catch (const WriteConflictException&) {
CurOp::get(txn)->debug().writeConflicts++;
WriteConflictException::logAndBackoff(attempt++, "delete", nss.ns());
- } catch (const StaleConfigException& staleExcep) {
- result->setError(new WriteErrorDetail);
- result->getError()->setErrCode(ErrorCodes::StaleShardVersion);
- buildStaleError(
- staleExcep.getVersionReceived(), staleExcep.getVersionWanted(), result->getError());
+ } catch (const StaleConfigException& e) {
+ result->setError(new WriteErrorDetail());
+ buildStaleError(e.getVersionReceived(), e.getVersionWanted(), result->getError());
return;
} catch (const DBException& ex) {
Status status = ex.toStatus();
diff --git a/src/mongo/db/db_raii.cpp b/src/mongo/db/db_raii.cpp
index 162017fc0f4..8075c940b1a 100644
--- a/src/mongo/db/db_raii.cpp
+++ b/src/mongo/db/db_raii.cpp
@@ -102,9 +102,7 @@ AutoGetCollectionForRead::AutoGetCollectionForRead(OperationContext* txn,
// We have both the DB and collection locked, which is the prerequisite to do a stable shard
// version check, but we'd like to do the check after we have a satisfactory snapshot.
auto css = CollectionShardingState::get(txn, nss);
- if (css) {
- css->checkShardVersionOrThrow(txn);
- }
+ css->checkShardVersionOrThrow(txn);
}
AutoGetCollectionForRead::~AutoGetCollectionForRead() {
@@ -196,9 +194,7 @@ void OldClientContext::_checkNotStale() const {
break;
default:
auto css = CollectionShardingState::get(_txn, _ns);
- if (css) {
- css->checkShardVersionOrThrow(_txn);
- }
+ css->checkShardVersionOrThrow(_txn);
}
}
diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp
index a469d2ebfaf..c2648e1f73b 100644
--- a/src/mongo/db/dbcommands.cpp
+++ b/src/mongo/db/dbcommands.cpp
@@ -1209,7 +1209,6 @@ void Command::execCommand(OperationContext* txn,
std::string dbname = request.getDatabase().toString();
unique_ptr<MaintenanceModeSetter> mmSetter;
-
std::array<BSONElement, std::tuple_size<decltype(neededFieldNames)>::value>
extractedFields{};
request.getCommandArgs().getFields(neededFieldNames, &extractedFields);
@@ -1322,7 +1321,14 @@ void Command::execCommand(OperationContext* txn,
if (!retval) {
command->_commandsFailed.increment();
}
- } catch (const DBException& exception) {
+ } catch (const DBException& e) {
+ // If we got a stale config, wait in case the operation is stuck in a critical section
+ if (e.getCode() == ErrorCodes::SendStaleConfig) {
+ auto& sce = static_cast<const StaleConfigException&>(e);
+ ShardingState::get(txn)
+ ->onStaleShardVersion(txn, NamespaceString(sce.getns()), sce.getVersionReceived());
+ }
+
BSONObj metadata = rpc::makeEmptyMetadata();
if (ShardingState::get(txn)->enabled()) {
auto opTime = grid.shardRegistry()->getConfigOpTime();
@@ -1331,7 +1337,7 @@ void Command::execCommand(OperationContext* txn,
metadata = metadataBob.obj();
}
- Command::generateErrorResponse(txn, replyBuilder, exception, request, command, metadata);
+ Command::generateErrorResponse(txn, replyBuilder, e, request, command, metadata);
}
}
diff --git a/src/mongo/db/instance.cpp b/src/mongo/db/instance.cpp
index 10ab281ff04..166a2b5683f 100644
--- a/src/mongo/db/instance.cpp
+++ b/src/mongo/db/instance.cpp
@@ -79,6 +79,7 @@
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator_global.h"
+#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/sharded_connection_info.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/service_context.h"
@@ -399,9 +400,16 @@ static void receivedQuery(OperationContext* txn,
uassertStatusOK(status);
dbResponse.exhaustNS = runQuery(txn, q, nss, dbResponse.response);
- } catch (const AssertionException& exception) {
+ } catch (const AssertionException& e) {
+ // If we got a stale config, wait in case the operation is stuck in a critical section
+ if (e.getCode() == ErrorCodes::SendStaleConfig) {
+ auto& sce = static_cast<const StaleConfigException&>(e);
+ ShardingState::get(txn)
+ ->onStaleShardVersion(txn, NamespaceString(sce.getns()), sce.getVersionReceived());
+ }
+
dbResponse.response.reset();
- generateLegacyQueryErrorResponse(&exception, q, &op, &dbResponse.response);
+ generateLegacyQueryErrorResponse(&e, q, &op, &dbResponse.response);
}
op.debug().responseLength = dbResponse.response.header().dataLen();
diff --git a/src/mongo/db/op_observer.cpp b/src/mongo/db/op_observer.cpp
index 68f5bfcfc08..25bf9866c7f 100644
--- a/src/mongo/db/op_observer.cpp
+++ b/src/mongo/db/op_observer.cpp
@@ -51,8 +51,8 @@ void OpObserver::onCreateIndex(OperationContext* txn,
AuthorizationManager::get(txn->getServiceContext())
->logOp(txn, "i", ns.c_str(), indexDoc, nullptr);
- CollectionShardingState* const css = CollectionShardingState::get(txn, ns);
- if (css && !fromMigrate) {
+ auto css = CollectionShardingState::get(txn, ns);
+ if (!fromMigrate) {
css->onInsertOp(txn, indexDoc);
}
@@ -66,12 +66,12 @@ void OpObserver::onInserts(OperationContext* txn,
bool fromMigrate) {
repl::logOps(txn, "i", nss, begin, end, fromMigrate);
- CollectionShardingState* const css = CollectionShardingState::get(txn, nss.ns());
+ auto css = CollectionShardingState::get(txn, nss.ns());
const char* ns = nss.ns().c_str();
for (auto it = begin; it != end; it++) {
AuthorizationManager::get(txn->getServiceContext())->logOp(txn, "i", ns, *it, nullptr);
- if (css && !fromMigrate) {
+ if (!fromMigrate) {
css->onInsertOp(txn, *it);
}
}
@@ -92,8 +92,8 @@ void OpObserver::onUpdate(OperationContext* txn, const OplogUpdateEntryArgs& arg
AuthorizationManager::get(txn->getServiceContext())
->logOp(txn, "u", args.ns.c_str(), args.update, &args.criteria);
- CollectionShardingState* const css = CollectionShardingState::get(txn, args.ns);
- if (css && !args.fromMigrate) {
+ auto css = CollectionShardingState::get(txn, args.ns);
+ if (!args.fromMigrate) {
css->onUpdateOp(txn, args.updatedDoc);
}
@@ -112,10 +112,8 @@ OpObserver::DeleteState OpObserver::aboutToDelete(OperationContext* txn,
deleteState.idDoc = idElement.wrap();
}
- CollectionShardingState* const css = CollectionShardingState::get(txn, ns.ns());
- if (css) {
- deleteState.isMigrating = css->isDocumentInMigratingChunk(txn, doc);
- }
+ auto css = CollectionShardingState::get(txn, ns.ns());
+ deleteState.isMigrating = css->isDocumentInMigratingChunk(txn, doc);
return deleteState;
}
@@ -131,8 +129,8 @@ void OpObserver::onDelete(OperationContext* txn,
AuthorizationManager::get(txn->getServiceContext())
->logOp(txn, "d", ns.ns().c_str(), deleteState.idDoc, nullptr);
- CollectionShardingState* const css = CollectionShardingState::get(txn, ns.ns());
- if (css && !fromMigrate && deleteState.isMigrating) {
+ auto css = CollectionShardingState::get(txn, ns.ns());
+ if (!fromMigrate && deleteState.isMigrating) {
css->onDeleteOp(txn, deleteState.idDoc);
}
diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp
index 946562baa27..241943f5fe1 100644
--- a/src/mongo/db/query/find.cpp
+++ b/src/mongo/db/query/find.cpp
@@ -51,7 +51,7 @@
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/query/query_planner_params.h"
#include "mongo/db/repl/replication_coordinator_global.h"
-#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/server_options.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/storage/storage_options.h"
@@ -553,11 +553,6 @@ std::string runQuery(OperationContext* txn,
return "";
}
- ShardingState* const shardingState = ShardingState::get(txn);
-
- // We freak out later if this changes before we're done with the query.
- const ChunkVersion shardingVersionAtStart = shardingState->getVersion(nss.ns());
-
// Handle query option $maxTimeMS (not used with commands).
curop.setMaxTimeMicros(static_cast<unsigned long long>(pq.getMaxTimeMS()) * 1000);
txn->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point.
@@ -583,7 +578,6 @@ std::string runQuery(OperationContext* txn,
BSONObj obj;
PlanExecutor::ExecState state;
- // uint64_t numMisplacedDocs = 0;
// Get summary info about which plan the executor is using.
{
@@ -635,17 +629,10 @@ std::string runQuery(OperationContext* txn,
uasserted(17144, "Executor error: " + WorkingSetCommon::toStatusString(obj));
}
- // TODO: Currently, chunk ranges are kept around until all ClientCursors created while the
- // chunk belonged on this node are gone. Separating chunk lifetime management from
- // ClientCursor should allow this check to go away.
- if (!shardingState->getVersion(nss.ns()).isWriteCompatibleWith(shardingVersionAtStart)) {
- // if the version changed during the query we might be missing some data and its safe to
- // send this as mongos can resend at this point
- throw SendStaleConfigException(nss.ns(),
- "version changed during initial query",
- shardingVersionAtStart,
- shardingState->getVersion(nss.ns()));
- }
+ // Before saving the cursor, ensure that whatever plan we established happened with the expected
+ // collection version
+ auto css = CollectionShardingState::get(txn, nss);
+ css->checkShardVersionOrThrow(txn);
// Fill out curop based on query results. If we have a cursorid, we will fill out curop with
// this cursorid later.
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp
index e0fa35d4da2..acff32f0b43 100644
--- a/src/mongo/db/s/collection_sharding_state.cpp
+++ b/src/mongo/db/s/collection_sharding_state.cpp
@@ -68,6 +68,11 @@ CollectionShardingState* CollectionShardingState::get(OperationContext* txn,
}
void CollectionShardingState::setMetadata(std::shared_ptr<CollectionMetadata> newMetadata) {
+ if (newMetadata) {
+ invariant(!newMetadata->getCollVersion().isWriteCompatibleWith(ChunkVersion::UNSHARDED()));
+ invariant(!newMetadata->getShardVersion().isWriteCompatibleWith(ChunkVersion::UNSHARDED()));
+ }
+
_metadata = std::move(newMetadata);
}
@@ -76,6 +81,13 @@ void CollectionShardingState::checkShardVersionOrThrow(OperationContext* txn) co
ChunkVersion received;
ChunkVersion wanted;
if (!_checkShardVersionOk(txn, &errmsg, &received, &wanted)) {
+ // Set migration critical section in case we failed because of migration
+ auto migrationCritSec =
+ ShardingState::get(txn)->migrationSourceManager()->getMigrationCriticalSection();
+ if (migrationCritSec) {
+ OperationShardingState::get(txn).setMigrationCriticalSection(migrationCritSec);
+ }
+
throw SendStaleConfigException(_nss.ns(),
str::stream() << "[" << _nss.ns()
<< "] shard version not ok: " << errmsg,
diff --git a/src/mongo/db/s/migration_impl.cpp b/src/mongo/db/s/migration_impl.cpp
index 873fc6c6f5c..d3dd4e30442 100644
--- a/src/mongo/db/s/migration_impl.cpp
+++ b/src/mongo/db/s/migration_impl.cpp
@@ -69,6 +69,7 @@ BSONObj createRecvChunkCommitRequest(const MigrationSessionId& sessionId) {
}
MONGO_FP_DECLARE(failMigrationCommit);
+MONGO_FP_DECLARE(hangBeforeCommitMigration);
MONGO_FP_DECLARE(hangBeforeLeavingCriticalSection);
} // namespace
@@ -133,12 +134,12 @@ Status ChunkMoveOperationState::initialize(const BSONObj& cmdObj) {
_toShardCS = toShard->getConnString();
}
- auto& operationVersion = OperationShardingState::get(_txn);
- if (!operationVersion.hasShardVersion()) {
+ auto& oss = OperationShardingState::get(_txn);
+ if (!oss.hasShardVersion()) {
return Status{ErrorCodes::InvalidOptions, "moveChunk command is missing shard version"};
}
- _collectionVersion = operationVersion.getShardVersion(_nss);
+ _collectionVersion = oss.getShardVersion(_nss);
return Status::OK();
}
@@ -385,6 +386,8 @@ Status ChunkMoveOperationState::commitMigration(const MigrationSessionId& sessio
preCond.append(b.obj());
}
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangBeforeCommitMigration);
+
fassertStatusOK(34431,
grid.catalogManager(_txn)->applyChunkOpsDeprecated(
_txn, updates.arr(), preCond.arr(), _nss.ns(), nextVersion));
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index c9dacc78491..980620de09e 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -48,6 +48,7 @@
#include "mongo/logger/ramlog.h"
#include "mongo/s/chunk.h"
#include "mongo/s/shard_key_pattern.h"
+#include "mongo/stdx/memory.h"
#include "mongo/util/elapsed_tracker.h"
#include "mongo/util/log.h"
@@ -209,8 +210,11 @@ void MigrationSourceManager::done(OperationContext* txn) {
_sessionId = boost::none;
_deleteNotifyExec.reset(NULL);
- _inCriticalSection = false;
- _inCriticalSectionCV.notify_all();
+
+ if (_critSec) {
+ _critSec->exitCriticalSection();
+ _critSec = nullptr;
+ }
_deleted.clear();
_reload.clear();
@@ -577,30 +581,23 @@ long long MigrationSourceManager::mbUsed() const {
return _memoryUsed / (1024 * 1024);
}
-bool MigrationSourceManager::getInCriticalSection() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _inCriticalSection;
-}
-
void MigrationSourceManager::setInCriticalSection(bool inCritSec) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- _inCriticalSection = inCritSec;
- _inCriticalSectionCV.notify_all();
-}
-bool MigrationSourceManager::waitTillNotInCriticalSection(int maxSecondsToWait) {
- const auto deadline = stdx::chrono::system_clock::now() + Seconds(maxSecondsToWait);
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- while (_inCriticalSection) {
- log() << "Waiting for " << maxSecondsToWait
- << " seconds for the migration critical section to end";
-
- if (stdx::cv_status::timeout == _inCriticalSectionCV.wait_until(lk, deadline)) {
- return false;
- }
+ if (inCritSec) {
+ invariant(!_critSec);
+ _critSec = std::make_shared<CriticalSectionState>();
+ } else {
+ invariant(_critSec);
+ _critSec->exitCriticalSection();
+ _critSec = nullptr;
}
+}
- return true;
+std::shared_ptr<MigrationSourceManager::CriticalSectionState>
+MigrationSourceManager::getMigrationCriticalSection() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _critSec;
}
bool MigrationSourceManager::isActive() const {
@@ -649,4 +646,26 @@ NamespaceString MigrationSourceManager::_getNS() const {
return _nss;
}
+MigrationSourceManager::CriticalSectionState::CriticalSectionState() = default;
+
+bool MigrationSourceManager::CriticalSectionState::waitUntilOutOfCriticalSection(
+ Microseconds waitTimeout) {
+ const auto waitDeadline = stdx::chrono::system_clock::now() + waitTimeout;
+
+ stdx::unique_lock<stdx::mutex> sl(_criticalSectionMutex);
+ while (_inCriticalSection) {
+ if (stdx::cv_status::timeout == _criticalSectionCV.wait_until(sl, waitDeadline)) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+void MigrationSourceManager::CriticalSectionState::exitCriticalSection() {
+ stdx::unique_lock<stdx::mutex> sl(_criticalSectionMutex);
+ _inCriticalSection = false;
+ _criticalSectionCV.notify_all();
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h
index 8b5504fd1c9..5c9cf07751e 100644
--- a/src/mongo/db/s/migration_source_manager.h
+++ b/src/mongo/db/s/migration_source_manager.h
@@ -49,6 +49,8 @@ class MigrationSourceManager {
MONGO_DISALLOW_COPYING(MigrationSourceManager);
public:
+ class CriticalSectionState;
+
MigrationSourceManager();
~MigrationSourceManager();
@@ -127,18 +129,12 @@ public:
long long mbUsed() const;
- bool getInCriticalSection() const;
-
void setInCriticalSection(bool inCritSec);
- /**
- * Blocks until the "in critical section" state changes and returns true if we are NOT in the
- * critical section
- */
- bool waitTillNotInCriticalSection(int maxSecondsToWait);
-
bool isActive() const;
+ std::shared_ptr<CriticalSectionState> getMigrationCriticalSection();
+
private:
friend class LogOpForShardingHandler;
@@ -175,11 +171,6 @@ private:
mutable stdx::mutex _mutex;
- stdx::condition_variable _inCriticalSectionCV; // (M)
-
- // Is migration currently in critical section. This can be used to block new writes.
- bool _inCriticalSection{false}; // (M)
-
std::unique_ptr<PlanExecutor> _deleteNotifyExec; // (M)
// List of _id of documents that were modified that must be re-cloned.
@@ -203,6 +194,42 @@ private:
// List of record id that needs to be transferred from here to the other side.
std::set<RecordId> _cloneLocs; // (C)
+
+ // This value is set when setInCriticalSection is called with true argument and is signalled and
+ // cleared when it is called with false argument.
+ std::shared_ptr<CriticalSectionState> _critSec;
+};
+
+/**
+ * This object is instantiated once the migration logic enters critical section. It contains all
+ * the state which is associated with being in a critical section, such as the bumped metadata
+ * version (which has not yet been reflected on the config server).
+ */
+class MigrationSourceManager::CriticalSectionState {
+ MONGO_DISALLOW_COPYING(CriticalSectionState);
+
+public:
+ CriticalSectionState();
+
+ /**
+ * Blocks until the critical section completes. Returns true if the wait succeeded and the
+ * critical section is no longer active, or false if the waitTimeout was exceeded.
+ */
+ bool waitUntilOutOfCriticalSection(Microseconds waitTimeout);
+
+ /**
+ * To be called when the critical section has completed. Signals any threads sitting blocked in
+ * waitUntilOutOfCriticalSection. Must only be used once for the lifetime of this object.
+ */
+ void exitCriticalSection();
+
+private:
+ // Only moves from true to false once. Happens under the critical section mutex and the critical
+ // section will be signalled.
+ bool _inCriticalSection{true};
+
+ stdx::mutex _criticalSectionMutex;
+ stdx::condition_variable _criticalSectionCV;
};
} // namespace mongo
diff --git a/src/mongo/db/s/operation_sharding_state.cpp b/src/mongo/db/s/operation_sharding_state.cpp
index 906c491aac3..40617c08138 100644
--- a/src/mongo/db/s/operation_sharding_state.cpp
+++ b/src/mongo/db/s/operation_sharding_state.cpp
@@ -39,7 +39,8 @@ namespace {
const OperationContext::Decoration<OperationShardingState> shardingMetadataDecoration =
OperationContext::declareDecoration<OperationShardingState>();
-const ChunkVersion kUnshardedVersion(ChunkVersion::UNSHARDED());
+// Max time to wait for the migration critical section to complete
+const Minutes kMaxWaitForMigrationCriticalSection(5);
} // namespace mongo
@@ -49,12 +50,12 @@ OperationShardingState& OperationShardingState::get(OperationContext* txn) {
return shardingMetadataDecoration(txn);
}
-void OperationShardingState::initializeShardVersion(NamespaceString ns,
+void OperationShardingState::initializeShardVersion(NamespaceString nss,
const BSONElement& shardVersionElt) {
invariant(!hasShardVersion());
- if (ns.isSystemDotIndexes()) {
- setShardVersion(std::move(ns), ChunkVersion::IGNORED());
+ if (nss.isSystemDotIndexes()) {
+ setShardVersion(std::move(nss), ChunkVersion::IGNORED());
return;
}
@@ -70,31 +71,54 @@ void OperationShardingState::initializeShardVersion(NamespaceString ns,
return;
}
- setShardVersion(std::move(ns), std::move(newVersion));
+ setShardVersion(std::move(nss), std::move(newVersion));
}
bool OperationShardingState::hasShardVersion() const {
return _hasVersion;
}
-const ChunkVersion& OperationShardingState::getShardVersion(const NamespaceString& ns) const {
- if (_ns != ns) {
- return kUnshardedVersion;
+ChunkVersion OperationShardingState::getShardVersion(const NamespaceString& nss) const {
+ if (_ns != nss) {
+ return ChunkVersion::UNSHARDED();
}
return _shardVersion;
}
-void OperationShardingState::setShardVersion(NamespaceString ns, ChunkVersion newVersion) {
+void OperationShardingState::setShardVersion(NamespaceString nss, ChunkVersion newVersion) {
// This currently supports only setting the shard version for one namespace.
- invariant(!_hasVersion || _ns == ns);
- invariant(!ns.isSystemDotIndexes() || ChunkVersion::isIgnoredVersion(newVersion));
+ invariant(!_hasVersion || _ns == nss);
+ invariant(!nss.isSystemDotIndexes() || ChunkVersion::isIgnoredVersion(newVersion));
- _ns = std::move(ns);
+ _ns = std::move(nss);
_shardVersion = std::move(newVersion);
_hasVersion = true;
}
+bool OperationShardingState::waitForMigrationCriticalSection(OperationContext* txn) {
+ // Must not block while holding a lock
+ invariant(!txn->lockState()->isLocked());
+
+ if (_migrationCriticalSection) {
+ const Microseconds operationRemainingTime(Microseconds(txn->getRemainingMaxTimeMicros()));
+ _migrationCriticalSection->waitUntilOutOfCriticalSection(
+ durationCount<Microseconds>(operationRemainingTime)
+ ? operationRemainingTime
+ : kMaxWaitForMigrationCriticalSection);
+ _migrationCriticalSection = nullptr;
+ return true;
+ }
+
+ return false;
+}
+
+void OperationShardingState::setMigrationCriticalSection(
+ std::shared_ptr<MigrationSourceManager::CriticalSectionState> critSec) {
+ invariant(critSec);
+ _migrationCriticalSection = std::move(critSec);
+}
+
void OperationShardingState::_clear() {
_hasVersion = false;
_shardVersion = ChunkVersion();
@@ -104,21 +128,21 @@ void OperationShardingState::_clear() {
OperationShardingState::IgnoreVersioningBlock::IgnoreVersioningBlock(OperationContext* txn,
const NamespaceString& ns)
: _txn(txn), _ns(ns) {
- auto& operationVersion = OperationShardingState::get(txn);
- _hadOriginalVersion = operationVersion._hasVersion;
+ auto& oss = OperationShardingState::get(txn);
+ _hadOriginalVersion = oss._hasVersion;
if (_hadOriginalVersion) {
- _originalVersion = operationVersion.getShardVersion(ns);
+ _originalVersion = oss.getShardVersion(ns);
}
- operationVersion.setShardVersion(ns, ChunkVersion::IGNORED());
+ oss.setShardVersion(ns, ChunkVersion::IGNORED());
}
OperationShardingState::IgnoreVersioningBlock::~IgnoreVersioningBlock() {
- auto& operationVersion = OperationShardingState::get(_txn);
- invariant(ChunkVersion::isIgnoredVersion(operationVersion.getShardVersion(_ns)));
+ auto& oss = OperationShardingState::get(_txn);
+ invariant(ChunkVersion::isIgnoredVersion(oss.getShardVersion(_ns)));
if (_hadOriginalVersion) {
- operationVersion.setShardVersion(_ns, _originalVersion);
+ oss.setShardVersion(_ns, _originalVersion);
} else {
- operationVersion._clear();
+ oss._clear();
}
}
diff --git a/src/mongo/db/s/operation_sharding_state.h b/src/mongo/db/s/operation_sharding_state.h
index 7aa2fc919d3..f52cbd9f702 100644
--- a/src/mongo/db/s/operation_sharding_state.h
+++ b/src/mongo/db/s/operation_sharding_state.h
@@ -32,6 +32,7 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/s/migration_source_manager.h"
#include "mongo/s/chunk_version.h"
namespace mongo {
@@ -69,7 +70,7 @@ public:
* This initialization may only be performed once for the lifetime of the object, which
* coincides with the lifetime of the request.
*/
- void initializeShardVersion(NamespaceString ns, const BSONElement& shardVersionElement);
+ void initializeShardVersion(NamespaceString nss, const BSONElement& shardVersionElement);
/**
* Returns whether or not there is a shard version associated with this operation.
@@ -84,12 +85,29 @@ public:
* Returns ChunkVersion::UNSHARDED() if this operation has no shard version information
* for the requested namespace.
*/
- const ChunkVersion& getShardVersion(const NamespaceString& ns) const;
+ ChunkVersion getShardVersion(const NamespaceString& nss) const;
/**
* Stores the given chunk version of a namespace into this object.
*/
- void setShardVersion(NamespaceString ns, ChunkVersion newVersion);
+ void setShardVersion(NamespaceString nss, ChunkVersion newVersion);
+
+ /**
+ * This call is a no op if there isn't a currently active migration critical section. Otherwise
+ * it will wait for the critical section to complete up to the remaining operation time.
+ *
+ * Returns true if the call actually waited because of migration critical section (regardless if
+ * whether it timed out or not), false if there was no active migration critical section.
+ */
+ bool waitForMigrationCriticalSection(OperationContext* txn);
+
+ /**
+ * Setting this value indicates that when the version check failed, there was an active
+ * migration for the namespace and that it would be prudent to wait for the critical section to
+ * complete before retrying so the router doesn't make wasteful requests.
+ */
+ void setMigrationCriticalSection(
+ std::shared_ptr<MigrationSourceManager::CriticalSectionState> critSec);
private:
/**
@@ -101,6 +119,10 @@ private:
bool _hasVersion = false;
ChunkVersion _shardVersion;
NamespaceString _ns;
+
+ // This value will only be non-null if version check during the operation execution failed due
+ // to stale version and there was a migration for that namespace, which was in critical section.
+ std::shared_ptr<MigrationSourceManager::CriticalSectionState> _migrationCriticalSection;
};
/**
diff --git a/src/mongo/db/s/set_shard_version_command.cpp b/src/mongo/db/s/set_shard_version_command.cpp
index 04710cd8c95..b4f5f9170cb 100644
--- a/src/mongo/db/s/set_shard_version_command.cpp
+++ b/src/mongo/db/s/set_shard_version_command.cpp
@@ -37,9 +37,12 @@
#include "mongo/db/auth/privilege.h"
#include "mongo/db/client.h"
#include "mongo/db/commands.h"
+#include "mongo/db/db_raii.h"
#include "mongo/db/lasterror.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/replication_coordinator_global.h"
+#include "mongo/db/s/collection_metadata.h"
+#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/sharded_connection_info.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/wire_version.h"
@@ -152,157 +155,191 @@ public:
return false;
}
+ const NamespaceString nss(ns);
+
+ // Backwards compatibility for SERVER-23119
+ if (!nss.isValid()) {
+ warning() << "Invalid namespace used for setShardVersion: " << ns;
+ return true;
+ }
// we can run on a slave up to here
- if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(
- nsToDatabase(ns))) {
+ if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(nss.db())) {
result.append("errmsg", "not master");
result.append("note", "from post init in setShardVersion");
return false;
}
// step 2
- ChunkVersion version =
+ const ChunkVersion requestedVersion =
uassertStatusOK(ChunkVersion::parseFromBSONForSetShardVersion(cmdObj));
- // step 3
- const ChunkVersion oldVersion = info->getVersion(ns);
- const ChunkVersion globalVersion = shardingState->getVersion(ns);
+ // step 3 - Actual version checking
+ const ChunkVersion connectionVersion = info->getVersion(ns);
+ connectionVersion.addToBSON(result, "oldVersion");
+
+ {
+ // Use a stable collection metadata while performing the checks
+ boost::optional<AutoGetCollection> autoColl;
+ autoColl.emplace(txn, nss, MODE_IS);
+
+ auto css = CollectionShardingState::get(txn, nss);
+ const ChunkVersion collectionShardVersion =
+ (css->getMetadata() ? css->getMetadata()->getShardVersion()
+ : ChunkVersion::UNSHARDED());
+
+ if (requestedVersion.isWriteCompatibleWith(collectionShardVersion)) {
+ // mongos and mongod agree!
+ if (!connectionVersion.isWriteCompatibleWith(requestedVersion)) {
+ if (connectionVersion < collectionShardVersion &&
+ connectionVersion.epoch() == collectionShardVersion.epoch()) {
+ info->setVersion(ns, requestedVersion);
+ } else if (authoritative) {
+ // this means there was a drop and our version is reset
+ info->setVersion(ns, requestedVersion);
+ } else {
+ result.append("ns", ns);
+ result.appendBool("need_authoritative", true);
+ errmsg = "verifying drop on '" + ns + "'";
+ return false;
+ }
+ }
- oldVersion.addToBSON(result, "oldVersion");
+ return true;
+ }
- if (version.isWriteCompatibleWith(globalVersion)) {
- // mongos and mongod agree!
- if (!oldVersion.isWriteCompatibleWith(version)) {
- if (oldVersion < globalVersion && oldVersion.hasEqualEpoch(globalVersion)) {
- info->setVersion(ns, version);
- } else if (authoritative) {
- // this means there was a drop and our version is reset
- info->setVersion(ns, version);
- } else {
- result.append("ns", ns);
+ // step 4
+ // Cases below all either return OR fall-through to remote metadata reload.
+ const bool isDropRequested =
+ !requestedVersion.isSet() && collectionShardVersion.isSet();
+
+ if (isDropRequested) {
+ if (!authoritative) {
result.appendBool("need_authoritative", true);
- errmsg = "verifying drop on '" + ns + "'";
+ result.append("ns", ns);
+ collectionShardVersion.addToBSON(result, "globalVersion");
+ errmsg = "dropping needs to be authoritative";
return false;
}
- }
-
- return true;
- }
- // step 4
- // Cases below all either return OR fall-through to remote metadata reload.
- const bool isDropRequested = !version.isSet() && globalVersion.isSet();
-
- if (isDropRequested) {
- if (!authoritative) {
- result.appendBool("need_authoritative", true);
- result.append("ns", ns);
- globalVersion.addToBSON(result, "globalVersion");
- errmsg = "dropping needs to be authoritative";
- return false;
- }
-
- // Fall through to metadata reload below
- } else {
- // Not Dropping
+ // Fall through to metadata reload below
+ } else {
+ // Not Dropping
- // TODO: Refactor all of this
- if (version < oldVersion && version.hasEqualEpoch(oldVersion)) {
- errmsg = str::stream() << "this connection already had a newer version "
- << "of collection '" << ns << "'";
- result.append("ns", ns);
- version.addToBSON(result, "newVersion");
- globalVersion.addToBSON(result, "globalVersion");
- return false;
- }
+ // TODO: Refactor all of this
+ if (requestedVersion < connectionVersion &&
+ requestedVersion.epoch() == connectionVersion.epoch()) {
+ errmsg = str::stream() << "this connection already had a newer version "
+ << "of collection '" << ns << "'";
+ result.append("ns", ns);
+ requestedVersion.addToBSON(result, "newVersion");
+ collectionShardVersion.addToBSON(result, "globalVersion");
+ return false;
+ }
- // TODO: Refactor all of this
- if (version < globalVersion && version.hasEqualEpoch(globalVersion)) {
- while (shardingState->inCriticalMigrateSection()) {
- log() << "waiting till out of critical section";
- shardingState->waitTillNotInCriticalSection(10);
+ // TODO: Refactor all of this
+ if (requestedVersion < collectionShardVersion &&
+ requestedVersion.epoch() == collectionShardVersion.epoch()) {
+ auto critSec =
+ shardingState->migrationSourceManager()->getMigrationCriticalSection();
+ if (critSec) {
+ autoColl.reset();
+ log() << "waiting till out of critical section";
+ critSec->waitUntilOutOfCriticalSection(Seconds(10));
+ }
+
+ errmsg = str::stream() << "shard global version for collection is higher "
+ << "than trying to set to '" << ns << "'";
+ result.append("ns", ns);
+ requestedVersion.addToBSON(result, "version");
+ collectionShardVersion.addToBSON(result, "globalVersion");
+ result.appendBool("reloadConfig", true);
+ return false;
}
- errmsg = str::stream() << "shard global version for collection is higher "
- << "than trying to set to '" << ns << "'";
- result.append("ns", ns);
- version.addToBSON(result, "version");
- globalVersion.addToBSON(result, "globalVersion");
- result.appendBool("reloadConfig", true);
- return false;
- }
- if (!globalVersion.isSet() && !authoritative) {
- // Needed b/c when the last chunk is moved off a shard,
- // the version gets reset to zero, which should require a reload.
- while (shardingState->inCriticalMigrateSection()) {
- log() << "waiting till out of critical section";
- shardingState->waitTillNotInCriticalSection(10);
+ if (!collectionShardVersion.isSet() && !authoritative) {
+ // Needed b/c when the last chunk is moved off a shard, the version gets reset
+ // to zero, which should require a reload.
+ auto critSec =
+ shardingState->migrationSourceManager()->getMigrationCriticalSection();
+ if (critSec) {
+ autoColl.reset();
+ log() << "waiting till out of critical section";
+ critSec->waitUntilOutOfCriticalSection(Seconds(10));
+ }
+
+ // need authoritative for first look
+ result.append("ns", ns);
+ result.appendBool("need_authoritative", true);
+ errmsg = "first time for collection '" + ns + "'";
+ return false;
}
- // need authoritative for first look
- result.append("ns", ns);
- result.appendBool("need_authoritative", true);
- errmsg = "first time for collection '" + ns + "'";
- return false;
+ // Fall through to metadata reload below
}
-
- // Fall through to metadata reload below
}
- ChunkVersion currVersion;
- Status status = shardingState->refreshMetadataIfNeeded(txn, ns, version, &currVersion);
+ Status status = shardingState->onStaleShardVersion(txn, nss, requestedVersion);
+
+ {
+ AutoGetCollection autoColl(txn, nss, MODE_IS);
- if (!status.isOK()) {
- // The reload itself was interrupted or confused here
+ ChunkVersion currVersion = ChunkVersion::UNSHARDED();
+ auto collMetadata = CollectionShardingState::get(txn, nss)->getMetadata();
+ if (collMetadata) {
+ currVersion = collMetadata->getShardVersion();
+ }
- errmsg = str::stream() << "could not refresh metadata for " << ns
- << " with requested shard version " << version.toString()
- << ", stored shard version is " << currVersion.toString()
- << causedBy(status.reason());
+ if (!status.isOK()) {
+ // The reload itself was interrupted or confused here
- warning() << errmsg;
+ errmsg = str::stream()
+ << "could not refresh metadata for " << ns << " with requested shard version "
+ << requestedVersion.toString() << ", stored shard version is "
+ << currVersion.toString() << causedBy(status.reason());
- result.append("ns", ns);
- version.addToBSON(result, "version");
- currVersion.addToBSON(result, "globalVersion");
- result.appendBool("reloadConfig", true);
+ warning() << errmsg;
- return false;
- } else if (!version.isWriteCompatibleWith(currVersion)) {
- // We reloaded a version that doesn't match the version mongos was trying to
- // set.
+ result.append("ns", ns);
+ requestedVersion.addToBSON(result, "version");
+ currVersion.addToBSON(result, "globalVersion");
+ result.appendBool("reloadConfig", true);
- errmsg = str::stream() << "requested shard version differs from"
- << " config shard version for " << ns
- << ", requested version is " << version.toString()
- << " but found version " << currVersion.toString();
+ return false;
+ } else if (!requestedVersion.isWriteCompatibleWith(currVersion)) {
+ // We reloaded a version that doesn't match the version mongos was trying to
+ // set.
+ errmsg = str::stream() << "requested shard version differs from"
+ << " config shard version for " << ns
+ << ", requested version is " << requestedVersion.toString()
+ << " but found version " << currVersion.toString();
- OCCASIONALLY warning() << errmsg;
+ OCCASIONALLY warning() << errmsg;
- // WARNING: the exact fields below are important for compatibility with mongos
- // version reload.
+ // WARNING: the exact fields below are important for compatibility with mongos
+ // version reload.
- result.append("ns", ns);
- currVersion.addToBSON(result, "globalVersion");
+ result.append("ns", ns);
+ currVersion.addToBSON(result, "globalVersion");
+
+ // If this was a reset of a collection or the last chunk moved out, inform mongos to
+ // do a full reload.
+ if (currVersion.epoch() != requestedVersion.epoch() || !currVersion.isSet()) {
+ result.appendBool("reloadConfig", true);
+ // Zero-version also needed to trigger full mongos reload, sadly
+ // TODO: Make this saner, and less impactful (full reload on last chunk is bad)
+ ChunkVersion(0, 0, OID()).addToBSON(result, "version");
+ // For debugging
+ requestedVersion.addToBSON(result, "origVersion");
+ } else {
+ requestedVersion.addToBSON(result, "version");
+ }
- // If this was a reset of a collection or the last chunk moved out, inform mongos to
- // do a full reload.
- if (currVersion.epoch() != version.epoch() || !currVersion.isSet()) {
- result.appendBool("reloadConfig", true);
- // Zero-version also needed to trigger full mongos reload, sadly
- // TODO: Make this saner, and less impactful (full reload on last chunk is bad)
- ChunkVersion(0, 0, OID()).addToBSON(result, "version");
- // For debugging
- version.addToBSON(result, "origVersion");
- } else {
- version.addToBSON(result, "version");
+ return false;
}
-
- return false;
}
- info->setVersion(ns, version);
+ info->setVersion(ns, requestedVersion);
return true;
}
diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp
index 775104d4e6b..2baf2f513cb 100644
--- a/src/mongo/db/s/sharding_state.cpp
+++ b/src/mongo/db/s/sharding_state.cpp
@@ -425,14 +425,6 @@ void ShardingState::mergeChunks(OperationContext* txn,
it->second->setMetadata(std::move(cloned));
}
-bool ShardingState::inCriticalMigrateSection() {
- return _migrationSourceManager.getInCriticalSection();
-}
-
-bool ShardingState::waitTillNotInCriticalSection(int maxSecondsToWait) {
- return _migrationSourceManager.waitTillNotInCriticalSection(maxSecondsToWait);
-}
-
void ShardingState::resetMetadata(const string& ns) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
@@ -441,47 +433,45 @@ void ShardingState::resetMetadata(const string& ns) {
_collections.erase(ns);
}
-Status ShardingState::refreshMetadataIfNeeded(OperationContext* txn,
- const string& ns,
- const ChunkVersion& reqShardVersion,
- ChunkVersion* latestShardVersion) {
- // The _configServerTickets serializes this process such that only a small number of threads
- // can try to refresh at the same time.
-
- LOG(2) << "metadata refresh requested for " << ns << " at shard version " << reqShardVersion;
+Status ShardingState::onStaleShardVersion(OperationContext* txn,
+ const NamespaceString& nss,
+ const ChunkVersion& expectedVersion) {
+ invariant(!txn->lockState()->isLocked());
+ invariant(enabled());
- //
- // Queuing of refresh requests starts here when remote reload is needed. This may take time.
- // TODO: Explicitly expose the queuing discipline.
- //
+ LOG(2) << "metadata refresh requested for " << nss.ns() << " at shard version "
+ << expectedVersion;
- _configServerTickets.waitForTicket();
- TicketHolderReleaser needTicketFrom(&_configServerTickets);
+ // Ensure any ongoing migrations have completed
+ auto& oss = OperationShardingState::get(txn);
+ oss.waitForMigrationCriticalSection(txn);
- //
- // Fast path - check if the requested version is at a higher version than the current
- // metadata version or a different epoch before verifying against config server.
- //
+ ChunkVersion collectionShardVersion;
- shared_ptr<CollectionMetadata> storedMetadata;
+ // Fast path - check if the requested version is at a higher version than the current metadata
+ // version or a different epoch before verifying against config server.
{
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- CollectionShardingStateMap::iterator it = _collections.find(ns);
- if (it != _collections.end())
- storedMetadata = it->second->getMetadata();
- }
+ AutoGetCollection autoColl(txn, nss, MODE_IS);
- ChunkVersion storedShardVersion;
- if (storedMetadata)
- storedShardVersion = storedMetadata->getShardVersion();
- *latestShardVersion = storedShardVersion;
+ shared_ptr<CollectionMetadata> storedMetadata =
+ CollectionShardingState::get(txn, nss)->getMetadata();
+ if (storedMetadata) {
+ collectionShardVersion = storedMetadata->getShardVersion();
+ }
- if (storedShardVersion >= reqShardVersion &&
- storedShardVersion.epoch() == reqShardVersion.epoch()) {
- // Don't need to remotely reload if we're in the same epoch with a >= version
- return Status::OK();
+ if (collectionShardVersion >= expectedVersion &&
+ collectionShardVersion.epoch() == expectedVersion.epoch()) {
+ // Don't need to remotely reload if we're in the same epoch and the requested version is
+ // smaller than the one we know about. This means that the remote side is behind.
+ return Status::OK();
+ }
}
+ // The _configServerTickets serializes this process such that only a small number of threads can
+ // try to refresh at the same time
+ _configServerTickets.waitForTicket();
+ TicketHolderReleaser needTicketFrom(&_configServerTickets);
+
//
// Slow path - remotely reload
//
@@ -491,19 +481,20 @@ Status ShardingState::refreshMetadataIfNeeded(OperationContext* txn,
// C) Dropping a collection, notified (currently) by mongos.
// D) Stale client wants to reload metadata with a different *epoch*, so we aren't sure.
- if (storedShardVersion.epoch() != reqShardVersion.epoch()) {
+ if (collectionShardVersion.epoch() != expectedVersion.epoch()) {
// Need to remotely reload if our epochs aren't the same, to verify
- LOG(1) << "metadata change requested for " << ns << ", from shard version "
- << storedShardVersion << " to " << reqShardVersion
+ LOG(1) << "metadata change requested for " << nss.ns() << ", from shard version "
+ << collectionShardVersion << " to " << expectedVersion
<< ", need to verify with config server";
} else {
// Need to remotely reload since our epochs aren't the same but our version is greater
- LOG(1) << "metadata version update requested for " << ns << ", from shard version "
- << storedShardVersion << " to " << reqShardVersion
+ LOG(1) << "metadata version update requested for " << nss.ns() << ", from shard version "
+ << collectionShardVersion << " to " << expectedVersion
<< ", need to verify with config server";
}
- return _refreshMetadata(txn, ns, reqShardVersion, true, latestShardVersion);
+ ChunkVersion unusedLatestShardVersion;
+ return _refreshMetadata(txn, nss.ns(), expectedVersion, true, &unusedLatestShardVersion);
}
Status ShardingState::refreshMetadataNow(OperationContext* txn,
diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h
index 19dc1be2978..defecd2aafa 100644
--- a/src/mongo/db/s/sharding_state.h
+++ b/src/mongo/db/s/sharding_state.h
@@ -135,24 +135,12 @@ public:
ChunkVersion getVersion(const std::string& ns);
/**
- * If the metadata for 'ns' at this shard is at or above the requested version,
- * 'reqShardVersion', returns OK and fills in 'latestShardVersion' with the latest shard
- * version. The latter is always greater or equal than 'reqShardVersion' if in the same epoch.
- *
- * Otherwise, falls back to refreshMetadataNow.
- *
- * This call blocks if there are more than _configServerTickets threads currently refreshing
- * metadata (currently set to 3).
- *
- * Locking Note:
- * + Must NOT be called with the write lock because this call may go into the network,
- * and deadlocks may occur with shard-as-a-config. Therefore, nothing here guarantees
- * that 'latestShardVersion' is indeed the current one on return.
+ * Refreshes the local metadata based on whether the expected version is higher than what we
+ * have cached.
*/
- Status refreshMetadataIfNeeded(OperationContext* txn,
- const std::string& ns,
- const ChunkVersion& reqShardVersion,
- ChunkVersion* latestShardVersion);
+ Status onStaleShardVersion(OperationContext* txn,
+ const NamespaceString& nss,
+ const ChunkVersion& expectedVersion);
/**
* Refreshes collection metadata by asking the config server for the latest information.
@@ -303,13 +291,6 @@ public:
const BSONObj& maxKey,
ChunkVersion mergedVersion);
- bool inCriticalMigrateSection();
-
- /**
- * @return true if we are NOT in the critical section
- */
- bool waitTillNotInCriticalSection(int maxSecondsToWait);
-
/**
* TESTING ONLY
* Uninstalls the metadata for a given collection.
diff --git a/src/mongo/s/client/shard_connection.cpp b/src/mongo/s/client/shard_connection.cpp
index 177fd934e8c..0f64014f5e8 100644
--- a/src/mongo/s/client/shard_connection.cpp
+++ b/src/mongo/s/client/shard_connection.cpp
@@ -41,7 +41,6 @@
#include "mongo/s/client/shard.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
-#include "mongo/s/stale_exception.h"
#include "mongo/s/version_manager.h"
#include "mongo/util/concurrency/spin_lock.h"
#include "mongo/util/exit.h"
diff --git a/src/mongo/s/d_split.cpp b/src/mongo/s/d_split.cpp
index d8a690d211e..4aa5231ccc6 100644
--- a/src/mongo/s/d_split.cpp
+++ b/src/mongo/s/d_split.cpp
@@ -681,9 +681,9 @@ public:
// Mongos >= v3.2 sends the full version, v3.0 only sends the epoch.
// TODO(SERVER-20742): Stop parsing epoch separately after 3.2.
OID cmdEpoch;
- auto& operationVersion = OperationShardingState::get(txn);
- if (operationVersion.hasShardVersion()) {
- cmdVersion = operationVersion.getShardVersion(nss);
+ auto& oss = OperationShardingState::get(txn);
+ if (oss.hasShardVersion()) {
+ cmdVersion = oss.getShardVersion(nss);
cmdEpoch = cmdVersion.epoch();
} else {
BSONElement epochElem(cmdObj["epoch"]);
diff --git a/src/mongo/s/write_ops/batch_write_exec.cpp b/src/mongo/s/write_ops/batch_write_exec.cpp
index d9ce669e3f7..e056e05609c 100644
--- a/src/mongo/s/write_ops/batch_write_exec.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec.cpp
@@ -79,12 +79,6 @@ static void noteStaleResponses(const vector<ShardError*>& staleErrors, NSTargete
}
}
-static bool isShardMetadataChanging(const vector<ShardError*>& staleErrors) {
- if (!staleErrors.empty() && staleErrors.back()->error.isErrInfoSet())
- return staleErrors.back()->error.getErrInfo()["inCriticalSection"].trueValue();
- return false;
-}
-
// The number of times we'll try to continue a batch op if no progress is being made
// This only applies when no writes are occurring and metadata is not changing on reload
static const int kMaxRoundsWithoutProgress(5);
@@ -152,7 +146,6 @@ void BatchWriteExec::executeBatch(OperationContext* txn,
size_t numSent = 0;
size_t numToSend = childBatches.size();
- bool remoteMetadataChanging = false;
while (numSent != numToSend) {
// Collect batches out on the network, mapped by endpoint
OwnedHostBatchMap ownedPendingBatches;
@@ -270,11 +263,6 @@ void BatchWriteExec::executeBatch(OperationContext* txn,
++stats->numStaleBatches;
}
- // Remember if the shard is actively changing metadata right now
- if (isShardMetadataChanging(staleErrors)) {
- remoteMetadataChanging = true;
- }
-
// Remember that we successfully wrote to this shard
// NOTE: This will record lastOps for shards where we actually didn't update
// or delete any documents, which preserves old behavior but is conservative
@@ -327,7 +315,7 @@ void BatchWriteExec::executeBatch(OperationContext* txn,
//
int currCompletedOps = batchOp.numWriteOpsIn(WriteOpState_Completed);
- if (currCompletedOps == numCompletedOps && !targeterChanged && !remoteMetadataChanging) {
+ if (currCompletedOps == numCompletedOps && !targeterChanged) {
++numRoundsWithoutProgress;
} else {
numRoundsWithoutProgress = 0;
diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp
index 100fb1806a1..bf0e4c8685a 100644
--- a/src/mongo/s/write_ops/batch_write_exec_test.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp
@@ -290,11 +290,6 @@ TEST(BatchWriteExecTests, ManyStaleOpWithMigration) {
error.setErrCode(ErrorCodes::StaleShardVersion);
error.setErrMessage("mock stale error");
for (int i = 0; i < 10; i++) {
- if (i % 2 == 0)
- error.setErrInfo(BSONObj());
- else
- error.setErrInfo(BSON("inCriticalSection" << true));
-
mockResults.push_back(new MockWriteResult(backend.shardHost, error));
}
@@ -306,7 +301,7 @@ TEST(BatchWriteExecTests, ManyStaleOpWithMigration) {
backend.exec->executeBatch(&txn, request, &response, &stats);
ASSERT(response.getOk());
- ASSERT_EQUALS(stats.numStaleBatches, 10);
+ ASSERT_EQUALS(stats.numStaleBatches, 6);
}
} // namespace
diff --git a/src/mongo/s/write_ops/write_error_detail.cpp b/src/mongo/s/write_ops/write_error_detail.cpp
index cc2368154fe..51baaecd790 100644
--- a/src/mongo/s/write_ops/write_error_detail.cpp
+++ b/src/mongo/s/write_ops/write_error_detail.cpp
@@ -154,10 +154,6 @@ void WriteErrorDetail::setIndex(int index) {
_isIndexSet = true;
}
-void WriteErrorDetail::unsetIndex() {
- _isIndexSet = false;
-}
-
bool WriteErrorDetail::isIndexSet() const {
return _isIndexSet;
}
@@ -172,10 +168,6 @@ void WriteErrorDetail::setErrCode(int errCode) {
_isErrCodeSet = true;
}
-void WriteErrorDetail::unsetErrCode() {
- _isErrCodeSet = false;
-}
-
bool WriteErrorDetail::isErrCodeSet() const {
return _isErrCodeSet;
}
@@ -190,10 +182,6 @@ void WriteErrorDetail::setErrInfo(const BSONObj& errInfo) {
_isErrInfoSet = true;
}
-void WriteErrorDetail::unsetErrInfo() {
- _isErrInfoSet = false;
-}
-
bool WriteErrorDetail::isErrInfoSet() const {
return _isErrInfoSet;
}
diff --git a/src/mongo/s/write_ops/write_error_detail.h b/src/mongo/s/write_ops/write_error_detail.h
index 9c431f02f57..e0dec80ce52 100644
--- a/src/mongo/s/write_ops/write_error_detail.h
+++ b/src/mongo/s/write_ops/write_error_detail.h
@@ -77,17 +77,14 @@ public:
//
void setIndex(int index);
- void unsetIndex();
bool isIndexSet() const;
int getIndex() const;
void setErrCode(int errCode);
- void unsetErrCode();
bool isErrCodeSet() const;
int getErrCode() const;
void setErrInfo(const BSONObj& errInfo);
- void unsetErrInfo();
bool isErrInfoSet() const;
const BSONObj& getErrInfo() const;