summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--db/lasterror.h2
-rw-r--r--db/repl/heartbeat.cpp10
-rw-r--r--db/repl/manager.cpp8
-rw-r--r--db/repl/replset.cpp2
-rw-r--r--db/repl/replset.h9
-rw-r--r--db/repl/testing.js11
-rw-r--r--pch.h2
-rw-r--r--util/concurrency/msg.h16
-rw-r--r--util/concurrency/task.cpp29
9 files changed, 35 insertions, 54 deletions
diff --git a/db/lasterror.h b/db/lasterror.h
index 45a2d175298..f70cf32af4f 100644
--- a/db/lasterror.h
+++ b/db/lasterror.h
@@ -111,7 +111,7 @@ namespace mongo {
LastError *le = lastError.get();
if ( le == 0 ) {
/* might be intentional (non-user thread) */
- DEV log() << "warning dev: lastError==0 won't report:" << msg << '\n';
+ DEV log() << "warning dev: lastError==0 won't report:" << msg << endl;
} else if ( le->disabled ) {
log() << "lastError disabled, can't report: " << msg << endl;
} else {
diff --git a/db/repl/heartbeat.cpp b/db/repl/heartbeat.cpp
index f258b58041f..141271cf7dd 100644
--- a/db/repl/heartbeat.cpp
+++ b/db/repl/heartbeat.cpp
@@ -121,7 +121,9 @@ namespace mongo {
be cfg = info["config"];
if( cfg.ok() ) {
// received a new config
- theReplSet->mgr->send(cfg.Obj().copy());
+ boost::function<void()> f =
+ boost::bind(&ReplSet::Manager::msgReceivedNewConfig, theReplSet->mgr, cfg.Obj().copy());
+ theReplSet->mgr->send(f);
}
}
else {
@@ -132,9 +134,9 @@ namespace mongo {
down(mem, "connect/transport error");
}
m = mem;
- theReplSet->mgr->send(mem);
+ theReplSet->mgr->send( boost::bind(&ReplSet::msgUpdateHBInfo, theReplSet, mem) );
if( mem.changed(old) ) {
- theReplSet->mgr->send(ReplSet::Manager::CheckNewState);
+ theReplSet->mgr->send( boost::bind(&ReplSet::Manager::msgCheckNewState, theReplSet->mgr) );
}
}
@@ -163,7 +165,7 @@ namespace mongo {
m = m->next();
}
- mgr->send(Manager::CheckNewState);
+ mgr->send( boost::bind(&ReplSet::Manager::msgCheckNewState, theReplSet->mgr) );
}
}
diff --git a/db/repl/manager.cpp b/db/repl/manager.cpp
index b8533f09694..05c822551f6 100644
--- a/db/repl/manager.cpp
+++ b/db/repl/manager.cpp
@@ -41,7 +41,7 @@ namespace mongo {
return p;
}
- ReplSet::Manager::Manager(ReplSet *rs) : _rs(rs), _primary(NOPRIMARY)
+ ReplSet::Manager::Manager(ReplSet *rs) : task::Port("ReplSet::Manager"), _rs(rs), _primary(NOPRIMARY)
{
}
@@ -51,7 +51,7 @@ namespace mongo {
}
/** called as the health threads get new results */
- void ReplSet::Manager::checkNewState() {
+ void ReplSet::Manager::msgCheckNewState() {
{
const Member *p = _rs->currentPrimary();
const Member *p2 = findOtherPrimary();
@@ -97,7 +97,7 @@ namespace mongo {
_rs->elect.electSelf();
}
- bool ReplSet::Manager::got(const any& msg) {
+/* bool ReplSet::Manager::got(const any& msg) {
if( msg.type() == typeid(Messages) ) {
assert( CheckNewState == any_cast<Messages>(msg) );
checkNewState();
@@ -115,6 +115,6 @@ namespace mongo {
assert(false);
}
return true;
- }
+ }*/
}
diff --git a/db/repl/replset.cpp b/db/repl/replset.cpp
index 55514f3751d..13ff0b15577 100644
--- a/db/repl/replset.cpp
+++ b/db/repl/replset.cpp
@@ -25,7 +25,7 @@ namespace mongo {
bool replSet = false;
ReplSet *theReplSet = 0;
- void ReplSet::updateHBInfo(const HeartbeatInfo& h) {
+ void ReplSet::msgUpdateHBInfo(HeartbeatInfo h) {
for( Member *m = _members.head(); m; m=m->next() ) {
if( m->id() == h.id() ) {
m->_hbinfo = h;
diff --git a/db/repl/replset.h b/db/repl/replset.h
index dfd28292ff7..6eda7e8130d 100644
--- a/db/repl/replset.h
+++ b/db/repl/replset.h
@@ -113,7 +113,7 @@ namespace mongo {
list<HostAndPort> memberHostnames() const;
const Member* currentPrimary() const { return _currentPrimary; }
const ReplSetConfig::MemberCfg& myConfig() const { return _self->config(); }
- void updateHBInfo(const HeartbeatInfo& h);
+ void msgUpdateHBInfo(HeartbeatInfo);
private:
const Member *_currentPrimary;
@@ -131,18 +131,15 @@ namespace mongo {
public:
class Manager : public task::Port {
- string name() { return "ReplSet::Manager"; }
bool got(const any&);
ReplSet *_rs;
int _primary;
const Member* findOtherPrimary();
void noteARemoteIsPrimary(const Member *);
- void checkNewState();
public:
Manager(ReplSet *rs);
- enum Messages {
- CheckNewState
- };
+ void msgReceivedNewConfig(BSONObj) { assert(false); }
+ void msgCheckNewState();
};
shared_ptr<Manager> mgr;
diff --git a/db/repl/testing.js b/db/repl/testing.js
index 795ba84d595..d741cf3a644 100644
--- a/db/repl/testing.js
+++ b/db/repl/testing.js
@@ -20,8 +20,6 @@ c2 = {
db = db.getSisterDB("admin");
local = db.getSisterDB("local");
-b = new Mongo("localhost:27002").getDB("admin");
-
print("\n\ndb = admin db on localhost:27017");
print("b = admin on localhost:27002");
print("rc(x) = db.runCommand(x)");
@@ -33,3 +31,12 @@ print("\n\n");
function rc(c) { return db.runCommand(c); }
function i() { return rc({ replSetInitiate: cfg }); }
function ism() { return rc("isMaster"); }
+
+b = 0;
+try {
+ b = new Mongo("localhost:27002").getDB("admin");
+}
+catch (e) {
+ print("\nCouldn't connect to b mongod instance\n");
+}
+
diff --git a/pch.h b/pch.h
index ed3e5cbe756..db8b1e8cda8 100644
--- a/pch.h
+++ b/pch.h
@@ -103,6 +103,8 @@ namespace mongo {
#include <boost/program_options.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/smart_ptr.hpp>
+#include "boost/bind.hpp"
+#include "boost/function.hpp"
#define BOOST_SPIRIT_THREADSAFE
#include <boost/version.hpp>
diff --git a/util/concurrency/msg.h b/util/concurrency/msg.h
index 57925964939..5a9567d8caf 100644
--- a/util/concurrency/msg.h
+++ b/util/concurrency/msg.h
@@ -25,30 +25,24 @@ namespace mongo {
namespace task {
- /** See task::PortUnitTest in task.cpp for an example of usage.
- */
class Port : private Task {
- protected:
- /** implement a receiver of messages for the port.
- @return false to stop the port / terminate the thread (i.e. you want to return true)
- */
- virtual bool got(const any& msg) = 0;
- virtual string name() = 0; // names the thread
public:
/** send a message to the port */
- void send(const any& msg);
+ void send( boost::function<void()> );
/** typical usage is: task::fork( foo.task() ); */
shared_ptr<Task> taskPtr() { return shared_ptr<Task>(static_cast<Task*>(this)); }
- Port() { }
+ Port(string name) : _name(name) { }
virtual ~Port() { }
private:
+ virtual string name() { return _name; }
void doWork();
- deque<any> d;
+ deque< boost::function<void()> > d;
boost::mutex m;
boost::condition c;
+ string _name;
};
}
diff --git a/util/concurrency/task.cpp b/util/concurrency/task.cpp
index a02c875839b..68507a45198 100644
--- a/util/concurrency/task.cpp
+++ b/util/concurrency/task.cpp
@@ -92,7 +92,7 @@ namespace mongo {
namespace mongo {
namespace task {
- void Port::send(const any& msg) {
+ void Port::send( boost::function<void()> msg ) {
{
boost::mutex::scoped_lock lk(m);
d.push_back(msg);
@@ -102,42 +102,21 @@ namespace mongo {
void Port::doWork() {
while( 1 ) {
- any a;
+ boost::function<void()> f;
{
boost::mutex::scoped_lock lk(m);
while( d.empty() )
c.wait(lk);
- a = d.front();
+ f = d.front();
d.pop_front();
}
try {
- if( !got(a) )
- break;
+ f();
} catch(std::exception& e) {
log() << "Port::doWork() exception " << e.what() << endl;
}
}
}
- class PortTest : public Port {
- protected:
- void got(const int& msg) { }
- public:
- virtual bool got(const any& msg) {
- assert( any_cast<int>(msg) <= 55 );
- return any_cast<int>(msg) != 55;
- }
- virtual string name() { return "PortTest"; }
- };
-
- struct PortUnitTest : public UnitTest {
- void run() {
- PortTest *p = new PortTest();
- shared_ptr<Task> tp = p->taskPtr();
- fork( tp );
- p->send(3);
- p->send(55);
- }
- } portunittest;
}
}