summaryrefslogtreecommitdiff
path: root/src/mongo/db/commands/write_commands/batch_executor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/commands/write_commands/batch_executor.cpp')
-rw-r--r--src/mongo/db/commands/write_commands/batch_executor.cpp114
1 files changed, 24 insertions, 90 deletions
diff --git a/src/mongo/db/commands/write_commands/batch_executor.cpp b/src/mongo/db/commands/write_commands/batch_executor.cpp
index d3fb03727e7..1fa931d239b 100644
--- a/src/mongo/db/commands/write_commands/batch_executor.cpp
+++ b/src/mongo/db/commands/write_commands/batch_executor.cpp
@@ -65,6 +65,7 @@
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/s/collection_metadata.h"
+#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/sharded_connection_info.h"
#include "mongo/db/s/sharding_state.h"
@@ -150,14 +151,6 @@ void toBatchError(const Status& status, BatchedCommandResponse* response) {
dassert(response->isValid(NULL));
}
-void noteInCriticalSection(WriteErrorDetail* staleError) {
- BSONObjBuilder builder;
- if (staleError->isErrInfoSet())
- builder.appendElements(staleError->getErrInfo());
- builder.append("inCriticalSection", true);
- staleError->setErrInfo(builder.obj());
-}
-
/**
* Translates write item type to wire protocol op code. Helper for
* WriteBatchExecutor::applyWriteItem().
@@ -194,38 +187,20 @@ void buildStaleError(const ChunkVersion& shardVersionRecvd,
bool checkShardVersion(OperationContext* txn,
const BatchedCommandRequest& request,
WriteOpResult* result) {
- const NamespaceString& nss = request.getTargetingNSS();
- dassert(txn->lockState()->isCollectionLockedForMode(nss.ns(), MODE_IX));
-
- auto& oss = OperationShardingState::get(txn);
+ const auto& css = CollectionShardingState::get(txn, request.getTargetingNSS());
- if (!oss.hasShardVersion()) {
- return true;
- }
-
- ChunkVersion operationShardVersion = oss.getShardVersion(nss);
- if (ChunkVersion::isIgnoredVersion(operationShardVersion)) {
+ try {
+ css->checkShardVersionOrThrow(txn);
return true;
- }
-
- ShardingState* shardingState = ShardingState::get(txn);
- CollectionMetadataPtr metadata = shardingState->getCollectionMetadata(nss.ns());
- ChunkVersion shardVersion = metadata ? metadata->getShardVersion() : ChunkVersion::UNSHARDED();
-
- if (!operationShardVersion.isWriteCompatibleWith(shardVersion)) {
- result->setError(new WriteErrorDetail);
- buildStaleError(operationShardVersion, shardVersion, result->getError());
+ } catch (const StaleConfigException& e) {
+ result->setError(new WriteErrorDetail());
+ buildStaleError(e.getVersionReceived(), e.getVersionWanted(), result->getError());
return false;
}
-
- return true;
}
} // namespace
-// TODO: Determine queueing behavior we want here
-MONGO_EXPORT_SERVER_PARAMETER(queueForMigrationCommit, bool, true);
-
WriteBatchExecutor::WriteBatchExecutor(OperationContext* txn, OpCounters* opCounters, LastError* le)
: _txn(txn), _opCounters(opCounters), _le(le), _stats(new WriteBatchStats) {}
@@ -326,51 +301,15 @@ void WriteBatchExecutor::executeBatch(const BatchedCommandRequest& request,
// Refresh metadata if needed
//
- bool staleBatch =
+ const bool staleBatch =
!writeErrors.empty() && writeErrors.back()->getErrCode() == ErrorCodes::StaleShardVersion;
if (staleBatch) {
- ShardingState* shardingState = ShardingState::get(_txn);
-
- auto& oss = OperationShardingState::get(_txn);
- ChunkVersion requestShardVersion = oss.getShardVersion(request.getTargetingNSS());
-
- //
- // First, we refresh metadata if we need to based on the requested version.
- //
- ChunkVersion latestShardVersion;
- shardingState->refreshMetadataIfNeeded(
- _txn, request.getTargetingNS(), requestShardVersion, &latestShardVersion);
-
- // Report if we're still changing our metadata
- // TODO: Better reporting per-collection
- if (shardingState->inCriticalMigrateSection()) {
- noteInCriticalSection(writeErrors.back());
- }
+ const auto& oss = OperationShardingState::get(_txn);
- if (queueForMigrationCommit) {
- //
- // Queue up for migration to end - this allows us to be sure that clients will
- // not repeatedly try to refresh metadata that is not yet written to the config
- // server. Not necessary for correctness.
- // Exposed as optional parameter to allow testing of queuing behavior with
- // different network timings.
- //
- // Only wait if we're an older version (in the current collection epoch) and
- // we're not write compatible, implying that the current migration is affecting
- // writes.
- //
-
- if (requestShardVersion.isOlderThan(latestShardVersion) &&
- !requestShardVersion.isWriteCompatibleWith(latestShardVersion)) {
- while (shardingState->inCriticalMigrateSection()) {
- log() << "write request to old shard version " << requestShardVersion
- << " waiting for migration commit";
-
- shardingState->waitTillNotInCriticalSection(10 /* secs */);
- }
- }
- }
+ ChunkVersion requestedShardVersion = oss.getShardVersion(request.getTargetingNSS());
+ ShardingState::get(_txn)
+ ->onStaleShardVersion(_txn, request.getTargetingNSS(), requestedShardVersion);
}
//
@@ -1035,11 +974,9 @@ static void insertOne(WriteBatchExecutor::ExecInsertsState* state, WriteOpResult
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
txn, "insert", state->getCollection() ? state->getCollection()->ns().ns() : "index");
- } catch (const StaleConfigException& staleExcep) {
- result->setError(new WriteErrorDetail);
- result->getError()->setErrCode(ErrorCodes::StaleShardVersion);
- buildStaleError(
- staleExcep.getVersionReceived(), staleExcep.getVersionWanted(), result->getError());
+ } catch (const StaleConfigException& e) {
+ result->setError(new WriteErrorDetail());
+ buildStaleError(e.getVersionReceived(), e.getVersionWanted(), result->getError());
} catch (const DBException& ex) {
Status status(ex.toStatus());
if (ErrorCodes::isInterruption(status.code()))
@@ -1238,7 +1175,7 @@ static void multiUpdate(OperationContext* txn,
// fire in that case.
lastOpSetterGuard.Dismiss();
}
- } catch (const WriteConflictException& dle) {
+ } catch (const WriteConflictException&) {
debug->writeConflicts++;
if (isMulti) {
log() << "Had WriteConflict during multi update, aborting";
@@ -1246,16 +1183,15 @@ static void multiUpdate(OperationContext* txn,
}
createCollection = false;
+
// RESTART LOOP
fakeLoop = -1;
txn->recoveryUnit()->abandonSnapshot();
WriteConflictException::logAndBackoff(attempt++, "update", nsString.ns());
- } catch (const StaleConfigException& staleExcep) {
- result->setError(new WriteErrorDetail);
- result->getError()->setErrCode(ErrorCodes::StaleShardVersion);
- buildStaleError(
- staleExcep.getVersionReceived(), staleExcep.getVersionWanted(), result->getError());
+ } catch (const StaleConfigException& e) {
+ result->setError(new WriteErrorDetail());
+ buildStaleError(e.getVersionReceived(), e.getVersionWanted(), result->getError());
} catch (const DBException& ex) {
Status status = ex.toStatus();
if (ErrorCodes::isInterruption(status.code())) {
@@ -1345,14 +1281,12 @@ static void multiRemove(OperationContext* txn,
lastOpSetterGuard.Dismiss();
}
break;
- } catch (const WriteConflictException& dle) {
+ } catch (const WriteConflictException&) {
CurOp::get(txn)->debug().writeConflicts++;
WriteConflictException::logAndBackoff(attempt++, "delete", nss.ns());
- } catch (const StaleConfigException& staleExcep) {
- result->setError(new WriteErrorDetail);
- result->getError()->setErrCode(ErrorCodes::StaleShardVersion);
- buildStaleError(
- staleExcep.getVersionReceived(), staleExcep.getVersionWanted(), result->getError());
+ } catch (const StaleConfigException& e) {
+ result->setError(new WriteErrorDetail());
+ buildStaleError(e.getVersionReceived(), e.getVersionWanted(), result->getError());
return;
} catch (const DBException& ex) {
Status status = ex.toStatus();