summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp148
1 files changed, 84 insertions, 64 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 3007e9b1ab..2727d5af0a 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -17,7 +17,9 @@
*/
#include "Cluster.h"
+#include "qpid/broker/Broker.h"
#include "qpid/broker/SessionState.h"
+#include "qpid/broker/Connection.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ClusterNotifyBody.h"
#include "qpid/log/Statement.h"
@@ -32,68 +34,49 @@ namespace cluster {
using namespace qpid::framing;
using namespace qpid::sys;
using namespace std;
-using broker::SessionState;
+using broker::Connection;
namespace {
+// FIXME aconway 2008-07-01: sending every frame to cluster,
+// serializing all processing in cluster deliver thread.
+// This will not perform at all, but provides a correct starting point.
+//
+// TODO:
+// - Fake "Connection" for cluster: owns shadow sessions.
+// - Maintain shadow sessions.
+// - Apply foreign frames to shadow sessions.
+//
+
+
// Beginning of inbound chain: send to cluster.
struct ClusterSendHandler : public FrameHandler {
- SessionState& session;
+ Connection& connection;
Cluster& cluster;
- bool busy;
- Monitor lock;
- ClusterSendHandler(SessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {}
-
- void handle(AMQFrame& f) {
- Mutex::ScopedLock l(lock);
- assert(!busy);
- // FIXME aconway 2008-01-29: refcount Sessions.
- // session.addRef(); // Keep the session till the message is self delivered.
- cluster.send(f, next); // Indirectly send to next via cluster.
-
- // FIXME aconway 2008-01-29: need to get this blocking out of the loop.
- // But cluster needs to agree on order of side-effects on the shared model.
- // OK for wiring to block, for messages use queue tokens?
- // Both in & out transfers must be orderd per queue.
- // May need out-of-order completion.
- busy=true;
- while (busy) lock.wait();
- }
-};
+ ClusterSendHandler(Connection& conn, Cluster& clust) : connection(conn), cluster(clust) {}
-// Next in inbound chain, self delivered from cluster.
-struct ClusterDeliverHandler : public FrameHandler {
- Cluster& cluster;
- ClusterSendHandler& sender;
-
- ClusterDeliverHandler(ClusterSendHandler& prev, Cluster& c) : cluster(c), sender(prev) {}
-
void handle(AMQFrame& f) {
- next->handle(f);
- // FIXME aconway 2008-06-16: solve overtaking problem - async completion of commands.
- // Mutex::ScopedLock l(lock);
- // senderBusy=false;
- // senderLock.notify();
+ // FIXME aconway 2008-01-29: Refcount Connections to ensure
+ // Connection not destroyed till message is self delivered.
+ cluster.send(f, &connection, next); // Indirectly send to next via cluster.
}
};
-struct SessionObserver : public broker::SessionManager::Observer {
+struct ConnectionObserver : public broker::ConnectionManager::Observer {
Cluster& cluster;
- SessionObserver(Cluster& c) : cluster(c) {}
+ ConnectionObserver(Cluster& c) : cluster(c) {}
- void opened(SessionState& s) {
+ void created(Connection& c) {
// FIXME aconway 2008-06-16: clean up chaining and observers.
- ClusterSendHandler* sender=new ClusterSendHandler(s, cluster);
- ClusterDeliverHandler* deliverer=new ClusterDeliverHandler(*sender, cluster);
- s.getInChain().insert(deliverer);
- s.getOutChain().insert(sender);
+ ClusterSendHandler* sender=new ClusterSendHandler(c, cluster);
+ c.getInChain().insert(sender);
}
};
}
ostream& operator <<(ostream& out, const Cluster& cluster) {
- return out << "cluster[" << cluster.name.str() << " " << cluster.self << "]";
+ return out << cluster.name.str() << "-" << cluster.self;
}
ostream& operator<<(ostream& out, const Cluster::MemberMap::value_type& m) {
@@ -106,13 +89,16 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) {
return out;
}
-Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker&) :
+// FIXME aconway 2008-07-02: create a Connection for the cluster.
+Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
+ broker(b),
cpg(*this),
name(name_),
url(url_),
- observer(new SessionObserver(*this))
+ observer(new ConnectionObserver(*this)),
+ self(cpg.self())
{
- QPID_LOG(trace, *this << " Joining cluster: " << name_);
+ QPID_LOG(trace, "Joining cluster: " << name_);
cpg.join(name);
notify();
dispatcher=Thread(*this);
@@ -136,19 +122,32 @@ Cluster::~Cluster() {
}
}
-void Cluster::send(AMQFrame& frame, FrameHandler* next) {
- QPID_LOG(trace, *this << " SEND: " << frame);
- char data[65536]; // FIXME aconway 2008-01-29: Better buffer handling.
- Buffer buf(data);
+template <class T> void decodePtr(Buffer& buf, T*& ptr) {
+ uint64_t value = buf.getLongLong();
+ ptr = reinterpret_cast<T*>(value);
+}
+
+template <class T> void encodePtr(Buffer& buf, T* ptr) {
+ uint64_t value = reinterpret_cast<uint64_t>(ptr);
+ buf.putLongLong(value);
+}
+
+void Cluster::send(AMQFrame& frame, void* connection, FrameHandler* next) {
+ QPID_LOG(trace, "MCAST [" << connection << "] " << frame);
+ // TODO aconway 2008-07-03: More efficient buffer management.
+ // Cache coded form of decoded frames for re-encoding?
+ Buffer buf(buffer);
+ assert(frame.size() + 128 < sizeof(buffer));
frame.encode(buf);
- buf.putRawData((uint8_t*)&next, sizeof(next)); // Tag the frame with the next pointer.
- iovec iov = { data, frame.size()+sizeof(next) };
+ encodePtr(buf, connection);
+ encodePtr(buf, next);
+ iovec iov = { buffer, buf.getPosition() };
cpg.mcast(name, &iov, 1);
}
void Cluster::notify() {
AMQFrame frame(in_place<ClusterNotifyBody>(ProtocolVersion(), url.str()));
- send(frame, 0);
+ send(frame, 0, 0);
}
size_t Cluster::size() const {
@@ -164,6 +163,21 @@ Cluster::MemberList Cluster::getMembers() const {
return result;
}
+boost::shared_ptr<broker::Connection>
+Cluster::getShadowConnection(const Cpg::Id& member, void* connectionPtr) {
+ // FIXME aconway 2008-07-02: locking - called by deliver in
+ // cluster thread so no locks but may need to revisit as model
+ // changes.
+ ShadowConnectionId id(member, connectionPtr);
+ boost::shared_ptr<broker::Connection>& ptr = shadowConnectionMap[id];
+ if (!ptr) {
+ std::ostringstream os;
+ os << name << ":" << member << ":" << std::hex << connectionPtr;
+ ptr.reset(new broker::Connection(&shadowOut, broker, os.str()));
+ }
+ return ptr;
+}
+
void Cluster::deliver(
cpg_handle_t /*handle*/,
cpg_name* /*group*/,
@@ -172,20 +186,28 @@ void Cluster::deliver(
void* msg,
int msg_len)
{
+ Id from(nodeid, pid);
try {
- Id from(nodeid, pid);
Buffer buf(static_cast<char*>(msg), msg_len);
AMQFrame frame;
frame.decode(buf);
- QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from);
- if (frame.getChannel() == 0)
+ void* connectionId;
+ decodePtr(buf, connectionId);
+
+ QPID_LOG(trace, "DLVR [" << from << " " << connectionId << "] " << frame);
+
+ if (connectionId == 0) // A cluster control frame.
handleClusterFrame(from, frame);
- else if (from == self) {
- FrameHandler* next;
- buf.getRawData((uint8_t*)&next, sizeof(next));
+ else if (from == self) { // My own frame, carries a next pointer.
+ FrameHandler* next;
+ decodePtr(buf, next);
next->handle(frame);
}
- // FIXME aconway 2008-01-30: apply frames from foreign sessions.
+ else { // Foreign frame, forward to shadow connection.
+ // FIXME aconway 2008-07-02: ptr_map instead of shared_ptr.
+ boost::shared_ptr<broker::Connection> shadow = getShadowConnection(from, connectionId);
+ shadow->received(frame);
+ }
}
catch (const std::exception& e) {
// FIXME aconway 2008-01-30: exception handling.
@@ -203,7 +225,7 @@ bool Cluster::wait(boost::function<bool(const Cluster&)> predicate,
return (predicate(*this));
}
-// Handle cluster control frame from the null session.
+// Handle cluster control frame .
void Cluster::handleClusterFrame(Id from, AMQFrame& frame) {
// TODO aconway 2007-06-20: use visitor pattern here.
ClusterNotifyBody* notifyIn=
@@ -213,10 +235,8 @@ void Cluster::handleClusterFrame(Id from, AMQFrame& frame) {
{
Mutex::ScopedLock l(lock);
members[from].url=notifyIn->getUrl();
- if (!self.id && notifyIn->getUrl() == url.str())
- self=from;
lock.notifyAll();
- QPID_LOG(trace, *this << ": members joined: " << members);
+ QPID_LOG(debug, "Cluster join: " << members);
}
}
@@ -234,7 +254,7 @@ void Cluster::configChange(
if (nLeft) {
for (int i = 0; i < nLeft; ++i)
members.erase(Id(left[i]));
- QPID_LOG(trace, *this << ": members left: " << members);
+ QPID_LOG(debug, "Cluster leave: " << members);
lock.notifyAll();
}
newMembers = nJoined > 1 || (nJoined==1 && Id(joined[0]) != self);