summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp214
1 files changed, 93 insertions, 121 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index f36d606af8..aea10949e4 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -4,7 +4,7 @@
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+n * You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -17,18 +17,17 @@
*/
#include "Cluster.h"
-#include "ConnectionInterceptor.h"
+#include "Connection.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/SessionState.h"
-#include "qpid/broker/Connection.h"
#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/ClusterNotifyBody.h"
-#include "qpid/framing/ClusterConnectionCloseBody.h"
-#include "qpid/framing/ClusterConnectionDoOutputBody.h"
+#include "qpid/framing/ClusterJoinedBody.h"
#include "qpid/log/Statement.h"
#include "qpid/memory.h"
#include "qpid/shared_ptr.h"
+#include "qpid/framing/AMQP_AllOperations.h"
+#include "qpid/framing/AllInvoker.h"
+#include "qpid/framing/Invoker.h"
#include <boost/bind.hpp>
#include <boost/cast.hpp>
@@ -39,22 +38,34 @@
namespace qpid {
namespace cluster {
+
using namespace qpid::framing;
using namespace qpid::sys;
using namespace std;
-using broker::Connection;
+// Handle cluster controls from a given member.
+struct ClusterOperations : public framing::AMQP_AllOperations::ClusterHandler {
+ Cluster& cluster;
+ MemberId member;
+
+ ClusterOperations(Cluster& c, const MemberId& m) : cluster(c), member(m) {}
+
+ void joined(const std::string& url) {
+ cluster.joined(member, url);
+ }
+};
+
ostream& operator <<(ostream& out, const Cluster& cluster) {
return out << cluster.name.str() << "-" << cluster.self;
}
-ostream& operator<<(ostream& out, const Cluster::MemberMap::value_type& m) {
- return out << m.first << "=" << m.second.url;
+ostream& operator<<(ostream& out, const Cluster::UrlMap::value_type& m) {
+ return out << m.first << " at " << m.second;
}
-ostream& operator <<(ostream& out, const Cluster::MemberMap& members) {
- ostream_iterator<Cluster::MemberMap::value_type> o(out, " ");
- copy(members.begin(), members.end(), o);
+ostream& operator <<(ostream& out, const Cluster::UrlMap& urls) {
+ ostream_iterator<Cluster::UrlMap::value_type> o(out, " ");
+ copy(urls.begin(), urls.end(), o);
return out;
}
@@ -74,9 +85,9 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
mcastQueue(boost::bind(&Cluster::mcastQueueCb, this, _1, _2))
{
broker->addFinalizer(boost::bind(&Cluster::leave, this));
- QPID_LOG(trace, "Joining cluster: " << name_);
+ QPID_LOG(trace, "Node " << self << " joining cluster: " << name_);
cpg.join(name);
- notify();
+ send(AMQFrame(in_place<ClusterJoinedBody>(ProtocolVersion(), url.str())), ConnectionId(self,0));
// Start dispatching from the poller.
cpgDispatchHandle.startWatch(poller);
@@ -84,31 +95,15 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
mcastQueue.start(poller);
}
-Cluster::~Cluster() {
- for (ShadowConnectionMap::iterator i = shadowConnectionMap.begin();
- i != shadowConnectionMap.end();
- ++i)
- {
- i->second->dirtyClose();
- }
- std::for_each(localConnectionSet.begin(), localConnectionSet.end(), boost::bind(&ConnectionInterceptor::dirtyClose, _1));
-}
-
-void Cluster::initialize(broker::Connection& c) {
- bool isLocal = c.getOutput().get() != &shadowOut;
- if (isLocal)
- localConnectionSet.insert(new ConnectionInterceptor(c, *this));
-}
+Cluster::~Cluster() {}
void Cluster::leave() {
Mutex::ScopedLock l(lock);
if (!broker) return; // Already left.
// Leave is called by from Broker destructor after the poller has
// been shut down. No dispatches can occur.
-
- QPID_LOG(debug, "Leaving cluster " << *this);
cpg.leave(name);
- // broker= is set to 0 when the final config-change is delivered.
+ // broker is set to 0 when the final config-change is delivered.
while(broker) {
Mutex::ScopedUnlock u(lock);
cpg.dispatchAll();
@@ -126,9 +121,9 @@ template <class T> void encodePtr(Buffer& buf, T* ptr) {
buf.putLongLong(value);
}
-void Cluster::send(const AMQFrame& frame, ConnectionInterceptor* connection) {
- QPID_LOG(trace, "MCAST [" << connection << "] " << frame);
- mcastQueue.push(Message(frame, self, connection));
+void Cluster::send(const AMQFrame& frame, const ConnectionId& id) {
+ QPID_LOG(trace, "MCAST [" << id << "] " << frame);
+ mcastQueue.push(Message(frame, id));
}
void Cluster::mcastQueueCb(const MessageQueue::iterator& begin,
@@ -137,48 +132,40 @@ void Cluster::mcastQueueCb(const MessageQueue::iterator& begin,
// Static is OK because there is only one cluster allowed per
// process and only one thread in mcastQueueCb at a time.
static char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management.
- MessageQueue::iterator i = begin;
- while (i != end) {
- Buffer buf(buffer, sizeof(buffer));
- while (i != end && buf.available() > i->frame.size() + sizeof(uint64_t)) {
- i->frame.encode(buf);
- encodePtr(buf, i->connection);
- ++i;
- }
- iovec iov = { buffer, buf.getPosition() };
- cpg.mcast(name, &iov, 1);
+ Buffer buf(buffer, sizeof(buffer));
+ for (MessageQueue::iterator i = begin; i != end; ++i) {
+ AMQFrame& frame =i->first;
+ ConnectionId id =i->second;
+ if (buf.available() < frame.size() + sizeof(uint64_t))
+ break;
+ frame.encode(buf);
+ encodePtr(buf, id.second);
}
-}
-
-void Cluster::notify() {
- send(AMQFrame(in_place<ClusterNotifyBody>(ProtocolVersion(), url.str())), 0);
+ iovec iov = { buffer, buf.getPosition() };
+ cpg.mcast(name, &iov, 1);
}
size_t Cluster::size() const {
Mutex::ScopedLock l(lock);
- return members.size();
+ return urls.size();
}
-Cluster::MemberList Cluster::getMembers() const {
+std::vector<Url> Cluster::getUrls() const {
Mutex::ScopedLock l(lock);
- MemberList result(members.size());
- std::transform(members.begin(), members.end(), result.begin(),
- boost::bind(&MemberMap::value_type::second, _1));
+ std::vector<Url> result(urls.size());
+ std::transform(urls.begin(), urls.end(), result.begin(), boost::bind(&UrlMap::value_type::second, _1));
return result;
}
-ConnectionInterceptor* Cluster::getShadowConnection(const Cpg::Id& member, void* remotePtr) {
- ShadowConnectionId id(member, remotePtr);
- ShadowConnectionMap::iterator i = shadowConnectionMap.find(id);
- if (i == shadowConnectionMap.end()) { // A new shadow connection.
+boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& id) {
+ boost::intrusive_ptr<Connection> c = connections[id];
+ if (!c && id.first != self) { // Shadow connection
std::ostringstream os;
- os << name << ":" << member << ":" << remotePtr;
- assert(broker);
- broker::Connection* c = new broker::Connection(&shadowOut, *broker, os.str());
- ShadowConnectionMap::value_type value(id, new ConnectionInterceptor(*c, *this, id));
- i = shadowConnectionMap.insert(value).first;
+ os << id;
+ c = connections[id] = new Connection(*this, shadowOut, os.str(), id);
}
- return i->second;
+ assert(c);
+ return c;
}
void Cluster::deliver(
@@ -189,16 +176,17 @@ void Cluster::deliver(
void* msg,
int msg_len)
{
- Id from(nodeid, pid);
+ MemberId from(nodeid, pid);
try {
Buffer buf(static_cast<char*>(msg), msg_len);
while (buf.available() > 0) {
AMQFrame frame;
if (!frame.decode(buf)) // Not enough data.
throw Exception("Received incomplete cluster event.");
- void* connection;
- decodePtr(buf, connection);
- deliverQueue.push(Message(frame, from, connection));
+ Connection* cp;
+ decodePtr(buf, cp);
+ QPID_LOG(critical, "deliverQ.push " << frame);
+ deliverQueue.push(Message(frame, ConnectionId(from, cp)));
}
}
catch (const std::exception& e) {
@@ -213,23 +201,21 @@ void Cluster::deliverQueueCb(const MessageQueue::iterator& begin,
const MessageQueue::iterator& end)
{
for (MessageQueue::iterator i = begin; i != end; ++i) {
- AMQFrame& frame(i->frame);
- Id from(i->from);
- ConnectionInterceptor* connection = reinterpret_cast<ConnectionInterceptor*>(i->connection);
+ AMQFrame& frame(i->first);
+ ConnectionId connectionId(i->second);
try {
- QPID_LOG(trace, "DLVR [" << from << " " << connection << "] " << frame);
-
+ QPID_LOG(trace, "DLVR [" << connectionId << "]: " << frame);
if (!broker) {
- QPID_LOG(warning, "Unexpected DLVR, already left the cluster.");
+ QPID_LOG(error, "Unexpected DLVR after leaving the cluster.");
return;
}
- if (connection && from != self) // Look up shadow for remote connections
- connection = getShadowConnection(from, connection);
-
- if (frame.getMethod() && frame.getMethod()->amqpClassId() == CLUSTER_CLASS_ID)
- handleMethod(from, connection, *frame.getMethod());
- else
- connection->deliver(frame);
+ if (connectionId.getConnectionPtr()) // Connection control
+ getConnection(connectionId)->deliver(frame);
+ else { // Cluster control
+ ClusterOperations cops(*this, connectionId.getMember());
+ bool invoked = framing::invoke(cops, *frame.getBody()).wasHandled();
+ assert(invoked);
+ }
}
catch (const std::exception& e) {
// FIXME aconway 2008-01-30: exception handling.
@@ -240,54 +226,30 @@ void Cluster::deliverQueueCb(const MessageQueue::iterator& begin,
}
}
-// Handle cluster methods
-// FIXME aconway 2008-07-11: Generate/template a better dispatch mechanism.
-void Cluster::handleMethod(Id from, ConnectionInterceptor* connection, AMQMethodBody& method) {
- assert(method.amqpClassId() == CLUSTER_CLASS_ID);
- switch (method.amqpMethodId()) {
- case CLUSTER_NOTIFY_METHOD_ID: {
- ClusterNotifyBody& notify=static_cast<ClusterNotifyBody&>(method);
- Mutex::ScopedLock l(lock);
- members[from].url=notify.getUrl();
- lock.notifyAll();
- break;
- }
- case CLUSTER_CONNECTION_CLOSE_METHOD_ID: {
- if (!connection->isLocal())
- shadowConnectionMap.erase(connection->getShadowId());
- else
- localConnectionSet.erase(connection);
- connection->deliverClosed();
- break;
- }
- case CLUSTER_CONNECTION_DO_OUTPUT_METHOD_ID: {
- ClusterConnectionDoOutputBody& doOutput = static_cast<ClusterConnectionDoOutputBody&>(method);
- connection->deliverDoOutput(doOutput.getBytes());
- break;
- }
- default:
- assert(0);
- }
+void Cluster::joined(const MemberId& member, const string& url) {
+ Mutex::ScopedLock l(lock);
+ QPID_LOG(debug, member << " has URL " << url);
+ urls[member] = url;
+ lock.notifyAll();
}
void Cluster::configChange(
cpg_handle_t /*handle*/,
cpg_name */*group*/,
- cpg_address *current, int nCurrent,
+ cpg_address */*current*/, int /*nCurrent*/,
cpg_address *left, int nLeft,
- cpg_address */*joined*/, int nJoined)
+ cpg_address *joined, int nJoined)
{
+ QPID_LOG(debug, "Cluster change: " << std::make_pair(joined, nJoined) << std::make_pair(left, nLeft));
Mutex::ScopedLock l(lock);
- for (int i = 0; i < nLeft; ++i)
- members.erase(left[i]);
- for(int j = 0; j < nCurrent; ++j)
- members[current[j]].id = current[j];
- QPID_LOG(debug, "Cluster members: " << nCurrent << " ("<< nLeft << " left, " << nJoined << " joined):"
- << members);
- assert(members.size() == size_t(nCurrent));
- if (members.find(self) == members.end())
+ // We add URLs to the map in joined() we don't keep track of pre-URL members yet.
+ for (int l = 0; l < nLeft; ++l) urls.erase(left[l]);
+
+ if (std::find(left, left+nLeft, self) != left+nLeft) {
broker = 0; // We have left the group, this is the final config change.
- lock.notifyAll(); // Threads waiting for membership changes.
+ QPID_LOG(debug, "Leaving cluster " << *this);
+ }
+ lock.notifyAll(); // Threads waiting for url changes.
}
void Cluster::dispatch(sys::DispatchHandle& h) {
@@ -301,6 +263,16 @@ void Cluster::disconnect(sys::DispatchHandle& h) {
broker->shutdown();
}
+void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
+ Mutex::ScopedLock l(lock);
+ connections[c->getId()] = c;
+}
+
+void Cluster::erase(ConnectionId id) {
+ Mutex::ScopedLock l(lock);
+ connections.erase(id);
+}
+
}} // namespace qpid::cluster