diff options
-rw-r--r-- | db/lasterror.h | 2 | ||||
-rw-r--r-- | db/repl/heartbeat.cpp | 10 | ||||
-rw-r--r-- | db/repl/manager.cpp | 8 | ||||
-rw-r--r-- | db/repl/replset.cpp | 2 | ||||
-rw-r--r-- | db/repl/replset.h | 9 | ||||
-rw-r--r-- | db/repl/testing.js | 11 | ||||
-rw-r--r-- | pch.h | 2 | ||||
-rw-r--r-- | util/concurrency/msg.h | 16 | ||||
-rw-r--r-- | util/concurrency/task.cpp | 29 |
9 files changed, 35 insertions, 54 deletions
diff --git a/db/lasterror.h b/db/lasterror.h index 45a2d175298..f70cf32af4f 100644 --- a/db/lasterror.h +++ b/db/lasterror.h @@ -111,7 +111,7 @@ namespace mongo { LastError *le = lastError.get(); if ( le == 0 ) { /* might be intentional (non-user thread) */ - DEV log() << "warning dev: lastError==0 won't report:" << msg << '\n'; + DEV log() << "warning dev: lastError==0 won't report:" << msg << endl; } else if ( le->disabled ) { log() << "lastError disabled, can't report: " << msg << endl; } else { diff --git a/db/repl/heartbeat.cpp b/db/repl/heartbeat.cpp index f258b58041f..141271cf7dd 100644 --- a/db/repl/heartbeat.cpp +++ b/db/repl/heartbeat.cpp @@ -121,7 +121,9 @@ namespace mongo { be cfg = info["config"]; if( cfg.ok() ) { // received a new config - theReplSet->mgr->send(cfg.Obj().copy()); + boost::function<void()> f = + boost::bind(&ReplSet::Manager::msgReceivedNewConfig, theReplSet->mgr, cfg.Obj().copy()); + theReplSet->mgr->send(f); } } else { @@ -132,9 +134,9 @@ namespace mongo { down(mem, "connect/transport error"); } m = mem; - theReplSet->mgr->send(mem); + theReplSet->mgr->send( boost::bind(&ReplSet::msgUpdateHBInfo, theReplSet, mem) ); if( mem.changed(old) ) { - theReplSet->mgr->send(ReplSet::Manager::CheckNewState); + theReplSet->mgr->send( boost::bind(&ReplSet::Manager::msgCheckNewState, theReplSet->mgr) ); } } @@ -163,7 +165,7 @@ namespace mongo { m = m->next(); } - mgr->send(Manager::CheckNewState); + mgr->send( boost::bind(&ReplSet::Manager::msgCheckNewState, theReplSet->mgr) ); } } diff --git a/db/repl/manager.cpp b/db/repl/manager.cpp index b8533f09694..05c822551f6 100644 --- a/db/repl/manager.cpp +++ b/db/repl/manager.cpp @@ -41,7 +41,7 @@ namespace mongo { return p; } - ReplSet::Manager::Manager(ReplSet *rs) : _rs(rs), _primary(NOPRIMARY) + ReplSet::Manager::Manager(ReplSet *rs) : task::Port("ReplSet::Manager"), _rs(rs), _primary(NOPRIMARY) { } @@ -51,7 +51,7 @@ namespace mongo { } /** called as the health threads get new results */ - void ReplSet::Manager::checkNewState() { + void ReplSet::Manager::msgCheckNewState() { { const Member *p = _rs->currentPrimary(); const Member *p2 = findOtherPrimary(); @@ -97,7 +97,7 @@ namespace mongo { _rs->elect.electSelf(); } - bool ReplSet::Manager::got(const any& msg) { +/* bool ReplSet::Manager::got(const any& msg) { if( msg.type() == typeid(Messages) ) { assert( CheckNewState == any_cast<Messages>(msg) ); checkNewState(); @@ -115,6 +115,6 @@ namespace mongo { assert(false); } return true; - } + }*/ } diff --git a/db/repl/replset.cpp b/db/repl/replset.cpp index 55514f3751d..13ff0b15577 100644 --- a/db/repl/replset.cpp +++ b/db/repl/replset.cpp @@ -25,7 +25,7 @@ namespace mongo { bool replSet = false; ReplSet *theReplSet = 0; - void ReplSet::updateHBInfo(const HeartbeatInfo& h) { + void ReplSet::msgUpdateHBInfo(HeartbeatInfo h) { for( Member *m = _members.head(); m; m=m->next() ) { if( m->id() == h.id() ) { m->_hbinfo = h; diff --git a/db/repl/replset.h b/db/repl/replset.h index dfd28292ff7..6eda7e8130d 100644 --- a/db/repl/replset.h +++ b/db/repl/replset.h @@ -113,7 +113,7 @@ namespace mongo { list<HostAndPort> memberHostnames() const; const Member* currentPrimary() const { return _currentPrimary; } const ReplSetConfig::MemberCfg& myConfig() const { return _self->config(); } - void updateHBInfo(const HeartbeatInfo& h); + void msgUpdateHBInfo(HeartbeatInfo); private: const Member *_currentPrimary; @@ -131,18 +131,15 @@ namespace mongo { public: class Manager : public task::Port { - string name() { return "ReplSet::Manager"; } bool got(const any&); ReplSet *_rs; int _primary; const Member* findOtherPrimary(); void noteARemoteIsPrimary(const Member *); - void checkNewState(); public: Manager(ReplSet *rs); - enum Messages { - CheckNewState - }; + void msgReceivedNewConfig(BSONObj) { assert(false); } + void msgCheckNewState(); }; shared_ptr<Manager> mgr; diff --git a/db/repl/testing.js b/db/repl/testing.js index 795ba84d595..d741cf3a644 100644 --- a/db/repl/testing.js +++ b/db/repl/testing.js @@ -20,8 +20,6 @@ c2 = { db = db.getSisterDB("admin");
local = db.getSisterDB("local");
-b = new Mongo("localhost:27002").getDB("admin");
-
print("\n\ndb = admin db on localhost:27017");
print("b = admin on localhost:27002");
print("rc(x) = db.runCommand(x)");
@@ -33,3 +31,12 @@ print("\n\n"); function rc(c) { return db.runCommand(c); }
function i() { return rc({ replSetInitiate: cfg }); }
function ism() { return rc("isMaster"); }
+
+b = 0;
+try {
+ b = new Mongo("localhost:27002").getDB("admin");
+}
+catch (e) {
+ print("\nCouldn't connect to b mongod instance\n");
+}
+
@@ -103,6 +103,8 @@ namespace mongo { #include <boost/program_options.hpp> #include <boost/shared_ptr.hpp> #include <boost/smart_ptr.hpp> +#include "boost/bind.hpp"
+#include "boost/function.hpp" #define BOOST_SPIRIT_THREADSAFE #include <boost/version.hpp> diff --git a/util/concurrency/msg.h b/util/concurrency/msg.h index 57925964939..5a9567d8caf 100644 --- a/util/concurrency/msg.h +++ b/util/concurrency/msg.h @@ -25,30 +25,24 @@ namespace mongo { namespace task { - /** See task::PortUnitTest in task.cpp for an example of usage. - */ class Port : private Task { - protected: - /** implement a receiver of messages for the port. - @return false to stop the port / terminate the thread (i.e. you want to return true) - */ - virtual bool got(const any& msg) = 0; - virtual string name() = 0; // names the thread public: /** send a message to the port */ - void send(const any& msg); + void send( boost::function<void()> ); /** typical usage is: task::fork( foo.task() ); */ shared_ptr<Task> taskPtr() { return shared_ptr<Task>(static_cast<Task*>(this)); } - Port() { } + Port(string name) : _name(name) { } virtual ~Port() { } private: + virtual string name() { return _name; } void doWork(); - deque<any> d; + deque< boost::function<void()> > d; boost::mutex m; boost::condition c; + string _name; }; } diff --git a/util/concurrency/task.cpp b/util/concurrency/task.cpp index a02c875839b..68507a45198 100644 --- a/util/concurrency/task.cpp +++ b/util/concurrency/task.cpp @@ -92,7 +92,7 @@ namespace mongo { namespace mongo { namespace task { - void Port::send(const any& msg) { + void Port::send( boost::function<void()> msg ) { { boost::mutex::scoped_lock lk(m); d.push_back(msg); @@ -102,42 +102,21 @@ namespace mongo { void Port::doWork() { while( 1 ) { - any a; + boost::function<void()> f; { boost::mutex::scoped_lock lk(m); while( d.empty() ) c.wait(lk); - a = d.front(); + f = d.front(); d.pop_front(); } try { - if( !got(a) ) - break; + f(); } catch(std::exception& e) { log() << "Port::doWork() exception " << e.what() << endl; } } } - class PortTest : public Port { - protected: - void got(const int& msg) { } - public: - virtual bool got(const any& msg) { - assert( any_cast<int>(msg) <= 55 ); - return any_cast<int>(msg) != 55; - } - virtual string name() { return "PortTest"; } - }; - - struct PortUnitTest : public UnitTest { - void run() { - PortTest *p = new PortTest(); - shared_ptr<Task> tp = p->taskPtr(); - fork( tp ); - p->send(3); - p->send(55); - } - } portunittest; } } |