summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp195
1 files changed, 93 insertions, 102 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 4ea77e7fbf..3b7f32e822 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -17,15 +17,19 @@
*/
#include "Cluster.h"
+#include "ConnectionInterceptor.h"
+
#include "qpid/broker/Broker.h"
#include "qpid/broker/SessionState.h"
#include "qpid/broker/Connection.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ClusterNotifyBody.h"
+#include "qpid/framing/ClusterConnectionCloseBody.h"
#include "qpid/log/Statement.h"
#include "qpid/memory.h"
+#include "qpid/shared_ptr.h"
#include <boost/bind.hpp>
-#include <boost/scoped_array.hpp>
+#include <boost/cast.hpp>
#include <algorithm>
#include <iterator>
#include <map>
@@ -37,24 +41,6 @@ using namespace qpid::sys;
using namespace std;
using broker::Connection;
-// Beginning of inbound chain: send to cluster.
-struct ClusterSendHandler : public HandlerChain<FrameHandler>::Handler {
- Cluster::ConnectionChain& connection;
- Cluster& cluster;
-
- ClusterSendHandler(Cluster::ConnectionChain& conn, Cluster& clust) : connection(conn), cluster(clust) {}
-
- void handle(AMQFrame& f) {
- // FIXME aconway 2008-01-29: Refcount Connections to ensure
- // Connection not destroyed till message is self delivered.
- cluster.send(f, &connection, next); // Indirectly send to next via cluster.
- }
-};
-
-void Cluster::initialize(Cluster::ConnectionChain& cc) {
- cc.push(ConnectionChain::HandlerAutoPtr(new ClusterSendHandler(cc, *this)));
-}
-
ostream& operator <<(ostream& out, const Cluster& cluster) {
return out << cluster.name.str() << "-" << cluster.self;
}
@@ -69,14 +55,14 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) {
return out;
}
-// FIXME aconway 2008-07-02: create a Connection for the cluster.
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
- broker(b),
cpg(*this),
+ broker(&b),
name(name_),
url(url_),
self(cpg.self())
{
+ broker->addFinalizer(boost::bind(&Cluster::leave, this));
QPID_LOG(trace, "Joining cluster: " << name_);
cpg.join(name);
notify();
@@ -90,15 +76,27 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
}
Cluster::~Cluster() {
- QPID_LOG(trace, *this << " Leaving cluster.");
- try {
- cpg.leave(name);
- cpg.shutdown();
- dispatcher.join();
- }
- catch (const std::exception& e) {
- QPID_LOG(error, "Exception leaving cluster " << *this << ": "
- << e.what());
+ cpg.shutdown();
+ dispatcher.join();
+}
+
+// local connection initializes plugins
+void Cluster::initialize(broker::Connection& c) {
+ bool isLocal = &c.getOutput() != &shadowOut;
+ if (isLocal)
+ new ConnectionInterceptor(c, *this);
+}
+
+void Cluster::leave() {
+ if (!broker.get()) return; // Already left
+ QPID_LOG(info, QPID_MSG("Leaving cluster " << *this));
+ // Must not be called in the dispatch thread.
+ assert(Thread::current().id() != dispatcher.id());
+ cpg.leave(name);
+ // Wait till final config-change is delivered and broker is released.
+ {
+ Mutex::ScopedLock l(lock);
+ while(broker.get()) lock.wait();
}
}
@@ -112,22 +110,20 @@ template <class T> void encodePtr(Buffer& buf, T* ptr) {
buf.putLongLong(value);
}
-void Cluster::send(AMQFrame& frame, void* connection, FrameHandler* next) {
+void Cluster::send(const AMQFrame& frame, ConnectionInterceptor* connection) {
QPID_LOG(trace, "MCAST [" << connection << "] " << frame);
- // TODO aconway 2008-07-03: More efficient buffer management.
+ // FIXME aconway 2008-07-03: More efficient buffer management.
// Cache coded form of decoded frames for re-encoding?
Buffer buf(buffer);
- assert(frame.size() + 128 < sizeof(buffer));
+ assert(frame.size() + 64 < sizeof(buffer));
frame.encode(buf);
encodePtr(buf, connection);
- encodePtr(buf, next);
iovec iov = { buffer, buf.getPosition() };
cpg.mcast(name, &iov, 1);
}
void Cluster::notify() {
- AMQFrame frame(in_place<ClusterNotifyBody>(ProtocolVersion(), url.str()));
- send(frame, 0, 0);
+ send(AMQFrame(in_place<ClusterNotifyBody>(ProtocolVersion(), url.str())), 0);
}
size_t Cluster::size() const {
@@ -143,19 +139,17 @@ Cluster::MemberList Cluster::getMembers() const {
return result;
}
-boost::shared_ptr<broker::Connection>
-Cluster::getShadowConnection(const Cpg::Id& member, void* connectionPtr) {
- // FIXME aconway 2008-07-02: locking - called by deliver in
- // cluster thread so no locks but may need to revisit as model
- // changes.
- ShadowConnectionId id(member, connectionPtr);
- boost::shared_ptr<broker::Connection>& ptr = shadowConnectionMap[id];
- if (!ptr) {
+ConnectionInterceptor* Cluster::getShadowConnection(const Cpg::Id& member, void* remotePtr) {
+ ShadowConnectionId id(member, remotePtr);
+ ShadowConnectionMap::iterator i = shadowConnectionMap.find(id);
+ if (i == shadowConnectionMap.end()) { // A new shadow connection.
std::ostringstream os;
- os << name << ":" << member << ":" << std::hex << connectionPtr;
- ptr.reset(new broker::Connection(&shadowOut, broker, os.str()));
+ os << name << ":" << member << ":" << remotePtr;
+ broker::Connection* c = new broker::Connection(&shadowOut, *broker, os.str());
+ ShadowConnectionMap::value_type value(id, new ConnectionInterceptor(*c, *this, id));
+ i = shadowConnectionMap.insert(value).first;
}
- return ptr;
+ return i->second;
}
void Cluster::deliver(
@@ -171,78 +165,75 @@ void Cluster::deliver(
Buffer buf(static_cast<char*>(msg), msg_len);
AMQFrame frame;
frame.decode(buf);
- void* connectionId;
- decodePtr(buf, connectionId);
+ ConnectionInterceptor* connection;
+ decodePtr(buf, connection);
+ QPID_LOG(trace, "DLVR [" << from << " " << connection << "] " << frame);
- QPID_LOG(trace, "DLVR [" << from << " " << connectionId << "] " << frame);
-
- if (connectionId == 0) // A cluster control frame.
- handleClusterFrame(from, frame);
- else if (from == self) { // My own frame, carries a next pointer.
- FrameHandler* next;
- decodePtr(buf, next);
- next->handle(frame);
- }
- else { // Foreign frame, forward to shadow connection.
- // FIXME aconway 2008-07-02: ptr_map instead of shared_ptr.
- boost::shared_ptr<broker::Connection> shadow = getShadowConnection(from, connectionId);
- shadow->received(frame);
+ if (!broker.get()) {
+ QPID_LOG(warning, "Ignoring late DLVR, already left the cluster.");
+ return;
}
+
+ if (connection && from != self) // Look up shadow for remote connections
+ connection = getShadowConnection(from, connection);
+
+ if (frame.getMethod() && frame.getMethod()->amqpClassId() == CLUSTER_CLASS_ID)
+ handleMethod(from, connection, *frame.getMethod());
+ else
+ connection->deliver(frame);
}
catch (const std::exception& e) {
// FIXME aconway 2008-01-30: exception handling.
- QPID_LOG(error, "Error handling frame from cluster " << e.what());
+ QPID_LOG(critical, "Error in cluster delivery: " << e.what());
+ assert(0);
+ throw;
}
}
-bool Cluster::wait(boost::function<bool(const Cluster&)> predicate,
- Duration timeout) const
-{
- AbsTime deadline(now(), timeout);
- Mutex::ScopedLock l(lock);
- while (!predicate(*this) && lock.wait(deadline))
- ;
- return (predicate(*this));
-}
-
-// Handle cluster control frame .
-void Cluster::handleClusterFrame(Id from, AMQFrame& frame) {
- // TODO aconway 2007-06-20: use visitor pattern here.
- ClusterNotifyBody* notifyIn=
- dynamic_cast<ClusterNotifyBody*>(frame.getBody());
- assert(notifyIn);
- MemberList list;
- {
- Mutex::ScopedLock l(lock);
- members[from].url=notifyIn->getUrl();
- lock.notifyAll();
- QPID_LOG(debug, "Cluster join: " << members);
+// Handle cluster methods
+// FIXME aconway 2008-07-11: Generate/template a better dispatch mechanism.
+void Cluster::handleMethod(Id from, ConnectionInterceptor* connection, AMQMethodBody& method) {
+ assert(method.amqpClassId() == CLUSTER_CLASS_ID);
+ switch (method.amqpMethodId()) {
+ case CLUSTER_NOTIFY_METHOD_ID: {
+ ClusterNotifyBody& notify=static_cast<ClusterNotifyBody&>(method);
+ Mutex::ScopedLock l(lock);
+ members[from].url=notify.getUrl();
+ lock.notifyAll();
+ break;
+ }
+ case CLUSTER_CONNECTION_CLOSE_METHOD_ID: {
+ if (!connection->isLocal())
+ shadowConnectionMap.erase(connection->getShadowId());
+ connection->deliverClosed();
+ break;
+ }
+ default:
+ assert(0);
}
}
void Cluster::configChange(
cpg_handle_t /*handle*/,
cpg_name */*group*/,
- cpg_address */*current*/, int /*nCurrent*/,
+ cpg_address *current, int nCurrent,
cpg_address *left, int nLeft,
- cpg_address *joined, int nJoined)
+ cpg_address */*joined*/, int nJoined)
{
- bool newMembers=false;
- MemberList updated;
- {
- Mutex::ScopedLock l(lock);
- if (nLeft) {
- for (int i = 0; i < nLeft; ++i)
- members.erase(Id(left[i]));
- QPID_LOG(debug, "Cluster leave: " << members);
- lock.notifyAll();
- }
- newMembers = nJoined > 1 || (nJoined==1 && Id(joined[0]) != self);
- // We don't record members joining here, we record them when
- // we get their ClusterNotify message.
+ Mutex::ScopedLock l(lock);
+ for (int i = 0; i < nLeft; ++i)
+ members.erase(left[i]);
+ for(int j = 0; j < nCurrent; ++j)
+ members[current[j]].id = current[j];
+ QPID_LOG(debug, "Cluster members: " << nCurrent << " ("<< nLeft << " left, " << nJoined << " joined):"
+ << members);
+ assert(members.size() == size_t(nCurrent));
+ if (members.find(self) == members.end()) {
+ QPID_LOG(debug, "Left cluster " << *this);
+ broker = 0; // Release broker reference.
}
- if (newMembers) // Notify new members of my presence.
- notify();
+
+ lock.notifyAll(); // Threads waiting for membership changes.
}
void Cluster::run() {