summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/client.cpp31
-rw-r--r--src/mongo/db/client.h6
-rw-r--r--src/mongo/db/commands/write_commands/batch_executor.cpp76
-rw-r--r--src/mongo/db/commands/write_commands/write_commands.cpp2
-rw-r--r--src/mongo/db/instance.cpp2
-rw-r--r--src/mongo/s/d_logic.h2
-rw-r--r--src/mongo/s/d_state.cpp17
-rw-r--r--src/mongo/s/s_only.cpp3
8 files changed, 118 insertions, 21 deletions
diff --git a/src/mongo/db/client.cpp b/src/mongo/db/client.cpp
index 483f872ce83..a523505433d 100644
--- a/src/mongo/db/client.cpp
+++ b/src/mongo/db/client.cpp
@@ -167,7 +167,8 @@ namespace mongo {
_shutdown(false),
_desc(desc),
_god(0),
- _lastOp(0)
+ _lastOp(0),
+ _isWriteCmd(false)
{
_hasWrittenThisOperation = false;
_hasWrittenSinceCheckpoint = false;
@@ -312,22 +313,20 @@ namespace mongo {
}
- void Client::Context::checkNotStale() const {
+ void Client::Context::checkNotStale() const {
+ // Write commands do not rely on the writeback mechanism (handled after the request
+ // has been processed) to reroute writes, so the version check needs to be done here.
+ if (_client->isWriteCmd()) {
+ ensureShardVersionOKOrThrow(_ns);
+ }
+
switch ( _client->_curOp->getOp() ) {
case dbGetMore: // getMore's are special and should be handled else where
case dbUpdate: // update & delete check shard version in instance.cpp, so don't check here as well
case dbDelete:
break;
- default: {
- string errmsg;
- ChunkVersion received;
- ChunkVersion wanted;
- if ( ! shardVersionOk( _ns , errmsg, received, wanted ) ) {
- ostringstream os;
- os << "[" << _ns << "] shard version not ok in Client::Context: " << errmsg;
- throw SendStaleConfigException( _ns, os.str(), received, wanted );
- }
- }
+ default:
+ ensureShardVersionOKOrThrow(_ns);
}
}
@@ -540,6 +539,14 @@ namespace mongo {
return true;
}
+ void Client::setIsWriteCmd(bool newSetting) {
+ _isWriteCmd = newSetting;
+ }
+
+ bool Client::isWriteCmd() const {
+ return _isWriteCmd;
+ }
+
void OpDebug::reset() {
extra.reset();
diff --git a/src/mongo/db/client.h b/src/mongo/db/client.h
index 9c7f594f8aa..2017c51bf43 100644
--- a/src/mongo/db/client.h
+++ b/src/mongo/db/client.h
@@ -136,6 +136,9 @@ namespace mongo {
LockState& lockState() { return _ls; }
+ void setIsWriteCmd(bool newSetting);
+ bool isWriteCmd() const;
+
private:
Client(const std::string& desc, AbstractMessagingPort *p = 0);
friend class CurOp;
@@ -158,6 +161,9 @@ namespace mongo {
friend class PageFaultRetryableSection; // TEMP
friend class NoPageFaultsAllowed; // TEMP
+
+ bool _isWriteCmd;
+
public:
/** "read lock, and set my context, all in one operation"
diff --git a/src/mongo/db/commands/write_commands/batch_executor.cpp b/src/mongo/db/commands/write_commands/batch_executor.cpp
index 76cda120674..eb66b396d17 100644
--- a/src/mongo/db/commands/write_commands/batch_executor.cpp
+++ b/src/mongo/db/commands/write_commands/batch_executor.cpp
@@ -54,6 +54,7 @@
#include "mongo/s/collection_metadata.h"
#include "mongo/s/d_logic.h"
#include "mongo/s/shard_key_pattern.h"
+#include "mongo/s/stale_exception.h"
#include "mongo/s/write_ops/batched_upsert_detail.h"
#include "mongo/s/write_ops/write_error_detail.h"
#include "mongo/util/elapsed_tracker.h"
@@ -813,6 +814,17 @@ namespace mongo {
ElapsedTracker elapsedTracker(128, 10); // 128 hits or 10 ms, matching RunnerYieldPolicy's
+ ShardedConnectionInfo* info = ShardedConnectionInfo::get(false);
+ if (info) {
+ if (request.isMetadataSet() && request.getMetadata()->isShardVersionSet()) {
+ info->setVersion(request.getTargetingNS(),
+ request.getMetadata()->getShardVersion());
+ }
+ else {
+ info->setVersion(request.getTargetingNS(), ChunkVersion::IGNORED());
+ }
+ }
+
for (state.currIndex = 0;
state.currIndex < state.request->sizeWriteOps();
++state.currIndex) {
@@ -851,6 +863,20 @@ namespace mongo {
scoped_ptr<CurOp> currentOp( beginCurrentOp( _client, updateItem ) );
incOpStats( updateItem );
+ ShardedConnectionInfo* info = ShardedConnectionInfo::get(false);
+ if (info) {
+ const BatchedCommandRequest* rootRequest = updateItem.getRequest();
+ if (!updateItem.getUpdate()->getMulti() &&
+ rootRequest->isMetadataSet() &&
+ rootRequest->getMetadata()->isShardVersionSet()) {
+ info->setVersion(rootRequest->getTargetingNS(),
+ rootRequest->getMetadata()->getShardVersion());
+ }
+ else {
+ info->setVersion(rootRequest->getTargetingNS(), ChunkVersion::IGNORED());
+ }
+ }
+
WriteOpResult result;
multiUpdate( updateItem, &result );
@@ -877,6 +903,20 @@ namespace mongo {
scoped_ptr<CurOp> currentOp( beginCurrentOp( _client, removeItem ) );
incOpStats( removeItem );
+ ShardedConnectionInfo* info = ShardedConnectionInfo::get(false);
+ if (info) {
+ const BatchedCommandRequest* rootRequest = removeItem.getRequest();
+ if (removeItem.getDelete()->getLimit() == 1 &&
+ rootRequest->isMetadataSet() &&
+ rootRequest->getMetadata()->isShardVersionSet()) {
+ info->setVersion(rootRequest->getTargetingNS(),
+ rootRequest->getMetadata()->getShardVersion());
+ }
+ else {
+ info->setVersion(rootRequest->getTargetingNS(), ChunkVersion::IGNORED());
+ }
+ }
+
WriteOpResult result;
// NOTE: Deletes will not fault outside the lock once any data has been written
@@ -934,8 +974,7 @@ namespace mongo {
return false;
}
_context.reset(new Client::Context(request->getNS(),
- storageGlobalParams.dbpath,
- false /* don't check version */));
+ storageGlobalParams.dbpath));
Database* database = _context->db();
dassert(database);
_collection = database->getCollection(request->getTargetingNS());
@@ -999,6 +1038,14 @@ namespace mongo {
}
break;
}
+ catch (const StaleConfigException& staleExcep) {
+ result->setError(new WriteErrorDetail);
+ result->getError()->setErrCode(ErrorCodes::StaleShardVersion);
+ buildStaleError(staleExcep.getVersionReceived(),
+ staleExcep.getVersionWanted(),
+ result->getError());
+ break;
+ }
catch (const DBException& ex) {
Status status(ex.toStatus());
if (ErrorCodes::isInterruption(status.code()))
@@ -1131,9 +1178,8 @@ namespace mongo {
if ( !checkShardVersion( &shardingState, *updateItem.getRequest(), result ) )
return;
- Client::Context ctx( nsString.ns(),
- storageGlobalParams.dbpath,
- false /* don't check version */ );
+ Client::Context ctx(nsString.ns(),
+ storageGlobalParams.dbpath);
try {
UpdateResult res = executor.execute();
@@ -1149,6 +1195,13 @@ namespace mongo {
result->getStats().n = didInsert ? 1 : numMatched;
result->getStats().upsertedID = resUpsertedID;
}
+ catch (const StaleConfigException& staleExcep) {
+ result->setError(new WriteErrorDetail);
+ result->getError()->setErrCode(ErrorCodes::StaleShardVersion);
+ buildStaleError(staleExcep.getVersionReceived(),
+ staleExcep.getVersionWanted(),
+ result->getError());
+ }
catch (const DBException& ex) {
status = ex.toStatus();
if (ErrorCodes::isInterruption(status.code())) {
@@ -1193,13 +1246,20 @@ namespace mongo {
// Context once we're locked, to set more details in currentOp()
// TODO: better constructor?
- Client::Context writeContext( nss.ns(),
- storageGlobalParams.dbpath,
- false /* don't check version */);
+ Client::Context writeContext(nss.ns(),
+ storageGlobalParams.dbpath);
try {
result->getStats().n = executor.execute();
}
+ catch (const StaleConfigException& staleExcep) {
+ result->setError(new WriteErrorDetail);
+ result->getError()->setErrCode(ErrorCodes::StaleShardVersion);
+ buildStaleError(staleExcep.getVersionReceived(),
+ staleExcep.getVersionWanted(),
+ result->getError());
+ return;
+ }
catch ( const DBException& ex ) {
status = ex.toStatus();
if (ErrorCodes::isInterruption(status.code())) {
diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp
index a9717b57426..501dbd43dd9 100644
--- a/src/mongo/db/commands/write_commands/write_commands.cpp
+++ b/src/mongo/db/commands/write_commands/write_commands.cpp
@@ -136,6 +136,8 @@ namespace mongo {
// TODO: fix this for sane behavior where we query repl set object
if ( getLastErrorDefault ) defaultWriteConcern = *getLastErrorDefault;
+ cc().setIsWriteCmd(true);
+
WriteBatchExecutor writeBatchExecutor(defaultWriteConcern,
&cc(),
&globalOpCounters,
diff --git a/src/mongo/db/instance.cpp b/src/mongo/db/instance.cpp
index d99894a6cdf..40fe6d07c33 100644
--- a/src/mongo/db/instance.cpp
+++ b/src/mongo/db/instance.cpp
@@ -354,6 +354,8 @@ namespace mongo {
if (!c.isGod())
c.getAuthorizationSession()->startRequest();
+ c.setIsWriteCmd(false);
+
if ( op == dbQuery ) {
const char *ns = dbmsg.getns();
diff --git a/src/mongo/s/d_logic.h b/src/mongo/s/d_logic.h
index d13e15d67bc..d453a8b1b31 100644
--- a/src/mongo/s/d_logic.h
+++ b/src/mongo/s/d_logic.h
@@ -377,4 +377,6 @@ namespace mongo {
const BSONObj* fullObj,
bool forMigrateCleanup );
+ void ensureShardVersionOKOrThrow(const std::string& ns);
+
}
diff --git a/src/mongo/s/d_state.cpp b/src/mongo/s/d_state.cpp
index cd275c36cb6..8e46206eb28 100644
--- a/src/mongo/s/d_state.cpp
+++ b/src/mongo/s/d_state.cpp
@@ -56,6 +56,7 @@
#include "mongo/s/d_logic.h"
#include "mongo/s/metadata_loader.h"
#include "mongo/s/shard.h"
+#include "mongo/s/stale_exception.h"
#include "mongo/util/queue.h"
#include "mongo/util/concurrency/mutex.h"
#include "mongo/util/concurrency/ticketholder.h"
@@ -1281,6 +1282,11 @@ namespace mongo {
// TODO : all collections at some point, be sharded or not, will have a version
// (and a CollectionMetadata)
received = info->getVersion( ns );
+
+ if (ChunkVersion::isIgnoredVersion(received)) {
+ return true;
+ }
+
wanted = shardingState.getVersion( ns );
if( received.isWriteCompatibleWith( wanted ) ) return true;
@@ -1333,4 +1339,15 @@ namespace mongo {
void usingAShardConnection( const string& addr ) {
}
+ void ensureShardVersionOKOrThrow(const std::string& ns) {
+ string errmsg;
+ ChunkVersion received;
+ ChunkVersion wanted;
+ if (!shardVersionOk(ns, errmsg, received, wanted)) {
+ StringBuilder sb;
+ sb << "[" << ns << "] shard version not ok: " << errmsg;
+ throw SendStaleConfigException(ns, sb.str(), received, wanted);
+ }
+ }
+
}
diff --git a/src/mongo/s/s_only.cpp b/src/mongo/s/s_only.cpp
index 31adb31a711..efecbb04a18 100644
--- a/src/mongo/s/s_only.cpp
+++ b/src/mongo/s/s_only.cpp
@@ -69,7 +69,8 @@ namespace mongo {
_shutdown(false),
_desc(desc),
_god(0),
- _lastOp(0) {
+ _lastOp(0),
+ _isWriteCmd(false) {
}
Client::~Client() {}
bool Client::shutdown() { return true; }