diff options
author | Eliot Horowitz <eliot@10gen.com> | 2010-06-11 15:12:25 -0400 |
---|---|---|
committer | Eliot Horowitz <eliot@10gen.com> | 2010-06-11 15:12:25 -0400 |
commit | 951ac0ff465cbf482b70db6e5f5626c6e0a13711 (patch) | |
tree | abe146982c5617eb4d806883266c1d45f93436b3 /s | |
parent | 37e6e26d3e3c27517cf2e40476b5f7271cab452c (diff) | |
download | mongo-951ac0ff465cbf482b70db6e5f5626c6e0a13711.tar.gz |
starting some sharding mongod logic cleaning
Diffstat (limited to 's')
-rw-r--r-- | s/d_logic.cpp | 32 | ||||
-rw-r--r-- | s/d_logic.h | 3 | ||||
-rw-r--r-- | s/d_writeback.cpp | 79 |
3 files changed, 84 insertions, 30 deletions
diff --git a/s/d_logic.cpp b/s/d_logic.cpp index 7d40b6e7544..7c0cee6ea15 100644 --- a/s/d_logic.cpp +++ b/s/d_logic.cpp @@ -36,6 +36,7 @@ #include "../util/queue.h" #include "shard.h" +#include "d_logic.h" using namespace std; @@ -49,7 +50,6 @@ namespace mongo { string shardConfigServer; boost::thread_specific_ptr<OID> clientServerIds; - map< string , BlockingQueue<BSONObj>* > clientQueues; unsigned long long getVersion( BSONElement e , string& errmsg ){ if ( e.eoo() ){ @@ -80,32 +80,7 @@ namespace mongo { } }; - class WriteBackCommand : public MongodShardCommand { - public: - virtual LockType locktype() const { return NONE; } - WriteBackCommand() : MongodShardCommand( "writebacklisten" ){} - void help(stringstream& h) const { h<<"internal"; } - bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ - BSONElement e = cmdObj.firstElement(); - if ( e.type() != jstOID ){ - errmsg = "need oid as first value"; - return 0; - } - - const OID id = e.__oid(); - - if ( ! clientQueues[id.str()] ) - clientQueues[id.str()] = new BlockingQueue<BSONObj>(); - - BSONObj z = clientQueues[id.str()]->blockingPop(); - log(1) << "WriteBackCommand got : " << z << endl; - - result.append( "data" , z ); - - return true; - } - } writeBackCommand; // setShardVersion( ns ) @@ -158,9 +133,6 @@ namespace mongo { OID * nid = new OID(); nid->init( s ); clientServerIds.reset( nid ); - - if ( ! clientQueues[s] ) - clientQueues[s] = new BlockingQueue<BSONObj>(); } else if ( clientId != *clientServerIds.get() ){ errmsg = "server id has changed!"; @@ -550,7 +522,7 @@ namespace mongo { b.append( "ns" , ns ); b.appendBinData( "msg" , m.header()->len , bdtCustom , (char*)(m.singleData()) ); log() << "writing back msg with len: " << m.header()->len << " op: " << m.operation() << endl; - clientQueues[clientID->str()]->push( b.obj() ); + queueWriteBack( clientID->str() , b.obj() ); return true; } diff --git a/s/d_logic.h b/s/d_logic.h index f08c31d59c4..f3174b8bd31 100644 --- a/s/d_logic.h +++ b/s/d_logic.h @@ -22,6 +22,9 @@ namespace mongo { + /* queue a write back on a remote server for a failed write */ + void queueWriteBack( const string& remote , const BSONObj& o ); + /** * @return true if we have any shard info for the ns */ diff --git a/s/d_writeback.cpp b/s/d_writeback.cpp new file mode 100644 index 00000000000..4c0e134b23d --- /dev/null +++ b/s/d_writeback.cpp @@ -0,0 +1,79 @@ +// d_logic.cpp + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "../pch.h" + +#include "../db/commands.h" +#include "../db/jsobj.h" +#include "../db/dbmessage.h" +#include "../db/query.h" + +#include "../client/connpool.h" + +#include "../util/queue.h" + +#include "shard.h" + +using namespace std; + +namespace mongo { + + map< string , BlockingQueue<BSONObj>* > writebackQueue; + mongo::mutex writebackQueueLock("sharding:writebackQueueLock"); + + BlockingQueue<BSONObj>* getWritebackQueue( const string& remote ){ + scoped_lock lk (writebackQueueLock ); + BlockingQueue<BSONObj>*& q = writebackQueue[remote]; + if ( ! q ) + q = new BlockingQueue<BSONObj>(); + return q; + } + + void queueWriteBack( const string& remote , const BSONObj& o ){ + getWritebackQueue( remote )->push( o ); + } + + class WriteBackCommand : public Command { + public: + virtual LockType locktype() const { return NONE; } + virtual bool slaveOk() const { return false; } + virtual bool adminOnly() const { return true; } + + WriteBackCommand() : Command( "writebacklisten" ){} + + void help(stringstream& h) const { h<<"internal"; } + + bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + + BSONElement e = cmdObj.firstElement(); + if ( e.type() != jstOID ){ + errmsg = "need oid as first value"; + return 0; + } + + const OID id = e.__oid(); + BSONObj z = getWritebackQueue(id.str())->blockingPop(); + log(1) << "WriteBackCommand got : " << z << endl; + + result.append( "data" , z ); + + return true; + } + } writeBackCommand; + +} |