diff options
author | Eliot Horowitz <eliot@10gen.com> | 2009-09-11 16:14:14 -0400 |
---|---|---|
committer | Eliot Horowitz <eliot@10gen.com> | 2009-09-11 16:14:14 -0400 |
commit | 335d92c443e23df5912f31f6310b8dbb5b23fda3 (patch) | |
tree | 1fa1d9ba83ff44a44fbc0718ab30c8d9ffc30148 | |
parent | 1811d1e47f8894935b9c8f4f3bc004cf86b5f2b0 (diff) | |
download | mongo-335d92c443e23df5912f31f6310b8dbb5b23fda3.tar.gz |
client id work and make getLastError work non sharded-collections in sharding, but with connection pooling SHARDING-16
-rw-r--r-- | db/db.cpp | 12 | ||||
-rw-r--r-- | db/instance.cpp | 5 | ||||
-rw-r--r-- | jstests/sharding/error1.js | 15 | ||||
-rw-r--r-- | s/commands_admin.cpp | 16 | ||||
-rw-r--r-- | s/commands_public.cpp | 3 | ||||
-rw-r--r-- | s/server.cpp | 10 | ||||
-rw-r--r-- | util/message.cpp | 45 | ||||
-rw-r--r-- | util/message.h | 8 |
8 files changed, 79 insertions, 35 deletions
diff --git a/db/db.cpp b/db/db.cpp index 0ddac3f8c29..e3828de6287 100644 --- a/db/db.cpp +++ b/db/db.cpp @@ -145,18 +145,6 @@ namespace mongo { } } - class JniMessagingPort : public AbstractMessagingPort { - public: - JniMessagingPort(Message& _container) : container(_container) { } - void reply(Message& received, Message& response, MSGID) { - container = response; - } - void reply(Message& received, Message& response) { - container = response; - } - Message & container; - }; - } // namespace mongo #include "lasterror.h" diff --git a/db/instance.cpp b/db/instance.cpp index 9602e592d89..bd7b6dc60df 100644 --- a/db/instance.cpp +++ b/db/instance.cpp @@ -487,9 +487,12 @@ namespace mongo { void reply(Message& received, Message& response) { container = response; } + unsigned remotePort(){ + return 1; + } Message & container; }; - + /* a call from java/js to the database locally. m - inbound message diff --git a/jstests/sharding/error1.js b/jstests/sharding/error1.js new file mode 100644 index 00000000000..f995a2bc26c --- /dev/null +++ b/jstests/sharding/error1.js @@ -0,0 +1,15 @@ + +s = new ShardingTest( "error1" , 2 , 1 , 1 ); +s.adminCommand( { enablesharding : "test" } ); + +// ---- simple getLastError ---- + +db = s.getDB( "test" ); +db.foo.insert( { _id : 1 } ); +assert.isnull( db.getLastError() , "gle 1" ); +db.foo.insert( { _id : 1 } ); +assert( db.getLastError() , "gle21" ); + + +// ---- +s.stop(); diff --git a/s/commands_admin.cpp b/s/commands_admin.cpp index 766b1704669..7aa5b511b53 100644 --- a/s/commands_admin.cpp +++ b/s/commands_admin.cpp @@ -582,10 +582,18 @@ namespace mongo { help << "check for an error on the last command executed"; } CmdShardingGetLastError() : Command("getlasterror") { } - virtual bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { - errmsg += "getlasterror not working yet for sharded environments"; - result << "ok" << 0; - return false; + virtual bool run(const char *nsraw, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { + string dbName = nsraw; + dbName = dbName.substr( 0 , dbName.size() - 5 ); + + DBConfig * conf = grid.getDBConfig( dbName , false ); + + ScopedDbConnection conn( conf->getPrimary() ); + BSONObj res; + bool ok = conn->runCommand( conf->getName() , cmdObj , res ); + result.appendElements( res ); + conn.done(); + return ok; } } cmdGetLastError; diff --git a/s/commands_public.cpp b/s/commands_public.cpp index 3aece13fa4f..f94b62ff30c 100644 --- a/s/commands_public.cpp +++ b/s/commands_public.cpp @@ -45,12 +45,13 @@ namespace mongo { string getDBName( string ns ){ return ns.substr( 0 , ns.size() - 5 ); } - + bool passthrough( DBConfig * conf, const BSONObj& cmdObj , BSONObjBuilder& result ){ ScopedDbConnection conn( conf->getPrimary() ); BSONObj res; bool ok = conn->runCommand( conf->getName() , cmdObj , res ); result.appendElements( res ); + conn.done(); return ok; } }; diff --git a/s/server.cpp b/s/server.cpp index 57740885ff4..0dffe9ccb46 100644 --- a/s/server.cpp +++ b/s/server.cpp @@ -58,6 +58,13 @@ namespace mongo { // out() << " in our hostname with \"-grid\".\n"; out() << endl; } + + class ShardingConnectionHook : public DBConnectionHook { + public: + virtual void onCreate( DBClientBase * conn ){ + conn->simpleCommand( "admin" , 0 , "switchtoclienterrors" ); + } + } shardingConnectionHook; class ShardedMessageHandler : public MessageHandler { public: @@ -65,6 +72,7 @@ namespace mongo { virtual void process( Message& m , AbstractMessagingPort* p ){ Request r( m , p ); try { + setClientId( p->remotePort() << 16 ); r.process(); } catch ( DBException& e ){ @@ -149,6 +157,8 @@ int main(int argc, char* argv[], char *envp[] ) { return 0; } + pool.addHook( &shardingConnectionHook ); + if ( argc <= 1 ) { usage( argv ); return 3; diff --git a/util/message.cpp b/util/message.cpp index 436fd59673f..b8091617f9a 100644 --- a/util/message.cpp +++ b/util/message.cpp @@ -135,14 +135,6 @@ namespace mongo { char * _cur; }; - MSGID NextMsgId; - struct MsgStart { - MsgStart() { - NextMsgId = (((unsigned) time(0)) << 16) ^ curTimeMillis(); - assert(MsgDataHeaderSize == 16); - } - } msgstart; - class Ports { set<MessagingPort*>& ports; boost::mutex& m; @@ -380,9 +372,7 @@ again: void MessagingPort::say(Message& toSend, int responseTo) { mmm( out() << "* say() sock:" << this->sock << " thr:" << GetCurrentThreadId() << endl; ) - MSGID msgid = NextMsgId; - ++NextMsgId; - toSend.data->id = msgid; + toSend.data->id = nextMessageId(); toSend.data->responseTo = responseTo; int x = -100; @@ -418,9 +408,7 @@ again: } // we're going to be storing this, so need to set it up - MSGID msgid = NextMsgId; - ++NextMsgId; - toSend.data->id = msgid; + toSend.data->id = nextMessageId(); toSend.data->responseTo = responseTo; if ( ! piggyBackData ) @@ -429,14 +417,41 @@ again: piggyBackData->append( toSend ); } + unsigned MessagingPort::remotePort(){ + return farEnd.getPort(); + } + + MSGID NextMsgId; + bool usingClientIds = 0; + ThreadLocalInt clientId; + + struct MsgStart { + MsgStart() { + NextMsgId = (((unsigned) time(0)) << 16) ^ curTimeMillis(); + assert(MsgDataHeaderSize == 16); + } + } msgstart; + MSGID nextMessageId(){ MSGID msgid = NextMsgId; ++NextMsgId; + + if ( usingClientIds ){ + msgid = msgid & 0xFFFF; + msgid = msgid | clientId.get(); + } + return msgid; } bool doesOpGetAResponse( int op ){ return op == dbQuery || op == dbGetMore; } - + + void setClientId( int id ){ + usingClientIds = true; + id = id & 0xFFFF0000; + massert( "invalid id" , id ); + clientId.reset( id ); + } } // namespace mongo diff --git a/util/message.h b/util/message.h index 276e14e9d49..fef93ea1852 100644 --- a/util/message.h +++ b/util/message.h @@ -48,16 +48,18 @@ namespace mongo { virtual ~AbstractMessagingPort() { } virtual void reply(Message& received, Message& response, MSGID responseTo) = 0; // like the reply below, but doesn't rely on received.data still being available virtual void reply(Message& received, Message& response) = 0; + + virtual unsigned remotePort() = 0 ; }; class MessagingPort : public AbstractMessagingPort { public: MessagingPort(int sock, SockAddr& farEnd); MessagingPort(); - ~MessagingPort(); + virtual ~MessagingPort(); void shutdown(); - + bool connect(SockAddr& farEnd); /* it's assumed if you reuse a message object, that it doesn't cross MessagingPort's. @@ -71,6 +73,7 @@ namespace mongo { void piggyBack( Message& toSend , int responseTo = -1 ); + virtual unsigned remotePort(); private: int sock; PiggyBackData * piggyBackData; @@ -199,4 +202,5 @@ namespace mongo { MSGID nextMessageId(); + void setClientId( int id ); } // namespace mongo |