diff options
-rw-r--r-- | src/mongo/db/client.cpp | 31 | ||||
-rw-r--r-- | src/mongo/db/client.h | 6 | ||||
-rw-r--r-- | src/mongo/db/commands/write_commands/batch_executor.cpp | 76 | ||||
-rw-r--r-- | src/mongo/db/commands/write_commands/write_commands.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/instance.cpp | 2 | ||||
-rw-r--r-- | src/mongo/s/d_logic.h | 2 | ||||
-rw-r--r-- | src/mongo/s/d_state.cpp | 17 | ||||
-rw-r--r-- | src/mongo/s/s_only.cpp | 3 |
8 files changed, 118 insertions, 21 deletions
diff --git a/src/mongo/db/client.cpp b/src/mongo/db/client.cpp index 483f872ce83..a523505433d 100644 --- a/src/mongo/db/client.cpp +++ b/src/mongo/db/client.cpp @@ -167,7 +167,8 @@ namespace mongo { _shutdown(false), _desc(desc), _god(0), - _lastOp(0) + _lastOp(0), + _isWriteCmd(false) { _hasWrittenThisOperation = false; _hasWrittenSinceCheckpoint = false; @@ -312,22 +313,20 @@ namespace mongo { } - void Client::Context::checkNotStale() const { + void Client::Context::checkNotStale() const { + // Write commands do not rely on the writeback mechanism (handled after the request + // has been processed) to reroute writes, so the version check needs to be done here. + if (_client->isWriteCmd()) { + ensureShardVersionOKOrThrow(_ns); + } + switch ( _client->_curOp->getOp() ) { case dbGetMore: // getMore's are special and should be handled else where case dbUpdate: // update & delete check shard version in instance.cpp, so don't check here as well case dbDelete: break; - default: { - string errmsg; - ChunkVersion received; - ChunkVersion wanted; - if ( ! shardVersionOk( _ns , errmsg, received, wanted ) ) { - ostringstream os; - os << "[" << _ns << "] shard version not ok in Client::Context: " << errmsg; - throw SendStaleConfigException( _ns, os.str(), received, wanted ); - } - } + default: + ensureShardVersionOKOrThrow(_ns); } } @@ -540,6 +539,14 @@ namespace mongo { return true; } + void Client::setIsWriteCmd(bool newSetting) { + _isWriteCmd = newSetting; + } + + bool Client::isWriteCmd() const { + return _isWriteCmd; + } + void OpDebug::reset() { extra.reset(); diff --git a/src/mongo/db/client.h b/src/mongo/db/client.h index 9c7f594f8aa..2017c51bf43 100644 --- a/src/mongo/db/client.h +++ b/src/mongo/db/client.h @@ -136,6 +136,9 @@ namespace mongo { LockState& lockState() { return _ls; } + void setIsWriteCmd(bool newSetting); + bool isWriteCmd() const; + private: Client(const std::string& desc, AbstractMessagingPort *p = 0); friend class CurOp; @@ -158,6 +161,9 @@ namespace mongo { friend class PageFaultRetryableSection; // TEMP friend class NoPageFaultsAllowed; // TEMP + + bool _isWriteCmd; + public: /** "read lock, and set my context, all in one operation" diff --git a/src/mongo/db/commands/write_commands/batch_executor.cpp b/src/mongo/db/commands/write_commands/batch_executor.cpp index 76cda120674..eb66b396d17 100644 --- a/src/mongo/db/commands/write_commands/batch_executor.cpp +++ b/src/mongo/db/commands/write_commands/batch_executor.cpp @@ -54,6 +54,7 @@ #include "mongo/s/collection_metadata.h" #include "mongo/s/d_logic.h" #include "mongo/s/shard_key_pattern.h" +#include "mongo/s/stale_exception.h" #include "mongo/s/write_ops/batched_upsert_detail.h" #include "mongo/s/write_ops/write_error_detail.h" #include "mongo/util/elapsed_tracker.h" @@ -813,6 +814,17 @@ namespace mongo { ElapsedTracker elapsedTracker(128, 10); // 128 hits or 10 ms, matching RunnerYieldPolicy's + ShardedConnectionInfo* info = ShardedConnectionInfo::get(false); + if (info) { + if (request.isMetadataSet() && request.getMetadata()->isShardVersionSet()) { + info->setVersion(request.getTargetingNS(), + request.getMetadata()->getShardVersion()); + } + else { + info->setVersion(request.getTargetingNS(), ChunkVersion::IGNORED()); + } + } + for (state.currIndex = 0; state.currIndex < state.request->sizeWriteOps(); ++state.currIndex) { @@ -851,6 +863,20 @@ namespace mongo { scoped_ptr<CurOp> currentOp( beginCurrentOp( _client, updateItem ) ); incOpStats( updateItem ); + ShardedConnectionInfo* info = ShardedConnectionInfo::get(false); + if (info) { + const BatchedCommandRequest* rootRequest = updateItem.getRequest(); + if (!updateItem.getUpdate()->getMulti() && + rootRequest->isMetadataSet() && + rootRequest->getMetadata()->isShardVersionSet()) { + info->setVersion(rootRequest->getTargetingNS(), + rootRequest->getMetadata()->getShardVersion()); + } + else { + info->setVersion(rootRequest->getTargetingNS(), ChunkVersion::IGNORED()); + } + } + WriteOpResult result; multiUpdate( updateItem, &result ); @@ -877,6 +903,20 @@ namespace mongo { scoped_ptr<CurOp> currentOp( beginCurrentOp( _client, removeItem ) ); incOpStats( removeItem ); + ShardedConnectionInfo* info = ShardedConnectionInfo::get(false); + if (info) { + const BatchedCommandRequest* rootRequest = removeItem.getRequest(); + if (removeItem.getDelete()->getLimit() == 1 && + rootRequest->isMetadataSet() && + rootRequest->getMetadata()->isShardVersionSet()) { + info->setVersion(rootRequest->getTargetingNS(), + rootRequest->getMetadata()->getShardVersion()); + } + else { + info->setVersion(rootRequest->getTargetingNS(), ChunkVersion::IGNORED()); + } + } + WriteOpResult result; // NOTE: Deletes will not fault outside the lock once any data has been written @@ -934,8 +974,7 @@ namespace mongo { return false; } _context.reset(new Client::Context(request->getNS(), - storageGlobalParams.dbpath, - false /* don't check version */)); + storageGlobalParams.dbpath)); Database* database = _context->db(); dassert(database); _collection = database->getCollection(request->getTargetingNS()); @@ -999,6 +1038,14 @@ namespace mongo { } break; } + catch (const StaleConfigException& staleExcep) { + result->setError(new WriteErrorDetail); + result->getError()->setErrCode(ErrorCodes::StaleShardVersion); + buildStaleError(staleExcep.getVersionReceived(), + staleExcep.getVersionWanted(), + result->getError()); + break; + } catch (const DBException& ex) { Status status(ex.toStatus()); if (ErrorCodes::isInterruption(status.code())) @@ -1131,9 +1178,8 @@ namespace mongo { if ( !checkShardVersion( &shardingState, *updateItem.getRequest(), result ) ) return; - Client::Context ctx( nsString.ns(), - storageGlobalParams.dbpath, - false /* don't check version */ ); + Client::Context ctx(nsString.ns(), + storageGlobalParams.dbpath); try { UpdateResult res = executor.execute(); @@ -1149,6 +1195,13 @@ namespace mongo { result->getStats().n = didInsert ? 1 : numMatched; result->getStats().upsertedID = resUpsertedID; } + catch (const StaleConfigException& staleExcep) { + result->setError(new WriteErrorDetail); + result->getError()->setErrCode(ErrorCodes::StaleShardVersion); + buildStaleError(staleExcep.getVersionReceived(), + staleExcep.getVersionWanted(), + result->getError()); + } catch (const DBException& ex) { status = ex.toStatus(); if (ErrorCodes::isInterruption(status.code())) { @@ -1193,13 +1246,20 @@ namespace mongo { // Context once we're locked, to set more details in currentOp() // TODO: better constructor? - Client::Context writeContext( nss.ns(), - storageGlobalParams.dbpath, - false /* don't check version */); + Client::Context writeContext(nss.ns(), + storageGlobalParams.dbpath); try { result->getStats().n = executor.execute(); } + catch (const StaleConfigException& staleExcep) { + result->setError(new WriteErrorDetail); + result->getError()->setErrCode(ErrorCodes::StaleShardVersion); + buildStaleError(staleExcep.getVersionReceived(), + staleExcep.getVersionWanted(), + result->getError()); + return; + } catch ( const DBException& ex ) { status = ex.toStatus(); if (ErrorCodes::isInterruption(status.code())) { diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp index a9717b57426..501dbd43dd9 100644 --- a/src/mongo/db/commands/write_commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands/write_commands.cpp @@ -136,6 +136,8 @@ namespace mongo { // TODO: fix this for sane behavior where we query repl set object if ( getLastErrorDefault ) defaultWriteConcern = *getLastErrorDefault; + cc().setIsWriteCmd(true); + WriteBatchExecutor writeBatchExecutor(defaultWriteConcern, &cc(), &globalOpCounters, diff --git a/src/mongo/db/instance.cpp b/src/mongo/db/instance.cpp index d99894a6cdf..40fe6d07c33 100644 --- a/src/mongo/db/instance.cpp +++ b/src/mongo/db/instance.cpp @@ -354,6 +354,8 @@ namespace mongo { if (!c.isGod()) c.getAuthorizationSession()->startRequest(); + c.setIsWriteCmd(false); + if ( op == dbQuery ) { const char *ns = dbmsg.getns(); diff --git a/src/mongo/s/d_logic.h b/src/mongo/s/d_logic.h index d13e15d67bc..d453a8b1b31 100644 --- a/src/mongo/s/d_logic.h +++ b/src/mongo/s/d_logic.h @@ -377,4 +377,6 @@ namespace mongo { const BSONObj* fullObj, bool forMigrateCleanup ); + void ensureShardVersionOKOrThrow(const std::string& ns); + } diff --git a/src/mongo/s/d_state.cpp b/src/mongo/s/d_state.cpp index cd275c36cb6..8e46206eb28 100644 --- a/src/mongo/s/d_state.cpp +++ b/src/mongo/s/d_state.cpp @@ -56,6 +56,7 @@ #include "mongo/s/d_logic.h" #include "mongo/s/metadata_loader.h" #include "mongo/s/shard.h" +#include "mongo/s/stale_exception.h" #include "mongo/util/queue.h" #include "mongo/util/concurrency/mutex.h" #include "mongo/util/concurrency/ticketholder.h" @@ -1281,6 +1282,11 @@ namespace mongo { // TODO : all collections at some point, be sharded or not, will have a version // (and a CollectionMetadata) received = info->getVersion( ns ); + + if (ChunkVersion::isIgnoredVersion(received)) { + return true; + } + wanted = shardingState.getVersion( ns ); if( received.isWriteCompatibleWith( wanted ) ) return true; @@ -1333,4 +1339,15 @@ namespace mongo { void usingAShardConnection( const string& addr ) { } + void ensureShardVersionOKOrThrow(const std::string& ns) { + string errmsg; + ChunkVersion received; + ChunkVersion wanted; + if (!shardVersionOk(ns, errmsg, received, wanted)) { + StringBuilder sb; + sb << "[" << ns << "] shard version not ok: " << errmsg; + throw SendStaleConfigException(ns, sb.str(), received, wanted); + } + } + } diff --git a/src/mongo/s/s_only.cpp b/src/mongo/s/s_only.cpp index 31adb31a711..efecbb04a18 100644 --- a/src/mongo/s/s_only.cpp +++ b/src/mongo/s/s_only.cpp @@ -69,7 +69,8 @@ namespace mongo { _shutdown(false), _desc(desc), _god(0), - _lastOp(0) { + _lastOp(0), + _isWriteCmd(false) { } Client::~Client() {} bool Client::shutdown() { return true; } |