diff options
-rw-r--r-- | jstests/gle/create_index_gle.js | 43 | ||||
-rw-r--r-- | src/mongo/client/parallel.cpp | 37 | ||||
-rw-r--r-- | src/mongo/client/parallel.h | 18 | ||||
-rw-r--r-- | src/mongo/db/client.cpp | 20 | ||||
-rw-r--r-- | src/mongo/db/client.h | 12 | ||||
-rw-r--r-- | src/mongo/db/commands/create_indexes.cpp | 10 | ||||
-rw-r--r-- | src/mongo/s/commands_public.cpp | 31 |
7 files changed, 143 insertions, 28 deletions
diff --git a/jstests/gle/create_index_gle.js b/jstests/gle/create_index_gle.js new file mode 100644 index 00000000000..0dfab3fa2fa --- /dev/null +++ b/jstests/gle/create_index_gle.js @@ -0,0 +1,43 @@ +var st = new ShardingTest({ shards: { rs0: { nodes: 2, oplogSize: 10, verbose: 1 }}}); +var replTest = st.rs0; + +var config = replTest.getReplSetConfig(); +config.members[1].priority = 0; +// Add a delay long enough so getLastError would actually 'wait' for write concern. +config.members[1].slaveDelay = 3; +config.version = 2; + +var priConn = replTest.getPrimary(); + +try { + priConn.getDB('admin').runCommand({ replSetReconfig: config }); +} catch (x) { + print('reconfig closed conn'); +} + +assert.soon(function() { + var secConn = replTest.getSecondary(); + var config = secConn.getDB('local').system.replset.findOne(); + return config.members[1].slaveDelay == 3; +}); + +replTest.awaitSecondaryNodes(); + +var testDB = st.s.getDB('test'); +testDB.adminCommand({ connPoolSync: 1 }); + +var secConn = replTest.getSecondary(); +var testDB2 = secConn.getDB('test'); + +testDB.user.insert({ x: 1 }); + +testDB.user.ensureIndex({ x: 1 }); +assert.gleOK(testDB.runCommand({ getLastError: 1, w: 2 })); + +var priIdx = testDB.user.getIndexes(); +var secIdx = testDB2.user.getIndexes(); + +assert.eq(priIdx.length, secIdx.length, 'pri: ' + tojson(priIdx) + ', sec: ' + tojson(secIdx)); + +st.stop(); + diff --git a/src/mongo/client/parallel.cpp b/src/mongo/client/parallel.cpp index a5c94a67f76..99fea5b59f5 100644 --- a/src/mongo/client/parallel.cpp +++ b/src/mongo/client/parallel.cpp @@ -1538,8 +1538,19 @@ namespace mongo { // ---- Future ----- // ----------------- - Future::CommandResult::CommandResult( const string& server , const string& db , const BSONObj& cmd , int options , DBClientBase * conn ) - :_server(server) ,_db(db) , _options(options), _cmd(cmd) ,_conn(conn) ,_done(false) + Future::CommandResult::CommandResult( const string& server, + const string& db, + const BSONObj& cmd, + int options, + DBClientBase * conn, + bool useShardedConn ): + _server(server), + _db(db), + _options(options), + _cmd(cmd), + _conn(conn), + _useShardConn(useShardedConn), + _done(false) { init(); } @@ -1547,7 +1558,12 @@ namespace mongo { void Future::CommandResult::init(){ try { if ( ! _conn ){ - _connHolder.reset( new ScopedDbConnection( _server ) ); + if ( _useShardConn) { + _connHolder.reset( new ShardConnection( _server, "" )); + } + else { + _connHolder.reset( new ScopedDbConnection( _server ) ); + } _conn = _connHolder->get(); } @@ -1641,8 +1657,19 @@ namespace mongo { return _ok; } - shared_ptr<Future::CommandResult> Future::spawnCommand( const string& server , const string& db , const BSONObj& cmd , int options , DBClientBase * conn ) { - shared_ptr<Future::CommandResult> res (new Future::CommandResult( server , db , cmd , options , conn )); + shared_ptr<Future::CommandResult> Future::spawnCommand( const string& server, + const string& db, + const BSONObj& cmd, + int options, + DBClientBase * conn, + bool useShardConn ) { + shared_ptr<Future::CommandResult> res ( + new Future::CommandResult( server, + db, + cmd, + options, + conn, + useShardConn)); return res; } diff --git a/src/mongo/client/parallel.h b/src/mongo/client/parallel.h index df1e1577980..b75ef115dc9 100644 --- a/src/mongo/client/parallel.h +++ b/src/mongo/client/parallel.h @@ -326,7 +326,12 @@ namespace mongo { private: - CommandResult( const string& server , const string& db , const BSONObj& cmd , int options , DBClientBase * conn ); + CommandResult( const string& server, + const string& db, + const BSONObj& cmd, + int options, + DBClientBase * conn, + bool useShardedConn ); void init(); string _server; @@ -334,7 +339,8 @@ namespace mongo { int _options; BSONObj _cmd; DBClientBase * _conn; - scoped_ptr<ScopedDbConnection> _connHolder; // used if not provided a connection + scoped_ptr<AScopedConnection> _connHolder; // used if not provided a connection + bool _useShardConn; scoped_ptr<DBClientCursor> _cursor; @@ -351,8 +357,14 @@ namespace mongo { * @param db db name * @param cmd cmd to exec * @param conn optional connection to use. will use standard pooled if non-specified + * @param useShardConn use ShardConnection */ - static shared_ptr<CommandResult> spawnCommand( const string& server , const string& db , const BSONObj& cmd , int options , DBClientBase * conn = 0 ); + static shared_ptr<CommandResult> spawnCommand( const string& server, + const string& db, + const BSONObj& cmd, + int options, + DBClientBase * conn = 0, + bool useShardConn = false ); }; diff --git a/src/mongo/db/client.cpp b/src/mongo/db/client.cpp index 98120cbcd7b..c35150cad04 100644 --- a/src/mongo/db/client.cpp +++ b/src/mongo/db/client.cpp @@ -268,12 +268,14 @@ namespace mongo { /** "read lock, and set my context, all in one operation" * This handles (if not recursively locked) opening an unopened database. */ - Client::ReadContext::ReadContext(const string& ns, const std::string& path) { + Client::ReadContext::ReadContext(const string& ns, + const std::string& path, + bool doVersion) { { lk.reset( new Lock::DBRead(ns) ); Database *db = dbHolder().get(ns, path); if( db ) { - c.reset( new Context(path, ns, db) ); + c.reset( new Context(path, ns, db, doVersion) ); return; } } @@ -284,17 +286,17 @@ namespace mongo { if( Lock::isW() ) { // write locked already DEV RARELY log() << "write locked on ReadContext construction " << ns << endl; - c.reset(new Context(ns, path)); + c.reset(new Context(ns, path, doVersion)); } else if( !Lock::nested() ) { lk.reset(0); { Lock::GlobalWrite w; - Context c(ns, path); + Context c(ns, path, doVersion); } // db could be closed at this interim point -- that is ok, we will throw, and don't mind throwing. lk.reset( new Lock::DBRead(ns) ); - c.reset(new Context(ns, path)); + c.reset(new Context(ns, path, doVersion)); } else { uasserted(15928, str::stream() << "can't open a database from a nested read lock " << ns); @@ -306,9 +308,9 @@ namespace mongo { // it would be easy to first check that there is at least a .ns file, or something similar. } - Client::WriteContext::WriteContext(const string& ns, const std::string& path) + Client::WriteContext::WriteContext(const string& ns, const std::string& path, bool doVersion) : _lk( ns ) , - _c(ns, path) { + _c(ns, path, doVersion) { } @@ -332,12 +334,12 @@ namespace mongo { } // invoked from ReadContext - Client::Context::Context(const string& path, const string& ns, Database *db) : + Client::Context::Context(const string& path, const string& ns, Database *db, bool doVersion) : _client( currentClient.get() ), _oldContext( _client->_context ), _path( path ), _justCreated(false), - _doVersion( true ), + _doVersion( doVersion ), _ns( ns ), _db(db) { diff --git a/src/mongo/db/client.h b/src/mongo/db/client.h index 4586ca640be..f90d7edd230 100644 --- a/src/mongo/db/client.h +++ b/src/mongo/db/client.h @@ -161,7 +161,9 @@ namespace mongo { */ class ReadContext : boost::noncopyable { public: - ReadContext(const std::string& ns, const std::string& path=storageGlobalParams.dbpath); + ReadContext(const std::string& ns, + const std::string& path=storageGlobalParams.dbpath, + bool doVersion = true); Context& ctx() { return *c.get(); } private: scoped_ptr<Lock::DBRead> lk; @@ -175,7 +177,7 @@ namespace mongo { public: /** this is probably what you want */ Context(const string& ns, const std::string& path=storageGlobalParams.dbpath, - bool doVersion=true); + bool doVersion = true); /** note: this does not call finishInit -- i.e., does not call shardVersionOk() for example. @@ -184,7 +186,7 @@ namespace mongo { Context(const std::string& ns , Database * db); // used by ReadContext - Context(const string& path, const string& ns, Database *db); + Context(const string& path, const string& ns, Database *db, bool doVersion = true); ~Context(); Client* getClient() const { return _client; } @@ -232,7 +234,9 @@ namespace mongo { class WriteContext : boost::noncopyable { public: - WriteContext(const string& ns, const std::string& path=storageGlobalParams.dbpath); + WriteContext(const string& ns, + const std::string& path=storageGlobalParams.dbpath, + bool doVersion = true); Context& ctx() { return _c; } private: Lock::DBWrite _lk; diff --git a/src/mongo/db/commands/create_indexes.cpp b/src/mongo/db/commands/create_indexes.cpp index 48a2546e276..0a783d6e3e6 100644 --- a/src/mongo/db/commands/create_indexes.cpp +++ b/src/mongo/db/commands/create_indexes.cpp @@ -131,7 +131,10 @@ namespace mongo { // We first take a read lock to see if we need to do anything // as many calls are ensureIndex (and hence no-ops), this is good so its a shared // lock for common calls. We only take write lock if needed. - Client::ReadContext readContext( ns ); + // Note: createIndexes command does not currently respect shard versioning. + Client::ReadContext readContext( ns, + storageGlobalParams.dbpath, + false /* doVersion */ ); const Collection* collection = readContext.ctx().db()->getCollection( ns.ns() ); if ( collection ) { for ( size_t i = 0; i < specs.size(); i++ ) { @@ -160,7 +163,10 @@ namespace mongo { } // now we know we have to create index(es) - Client::WriteContext writeContext( ns.ns() ); + // Note: createIndexes command does not currently respect shard versioning. + Client::WriteContext writeContext( ns.ns(), + storageGlobalParams.dbpath, + false /* doVersion */ ); Database* db = writeContext.ctx().db(); Collection* collection = db->getCollection( ns.ns() ); diff --git a/src/mongo/s/commands_public.cpp b/src/mongo/s/commands_public.cpp index bf9b205ead0..089dca8c38b 100644 --- a/src/mongo/s/commands_public.cpp +++ b/src/mongo/s/commands_public.cpp @@ -156,7 +156,12 @@ namespace mongo { class RunOnAllShardsCommand : public Command { public: - RunOnAllShardsCommand(const char* n, const char* oldname=NULL) : Command(n, false, oldname) {} + RunOnAllShardsCommand(const char* n, + const char* oldname=NULL, + bool useShardConn = false): + Command(n, false, oldname), + _useShardConn(useShardConn) { + } virtual bool slaveOk() const { return true; } virtual bool adminOnly() const { return false; } @@ -164,7 +169,6 @@ namespace mongo { // all grid commands are designed not to lock virtual LockType locktype() const { return NONE; } - // default impl uses all shards for DB virtual void getShards(const string& dbName , BSONObj& cmdObj, set<Shard>& shards) { DBConfigPtr conf = grid.getDBConfig( dbName , false ); @@ -190,7 +194,12 @@ namespace mongo { list< shared_ptr<Future::CommandResult> > futures; for ( set<Shard>::const_iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ) { - futures.push_back( Future::spawnCommand( i->getConnString() , dbName , cmdObj, 0 ) ); + futures.push_back( Future::spawnCommand( i->getConnString(), + dbName, + cmdObj, + 0, + NULL, + _useShardConn )); } vector<BSONObj> results; @@ -266,11 +275,17 @@ namespace mongo { return true; } + private: + bool _useShardConn; // use ShardConnection as opposed to ScopedDbConnection }; class AllShardsCollectionCommand : public RunOnAllShardsCommand { public: - AllShardsCollectionCommand(const char* n, const char* oldname=NULL) : RunOnAllShardsCommand(n, oldname) {} + AllShardsCollectionCommand(const char* n, + const char* oldname = NULL, + bool useShardConn = false): + RunOnAllShardsCommand(n, oldname, useShardConn) { + } virtual void getShards(const string& dbName , BSONObj& cmdObj, set<Shard>& shards) { string fullns = dbName + '.' + cmdObj.firstElement().valuestrsafe(); @@ -323,7 +338,13 @@ namespace mongo { class CreateIndexesCmd : public AllShardsCollectionCommand { public: - CreateIndexesCmd() : AllShardsCollectionCommand("createIndexes") {} + CreateIndexesCmd(): + AllShardsCollectionCommand("createIndexes", + NULL, /* oldName */ + true /* use ShardConnection */) { + // createIndexes command should use ShardConnection so the getLastError would + // be able to properly enforce the write concern (via the saveGLEStats callback). + } /** * the createIndexes command doesn't require the 'ns' field to be populated |