summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp17
1 files changed, 9 insertions, 8 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 3c73719ef9..ce87d23c0d 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -21,6 +21,7 @@
#include "qpid/framing/ClusterNotifyBody.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
+#include <boost/scoped_array.hpp>
#include <algorithm>
#include <iterator>
#include <map>
@@ -46,13 +47,11 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) {
return out;
}
-Cluster::Cluster(const std::string& name_, const std::string& url_, broker::Broker& broker) :
- FrameHandler(&sessions),
+Cluster::Cluster(const std::string& name_, const std::string& url_, broker::Broker&) :
+ FrameHandler(0), // FIXME aconway 2008-01-29: handler. + observer
cpg(*this),
name(name_),
- url(url_),
- self(Id::self(cpg)),
- sessions(broker, *this)
+ url(url_)
{
QPID_LOG(trace, *this << " Joining cluster: " << name_);
cpg.join(name);
@@ -80,10 +79,10 @@ Cluster::~Cluster() {
void Cluster::handle(AMQFrame& frame) {
QPID_LOG(trace, *this << " SEND: " << frame);
- Buffer buf(frame.size());
+ boost::scoped_array<char> store(new char[frame.size()]); // FIXME aconway 2008-01-29: Better buffer handling.
+ Buffer buf(store.get());
frame.encode(buf);
- buf.flip();
- iovec iov = { buf.start(), frame.size() };
+ iovec iov = { store.get(), frame.size() };
cpg.mcast(name, &iov, 1);
}
@@ -144,6 +143,8 @@ void Cluster::handleClusterFrame(Id from, AMQFrame& frame) {
{
Mutex::ScopedLock l(lock);
members[from].url=notifyIn->getUrl();
+ if (!self.id && notifyIn->getUrl() == url)
+ self=from;
lock.notifyAll();
QPID_LOG(trace, *this << ": members joined: " << members);
}