summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2016-03-07 14:22:47 -0500
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2016-03-16 18:53:09 -0400
commitee5fbdd8540d93d2e0d6fa19ba9a5595bb1829cb (patch)
tree2ff83a961d227161d6171708162b595cf35ab7bd
parent28399f947dda32a33509ddaebc650b0aa95fa0dd (diff)
downloadmongo-ee5fbdd8540d93d2e0d6fa19ba9a5595bb1829cb.tar.gz
SERVER-22997 Unify wait for migrate critical section and metadata refresh
This change gets rid of commands explicitly waiting for critical section establishment and instead makes all operations when they encounter stale version to install a wait object on the OperationContext, which can be used to wait for critical section outside of lock. It also makes refresh to happen outside of lock.
-rw-r--r--jstests/sharding/sharding_balance4.js5
-rw-r--r--jstests/sharding/startup_with_all_configs_down.js12
-rw-r--r--src/mongo/db/commands/find_and_modify.cpp16
-rw-r--r--src/mongo/db/commands/find_cmd.cpp47
-rw-r--r--src/mongo/db/commands/write_commands/batch_executor.cpp114
-rw-r--r--src/mongo/db/db_raii.cpp8
-rw-r--r--src/mongo/db/dbcommands.cpp12
-rw-r--r--src/mongo/db/instance.cpp12
-rw-r--r--src/mongo/db/op_observer.cpp22
-rw-r--r--src/mongo/db/query/find.cpp23
-rw-r--r--src/mongo/db/s/collection_sharding_state.cpp12
-rw-r--r--src/mongo/db/s/migration_impl.cpp9
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp61
-rw-r--r--src/mongo/db/s/migration_source_manager.h53
-rw-r--r--src/mongo/db/s/operation_sharding_state.cpp64
-rw-r--r--src/mongo/db/s/operation_sharding_state.h28
-rw-r--r--src/mongo/db/s/set_shard_version_command.cpp263
-rw-r--r--src/mongo/db/s/sharding_state.cpp83
-rw-r--r--src/mongo/db/s/sharding_state.h29
-rw-r--r--src/mongo/s/client/shard_connection.cpp1
-rw-r--r--src/mongo/s/d_split.cpp6
-rw-r--r--src/mongo/s/write_ops/batch_write_exec.cpp14
-rw-r--r--src/mongo/s/write_ops/batch_write_exec_test.cpp7
-rw-r--r--src/mongo/s/write_ops/write_error_detail.cpp12
-rw-r--r--src/mongo/s/write_ops/write_error_detail.h3
25 files changed, 444 insertions, 472 deletions
diff --git a/jstests/sharding/sharding_balance4.js b/jstests/sharding/sharding_balance4.js
index d78e94d407f..91c9d5ce9ad 100644
--- a/jstests/sharding/sharding_balance4.js
+++ b/jstests/sharding/sharding_balance4.js
@@ -21,7 +21,7 @@
num = 0;
- counts = {};
+ var counts = {};
//
// TODO: Rewrite to make much clearer.
@@ -38,10 +38,11 @@
var up = {
$inc: {x: 1}
};
- if (includeString)
+ if (includeString) {
up["$set"] = {
s: bigString
};
+ }
var myid = optionalId == undefined ? Random.randInt(N) : optionalId;
bulk.find({_id: myid}).upsert().update(up);
diff --git a/jstests/sharding/startup_with_all_configs_down.js b/jstests/sharding/startup_with_all_configs_down.js
index 61bff427580..f2eb4d67fd5 100644
--- a/jstests/sharding/startup_with_all_configs_down.js
+++ b/jstests/sharding/startup_with_all_configs_down.js
@@ -16,12 +16,13 @@
assert.writeOK(st.s.getDB('test').foo.insert({_id: i}));
}
+ assert.commandWorked(st.s0.adminCommand({enableSharding: 'test'}));
st.ensurePrimaryShard('test', 'shard0000');
- st.adminCommand({enableSharding: 'test'});
- st.adminCommand({shardCollection: 'test.foo', key: {_id: 1}});
- st.adminCommand({split: 'test.foo', find: {_id: 50}});
- st.adminCommand({moveChunk: 'test.foo', find: {_id: 75}, to: 'shard0001'});
+ assert.commandWorked(st.s0.adminCommand({shardCollection: 'test.foo', key: {_id: 1}}));
+ assert.commandWorked(st.s0.adminCommand({split: 'test.foo', find: {_id: 50}}));
+ assert.commandWorked(
+ st.s0.adminCommand({moveChunk: 'test.foo', find: {_id: 75}, to: 'shard0001'}));
// Make sure the pre-existing mongos already has the routing information loaded into memory
assert.eq(100, st.s.getDB('test').foo.find().itcount());
@@ -55,6 +56,9 @@
st.restartConfigServer(i);
}
+ // TODO: SERVER-23192 - restart mongos because it has deemend the CSRS config server set as unusable
+ st.restartMongos(0);
+
jsTestLog("Queries against the original mongos should work again");
assert.eq(100, st.s.getDB('test').foo.find().itcount());
diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp
index c1666497b78..32b48d4eb3e 100644
--- a/src/mongo/db/commands/find_and_modify.cpp
+++ b/src/mongo/db/commands/find_and_modify.cpp
@@ -261,9 +261,7 @@ public:
}
auto css = CollectionShardingState::get(txn, nsString);
- if (css) {
- css->checkShardVersionOrThrow(txn);
- }
+ css->checkShardVersionOrThrow(txn);
Collection* const collection = autoColl.getCollection();
auto statusWithPlanExecutor = getExecutorDelete(txn, collection, &parsedDelete);
@@ -296,9 +294,7 @@ public:
}
auto css = CollectionShardingState::get(txn, nsString);
- if (css) {
- css->checkShardVersionOrThrow(txn);
- }
+ css->checkShardVersionOrThrow(txn);
Collection* collection = autoColl.getCollection();
auto statusWithPlanExecutor =
@@ -380,9 +376,7 @@ public:
}
auto css = CollectionShardingState::get(txn, nsString);
- if (css) {
- css->checkShardVersionOrThrow(txn);
- }
+ css->checkShardVersionOrThrow(txn);
Status isPrimary = checkCanAcceptWritesForDatabase(nsString);
if (!isPrimary.isOK()) {
@@ -442,9 +436,7 @@ public:
}
auto css = CollectionShardingState::get(txn, nsString);
- if (css) {
- css->checkShardVersionOrThrow(txn);
- }
+ css->checkShardVersionOrThrow(txn);
Status isPrimary = checkCanAcceptWritesForDatabase(nsString);
if (!isPrimary.isOK()) {
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index 5ecc6330afb..5610c4799db 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -49,11 +49,9 @@
#include "mongo/db/query/find.h"
#include "mongo/db/query/find_common.h"
#include "mongo/db/query/get_executor.h"
-#include "mongo/db/s/operation_sharding_state.h"
-#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/stats/counters.h"
-#include "mongo/s/stale_exception.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -238,29 +236,6 @@ public:
}
std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue());
- ShardingState* const shardingState = ShardingState::get(txn);
-
- if (OperationShardingState::get(txn).hasShardVersion() && shardingState->enabled()) {
- ChunkVersion receivedVersion = OperationShardingState::get(txn).getShardVersion(nss);
- ChunkVersion latestVersion;
- // Wait for migration completion to get the correct chunk version.
- const int maxTimeoutSec = 30;
- int timeoutSec = cq->getParsed().getMaxTimeMS() / 1000;
- if (!timeoutSec || timeoutSec > maxTimeoutSec) {
- timeoutSec = maxTimeoutSec;
- }
-
- if (!shardingState->waitTillNotInCriticalSection(timeoutSec)) {
- uasserted(ErrorCodes::LockTimeout, "Timeout while waiting for migration commit");
- }
-
- // If the received version is newer than the version cached in 'shardingState', then we
- // have to refresh 'shardingState' from the config servers. We do this before acquiring
- // locks so that we don't hold locks while waiting on the network.
- uassertStatusOK(shardingState->refreshMetadataIfNeeded(
- txn, nss.ns(), receivedVersion, &latestVersion));
- }
-
// Acquire locks.
AutoGetCollectionForRead ctx(txn, nss);
Collection* collection = ctx.getCollection();
@@ -268,11 +243,6 @@ public:
const int dbProfilingLevel =
ctx.getDb() ? ctx.getDb()->getProfilingLevel() : serverGlobalParams.defaultProfile;
- // It is possible that the sharding version will change during yield while we are
- // retrieving a plan executor. If this happens we will throw an error and mongos will
- // retry.
- const ChunkVersion shardingVersionAtStart = shardingState->getVersion(nss.ns());
-
// Get the execution plan for the query.
auto statusWithPlanExecutor =
getExecutorFind(txn, collection, nss, std::move(cq), PlanExecutor::YIELD_AUTO);
@@ -325,17 +295,10 @@ public:
<< WorkingSetCommon::toStatusString(obj)));
}
- // TODO: Currently, chunk ranges are kept around until all ClientCursors created while the
- // chunk belonged on this node are gone. Separating chunk lifetime management from
- // ClientCursor should allow this check to go away.
- if (!shardingState->getVersion(nss.ns()).isWriteCompatibleWith(shardingVersionAtStart)) {
- // Version changed while retrieving a PlanExecutor. Terminate the operation,
- // signaling that mongos should retry.
- throw SendStaleConfigException(nss.ns(),
- "version changed during find command",
- shardingVersionAtStart,
- shardingState->getVersion(nss.ns()));
- }
+ // Before saving the cursor, ensure that whatever plan we established happened with the
+ // expected collection version
+ auto css = CollectionShardingState::get(txn, nss);
+ css->checkShardVersionOrThrow(txn);
// Set up the cursor for getMore.
CursorId cursorId = 0;
diff --git a/src/mongo/db/commands/write_commands/batch_executor.cpp b/src/mongo/db/commands/write_commands/batch_executor.cpp
index d3fb03727e7..1fa931d239b 100644
--- a/src/mongo/db/commands/write_commands/batch_executor.cpp
+++ b/src/mongo/db/commands/write_commands/batch_executor.cpp
@@ -65,6 +65,7 @@
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/s/collection_metadata.h"
+#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/sharded_connection_info.h"
#include "mongo/db/s/sharding_state.h"
@@ -150,14 +151,6 @@ void toBatchError(const Status& status, BatchedCommandResponse* response) {
dassert(response->isValid(NULL));
}
-void noteInCriticalSection(WriteErrorDetail* staleError) {
- BSONObjBuilder builder;
- if (staleError->isErrInfoSet())
- builder.appendElements(staleError->getErrInfo());
- builder.append("inCriticalSection", true);
- staleError->setErrInfo(builder.obj());
-}
-
/**
* Translates write item type to wire protocol op code. Helper for
* WriteBatchExecutor::applyWriteItem().
@@ -194,38 +187,20 @@ void buildStaleError(const ChunkVersion& shardVersionRecvd,
bool checkShardVersion(OperationContext* txn,
const BatchedCommandRequest& request,
WriteOpResult* result) {
- const NamespaceString& nss = request.getTargetingNSS();
- dassert(txn->lockState()->isCollectionLockedForMode(nss.ns(), MODE_IX));
-
- auto& oss = OperationShardingState::get(txn);
+ const auto& css = CollectionShardingState::get(txn, request.getTargetingNSS());
- if (!oss.hasShardVersion()) {
- return true;
- }
-
- ChunkVersion operationShardVersion = oss.getShardVersion(nss);
- if (ChunkVersion::isIgnoredVersion(operationShardVersion)) {
+ try {
+ css->checkShardVersionOrThrow(txn);
return true;
- }
-
- ShardingState* shardingState = ShardingState::get(txn);
- CollectionMetadataPtr metadata = shardingState->getCollectionMetadata(nss.ns());
- ChunkVersion shardVersion = metadata ? metadata->getShardVersion() : ChunkVersion::UNSHARDED();
-
- if (!operationShardVersion.isWriteCompatibleWith(shardVersion)) {
- result->setError(new WriteErrorDetail);
- buildStaleError(operationShardVersion, shardVersion, result->getError());
+ } catch (const StaleConfigException& e) {
+ result->setError(new WriteErrorDetail());
+ buildStaleError(e.getVersionReceived(), e.getVersionWanted(), result->getError());
return false;
}
-
- return true;
}
} // namespace
-// TODO: Determine queueing behavior we want here
-MONGO_EXPORT_SERVER_PARAMETER(queueForMigrationCommit, bool, true);
-
WriteBatchExecutor::WriteBatchExecutor(OperationContext* txn, OpCounters* opCounters, LastError* le)
: _txn(txn), _opCounters(opCounters), _le(le), _stats(new WriteBatchStats) {}
@@ -326,51 +301,15 @@ void WriteBatchExecutor::executeBatch(const BatchedCommandRequest& request,
// Refresh metadata if needed
//
- bool staleBatch =
+ const bool staleBatch =
!writeErrors.empty() && writeErrors.back()->getErrCode() == ErrorCodes::StaleShardVersion;
if (staleBatch) {
- ShardingState* shardingState = ShardingState::get(_txn);
-
- auto& oss = OperationShardingState::get(_txn);
- ChunkVersion requestShardVersion = oss.getShardVersion(request.getTargetingNSS());
-
- //
- // First, we refresh metadata if we need to based on the requested version.
- //
- ChunkVersion latestShardVersion;
- shardingState->refreshMetadataIfNeeded(
- _txn, request.getTargetingNS(), requestShardVersion, &latestShardVersion);
-
- // Report if we're still changing our metadata
- // TODO: Better reporting per-collection
- if (shardingState->inCriticalMigrateSection()) {
- noteInCriticalSection(writeErrors.back());
- }
+ const auto& oss = OperationShardingState::get(_txn);
- if (queueForMigrationCommit) {
- //
- // Queue up for migration to end - this allows us to be sure that clients will
- // not repeatedly try to refresh metadata that is not yet written to the config
- // server. Not necessary for correctness.
- // Exposed as optional parameter to allow testing of queuing behavior with
- // different network timings.
- //
- // Only wait if we're an older version (in the current collection epoch) and
- // we're not write compatible, implying that the current migration is affecting
- // writes.
- //
-
- if (requestShardVersion.isOlderThan(latestShardVersion) &&
- !requestShardVersion.isWriteCompatibleWith(latestShardVersion)) {
- while (shardingState->inCriticalMigrateSection()) {
- log() << "write request to old shard version " << requestShardVersion
- << " waiting for migration commit";
-
- shardingState->waitTillNotInCriticalSection(10 /* secs */);
- }
- }
- }
+ ChunkVersion requestedShardVersion = oss.getShardVersion(request.getTargetingNSS());
+ ShardingState::get(_txn)
+ ->onStaleShardVersion(_txn, request.getTargetingNSS(), requestedShardVersion);
}
//
@@ -1035,11 +974,9 @@ static void insertOne(WriteBatchExecutor::ExecInsertsState* state, WriteOpResult
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
txn, "insert", state->getCollection() ? state->getCollection()->ns().ns() : "index");
- } catch (const StaleConfigException& staleExcep) {
- result->setError(new WriteErrorDetail);
- result->getError()->setErrCode(ErrorCodes::StaleShardVersion);
- buildStaleError(
- staleExcep.getVersionReceived(), staleExcep.getVersionWanted(), result->getError());
+ } catch (const StaleConfigException& e) {
+ result->setError(new WriteErrorDetail());
+ buildStaleError(e.getVersionReceived(), e.getVersionWanted(), result->getError());
} catch (const DBException& ex) {
Status status(ex.toStatus());
if (ErrorCodes::isInterruption(status.code()))
@@ -1238,7 +1175,7 @@ static void multiUpdate(OperationContext* txn,
// fire in that case.
lastOpSetterGuard.Dismiss();
}
- } catch (const WriteConflictException& dle) {
+ } catch (const WriteConflictException&) {
debug->writeConflicts++;
if (isMulti) {
log() << "Had WriteConflict during multi update, aborting";
@@ -1246,16 +1183,15 @@ static void multiUpdate(OperationContext* txn,
}
createCollection = false;
+
// RESTART LOOP
fakeLoop = -1;
txn->recoveryUnit()->abandonSnapshot();
WriteConflictException::logAndBackoff(attempt++, "update", nsString.ns());
- } catch (const StaleConfigException& staleExcep) {
- result->setError(new WriteErrorDetail);
- result->getError()->setErrCode(ErrorCodes::StaleShardVersion);
- buildStaleError(
- staleExcep.getVersionReceived(), staleExcep.getVersionWanted(), result->getError());
+ } catch (const StaleConfigException& e) {
+ result->setError(new WriteErrorDetail());
+ buildStaleError(e.getVersionReceived(), e.getVersionWanted(), result->getError());
} catch (const DBException& ex) {
Status status = ex.toStatus();
if (ErrorCodes::isInterruption(status.code())) {
@@ -1345,14 +1281,12 @@ static void multiRemove(OperationContext* txn,
lastOpSetterGuard.Dismiss();
}
break;
- } catch (const WriteConflictException& dle) {
+ } catch (const WriteConflictException&) {
CurOp::get(txn)->debug().writeConflicts++;
WriteConflictException::logAndBackoff(attempt++, "delete", nss.ns());
- } catch (const StaleConfigException& staleExcep) {
- result->setError(new WriteErrorDetail);
- result->getError()->setErrCode(ErrorCodes::StaleShardVersion);
- buildStaleError(
- staleExcep.getVersionReceived(), staleExcep.getVersionWanted(), result->getError());
+ } catch (const StaleConfigException& e) {
+ result->setError(new WriteErrorDetail());
+ buildStaleError(e.getVersionReceived(), e.getVersionWanted(), result->getError());
return;
} catch (const DBException& ex) {
Status status = ex.toStatus();
diff --git a/src/mongo/db/db_raii.cpp b/src/mongo/db/db_raii.cpp
index 162017fc0f4..8075c940b1a 100644
--- a/src/mongo/db/db_raii.cpp
+++ b/src/mongo/db/db_raii.cpp
@@ -102,9 +102,7 @@ AutoGetCollectionForRead::AutoGetCollectionForRead(OperationContext* txn,
// We have both the DB and collection locked, which is the prerequisite to do a stable shard
// version check, but we'd like to do the check after we have a satisfactory snapshot.
auto css = CollectionShardingState::get(txn, nss);
- if (css) {
- css->checkShardVersionOrThrow(txn);
- }
+ css->checkShardVersionOrThrow(txn);
}
AutoGetCollectionForRead::~AutoGetCollectionForRead() {
@@ -196,9 +194,7 @@ void OldClientContext::_checkNotStale() const {
break;
default:
auto css = CollectionShardingState::get(_txn, _ns);
- if (css) {
- css->checkShardVersionOrThrow(_txn);
- }
+ css->checkShardVersionOrThrow(_txn);
}
}
diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp
index a469d2ebfaf..c2648e1f73b 100644
--- a/src/mongo/db/dbcommands.cpp
+++ b/src/mongo/db/dbcommands.cpp
@@ -1209,7 +1209,6 @@ void Command::execCommand(OperationContext* txn,
std::string dbname = request.getDatabase().toString();
unique_ptr<MaintenanceModeSetter> mmSetter;
-
std::array<BSONElement, std::tuple_size<decltype(neededFieldNames)>::value>
extractedFields{};
request.getCommandArgs().getFields(neededFieldNames, &extractedFields);
@@ -1322,7 +1321,14 @@ void Command::execCommand(OperationContext* txn,
if (!retval) {
command->_commandsFailed.increment();
}
- } catch (const DBException& exception) {
+ } catch (const DBException& e) {
+ // If we got a stale config, wait in case the operation is stuck in a critical section
+ if (e.getCode() == ErrorCodes::SendStaleConfig) {
+ auto& sce = static_cast<const StaleConfigException&>(e);
+ ShardingState::get(txn)
+ ->onStaleShardVersion(txn, NamespaceString(sce.getns()), sce.getVersionReceived());
+ }
+
BSONObj metadata = rpc::makeEmptyMetadata();
if (ShardingState::get(txn)->enabled()) {
auto opTime = grid.shardRegistry()->getConfigOpTime();
@@ -1331,7 +1337,7 @@ void Command::execCommand(OperationContext* txn,
metadata = metadataBob.obj();
}
- Command::generateErrorResponse(txn, replyBuilder, exception, request, command, metadata);
+ Command::generateErrorResponse(txn, replyBuilder, e, request, command, metadata);
}
}
diff --git a/src/mongo/db/instance.cpp b/src/mongo/db/instance.cpp
index 10ab281ff04..166a2b5683f 100644
--- a/src/mongo/db/instance.cpp
+++ b/src/mongo/db/instance.cpp
@@ -79,6 +79,7 @@
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator_global.h"
+#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/sharded_connection_info.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/service_context.h"
@@ -399,9 +400,16 @@ static void receivedQuery(OperationContext* txn,
uassertStatusOK(status);
dbResponse.exhaustNS = runQuery(txn, q, nss, dbResponse.response);
- } catch (const AssertionException& exception) {
+ } catch (const AssertionException& e) {
+ // If we got a stale config, wait in case the operation is stuck in a critical section
+ if (e.getCode() == ErrorCodes::SendStaleConfig) {
+ auto& sce = static_cast<const StaleConfigException&>(e);
+ ShardingState::get(txn)
+ ->onStaleShardVersion(txn, NamespaceString(sce.getns()), sce.getVersionReceived());
+ }
+
dbResponse.response.reset();
- generateLegacyQueryErrorResponse(&exception, q, &op, &dbResponse.response);
+ generateLegacyQueryErrorResponse(&e, q, &op, &dbResponse.response);
}
op.debug().responseLength = dbResponse.response.header().dataLen();
diff --git a/src/mongo/db/op_observer.cpp b/src/mongo/db/op_observer.cpp
index 68f5bfcfc08..25bf9866c7f 100644
--- a/src/mongo/db/op_observer.cpp
+++ b/src/mongo/db/op_observer.cpp
@@ -51,8 +51,8 @@ void OpObserver::onCreateIndex(OperationContext* txn,
AuthorizationManager::get(txn->getServiceContext())
->logOp(txn, "i", ns.c_str(), indexDoc, nullptr);
- CollectionShardingState* const css = CollectionShardingState::get(txn, ns);
- if (css && !fromMigrate) {
+ auto css = CollectionShardingState::get(txn, ns);
+ if (!fromMigrate) {
css->onInsertOp(txn, indexDoc);
}
@@ -66,12 +66,12 @@ void OpObserver::onInserts(OperationContext* txn,
bool fromMigrate) {
repl::logOps(txn, "i", nss, begin, end, fromMigrate);
- CollectionShardingState* const css = CollectionShardingState::get(txn, nss.ns());
+ auto css = CollectionShardingState::get(txn, nss.ns());
const char* ns = nss.ns().c_str();
for (auto it = begin; it != end; it++) {
AuthorizationManager::get(txn->getServiceContext())->logOp(txn, "i", ns, *it, nullptr);
- if (css && !fromMigrate) {
+ if (!fromMigrate) {
css->onInsertOp(txn, *it);
}
}
@@ -92,8 +92,8 @@ void OpObserver::onUpdate(OperationContext* txn, const OplogUpdateEntryArgs& arg
AuthorizationManager::get(txn->getServiceContext())
->logOp(txn, "u", args.ns.c_str(), args.update, &args.criteria);
- CollectionShardingState* const css = CollectionShardingState::get(txn, args.ns);
- if (css && !args.fromMigrate) {
+ auto css = CollectionShardingState::get(txn, args.ns);
+ if (!args.fromMigrate) {
css->onUpdateOp(txn, args.updatedDoc);
}
@@ -112,10 +112,8 @@ OpObserver::DeleteState OpObserver::aboutToDelete(OperationContext* txn,
deleteState.idDoc = idElement.wrap();
}
- CollectionShardingState* const css = CollectionShardingState::get(txn, ns.ns());
- if (css) {
- deleteState.isMigrating = css->isDocumentInMigratingChunk(txn, doc);
- }
+ auto css = CollectionShardingState::get(txn, ns.ns());
+ deleteState.isMigrating = css->isDocumentInMigratingChunk(txn, doc);
return deleteState;
}
@@ -131,8 +129,8 @@ void OpObserver::onDelete(OperationContext* txn,
AuthorizationManager::get(txn->getServiceContext())
->logOp(txn, "d", ns.ns().c_str(), deleteState.idDoc, nullptr);
- CollectionShardingState* const css = CollectionShardingState::get(txn, ns.ns());
- if (css && !fromMigrate && deleteState.isMigrating) {
+ auto css = CollectionShardingState::get(txn, ns.ns());
+ if (!fromMigrate && deleteState.isMigrating) {
css->onDeleteOp(txn, deleteState.idDoc);
}
diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp
index 946562baa27..241943f5fe1 100644
--- a/src/mongo/db/query/find.cpp
+++ b/src/mongo/db/query/find.cpp
@@ -51,7 +51,7 @@
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/query/query_planner_params.h"
#include "mongo/db/repl/replication_coordinator_global.h"
-#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/server_options.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/storage/storage_options.h"
@@ -553,11 +553,6 @@ std::string runQuery(OperationContext* txn,
return "";
}
- ShardingState* const shardingState = ShardingState::get(txn);
-
- // We freak out later if this changes before we're done with the query.
- const ChunkVersion shardingVersionAtStart = shardingState->getVersion(nss.ns());
-
// Handle query option $maxTimeMS (not used with commands).
curop.setMaxTimeMicros(static_cast<unsigned long long>(pq.getMaxTimeMS()) * 1000);
txn->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point.
@@ -583,7 +578,6 @@ std::string runQuery(OperationContext* txn,
BSONObj obj;
PlanExecutor::ExecState state;
- // uint64_t numMisplacedDocs = 0;
// Get summary info about which plan the executor is using.
{
@@ -635,17 +629,10 @@ std::string runQuery(OperationContext* txn,
uasserted(17144, "Executor error: " + WorkingSetCommon::toStatusString(obj));
}
- // TODO: Currently, chunk ranges are kept around until all ClientCursors created while the
- // chunk belonged on this node are gone. Separating chunk lifetime management from
- // ClientCursor should allow this check to go away.
- if (!shardingState->getVersion(nss.ns()).isWriteCompatibleWith(shardingVersionAtStart)) {
- // if the version changed during the query we might be missing some data and its safe to
- // send this as mongos can resend at this point
- throw SendStaleConfigException(nss.ns(),
- "version changed during initial query",
- shardingVersionAtStart,
- shardingState->getVersion(nss.ns()));
- }
+ // Before saving the cursor, ensure that whatever plan we established happened with the expected
+ // collection version
+ auto css = CollectionShardingState::get(txn, nss);
+ css->checkShardVersionOrThrow(txn);
// Fill out curop based on query results. If we have a cursorid, we will fill out curop with
// this cursorid later.
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp
index e0fa35d4da2..acff32f0b43 100644
--- a/src/mongo/db/s/collection_sharding_state.cpp
+++ b/src/mongo/db/s/collection_sharding_state.cpp
@@ -68,6 +68,11 @@ CollectionShardingState* CollectionShardingState::get(OperationContext* txn,
}
void CollectionShardingState::setMetadata(std::shared_ptr<CollectionMetadata> newMetadata) {
+ if (newMetadata) {
+ invariant(!newMetadata->getCollVersion().isWriteCompatibleWith(ChunkVersion::UNSHARDED()));
+ invariant(!newMetadata->getShardVersion().isWriteCompatibleWith(ChunkVersion::UNSHARDED()));
+ }
+
_metadata = std::move(newMetadata);
}
@@ -76,6 +81,13 @@ void CollectionShardingState::checkShardVersionOrThrow(OperationContext* txn) co
ChunkVersion received;
ChunkVersion wanted;
if (!_checkShardVersionOk(txn, &errmsg, &received, &wanted)) {
+ // Set migration critical section in case we failed because of migration
+ auto migrationCritSec =
+ ShardingState::get(txn)->migrationSourceManager()->getMigrationCriticalSection();
+ if (migrationCritSec) {
+ OperationShardingState::get(txn).setMigrationCriticalSection(migrationCritSec);
+ }
+
throw SendStaleConfigException(_nss.ns(),
str::stream() << "[" << _nss.ns()
<< "] shard version not ok: " << errmsg,
diff --git a/src/mongo/db/s/migration_impl.cpp b/src/mongo/db/s/migration_impl.cpp
index 873fc6c6f5c..d3dd4e30442 100644
--- a/src/mongo/db/s/migration_impl.cpp
+++ b/src/mongo/db/s/migration_impl.cpp
@@ -69,6 +69,7 @@ BSONObj createRecvChunkCommitRequest(const MigrationSessionId& sessionId) {
}
MONGO_FP_DECLARE(failMigrationCommit);
+MONGO_FP_DECLARE(hangBeforeCommitMigration);
MONGO_FP_DECLARE(hangBeforeLeavingCriticalSection);
} // namespace
@@ -133,12 +134,12 @@ Status ChunkMoveOperationState::initialize(const BSONObj& cmdObj) {
_toShardCS = toShard->getConnString();
}
- auto& operationVersion = OperationShardingState::get(_txn);
- if (!operationVersion.hasShardVersion()) {
+ auto& oss = OperationShardingState::get(_txn);
+ if (!oss.hasShardVersion()) {
return Status{ErrorCodes::InvalidOptions, "moveChunk command is missing shard version"};
}
- _collectionVersion = operationVersion.getShardVersion(_nss);
+ _collectionVersion = oss.getShardVersion(_nss);
return Status::OK();
}
@@ -385,6 +386,8 @@ Status ChunkMoveOperationState::commitMigration(const MigrationSessionId& sessio
preCond.append(b.obj());
}
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangBeforeCommitMigration);
+
fassertStatusOK(34431,
grid.catalogManager(_txn)->applyChunkOpsDeprecated(
_txn, updates.arr(), preCond.arr(), _nss.ns(), nextVersion));
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index c9dacc78491..980620de09e 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -48,6 +48,7 @@
#include "mongo/logger/ramlog.h"
#include "mongo/s/chunk.h"
#include "mongo/s/shard_key_pattern.h"
+#include "mongo/stdx/memory.h"
#include "mongo/util/elapsed_tracker.h"
#include "mongo/util/log.h"
@@ -209,8 +210,11 @@ void MigrationSourceManager::done(OperationContext* txn) {
_sessionId = boost::none;
_deleteNotifyExec.reset(NULL);
- _inCriticalSection = false;
- _inCriticalSectionCV.notify_all();
+
+ if (_critSec) {
+ _critSec->exitCriticalSection();
+ _critSec = nullptr;
+ }
_deleted.clear();
_reload.clear();
@@ -577,30 +581,23 @@ long long MigrationSourceManager::mbUsed() const {
return _memoryUsed / (1024 * 1024);
}
-bool MigrationSourceManager::getInCriticalSection() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _inCriticalSection;
-}
-
void MigrationSourceManager::setInCriticalSection(bool inCritSec) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- _inCriticalSection = inCritSec;
- _inCriticalSectionCV.notify_all();
-}
-bool MigrationSourceManager::waitTillNotInCriticalSection(int maxSecondsToWait) {
- const auto deadline = stdx::chrono::system_clock::now() + Seconds(maxSecondsToWait);
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- while (_inCriticalSection) {
- log() << "Waiting for " << maxSecondsToWait
- << " seconds for the migration critical section to end";
-
- if (stdx::cv_status::timeout == _inCriticalSectionCV.wait_until(lk, deadline)) {
- return false;
- }
+ if (inCritSec) {
+ invariant(!_critSec);
+ _critSec = std::make_shared<CriticalSectionState>();
+ } else {
+ invariant(_critSec);
+ _critSec->exitCriticalSection();
+ _critSec = nullptr;
}
+}
- return true;
+std::shared_ptr<MigrationSourceManager::CriticalSectionState>
+MigrationSourceManager::getMigrationCriticalSection() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _critSec;
}
bool MigrationSourceManager::isActive() const {
@@ -649,4 +646,26 @@ NamespaceString MigrationSourceManager::_getNS() const {
return _nss;
}
+MigrationSourceManager::CriticalSectionState::CriticalSectionState() = default;
+
+bool MigrationSourceManager::CriticalSectionState::waitUntilOutOfCriticalSection(
+ Microseconds waitTimeout) {
+ const auto waitDeadline = stdx::chrono::system_clock::now() + waitTimeout;
+
+ stdx::unique_lock<stdx::mutex> sl(_criticalSectionMutex);
+ while (_inCriticalSection) {
+ if (stdx::cv_status::timeout == _criticalSectionCV.wait_until(sl, waitDeadline)) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+void MigrationSourceManager::CriticalSectionState::exitCriticalSection() {
+ stdx::unique_lock<stdx::mutex> sl(_criticalSectionMutex);
+ _inCriticalSection = false;
+ _criticalSectionCV.notify_all();
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h
index 8b5504fd1c9..5c9cf07751e 100644
--- a/src/mongo/db/s/migration_source_manager.h
+++ b/src/mongo/db/s/migration_source_manager.h
@@ -49,6 +49,8 @@ class MigrationSourceManager {
MONGO_DISALLOW_COPYING(MigrationSourceManager);
public:
+ class CriticalSectionState;
+
MigrationSourceManager();
~MigrationSourceManager();
@@ -127,18 +129,12 @@ public:
long long mbUsed() const;
- bool getInCriticalSection() const;
-
void setInCriticalSection(bool inCritSec);
- /**
- * Blocks until the "in critical section" state changes and returns true if we are NOT in the
- * critical section
- */
- bool waitTillNotInCriticalSection(int maxSecondsToWait);
-
bool isActive() const;
+ std::shared_ptr<CriticalSectionState> getMigrationCriticalSection();
+
private:
friend class LogOpForShardingHandler;
@@ -175,11 +171,6 @@ private:
mutable stdx::mutex _mutex;
- stdx::condition_variable _inCriticalSectionCV; // (M)
-
- // Is migration currently in critical section. This can be used to block new writes.
- bool _inCriticalSection{false}; // (M)
-
std::unique_ptr<PlanExecutor> _deleteNotifyExec; // (M)
// List of _id of documents that were modified that must be re-cloned.
@@ -203,6 +194,42 @@ private:
// List of record id that needs to be transferred from here to the other side.
std::set<RecordId> _cloneLocs; // (C)
+
+ // This value is set when setInCriticalSection is called with true argument and is signalled and
+ // cleared when it is called with false argument.
+ std::shared_ptr<CriticalSectionState> _critSec;
+};
+
+/**
+ * This object is instantiated once the migration logic enters critical section. It contains all
+ * the state which is associated with being in a critical section, such as the bumped metadata
+ * version (which has not yet been reflected on the config server).
+ */
+class MigrationSourceManager::CriticalSectionState {
+ MONGO_DISALLOW_COPYING(CriticalSectionState);
+
+public:
+ CriticalSectionState();
+
+ /**
+ * Blocks until the critical section completes. Returns true if the wait succeeded and the
+ * critical section is no longer active, or false if the waitTimeout was exceeded.
+ */
+ bool waitUntilOutOfCriticalSection(Microseconds waitTimeout);
+
+ /**
+ * To be called when the critical section has completed. Signals any threads sitting blocked in
+ * waitUntilOutOfCriticalSection. Must only be used once for the lifetime of this object.
+ */
+ void exitCriticalSection();
+
+private:
+ // Only moves from true to false once. Happens under the critical section mutex and the critical
+ // section will be signalled.
+ bool _inCriticalSection{true};
+
+ stdx::mutex _criticalSectionMutex;
+ stdx::condition_variable _criticalSectionCV;
};
} // namespace mongo
diff --git a/src/mongo/db/s/operation_sharding_state.cpp b/src/mongo/db/s/operation_sharding_state.cpp
index 906c491aac3..40617c08138 100644
--- a/src/mongo/db/s/operation_sharding_state.cpp
+++ b/src/mongo/db/s/operation_sharding_state.cpp
@@ -39,7 +39,8 @@ namespace {
const OperationContext::Decoration<OperationShardingState> shardingMetadataDecoration =
OperationContext::declareDecoration<OperationShardingState>();
-const ChunkVersion kUnshardedVersion(ChunkVersion::UNSHARDED());
+// Max time to wait for the migration critical section to complete
+const Minutes kMaxWaitForMigrationCriticalSection(5);
} // namespace mongo
@@ -49,12 +50,12 @@ OperationShardingState& OperationShardingState::get(OperationContext* txn) {
return shardingMetadataDecoration(txn);
}
-void OperationShardingState::initializeShardVersion(NamespaceString ns,
+void OperationShardingState::initializeShardVersion(NamespaceString nss,
const BSONElement& shardVersionElt) {
invariant(!hasShardVersion());
- if (ns.isSystemDotIndexes()) {
- setShardVersion(std::move(ns), ChunkVersion::IGNORED());
+ if (nss.isSystemDotIndexes()) {
+ setShardVersion(std::move(nss), ChunkVersion::IGNORED());
return;
}
@@ -70,31 +71,54 @@ void OperationShardingState::initializeShardVersion(NamespaceString ns,
return;
}
- setShardVersion(std::move(ns), std::move(newVersion));
+ setShardVersion(std::move(nss), std::move(newVersion));
}
bool OperationShardingState::hasShardVersion() const {
return _hasVersion;
}
-const ChunkVersion& OperationShardingState::getShardVersion(const NamespaceString& ns) const {
- if (_ns != ns) {
- return kUnshardedVersion;
+ChunkVersion OperationShardingState::getShardVersion(const NamespaceString& nss) const {
+ if (_ns != nss) {
+ return ChunkVersion::UNSHARDED();
}
return _shardVersion;
}
-void OperationShardingState::setShardVersion(NamespaceString ns, ChunkVersion newVersion) {
+void OperationShardingState::setShardVersion(NamespaceString nss, ChunkVersion newVersion) {
// This currently supports only setting the shard version for one namespace.
- invariant(!_hasVersion || _ns == ns);
- invariant(!ns.isSystemDotIndexes() || ChunkVersion::isIgnoredVersion(newVersion));
+ invariant(!_hasVersion || _ns == nss);
+ invariant(!nss.isSystemDotIndexes() || ChunkVersion::isIgnoredVersion(newVersion));
- _ns = std::move(ns);
+ _ns = std::move(nss);
_shardVersion = std::move(newVersion);
_hasVersion = true;
}
+bool OperationShardingState::waitForMigrationCriticalSection(OperationContext* txn) {
+ // Must not block while holding a lock
+ invariant(!txn->lockState()->isLocked());
+
+ if (_migrationCriticalSection) {
+ const Microseconds operationRemainingTime(Microseconds(txn->getRemainingMaxTimeMicros()));
+ _migrationCriticalSection->waitUntilOutOfCriticalSection(
+ durationCount<Microseconds>(operationRemainingTime)
+ ? operationRemainingTime
+ : kMaxWaitForMigrationCriticalSection);
+ _migrationCriticalSection = nullptr;
+ return true;
+ }
+
+ return false;
+}
+
+void OperationShardingState::setMigrationCriticalSection(
+ std::shared_ptr<MigrationSourceManager::CriticalSectionState> critSec) {
+ invariant(critSec);
+ _migrationCriticalSection = std::move(critSec);
+}
+
void OperationShardingState::_clear() {
_hasVersion = false;
_shardVersion = ChunkVersion();
@@ -104,21 +128,21 @@ void OperationShardingState::_clear() {
OperationShardingState::IgnoreVersioningBlock::IgnoreVersioningBlock(OperationContext* txn,
const NamespaceString& ns)
: _txn(txn), _ns(ns) {
- auto& operationVersion = OperationShardingState::get(txn);
- _hadOriginalVersion = operationVersion._hasVersion;
+ auto& oss = OperationShardingState::get(txn);
+ _hadOriginalVersion = oss._hasVersion;
if (_hadOriginalVersion) {
- _originalVersion = operationVersion.getShardVersion(ns);
+ _originalVersion = oss.getShardVersion(ns);
}
- operationVersion.setShardVersion(ns, ChunkVersion::IGNORED());
+ oss.setShardVersion(ns, ChunkVersion::IGNORED());
}
OperationShardingState::IgnoreVersioningBlock::~IgnoreVersioningBlock() {
- auto& operationVersion = OperationShardingState::get(_txn);
- invariant(ChunkVersion::isIgnoredVersion(operationVersion.getShardVersion(_ns)));
+ auto& oss = OperationShardingState::get(_txn);
+ invariant(ChunkVersion::isIgnoredVersion(oss.getShardVersion(_ns)));
if (_hadOriginalVersion) {
- operationVersion.setShardVersion(_ns, _originalVersion);
+ oss.setShardVersion(_ns, _originalVersion);
} else {
- operationVersion._clear();
+ oss._clear();
}
}
diff --git a/src/mongo/db/s/operation_sharding_state.h b/src/mongo/db/s/operation_sharding_state.h
index 7aa2fc919d3..f52cbd9f702 100644
--- a/src/mongo/db/s/operation_sharding_state.h
+++ b/src/mongo/db/s/operation_sharding_state.h
@@ -32,6 +32,7 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/s/migration_source_manager.h"
#include "mongo/s/chunk_version.h"
namespace mongo {
@@ -69,7 +70,7 @@ public:
* This initialization may only be performed once for the lifetime of the object, which
* coincides with the lifetime of the request.
*/
- void initializeShardVersion(NamespaceString ns, const BSONElement& shardVersionElement);
+ void initializeShardVersion(NamespaceString nss, const BSONElement& shardVersionElement);
/**
* Returns whether or not there is a shard version associated with this operation.
@@ -84,12 +85,29 @@ public:
* Returns ChunkVersion::UNSHARDED() if this operation has no shard version information
* for the requested namespace.
*/
- const ChunkVersion& getShardVersion(const NamespaceString& ns) const;
+ ChunkVersion getShardVersion(const NamespaceString& nss) const;
/**
* Stores the given chunk version of a namespace into this object.
*/
- void setShardVersion(NamespaceString ns, ChunkVersion newVersion);
+ void setShardVersion(NamespaceString nss, ChunkVersion newVersion);
+
+ /**
+ * This call is a no op if there isn't a currently active migration critical section. Otherwise
+ * it will wait for the critical section to complete up to the remaining operation time.
+ *
+ * Returns true if the call actually waited because of migration critical section (regardless if
+ * whether it timed out or not), false if there was no active migration critical section.
+ */
+ bool waitForMigrationCriticalSection(OperationContext* txn);
+
+ /**
+ * Setting this value indicates that when the version check failed, there was an active
+ * migration for the namespace and that it would be prudent to wait for the critical section to
+ * complete before retrying so the router doesn't make wasteful requests.
+ */
+ void setMigrationCriticalSection(
+ std::shared_ptr<MigrationSourceManager::CriticalSectionState> critSec);
private:
/**
@@ -101,6 +119,10 @@ private:
bool _hasVersion = false;
ChunkVersion _shardVersion;
NamespaceString _ns;
+
+ // This value will only be non-null if version check during the operation execution failed due
+ // to stale version and there was a migration for that namespace, which was in critical section.
+ std::shared_ptr<MigrationSourceManager::CriticalSectionState> _migrationCriticalSection;
};
/**
diff --git a/src/mongo/db/s/set_shard_version_command.cpp b/src/mongo/db/s/set_shard_version_command.cpp
index 04710cd8c95..b4f5f9170cb 100644
--- a/src/mongo/db/s/set_shard_version_command.cpp
+++ b/src/mongo/db/s/set_shard_version_command.cpp
@@ -37,9 +37,12 @@
#include "mongo/db/auth/privilege.h"
#include "mongo/db/client.h"
#include "mongo/db/commands.h"
+#include "mongo/db/db_raii.h"
#include "mongo/db/lasterror.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/replication_coordinator_global.h"
+#include "mongo/db/s/collection_metadata.h"
+#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/sharded_connection_info.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/wire_version.h"
@@ -152,157 +155,191 @@ public:
return false;
}
+ const NamespaceString nss(ns);
+
+ // Backwards compatibility for SERVER-23119
+ if (!nss.isValid()) {
+ warning() << "Invalid namespace used for setShardVersion: " << ns;
+ return true;
+ }
// we can run on a slave up to here
- if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(
- nsToDatabase(ns))) {
+ if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(nss.db())) {
result.append("errmsg", "not master");
result.append("note", "from post init in setShardVersion");
return false;
}
// step 2
- ChunkVersion version =
+ const ChunkVersion requestedVersion =
uassertStatusOK(ChunkVersion::parseFromBSONForSetShardVersion(cmdObj));
- // step 3
- const ChunkVersion oldVersion = info->getVersion(ns);
- const ChunkVersion globalVersion = shardingState->getVersion(ns);
+ // step 3 - Actual version checking
+ const ChunkVersion connectionVersion = info->getVersion(ns);
+ connectionVersion.addToBSON(result, "oldVersion");
+
+ {
+ // Use a stable collection metadata while performing the checks
+ boost::optional<AutoGetCollection> autoColl;
+ autoColl.emplace(txn, nss, MODE_IS);
+
+ auto css = CollectionShardingState::get(txn, nss);
+ const ChunkVersion collectionShardVersion =
+ (css->getMetadata() ? css->getMetadata()->getShardVersion()
+ : ChunkVersion::UNSHARDED());
+
+ if (requestedVersion.isWriteCompatibleWith(collectionShardVersion)) {
+ // mongos and mongod agree!
+ if (!connectionVersion.isWriteCompatibleWith(requestedVersion)) {
+ if (connectionVersion < collectionShardVersion &&
+ connectionVersion.epoch() == collectionShardVersion.epoch()) {
+ info->setVersion(ns, requestedVersion);
+ } else if (authoritative) {
+ // this means there was a drop and our version is reset
+ info->setVersion(ns, requestedVersion);
+ } else {
+ result.append("ns", ns);
+ result.appendBool("need_authoritative", true);
+ errmsg = "verifying drop on '" + ns + "'";
+ return false;
+ }
+ }
- oldVersion.addToBSON(result, "oldVersion");
+ return true;
+ }
- if (version.isWriteCompatibleWith(globalVersion)) {
- // mongos and mongod agree!
- if (!oldVersion.isWriteCompatibleWith(version)) {
- if (oldVersion < globalVersion && oldVersion.hasEqualEpoch(globalVersion)) {
- info->setVersion(ns, version);
- } else if (authoritative) {
- // this means there was a drop and our version is reset
- info->setVersion(ns, version);
- } else {
- result.append("ns", ns);
+ // step 4
+ // Cases below all either return OR fall-through to remote metadata reload.
+ const bool isDropRequested =
+ !requestedVersion.isSet() && collectionShardVersion.isSet();
+
+ if (isDropRequested) {
+ if (!authoritative) {
result.appendBool("need_authoritative", true);
- errmsg = "verifying drop on '" + ns + "'";
+ result.append("ns", ns);
+ collectionShardVersion.addToBSON(result, "globalVersion");
+ errmsg = "dropping needs to be authoritative";
return false;
}
- }
-
- return true;
- }
- // step 4
- // Cases below all either return OR fall-through to remote metadata reload.
- const bool isDropRequested = !version.isSet() && globalVersion.isSet();
-
- if (isDropRequested) {
- if (!authoritative) {
- result.appendBool("need_authoritative", true);
- result.append("ns", ns);
- globalVersion.addToBSON(result, "globalVersion");
- errmsg = "dropping needs to be authoritative";
- return false;
- }
-
- // Fall through to metadata reload below
- } else {
- // Not Dropping
+ // Fall through to metadata reload below
+ } else {
+ // Not Dropping
- // TODO: Refactor all of this
- if (version < oldVersion && version.hasEqualEpoch(oldVersion)) {
- errmsg = str::stream() << "this connection already had a newer version "
- << "of collection '" << ns << "'";
- result.append("ns", ns);
- version.addToBSON(result, "newVersion");
- globalVersion.addToBSON(result, "globalVersion");
- return false;
- }
+ // TODO: Refactor all of this
+ if (requestedVersion < connectionVersion &&
+ requestedVersion.epoch() == connectionVersion.epoch()) {
+ errmsg = str::stream() << "this connection already had a newer version "
+ << "of collection '" << ns << "'";
+ result.append("ns", ns);
+ requestedVersion.addToBSON(result, "newVersion");
+ collectionShardVersion.addToBSON(result, "globalVersion");
+ return false;
+ }
- // TODO: Refactor all of this
- if (version < globalVersion && version.hasEqualEpoch(globalVersion)) {
- while (shardingState->inCriticalMigrateSection()) {
- log() << "waiting till out of critical section";
- shardingState->waitTillNotInCriticalSection(10);
+ // TODO: Refactor all of this
+ if (requestedVersion < collectionShardVersion &&
+ requestedVersion.epoch() == collectionShardVersion.epoch()) {
+ auto critSec =
+ shardingState->migrationSourceManager()->getMigrationCriticalSection();
+ if (critSec) {
+ autoColl.reset();
+ log() << "waiting till out of critical section";
+ critSec->waitUntilOutOfCriticalSection(Seconds(10));
+ }
+
+ errmsg = str::stream() << "shard global version for collection is higher "
+ << "than trying to set to '" << ns << "'";
+ result.append("ns", ns);
+ requestedVersion.addToBSON(result, "version");
+ collectionShardVersion.addToBSON(result, "globalVersion");
+ result.appendBool("reloadConfig", true);
+ return false;
}
- errmsg = str::stream() << "shard global version for collection is higher "
- << "than trying to set to '" << ns << "'";
- result.append("ns", ns);
- version.addToBSON(result, "version");
- globalVersion.addToBSON(result, "globalVersion");
- result.appendBool("reloadConfig", true);
- return false;
- }
- if (!globalVersion.isSet() && !authoritative) {
- // Needed b/c when the last chunk is moved off a shard,
- // the version gets reset to zero, which should require a reload.
- while (shardingState->inCriticalMigrateSection()) {
- log() << "waiting till out of critical section";
- shardingState->waitTillNotInCriticalSection(10);
+ if (!collectionShardVersion.isSet() && !authoritative) {
+ // Needed b/c when the last chunk is moved off a shard, the version gets reset
+ // to zero, which should require a reload.
+ auto critSec =
+ shardingState->migrationSourceManager()->getMigrationCriticalSection();
+ if (critSec) {
+ autoColl.reset();
+ log() << "waiting till out of critical section";
+ critSec->waitUntilOutOfCriticalSection(Seconds(10));
+ }
+
+ // need authoritative for first look
+ result.append("ns", ns);
+ result.appendBool("need_authoritative", true);
+ errmsg = "first time for collection '" + ns + "'";
+ return false;
}
- // need authoritative for first look
- result.append("ns", ns);
- result.appendBool("need_authoritative", true);
- errmsg = "first time for collection '" + ns + "'";
- return false;
+ // Fall through to metadata reload below
}
-
- // Fall through to metadata reload below
}
- ChunkVersion currVersion;
- Status status = shardingState->refreshMetadataIfNeeded(txn, ns, version, &currVersion);
+ Status status = shardingState->onStaleShardVersion(txn, nss, requestedVersion);
+
+ {
+ AutoGetCollection autoColl(txn, nss, MODE_IS);
- if (!status.isOK()) {
- // The reload itself was interrupted or confused here
+ ChunkVersion currVersion = ChunkVersion::UNSHARDED();
+ auto collMetadata = CollectionShardingState::get(txn, nss)->getMetadata();
+ if (collMetadata) {
+ currVersion = collMetadata->getShardVersion();
+ }
- errmsg = str::stream() << "could not refresh metadata for " << ns
- << " with requested shard version " << version.toString()
- << ", stored shard version is " << currVersion.toString()
- << causedBy(status.reason());
+ if (!status.isOK()) {
+ // The reload itself was interrupted or confused here
- warning() << errmsg;
+ errmsg = str::stream()
+ << "could not refresh metadata for " << ns << " with requested shard version "
+ << requestedVersion.toString() << ", stored shard version is "
+ << currVersion.toString() << causedBy(status.reason());
- result.append("ns", ns);
- version.addToBSON(result, "version");
- currVersion.addToBSON(result, "globalVersion");
- result.appendBool("reloadConfig", true);
+ warning() << errmsg;
- return false;
- } else if (!version.isWriteCompatibleWith(currVersion)) {
- // We reloaded a version that doesn't match the version mongos was trying to
- // set.
+ result.append("ns", ns);
+ requestedVersion.addToBSON(result, "version");
+ currVersion.addToBSON(result, "globalVersion");
+ result.appendBool("reloadConfig", true);
- errmsg = str::stream() << "requested shard version differs from"
- << " config shard version for " << ns
- << ", requested version is " << version.toString()
- << " but found version " << currVersion.toString();
+ return false;
+ } else if (!requestedVersion.isWriteCompatibleWith(currVersion)) {
+ // We reloaded a version that doesn't match the version mongos was trying to
+ // set.
+ errmsg = str::stream() << "requested shard version differs from"
+ << " config shard version for " << ns
+ << ", requested version is " << requestedVersion.toString()
+ << " but found version " << currVersion.toString();
- OCCASIONALLY warning() << errmsg;
+ OCCASIONALLY warning() << errmsg;
- // WARNING: the exact fields below are important for compatibility with mongos
- // version reload.
+ // WARNING: the exact fields below are important for compatibility with mongos
+ // version reload.
- result.append("ns", ns);
- currVersion.addToBSON(result, "globalVersion");
+ result.append("ns", ns);
+ currVersion.addToBSON(result, "globalVersion");
+
+ // If this was a reset of a collection or the last chunk moved out, inform mongos to
+ // do a full reload.
+ if (currVersion.epoch() != requestedVersion.epoch() || !currVersion.isSet()) {
+ result.appendBool("reloadConfig", true);
+ // Zero-version also needed to trigger full mongos reload, sadly
+ // TODO: Make this saner, and less impactful (full reload on last chunk is bad)
+ ChunkVersion(0, 0, OID()).addToBSON(result, "version");
+ // For debugging
+ requestedVersion.addToBSON(result, "origVersion");
+ } else {
+ requestedVersion.addToBSON(result, "version");
+ }
- // If this was a reset of a collection or the last chunk moved out, inform mongos to
- // do a full reload.
- if (currVersion.epoch() != version.epoch() || !currVersion.isSet()) {
- result.appendBool("reloadConfig", true);
- // Zero-version also needed to trigger full mongos reload, sadly
- // TODO: Make this saner, and less impactful (full reload on last chunk is bad)
- ChunkVersion(0, 0, OID()).addToBSON(result, "version");
- // For debugging
- version.addToBSON(result, "origVersion");
- } else {
- version.addToBSON(result, "version");
+ return false;
}
-
- return false;
}
- info->setVersion(ns, version);
+ info->setVersion(ns, requestedVersion);
return true;
}
diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp
index 775104d4e6b..2baf2f513cb 100644
--- a/src/mongo/db/s/sharding_state.cpp
+++ b/src/mongo/db/s/sharding_state.cpp
@@ -425,14 +425,6 @@ void ShardingState::mergeChunks(OperationContext* txn,
it->second->setMetadata(std::move(cloned));
}
-bool ShardingState::inCriticalMigrateSection() {
- return _migrationSourceManager.getInCriticalSection();
-}
-
-bool ShardingState::waitTillNotInCriticalSection(int maxSecondsToWait) {
- return _migrationSourceManager.waitTillNotInCriticalSection(maxSecondsToWait);
-}
-
void ShardingState::resetMetadata(const string& ns) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
@@ -441,47 +433,45 @@ void ShardingState::resetMetadata(const string& ns) {
_collections.erase(ns);
}
-Status ShardingState::refreshMetadataIfNeeded(OperationContext* txn,
- const string& ns,
- const ChunkVersion& reqShardVersion,
- ChunkVersion* latestShardVersion) {
- // The _configServerTickets serializes this process such that only a small number of threads
- // can try to refresh at the same time.
-
- LOG(2) << "metadata refresh requested for " << ns << " at shard version " << reqShardVersion;
+Status ShardingState::onStaleShardVersion(OperationContext* txn,
+ const NamespaceString& nss,
+ const ChunkVersion& expectedVersion) {
+ invariant(!txn->lockState()->isLocked());
+ invariant(enabled());
- //
- // Queuing of refresh requests starts here when remote reload is needed. This may take time.
- // TODO: Explicitly expose the queuing discipline.
- //
+ LOG(2) << "metadata refresh requested for " << nss.ns() << " at shard version "
+ << expectedVersion;
- _configServerTickets.waitForTicket();
- TicketHolderReleaser needTicketFrom(&_configServerTickets);
+ // Ensure any ongoing migrations have completed
+ auto& oss = OperationShardingState::get(txn);
+ oss.waitForMigrationCriticalSection(txn);
- //
- // Fast path - check if the requested version is at a higher version than the current
- // metadata version or a different epoch before verifying against config server.
- //
+ ChunkVersion collectionShardVersion;
- shared_ptr<CollectionMetadata> storedMetadata;
+ // Fast path - check if the requested version is at a higher version than the current metadata
+ // version or a different epoch before verifying against config server.
{
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- CollectionShardingStateMap::iterator it = _collections.find(ns);
- if (it != _collections.end())
- storedMetadata = it->second->getMetadata();
- }
+ AutoGetCollection autoColl(txn, nss, MODE_IS);
- ChunkVersion storedShardVersion;
- if (storedMetadata)
- storedShardVersion = storedMetadata->getShardVersion();
- *latestShardVersion = storedShardVersion;
+ shared_ptr<CollectionMetadata> storedMetadata =
+ CollectionShardingState::get(txn, nss)->getMetadata();
+ if (storedMetadata) {
+ collectionShardVersion = storedMetadata->getShardVersion();
+ }
- if (storedShardVersion >= reqShardVersion &&
- storedShardVersion.epoch() == reqShardVersion.epoch()) {
- // Don't need to remotely reload if we're in the same epoch with a >= version
- return Status::OK();
+ if (collectionShardVersion >= expectedVersion &&
+ collectionShardVersion.epoch() == expectedVersion.epoch()) {
+ // Don't need to remotely reload if we're in the same epoch and the requested version is
+ // smaller than the one we know about. This means that the remote side is behind.
+ return Status::OK();
+ }
}
+ // The _configServerTickets serializes this process such that only a small number of threads can
+ // try to refresh at the same time
+ _configServerTickets.waitForTicket();
+ TicketHolderReleaser needTicketFrom(&_configServerTickets);
+
//
// Slow path - remotely reload
//
@@ -491,19 +481,20 @@ Status ShardingState::refreshMetadataIfNeeded(OperationContext* txn,
// C) Dropping a collection, notified (currently) by mongos.
// D) Stale client wants to reload metadata with a different *epoch*, so we aren't sure.
- if (storedShardVersion.epoch() != reqShardVersion.epoch()) {
+ if (collectionShardVersion.epoch() != expectedVersion.epoch()) {
// Need to remotely reload if our epochs aren't the same, to verify
- LOG(1) << "metadata change requested for " << ns << ", from shard version "
- << storedShardVersion << " to " << reqShardVersion
+ LOG(1) << "metadata change requested for " << nss.ns() << ", from shard version "
+ << collectionShardVersion << " to " << expectedVersion
<< ", need to verify with config server";
} else {
// Need to remotely reload since our epochs aren't the same but our version is greater
- LOG(1) << "metadata version update requested for " << ns << ", from shard version "
- << storedShardVersion << " to " << reqShardVersion
+ LOG(1) << "metadata version update requested for " << nss.ns() << ", from shard version "
+ << collectionShardVersion << " to " << expectedVersion
<< ", need to verify with config server";
}
- return _refreshMetadata(txn, ns, reqShardVersion, true, latestShardVersion);
+ ChunkVersion unusedLatestShardVersion;
+ return _refreshMetadata(txn, nss.ns(), expectedVersion, true, &unusedLatestShardVersion);
}
Status ShardingState::refreshMetadataNow(OperationContext* txn,
diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h
index 19dc1be2978..defecd2aafa 100644
--- a/src/mongo/db/s/sharding_state.h
+++ b/src/mongo/db/s/sharding_state.h
@@ -135,24 +135,12 @@ public:
ChunkVersion getVersion(const std::string& ns);
/**
- * If the metadata for 'ns' at this shard is at or above the requested version,
- * 'reqShardVersion', returns OK and fills in 'latestShardVersion' with the latest shard
- * version. The latter is always greater or equal than 'reqShardVersion' if in the same epoch.
- *
- * Otherwise, falls back to refreshMetadataNow.
- *
- * This call blocks if there are more than _configServerTickets threads currently refreshing
- * metadata (currently set to 3).
- *
- * Locking Note:
- * + Must NOT be called with the write lock because this call may go into the network,
- * and deadlocks may occur with shard-as-a-config. Therefore, nothing here guarantees
- * that 'latestShardVersion' is indeed the current one on return.
+ * Refreshes the local metadata based on whether the expected version is higher than what we
+ * have cached.
*/
- Status refreshMetadataIfNeeded(OperationContext* txn,
- const std::string& ns,
- const ChunkVersion& reqShardVersion,
- ChunkVersion* latestShardVersion);
+ Status onStaleShardVersion(OperationContext* txn,
+ const NamespaceString& nss,
+ const ChunkVersion& expectedVersion);
/**
* Refreshes collection metadata by asking the config server for the latest information.
@@ -303,13 +291,6 @@ public:
const BSONObj& maxKey,
ChunkVersion mergedVersion);
- bool inCriticalMigrateSection();
-
- /**
- * @return true if we are NOT in the critical section
- */
- bool waitTillNotInCriticalSection(int maxSecondsToWait);
-
/**
* TESTING ONLY
* Uninstalls the metadata for a given collection.
diff --git a/src/mongo/s/client/shard_connection.cpp b/src/mongo/s/client/shard_connection.cpp
index 177fd934e8c..0f64014f5e8 100644
--- a/src/mongo/s/client/shard_connection.cpp
+++ b/src/mongo/s/client/shard_connection.cpp
@@ -41,7 +41,6 @@
#include "mongo/s/client/shard.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
-#include "mongo/s/stale_exception.h"
#include "mongo/s/version_manager.h"
#include "mongo/util/concurrency/spin_lock.h"
#include "mongo/util/exit.h"
diff --git a/src/mongo/s/d_split.cpp b/src/mongo/s/d_split.cpp
index d8a690d211e..4aa5231ccc6 100644
--- a/src/mongo/s/d_split.cpp
+++ b/src/mongo/s/d_split.cpp
@@ -681,9 +681,9 @@ public:
// Mongos >= v3.2 sends the full version, v3.0 only sends the epoch.
// TODO(SERVER-20742): Stop parsing epoch separately after 3.2.
OID cmdEpoch;
- auto& operationVersion = OperationShardingState::get(txn);
- if (operationVersion.hasShardVersion()) {
- cmdVersion = operationVersion.getShardVersion(nss);
+ auto& oss = OperationShardingState::get(txn);
+ if (oss.hasShardVersion()) {
+ cmdVersion = oss.getShardVersion(nss);
cmdEpoch = cmdVersion.epoch();
} else {
BSONElement epochElem(cmdObj["epoch"]);
diff --git a/src/mongo/s/write_ops/batch_write_exec.cpp b/src/mongo/s/write_ops/batch_write_exec.cpp
index d9ce669e3f7..e056e05609c 100644
--- a/src/mongo/s/write_ops/batch_write_exec.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec.cpp
@@ -79,12 +79,6 @@ static void noteStaleResponses(const vector<ShardError*>& staleErrors, NSTargete
}
}
-static bool isShardMetadataChanging(const vector<ShardError*>& staleErrors) {
- if (!staleErrors.empty() && staleErrors.back()->error.isErrInfoSet())
- return staleErrors.back()->error.getErrInfo()["inCriticalSection"].trueValue();
- return false;
-}
-
// The number of times we'll try to continue a batch op if no progress is being made
// This only applies when no writes are occurring and metadata is not changing on reload
static const int kMaxRoundsWithoutProgress(5);
@@ -152,7 +146,6 @@ void BatchWriteExec::executeBatch(OperationContext* txn,
size_t numSent = 0;
size_t numToSend = childBatches.size();
- bool remoteMetadataChanging = false;
while (numSent != numToSend) {
// Collect batches out on the network, mapped by endpoint
OwnedHostBatchMap ownedPendingBatches;
@@ -270,11 +263,6 @@ void BatchWriteExec::executeBatch(OperationContext* txn,
++stats->numStaleBatches;
}
- // Remember if the shard is actively changing metadata right now
- if (isShardMetadataChanging(staleErrors)) {
- remoteMetadataChanging = true;
- }
-
// Remember that we successfully wrote to this shard
// NOTE: This will record lastOps for shards where we actually didn't update
// or delete any documents, which preserves old behavior but is conservative
@@ -327,7 +315,7 @@ void BatchWriteExec::executeBatch(OperationContext* txn,
//
int currCompletedOps = batchOp.numWriteOpsIn(WriteOpState_Completed);
- if (currCompletedOps == numCompletedOps && !targeterChanged && !remoteMetadataChanging) {
+ if (currCompletedOps == numCompletedOps && !targeterChanged) {
++numRoundsWithoutProgress;
} else {
numRoundsWithoutProgress = 0;
diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp
index 100fb1806a1..bf0e4c8685a 100644
--- a/src/mongo/s/write_ops/batch_write_exec_test.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp
@@ -290,11 +290,6 @@ TEST(BatchWriteExecTests, ManyStaleOpWithMigration) {
error.setErrCode(ErrorCodes::StaleShardVersion);
error.setErrMessage("mock stale error");
for (int i = 0; i < 10; i++) {
- if (i % 2 == 0)
- error.setErrInfo(BSONObj());
- else
- error.setErrInfo(BSON("inCriticalSection" << true));
-
mockResults.push_back(new MockWriteResult(backend.shardHost, error));
}
@@ -306,7 +301,7 @@ TEST(BatchWriteExecTests, ManyStaleOpWithMigration) {
backend.exec->executeBatch(&txn, request, &response, &stats);
ASSERT(response.getOk());
- ASSERT_EQUALS(stats.numStaleBatches, 10);
+ ASSERT_EQUALS(stats.numStaleBatches, 6);
}
} // namespace
diff --git a/src/mongo/s/write_ops/write_error_detail.cpp b/src/mongo/s/write_ops/write_error_detail.cpp
index cc2368154fe..51baaecd790 100644
--- a/src/mongo/s/write_ops/write_error_detail.cpp
+++ b/src/mongo/s/write_ops/write_error_detail.cpp
@@ -154,10 +154,6 @@ void WriteErrorDetail::setIndex(int index) {
_isIndexSet = true;
}
-void WriteErrorDetail::unsetIndex() {
- _isIndexSet = false;
-}
-
bool WriteErrorDetail::isIndexSet() const {
return _isIndexSet;
}
@@ -172,10 +168,6 @@ void WriteErrorDetail::setErrCode(int errCode) {
_isErrCodeSet = true;
}
-void WriteErrorDetail::unsetErrCode() {
- _isErrCodeSet = false;
-}
-
bool WriteErrorDetail::isErrCodeSet() const {
return _isErrCodeSet;
}
@@ -190,10 +182,6 @@ void WriteErrorDetail::setErrInfo(const BSONObj& errInfo) {
_isErrInfoSet = true;
}
-void WriteErrorDetail::unsetErrInfo() {
- _isErrInfoSet = false;
-}
-
bool WriteErrorDetail::isErrInfoSet() const {
return _isErrInfoSet;
}
diff --git a/src/mongo/s/write_ops/write_error_detail.h b/src/mongo/s/write_ops/write_error_detail.h
index 9c431f02f57..e0dec80ce52 100644
--- a/src/mongo/s/write_ops/write_error_detail.h
+++ b/src/mongo/s/write_ops/write_error_detail.h
@@ -77,17 +77,14 @@ public:
//
void setIndex(int index);
- void unsetIndex();
bool isIndexSet() const;
int getIndex() const;
void setErrCode(int errCode);
- void unsetErrCode();
bool isErrCodeSet() const;
int getErrCode() const;
void setErrInfo(const BSONObj& errInfo);
- void unsetErrInfo();
bool isErrInfoSet() const;
const BSONObj& getErrInfo() const;