summaryrefslogtreecommitdiff
path: root/s
diff options
context:
space:
mode:
authorEliot Horowitz <eliot@10gen.com>2010-06-11 15:12:25 -0400
committerEliot Horowitz <eliot@10gen.com>2010-06-11 15:12:25 -0400
commit951ac0ff465cbf482b70db6e5f5626c6e0a13711 (patch)
treeabe146982c5617eb4d806883266c1d45f93436b3 /s
parent37e6e26d3e3c27517cf2e40476b5f7271cab452c (diff)
downloadmongo-951ac0ff465cbf482b70db6e5f5626c6e0a13711.tar.gz
starting some sharding mongod logic cleaning
Diffstat (limited to 's')
-rw-r--r--s/d_logic.cpp32
-rw-r--r--s/d_logic.h3
-rw-r--r--s/d_writeback.cpp79
3 files changed, 84 insertions, 30 deletions
diff --git a/s/d_logic.cpp b/s/d_logic.cpp
index 7d40b6e7544..7c0cee6ea15 100644
--- a/s/d_logic.cpp
+++ b/s/d_logic.cpp
@@ -36,6 +36,7 @@
#include "../util/queue.h"
#include "shard.h"
+#include "d_logic.h"
using namespace std;
@@ -49,7 +50,6 @@ namespace mongo {
string shardConfigServer;
boost::thread_specific_ptr<OID> clientServerIds;
- map< string , BlockingQueue<BSONObj>* > clientQueues;
unsigned long long getVersion( BSONElement e , string& errmsg ){
if ( e.eoo() ){
@@ -80,32 +80,7 @@ namespace mongo {
}
};
- class WriteBackCommand : public MongodShardCommand {
- public:
- virtual LockType locktype() const { return NONE; }
- WriteBackCommand() : MongodShardCommand( "writebacklisten" ){}
- void help(stringstream& h) const { h<<"internal"; }
- bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
- BSONElement e = cmdObj.firstElement();
- if ( e.type() != jstOID ){
- errmsg = "need oid as first value";
- return 0;
- }
-
- const OID id = e.__oid();
-
- if ( ! clientQueues[id.str()] )
- clientQueues[id.str()] = new BlockingQueue<BSONObj>();
-
- BSONObj z = clientQueues[id.str()]->blockingPop();
- log(1) << "WriteBackCommand got : " << z << endl;
-
- result.append( "data" , z );
-
- return true;
- }
- } writeBackCommand;
// setShardVersion( ns )
@@ -158,9 +133,6 @@ namespace mongo {
OID * nid = new OID();
nid->init( s );
clientServerIds.reset( nid );
-
- if ( ! clientQueues[s] )
- clientQueues[s] = new BlockingQueue<BSONObj>();
}
else if ( clientId != *clientServerIds.get() ){
errmsg = "server id has changed!";
@@ -550,7 +522,7 @@ namespace mongo {
b.append( "ns" , ns );
b.appendBinData( "msg" , m.header()->len , bdtCustom , (char*)(m.singleData()) );
log() << "writing back msg with len: " << m.header()->len << " op: " << m.operation() << endl;
- clientQueues[clientID->str()]->push( b.obj() );
+ queueWriteBack( clientID->str() , b.obj() );
return true;
}
diff --git a/s/d_logic.h b/s/d_logic.h
index f08c31d59c4..f3174b8bd31 100644
--- a/s/d_logic.h
+++ b/s/d_logic.h
@@ -22,6 +22,9 @@
namespace mongo {
+ /* queue a write back on a remote server for a failed write */
+ void queueWriteBack( const string& remote , const BSONObj& o );
+
/**
* @return true if we have any shard info for the ns
*/
diff --git a/s/d_writeback.cpp b/s/d_writeback.cpp
new file mode 100644
index 00000000000..4c0e134b23d
--- /dev/null
+++ b/s/d_writeback.cpp
@@ -0,0 +1,79 @@
+// d_logic.cpp
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "../pch.h"
+
+#include "../db/commands.h"
+#include "../db/jsobj.h"
+#include "../db/dbmessage.h"
+#include "../db/query.h"
+
+#include "../client/connpool.h"
+
+#include "../util/queue.h"
+
+#include "shard.h"
+
+using namespace std;
+
+namespace mongo {
+
+ map< string , BlockingQueue<BSONObj>* > writebackQueue;
+ mongo::mutex writebackQueueLock("sharding:writebackQueueLock");
+
+ BlockingQueue<BSONObj>* getWritebackQueue( const string& remote ){
+ scoped_lock lk (writebackQueueLock );
+ BlockingQueue<BSONObj>*& q = writebackQueue[remote];
+ if ( ! q )
+ q = new BlockingQueue<BSONObj>();
+ return q;
+ }
+
+ void queueWriteBack( const string& remote , const BSONObj& o ){
+ getWritebackQueue( remote )->push( o );
+ }
+
+ class WriteBackCommand : public Command {
+ public:
+ virtual LockType locktype() const { return NONE; }
+ virtual bool slaveOk() const { return false; }
+ virtual bool adminOnly() const { return true; }
+
+ WriteBackCommand() : Command( "writebacklisten" ){}
+
+ void help(stringstream& h) const { h<<"internal"; }
+
+ bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+
+ BSONElement e = cmdObj.firstElement();
+ if ( e.type() != jstOID ){
+ errmsg = "need oid as first value";
+ return 0;
+ }
+
+ const OID id = e.__oid();
+ BSONObj z = getWritebackQueue(id.str())->blockingPop();
+ log(1) << "WriteBackCommand got : " << z << endl;
+
+ result.append( "data" , z );
+
+ return true;
+ }
+ } writeBackCommand;
+
+}