summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp107
1 files changed, 91 insertions, 16 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 49270bcfef..bca6c49c13 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -17,6 +17,7 @@
*/
#include "Cluster.h"
+#include "qpid/broker/SessionState.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ClusterNotifyBody.h"
#include "qpid/log/Statement.h"
@@ -31,7 +32,70 @@ namespace cluster {
using namespace qpid::framing;
using namespace qpid::sys;
using namespace std;
+using broker::SessionState;
+namespace {
+
+// Beginning of inbound chain: send to cluster.
+struct ClusterSendHandler : public FrameHandler {
+ SessionState& session;
+ Cluster& cluster;
+ bool busy;
+ Monitor lock;
+
+ ClusterSendHandler(SessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {}
+
+ void handle(AMQFrame& f) {
+ Mutex::ScopedLock l(lock);
+ assert(!busy);
+ // FIXME aconway 2008-01-29: refcount Sessions.
+ // session.addRef(); // Keep the session till the message is self delivered.
+ cluster.send(f, next); // Indirectly send to next via cluster.
+
+ // FIXME aconway 2008-01-29: need to get this blocking out of the loop.
+ // But cluster needs to agree on order of side-effects on the shared model.
+ // OK for wiring to block, for messages use queue tokens?
+ // Both in & out transfers must be orderd per queue.
+ // May need out-of-order completion.
+ busy=true;
+ while (busy) lock.wait();
+ }
+};
+
+// Next in inbound chain, self delivered from cluster.
+struct ClusterDeliverHandler : public FrameHandler {
+ Cluster& cluster;
+ ClusterSendHandler& sender;
+
+ ClusterDeliverHandler(ClusterSendHandler& prev, Cluster& c) : cluster(c), sender(prev) {}
+
+ void handle(AMQFrame& f) {
+ next->handle(f);
+ Mutex::ScopedLock l(sender.lock);
+ sender.busy=false;
+ sender.lock.notify();
+ }
+};
+
+// FIXME aconway 2008-01-29: IList
+void insert(FrameHandler::Chain& c, FrameHandler* h) {
+ h->next = c.next;
+ c.next = h;
+}
+
+struct SessionObserver : public broker::SessionManager::Observer {
+ Cluster& cluster;
+ SessionObserver(Cluster& c) : cluster(c) {}
+
+ void opened(SessionState& s) {
+ // FIXME aconway 2008-01-29: IList for memory management.
+ ClusterSendHandler* sender=new ClusterSendHandler(s, cluster);
+ ClusterDeliverHandler* deliverer=new ClusterDeliverHandler(*sender, cluster);
+ insert(s.in, deliverer);
+ insert(s.in, sender);
+ }
+};
+}
ostream& operator <<(ostream& out, const Cluster& cluster) {
return out << "cluster[" << cluster.name.str() << " " << cluster.self << "]";
@@ -48,10 +112,10 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) {
}
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker&) :
- FrameHandler(0), // FIXME aconway 2008-01-29: handler. + observer
cpg(*this),
name(name_),
- url(url_)
+ url(url_),
+ observer(new SessionObserver(*this))
{
QPID_LOG(trace, *this << " Joining cluster: " << name_);
cpg.join(name);
@@ -77,18 +141,19 @@ Cluster::~Cluster() {
}
}
-void Cluster::handle(AMQFrame& frame) {
+void Cluster::send(AMQFrame& frame, FrameHandler* next) {
QPID_LOG(trace, *this << " SEND: " << frame);
- boost::scoped_array<char> store(new char[frame.size()]); // FIXME aconway 2008-01-29: Better buffer handling.
- Buffer buf(store.get());
+ char data[65536]; // FIXME aconway 2008-01-29: Better buffer handling.
+ Buffer buf(data);
frame.encode(buf);
- iovec iov = { store.get(), frame.size() };
+ buf.putRawData((uint8_t*)&next, sizeof(next)); // Tag the frame with the next pointer.
+ iovec iov = { data, frame.size()+sizeof(next) };
cpg.mcast(name, &iov, 1);
}
void Cluster::notify() {
AMQFrame frame(in_place<ClusterNotifyBody>(ProtocolVersion(), url.str()));
- handle(frame);
+ send(frame, 0);
}
size_t Cluster::size() const {
@@ -112,15 +177,25 @@ void Cluster::deliver(
void* msg,
int msg_len)
{
- Id from(nodeid, pid);
- Buffer buf(static_cast<char*>(msg), msg_len);
- AMQFrame frame;
- frame.decode(buf);
- QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from);
- if (frame.getChannel() == 0)
- handleClusterFrame(from, frame);
- else
- next->handle(frame);
+ try {
+ Id from(nodeid, pid);
+ Buffer buf(static_cast<char*>(msg), msg_len);
+ AMQFrame frame;
+ frame.decode(buf);
+ QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from);
+ if (frame.getChannel() == 0)
+ handleClusterFrame(from, frame);
+ else if (from == self) {
+ FrameHandler* next;
+ buf.getRawData((uint8_t*)&next, sizeof(next));
+ next->handle(frame);
+ }
+ // FIXME aconway 2008-01-30: apply frames from foreign sessions.
+ }
+ catch (const std::exception& e) {
+ // FIXME aconway 2008-01-30: exception handling.
+ QPID_LOG(error, "Error handling frame from cluster " << e.what());
+ }
}
bool Cluster::wait(boost::function<bool(const Cluster&)> predicate,