diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 17 |
1 files changed, 9 insertions, 8 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 3c73719ef9..ce87d23c0d 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -21,6 +21,7 @@ #include "qpid/framing/ClusterNotifyBody.h" #include "qpid/log/Statement.h" #include <boost/bind.hpp> +#include <boost/scoped_array.hpp> #include <algorithm> #include <iterator> #include <map> @@ -46,13 +47,11 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) { return out; } -Cluster::Cluster(const std::string& name_, const std::string& url_, broker::Broker& broker) : - FrameHandler(&sessions), +Cluster::Cluster(const std::string& name_, const std::string& url_, broker::Broker&) : + FrameHandler(0), // FIXME aconway 2008-01-29: handler. + observer cpg(*this), name(name_), - url(url_), - self(Id::self(cpg)), - sessions(broker, *this) + url(url_) { QPID_LOG(trace, *this << " Joining cluster: " << name_); cpg.join(name); @@ -80,10 +79,10 @@ Cluster::~Cluster() { void Cluster::handle(AMQFrame& frame) { QPID_LOG(trace, *this << " SEND: " << frame); - Buffer buf(frame.size()); + boost::scoped_array<char> store(new char[frame.size()]); // FIXME aconway 2008-01-29: Better buffer handling. + Buffer buf(store.get()); frame.encode(buf); - buf.flip(); - iovec iov = { buf.start(), frame.size() }; + iovec iov = { store.get(), frame.size() }; cpg.mcast(name, &iov, 1); } @@ -144,6 +143,8 @@ void Cluster::handleClusterFrame(Id from, AMQFrame& frame) { { Mutex::ScopedLock l(lock); members[from].url=notifyIn->getUrl(); + if (!self.id && notifyIn->getUrl() == url) + self=from; lock.notifyAll(); QPID_LOG(trace, *this << ": members joined: " << members); } |