diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 107 |
1 files changed, 91 insertions, 16 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 49270bcfef..bca6c49c13 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -17,6 +17,7 @@ */ #include "Cluster.h" +#include "qpid/broker/SessionState.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ClusterNotifyBody.h" #include "qpid/log/Statement.h" @@ -31,7 +32,70 @@ namespace cluster { using namespace qpid::framing; using namespace qpid::sys; using namespace std; +using broker::SessionState; +namespace { + +// Beginning of inbound chain: send to cluster. +struct ClusterSendHandler : public FrameHandler { + SessionState& session; + Cluster& cluster; + bool busy; + Monitor lock; + + ClusterSendHandler(SessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {} + + void handle(AMQFrame& f) { + Mutex::ScopedLock l(lock); + assert(!busy); + // FIXME aconway 2008-01-29: refcount Sessions. + // session.addRef(); // Keep the session till the message is self delivered. + cluster.send(f, next); // Indirectly send to next via cluster. + + // FIXME aconway 2008-01-29: need to get this blocking out of the loop. + // But cluster needs to agree on order of side-effects on the shared model. + // OK for wiring to block, for messages use queue tokens? + // Both in & out transfers must be orderd per queue. + // May need out-of-order completion. + busy=true; + while (busy) lock.wait(); + } +}; + +// Next in inbound chain, self delivered from cluster. +struct ClusterDeliverHandler : public FrameHandler { + Cluster& cluster; + ClusterSendHandler& sender; + + ClusterDeliverHandler(ClusterSendHandler& prev, Cluster& c) : cluster(c), sender(prev) {} + + void handle(AMQFrame& f) { + next->handle(f); + Mutex::ScopedLock l(sender.lock); + sender.busy=false; + sender.lock.notify(); + } +}; + +// FIXME aconway 2008-01-29: IList +void insert(FrameHandler::Chain& c, FrameHandler* h) { + h->next = c.next; + c.next = h; +} + +struct SessionObserver : public broker::SessionManager::Observer { + Cluster& cluster; + SessionObserver(Cluster& c) : cluster(c) {} + + void opened(SessionState& s) { + // FIXME aconway 2008-01-29: IList for memory management. + ClusterSendHandler* sender=new ClusterSendHandler(s, cluster); + ClusterDeliverHandler* deliverer=new ClusterDeliverHandler(*sender, cluster); + insert(s.in, deliverer); + insert(s.in, sender); + } +}; +} ostream& operator <<(ostream& out, const Cluster& cluster) { return out << "cluster[" << cluster.name.str() << " " << cluster.self << "]"; @@ -48,10 +112,10 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) { } Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker&) : - FrameHandler(0), // FIXME aconway 2008-01-29: handler. + observer cpg(*this), name(name_), - url(url_) + url(url_), + observer(new SessionObserver(*this)) { QPID_LOG(trace, *this << " Joining cluster: " << name_); cpg.join(name); @@ -77,18 +141,19 @@ Cluster::~Cluster() { } } -void Cluster::handle(AMQFrame& frame) { +void Cluster::send(AMQFrame& frame, FrameHandler* next) { QPID_LOG(trace, *this << " SEND: " << frame); - boost::scoped_array<char> store(new char[frame.size()]); // FIXME aconway 2008-01-29: Better buffer handling. - Buffer buf(store.get()); + char data[65536]; // FIXME aconway 2008-01-29: Better buffer handling. + Buffer buf(data); frame.encode(buf); - iovec iov = { store.get(), frame.size() }; + buf.putRawData((uint8_t*)&next, sizeof(next)); // Tag the frame with the next pointer. + iovec iov = { data, frame.size()+sizeof(next) }; cpg.mcast(name, &iov, 1); } void Cluster::notify() { AMQFrame frame(in_place<ClusterNotifyBody>(ProtocolVersion(), url.str())); - handle(frame); + send(frame, 0); } size_t Cluster::size() const { @@ -112,15 +177,25 @@ void Cluster::deliver( void* msg, int msg_len) { - Id from(nodeid, pid); - Buffer buf(static_cast<char*>(msg), msg_len); - AMQFrame frame; - frame.decode(buf); - QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from); - if (frame.getChannel() == 0) - handleClusterFrame(from, frame); - else - next->handle(frame); + try { + Id from(nodeid, pid); + Buffer buf(static_cast<char*>(msg), msg_len); + AMQFrame frame; + frame.decode(buf); + QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from); + if (frame.getChannel() == 0) + handleClusterFrame(from, frame); + else if (from == self) { + FrameHandler* next; + buf.getRawData((uint8_t*)&next, sizeof(next)); + next->handle(frame); + } + // FIXME aconway 2008-01-30: apply frames from foreign sessions. + } + catch (const std::exception& e) { + // FIXME aconway 2008-01-30: exception handling. + QPID_LOG(error, "Error handling frame from cluster " << e.what()); + } } bool Cluster::wait(boost::function<bool(const Cluster&)> predicate, |