summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEliot Horowitz <eliot@10gen.com>2009-09-11 16:14:14 -0400
committerEliot Horowitz <eliot@10gen.com>2009-09-11 16:14:14 -0400
commit335d92c443e23df5912f31f6310b8dbb5b23fda3 (patch)
tree1fa1d9ba83ff44a44fbc0718ab30c8d9ffc30148
parent1811d1e47f8894935b9c8f4f3bc004cf86b5f2b0 (diff)
downloadmongo-335d92c443e23df5912f31f6310b8dbb5b23fda3.tar.gz
client id work and make getLastError work non sharded-collections in sharding, but with connection pooling SHARDING-16
-rw-r--r--db/db.cpp12
-rw-r--r--db/instance.cpp5
-rw-r--r--jstests/sharding/error1.js15
-rw-r--r--s/commands_admin.cpp16
-rw-r--r--s/commands_public.cpp3
-rw-r--r--s/server.cpp10
-rw-r--r--util/message.cpp45
-rw-r--r--util/message.h8
8 files changed, 79 insertions, 35 deletions
diff --git a/db/db.cpp b/db/db.cpp
index 0ddac3f8c29..e3828de6287 100644
--- a/db/db.cpp
+++ b/db/db.cpp
@@ -145,18 +145,6 @@ namespace mongo {
}
}
- class JniMessagingPort : public AbstractMessagingPort {
- public:
- JniMessagingPort(Message& _container) : container(_container) { }
- void reply(Message& received, Message& response, MSGID) {
- container = response;
- }
- void reply(Message& received, Message& response) {
- container = response;
- }
- Message & container;
- };
-
} // namespace mongo
#include "lasterror.h"
diff --git a/db/instance.cpp b/db/instance.cpp
index 9602e592d89..bd7b6dc60df 100644
--- a/db/instance.cpp
+++ b/db/instance.cpp
@@ -487,9 +487,12 @@ namespace mongo {
void reply(Message& received, Message& response) {
container = response;
}
+ unsigned remotePort(){
+ return 1;
+ }
Message & container;
};
-
+
/* a call from java/js to the database locally.
m - inbound message
diff --git a/jstests/sharding/error1.js b/jstests/sharding/error1.js
new file mode 100644
index 00000000000..f995a2bc26c
--- /dev/null
+++ b/jstests/sharding/error1.js
@@ -0,0 +1,15 @@
+
+s = new ShardingTest( "error1" , 2 , 1 , 1 );
+s.adminCommand( { enablesharding : "test" } );
+
+// ---- simple getLastError ----
+
+db = s.getDB( "test" );
+db.foo.insert( { _id : 1 } );
+assert.isnull( db.getLastError() , "gle 1" );
+db.foo.insert( { _id : 1 } );
+assert( db.getLastError() , "gle21" );
+
+
+// ----
+s.stop();
diff --git a/s/commands_admin.cpp b/s/commands_admin.cpp
index 766b1704669..7aa5b511b53 100644
--- a/s/commands_admin.cpp
+++ b/s/commands_admin.cpp
@@ -582,10 +582,18 @@ namespace mongo {
help << "check for an error on the last command executed";
}
CmdShardingGetLastError() : Command("getlasterror") { }
- virtual bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
- errmsg += "getlasterror not working yet for sharded environments";
- result << "ok" << 0;
- return false;
+ virtual bool run(const char *nsraw, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
+ string dbName = nsraw;
+ dbName = dbName.substr( 0 , dbName.size() - 5 );
+
+ DBConfig * conf = grid.getDBConfig( dbName , false );
+
+ ScopedDbConnection conn( conf->getPrimary() );
+ BSONObj res;
+ bool ok = conn->runCommand( conf->getName() , cmdObj , res );
+ result.appendElements( res );
+ conn.done();
+ return ok;
}
} cmdGetLastError;
diff --git a/s/commands_public.cpp b/s/commands_public.cpp
index 3aece13fa4f..f94b62ff30c 100644
--- a/s/commands_public.cpp
+++ b/s/commands_public.cpp
@@ -45,12 +45,13 @@ namespace mongo {
string getDBName( string ns ){
return ns.substr( 0 , ns.size() - 5 );
}
-
+
bool passthrough( DBConfig * conf, const BSONObj& cmdObj , BSONObjBuilder& result ){
ScopedDbConnection conn( conf->getPrimary() );
BSONObj res;
bool ok = conn->runCommand( conf->getName() , cmdObj , res );
result.appendElements( res );
+ conn.done();
return ok;
}
};
diff --git a/s/server.cpp b/s/server.cpp
index 57740885ff4..0dffe9ccb46 100644
--- a/s/server.cpp
+++ b/s/server.cpp
@@ -58,6 +58,13 @@ namespace mongo {
// out() << " in our hostname with \"-grid\".\n";
out() << endl;
}
+
+ class ShardingConnectionHook : public DBConnectionHook {
+ public:
+ virtual void onCreate( DBClientBase * conn ){
+ conn->simpleCommand( "admin" , 0 , "switchtoclienterrors" );
+ }
+ } shardingConnectionHook;
class ShardedMessageHandler : public MessageHandler {
public:
@@ -65,6 +72,7 @@ namespace mongo {
virtual void process( Message& m , AbstractMessagingPort* p ){
Request r( m , p );
try {
+ setClientId( p->remotePort() << 16 );
r.process();
}
catch ( DBException& e ){
@@ -149,6 +157,8 @@ int main(int argc, char* argv[], char *envp[] ) {
return 0;
}
+ pool.addHook( &shardingConnectionHook );
+
if ( argc <= 1 ) {
usage( argv );
return 3;
diff --git a/util/message.cpp b/util/message.cpp
index 436fd59673f..b8091617f9a 100644
--- a/util/message.cpp
+++ b/util/message.cpp
@@ -135,14 +135,6 @@ namespace mongo {
char * _cur;
};
- MSGID NextMsgId;
- struct MsgStart {
- MsgStart() {
- NextMsgId = (((unsigned) time(0)) << 16) ^ curTimeMillis();
- assert(MsgDataHeaderSize == 16);
- }
- } msgstart;
-
class Ports {
set<MessagingPort*>& ports;
boost::mutex& m;
@@ -380,9 +372,7 @@ again:
void MessagingPort::say(Message& toSend, int responseTo) {
mmm( out() << "* say() sock:" << this->sock << " thr:" << GetCurrentThreadId() << endl; )
- MSGID msgid = NextMsgId;
- ++NextMsgId;
- toSend.data->id = msgid;
+ toSend.data->id = nextMessageId();
toSend.data->responseTo = responseTo;
int x = -100;
@@ -418,9 +408,7 @@ again:
}
// we're going to be storing this, so need to set it up
- MSGID msgid = NextMsgId;
- ++NextMsgId;
- toSend.data->id = msgid;
+ toSend.data->id = nextMessageId();
toSend.data->responseTo = responseTo;
if ( ! piggyBackData )
@@ -429,14 +417,41 @@ again:
piggyBackData->append( toSend );
}
+ unsigned MessagingPort::remotePort(){
+ return farEnd.getPort();
+ }
+
+ MSGID NextMsgId;
+ bool usingClientIds = 0;
+ ThreadLocalInt clientId;
+
+ struct MsgStart {
+ MsgStart() {
+ NextMsgId = (((unsigned) time(0)) << 16) ^ curTimeMillis();
+ assert(MsgDataHeaderSize == 16);
+ }
+ } msgstart;
+
MSGID nextMessageId(){
MSGID msgid = NextMsgId;
++NextMsgId;
+
+ if ( usingClientIds ){
+ msgid = msgid & 0xFFFF;
+ msgid = msgid | clientId.get();
+ }
+
return msgid;
}
bool doesOpGetAResponse( int op ){
return op == dbQuery || op == dbGetMore;
}
-
+
+ void setClientId( int id ){
+ usingClientIds = true;
+ id = id & 0xFFFF0000;
+ massert( "invalid id" , id );
+ clientId.reset( id );
+ }
} // namespace mongo
diff --git a/util/message.h b/util/message.h
index 276e14e9d49..fef93ea1852 100644
--- a/util/message.h
+++ b/util/message.h
@@ -48,16 +48,18 @@ namespace mongo {
virtual ~AbstractMessagingPort() { }
virtual void reply(Message& received, Message& response, MSGID responseTo) = 0; // like the reply below, but doesn't rely on received.data still being available
virtual void reply(Message& received, Message& response) = 0;
+
+ virtual unsigned remotePort() = 0 ;
};
class MessagingPort : public AbstractMessagingPort {
public:
MessagingPort(int sock, SockAddr& farEnd);
MessagingPort();
- ~MessagingPort();
+ virtual ~MessagingPort();
void shutdown();
-
+
bool connect(SockAddr& farEnd);
/* it's assumed if you reuse a message object, that it doesn't cross MessagingPort's.
@@ -71,6 +73,7 @@ namespace mongo {
void piggyBack( Message& toSend , int responseTo = -1 );
+ virtual unsigned remotePort();
private:
int sock;
PiggyBackData * piggyBackData;
@@ -199,4 +202,5 @@ namespace mongo {
MSGID nextMessageId();
+ void setClientId( int id );
} // namespace mongo