diff options
Diffstat (limited to 'src/mongo/db/commands/write_commands/batch_executor.cpp')
-rw-r--r-- | src/mongo/db/commands/write_commands/batch_executor.cpp | 114 |
1 files changed, 24 insertions, 90 deletions
diff --git a/src/mongo/db/commands/write_commands/batch_executor.cpp b/src/mongo/db/commands/write_commands/batch_executor.cpp index d3fb03727e7..1fa931d239b 100644 --- a/src/mongo/db/commands/write_commands/batch_executor.cpp +++ b/src/mongo/db/commands/write_commands/batch_executor.cpp @@ -65,6 +65,7 @@ #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/server_parameters.h" #include "mongo/db/s/collection_metadata.h" +#include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/sharded_connection_info.h" #include "mongo/db/s/sharding_state.h" @@ -150,14 +151,6 @@ void toBatchError(const Status& status, BatchedCommandResponse* response) { dassert(response->isValid(NULL)); } -void noteInCriticalSection(WriteErrorDetail* staleError) { - BSONObjBuilder builder; - if (staleError->isErrInfoSet()) - builder.appendElements(staleError->getErrInfo()); - builder.append("inCriticalSection", true); - staleError->setErrInfo(builder.obj()); -} - /** * Translates write item type to wire protocol op code. Helper for * WriteBatchExecutor::applyWriteItem(). @@ -194,38 +187,20 @@ void buildStaleError(const ChunkVersion& shardVersionRecvd, bool checkShardVersion(OperationContext* txn, const BatchedCommandRequest& request, WriteOpResult* result) { - const NamespaceString& nss = request.getTargetingNSS(); - dassert(txn->lockState()->isCollectionLockedForMode(nss.ns(), MODE_IX)); - - auto& oss = OperationShardingState::get(txn); + const auto& css = CollectionShardingState::get(txn, request.getTargetingNSS()); - if (!oss.hasShardVersion()) { - return true; - } - - ChunkVersion operationShardVersion = oss.getShardVersion(nss); - if (ChunkVersion::isIgnoredVersion(operationShardVersion)) { + try { + css->checkShardVersionOrThrow(txn); return true; - } - - ShardingState* shardingState = ShardingState::get(txn); - CollectionMetadataPtr metadata = shardingState->getCollectionMetadata(nss.ns()); - ChunkVersion shardVersion = metadata ? metadata->getShardVersion() : ChunkVersion::UNSHARDED(); - - if (!operationShardVersion.isWriteCompatibleWith(shardVersion)) { - result->setError(new WriteErrorDetail); - buildStaleError(operationShardVersion, shardVersion, result->getError()); + } catch (const StaleConfigException& e) { + result->setError(new WriteErrorDetail()); + buildStaleError(e.getVersionReceived(), e.getVersionWanted(), result->getError()); return false; } - - return true; } } // namespace -// TODO: Determine queueing behavior we want here -MONGO_EXPORT_SERVER_PARAMETER(queueForMigrationCommit, bool, true); - WriteBatchExecutor::WriteBatchExecutor(OperationContext* txn, OpCounters* opCounters, LastError* le) : _txn(txn), _opCounters(opCounters), _le(le), _stats(new WriteBatchStats) {} @@ -326,51 +301,15 @@ void WriteBatchExecutor::executeBatch(const BatchedCommandRequest& request, // Refresh metadata if needed // - bool staleBatch = + const bool staleBatch = !writeErrors.empty() && writeErrors.back()->getErrCode() == ErrorCodes::StaleShardVersion; if (staleBatch) { - ShardingState* shardingState = ShardingState::get(_txn); - - auto& oss = OperationShardingState::get(_txn); - ChunkVersion requestShardVersion = oss.getShardVersion(request.getTargetingNSS()); - - // - // First, we refresh metadata if we need to based on the requested version. - // - ChunkVersion latestShardVersion; - shardingState->refreshMetadataIfNeeded( - _txn, request.getTargetingNS(), requestShardVersion, &latestShardVersion); - - // Report if we're still changing our metadata - // TODO: Better reporting per-collection - if (shardingState->inCriticalMigrateSection()) { - noteInCriticalSection(writeErrors.back()); - } + const auto& oss = OperationShardingState::get(_txn); - if (queueForMigrationCommit) { - // - // Queue up for migration to end - this allows us to be sure that clients will - // not repeatedly try to refresh metadata that is not yet written to the config - // server. Not necessary for correctness. - // Exposed as optional parameter to allow testing of queuing behavior with - // different network timings. - // - // Only wait if we're an older version (in the current collection epoch) and - // we're not write compatible, implying that the current migration is affecting - // writes. - // - - if (requestShardVersion.isOlderThan(latestShardVersion) && - !requestShardVersion.isWriteCompatibleWith(latestShardVersion)) { - while (shardingState->inCriticalMigrateSection()) { - log() << "write request to old shard version " << requestShardVersion - << " waiting for migration commit"; - - shardingState->waitTillNotInCriticalSection(10 /* secs */); - } - } - } + ChunkVersion requestedShardVersion = oss.getShardVersion(request.getTargetingNSS()); + ShardingState::get(_txn) + ->onStaleShardVersion(_txn, request.getTargetingNSS(), requestedShardVersion); } // @@ -1035,11 +974,9 @@ static void insertOne(WriteBatchExecutor::ExecInsertsState* state, WriteOpResult } MONGO_WRITE_CONFLICT_RETRY_LOOP_END( txn, "insert", state->getCollection() ? state->getCollection()->ns().ns() : "index"); - } catch (const StaleConfigException& staleExcep) { - result->setError(new WriteErrorDetail); - result->getError()->setErrCode(ErrorCodes::StaleShardVersion); - buildStaleError( - staleExcep.getVersionReceived(), staleExcep.getVersionWanted(), result->getError()); + } catch (const StaleConfigException& e) { + result->setError(new WriteErrorDetail()); + buildStaleError(e.getVersionReceived(), e.getVersionWanted(), result->getError()); } catch (const DBException& ex) { Status status(ex.toStatus()); if (ErrorCodes::isInterruption(status.code())) @@ -1238,7 +1175,7 @@ static void multiUpdate(OperationContext* txn, // fire in that case. lastOpSetterGuard.Dismiss(); } - } catch (const WriteConflictException& dle) { + } catch (const WriteConflictException&) { debug->writeConflicts++; if (isMulti) { log() << "Had WriteConflict during multi update, aborting"; @@ -1246,16 +1183,15 @@ static void multiUpdate(OperationContext* txn, } createCollection = false; + // RESTART LOOP fakeLoop = -1; txn->recoveryUnit()->abandonSnapshot(); WriteConflictException::logAndBackoff(attempt++, "update", nsString.ns()); - } catch (const StaleConfigException& staleExcep) { - result->setError(new WriteErrorDetail); - result->getError()->setErrCode(ErrorCodes::StaleShardVersion); - buildStaleError( - staleExcep.getVersionReceived(), staleExcep.getVersionWanted(), result->getError()); + } catch (const StaleConfigException& e) { + result->setError(new WriteErrorDetail()); + buildStaleError(e.getVersionReceived(), e.getVersionWanted(), result->getError()); } catch (const DBException& ex) { Status status = ex.toStatus(); if (ErrorCodes::isInterruption(status.code())) { @@ -1345,14 +1281,12 @@ static void multiRemove(OperationContext* txn, lastOpSetterGuard.Dismiss(); } break; - } catch (const WriteConflictException& dle) { + } catch (const WriteConflictException&) { CurOp::get(txn)->debug().writeConflicts++; WriteConflictException::logAndBackoff(attempt++, "delete", nss.ns()); - } catch (const StaleConfigException& staleExcep) { - result->setError(new WriteErrorDetail); - result->getError()->setErrCode(ErrorCodes::StaleShardVersion); - buildStaleError( - staleExcep.getVersionReceived(), staleExcep.getVersionWanted(), result->getError()); + } catch (const StaleConfigException& e) { + result->setError(new WriteErrorDetail()); + buildStaleError(e.getVersionReceived(), e.getVersionWanted(), result->getError()); return; } catch (const DBException& ex) { Status status = ex.toStatus(); |