diff options
author | Alan Conway <aconway@apache.org> | 2008-08-29 18:18:45 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-08-29 18:18:45 +0000 |
commit | 9749e6774159c43750f04907574d371235e36c0a (patch) | |
tree | a04aa3d5171ad59a82e82cec4a18e691dce56378 /cpp/src/qpid/cluster/Cluster.cpp | |
parent | 7a7273f69fdd328de06db16347914a20ae758b2b (diff) | |
download | qpid-python-9749e6774159c43750f04907574d371235e36c0a.tar.gz |
Refactored cluster to intercept at ConnectionCode, using sys:: interfaces rather than boost functions.
Use framing::Operations and Invoker to dispatch cluster methods.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@690358 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 214 |
1 files changed, 93 insertions, 121 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index f36d606af8..aea10949e4 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -4,7 +4,7 @@ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at +n * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -17,18 +17,17 @@ */ #include "Cluster.h" -#include "ConnectionInterceptor.h" +#include "Connection.h" #include "qpid/broker/Broker.h" -#include "qpid/broker/SessionState.h" -#include "qpid/broker/Connection.h" #include "qpid/framing/AMQFrame.h" -#include "qpid/framing/ClusterNotifyBody.h" -#include "qpid/framing/ClusterConnectionCloseBody.h" -#include "qpid/framing/ClusterConnectionDoOutputBody.h" +#include "qpid/framing/ClusterJoinedBody.h" #include "qpid/log/Statement.h" #include "qpid/memory.h" #include "qpid/shared_ptr.h" +#include "qpid/framing/AMQP_AllOperations.h" +#include "qpid/framing/AllInvoker.h" +#include "qpid/framing/Invoker.h" #include <boost/bind.hpp> #include <boost/cast.hpp> @@ -39,22 +38,34 @@ namespace qpid { namespace cluster { + using namespace qpid::framing; using namespace qpid::sys; using namespace std; -using broker::Connection; +// Handle cluster controls from a given member. +struct ClusterOperations : public framing::AMQP_AllOperations::ClusterHandler { + Cluster& cluster; + MemberId member; + + ClusterOperations(Cluster& c, const MemberId& m) : cluster(c), member(m) {} + + void joined(const std::string& url) { + cluster.joined(member, url); + } +}; + ostream& operator <<(ostream& out, const Cluster& cluster) { return out << cluster.name.str() << "-" << cluster.self; } -ostream& operator<<(ostream& out, const Cluster::MemberMap::value_type& m) { - return out << m.first << "=" << m.second.url; +ostream& operator<<(ostream& out, const Cluster::UrlMap::value_type& m) { + return out << m.first << " at " << m.second; } -ostream& operator <<(ostream& out, const Cluster::MemberMap& members) { - ostream_iterator<Cluster::MemberMap::value_type> o(out, " "); - copy(members.begin(), members.end(), o); +ostream& operator <<(ostream& out, const Cluster::UrlMap& urls) { + ostream_iterator<Cluster::UrlMap::value_type> o(out, " "); + copy(urls.begin(), urls.end(), o); return out; } @@ -74,9 +85,9 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : mcastQueue(boost::bind(&Cluster::mcastQueueCb, this, _1, _2)) { broker->addFinalizer(boost::bind(&Cluster::leave, this)); - QPID_LOG(trace, "Joining cluster: " << name_); + QPID_LOG(trace, "Node " << self << " joining cluster: " << name_); cpg.join(name); - notify(); + send(AMQFrame(in_place<ClusterJoinedBody>(ProtocolVersion(), url.str())), ConnectionId(self,0)); // Start dispatching from the poller. cpgDispatchHandle.startWatch(poller); @@ -84,31 +95,15 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : mcastQueue.start(poller); } -Cluster::~Cluster() { - for (ShadowConnectionMap::iterator i = shadowConnectionMap.begin(); - i != shadowConnectionMap.end(); - ++i) - { - i->second->dirtyClose(); - } - std::for_each(localConnectionSet.begin(), localConnectionSet.end(), boost::bind(&ConnectionInterceptor::dirtyClose, _1)); -} - -void Cluster::initialize(broker::Connection& c) { - bool isLocal = c.getOutput().get() != &shadowOut; - if (isLocal) - localConnectionSet.insert(new ConnectionInterceptor(c, *this)); -} +Cluster::~Cluster() {} void Cluster::leave() { Mutex::ScopedLock l(lock); if (!broker) return; // Already left. // Leave is called by from Broker destructor after the poller has // been shut down. No dispatches can occur. - - QPID_LOG(debug, "Leaving cluster " << *this); cpg.leave(name); - // broker= is set to 0 when the final config-change is delivered. + // broker is set to 0 when the final config-change is delivered. while(broker) { Mutex::ScopedUnlock u(lock); cpg.dispatchAll(); @@ -126,9 +121,9 @@ template <class T> void encodePtr(Buffer& buf, T* ptr) { buf.putLongLong(value); } -void Cluster::send(const AMQFrame& frame, ConnectionInterceptor* connection) { - QPID_LOG(trace, "MCAST [" << connection << "] " << frame); - mcastQueue.push(Message(frame, self, connection)); +void Cluster::send(const AMQFrame& frame, const ConnectionId& id) { + QPID_LOG(trace, "MCAST [" << id << "] " << frame); + mcastQueue.push(Message(frame, id)); } void Cluster::mcastQueueCb(const MessageQueue::iterator& begin, @@ -137,48 +132,40 @@ void Cluster::mcastQueueCb(const MessageQueue::iterator& begin, // Static is OK because there is only one cluster allowed per // process and only one thread in mcastQueueCb at a time. static char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management. - MessageQueue::iterator i = begin; - while (i != end) { - Buffer buf(buffer, sizeof(buffer)); - while (i != end && buf.available() > i->frame.size() + sizeof(uint64_t)) { - i->frame.encode(buf); - encodePtr(buf, i->connection); - ++i; - } - iovec iov = { buffer, buf.getPosition() }; - cpg.mcast(name, &iov, 1); + Buffer buf(buffer, sizeof(buffer)); + for (MessageQueue::iterator i = begin; i != end; ++i) { + AMQFrame& frame =i->first; + ConnectionId id =i->second; + if (buf.available() < frame.size() + sizeof(uint64_t)) + break; + frame.encode(buf); + encodePtr(buf, id.second); } -} - -void Cluster::notify() { - send(AMQFrame(in_place<ClusterNotifyBody>(ProtocolVersion(), url.str())), 0); + iovec iov = { buffer, buf.getPosition() }; + cpg.mcast(name, &iov, 1); } size_t Cluster::size() const { Mutex::ScopedLock l(lock); - return members.size(); + return urls.size(); } -Cluster::MemberList Cluster::getMembers() const { +std::vector<Url> Cluster::getUrls() const { Mutex::ScopedLock l(lock); - MemberList result(members.size()); - std::transform(members.begin(), members.end(), result.begin(), - boost::bind(&MemberMap::value_type::second, _1)); + std::vector<Url> result(urls.size()); + std::transform(urls.begin(), urls.end(), result.begin(), boost::bind(&UrlMap::value_type::second, _1)); return result; } -ConnectionInterceptor* Cluster::getShadowConnection(const Cpg::Id& member, void* remotePtr) { - ShadowConnectionId id(member, remotePtr); - ShadowConnectionMap::iterator i = shadowConnectionMap.find(id); - if (i == shadowConnectionMap.end()) { // A new shadow connection. +boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& id) { + boost::intrusive_ptr<Connection> c = connections[id]; + if (!c && id.first != self) { // Shadow connection std::ostringstream os; - os << name << ":" << member << ":" << remotePtr; - assert(broker); - broker::Connection* c = new broker::Connection(&shadowOut, *broker, os.str()); - ShadowConnectionMap::value_type value(id, new ConnectionInterceptor(*c, *this, id)); - i = shadowConnectionMap.insert(value).first; + os << id; + c = connections[id] = new Connection(*this, shadowOut, os.str(), id); } - return i->second; + assert(c); + return c; } void Cluster::deliver( @@ -189,16 +176,17 @@ void Cluster::deliver( void* msg, int msg_len) { - Id from(nodeid, pid); + MemberId from(nodeid, pid); try { Buffer buf(static_cast<char*>(msg), msg_len); while (buf.available() > 0) { AMQFrame frame; if (!frame.decode(buf)) // Not enough data. throw Exception("Received incomplete cluster event."); - void* connection; - decodePtr(buf, connection); - deliverQueue.push(Message(frame, from, connection)); + Connection* cp; + decodePtr(buf, cp); + QPID_LOG(critical, "deliverQ.push " << frame); + deliverQueue.push(Message(frame, ConnectionId(from, cp))); } } catch (const std::exception& e) { @@ -213,23 +201,21 @@ void Cluster::deliverQueueCb(const MessageQueue::iterator& begin, const MessageQueue::iterator& end) { for (MessageQueue::iterator i = begin; i != end; ++i) { - AMQFrame& frame(i->frame); - Id from(i->from); - ConnectionInterceptor* connection = reinterpret_cast<ConnectionInterceptor*>(i->connection); + AMQFrame& frame(i->first); + ConnectionId connectionId(i->second); try { - QPID_LOG(trace, "DLVR [" << from << " " << connection << "] " << frame); - + QPID_LOG(trace, "DLVR [" << connectionId << "]: " << frame); if (!broker) { - QPID_LOG(warning, "Unexpected DLVR, already left the cluster."); + QPID_LOG(error, "Unexpected DLVR after leaving the cluster."); return; } - if (connection && from != self) // Look up shadow for remote connections - connection = getShadowConnection(from, connection); - - if (frame.getMethod() && frame.getMethod()->amqpClassId() == CLUSTER_CLASS_ID) - handleMethod(from, connection, *frame.getMethod()); - else - connection->deliver(frame); + if (connectionId.getConnectionPtr()) // Connection control + getConnection(connectionId)->deliver(frame); + else { // Cluster control + ClusterOperations cops(*this, connectionId.getMember()); + bool invoked = framing::invoke(cops, *frame.getBody()).wasHandled(); + assert(invoked); + } } catch (const std::exception& e) { // FIXME aconway 2008-01-30: exception handling. @@ -240,54 +226,30 @@ void Cluster::deliverQueueCb(const MessageQueue::iterator& begin, } } -// Handle cluster methods -// FIXME aconway 2008-07-11: Generate/template a better dispatch mechanism. -void Cluster::handleMethod(Id from, ConnectionInterceptor* connection, AMQMethodBody& method) { - assert(method.amqpClassId() == CLUSTER_CLASS_ID); - switch (method.amqpMethodId()) { - case CLUSTER_NOTIFY_METHOD_ID: { - ClusterNotifyBody& notify=static_cast<ClusterNotifyBody&>(method); - Mutex::ScopedLock l(lock); - members[from].url=notify.getUrl(); - lock.notifyAll(); - break; - } - case CLUSTER_CONNECTION_CLOSE_METHOD_ID: { - if (!connection->isLocal()) - shadowConnectionMap.erase(connection->getShadowId()); - else - localConnectionSet.erase(connection); - connection->deliverClosed(); - break; - } - case CLUSTER_CONNECTION_DO_OUTPUT_METHOD_ID: { - ClusterConnectionDoOutputBody& doOutput = static_cast<ClusterConnectionDoOutputBody&>(method); - connection->deliverDoOutput(doOutput.getBytes()); - break; - } - default: - assert(0); - } +void Cluster::joined(const MemberId& member, const string& url) { + Mutex::ScopedLock l(lock); + QPID_LOG(debug, member << " has URL " << url); + urls[member] = url; + lock.notifyAll(); } void Cluster::configChange( cpg_handle_t /*handle*/, cpg_name */*group*/, - cpg_address *current, int nCurrent, + cpg_address */*current*/, int /*nCurrent*/, cpg_address *left, int nLeft, - cpg_address */*joined*/, int nJoined) + cpg_address *joined, int nJoined) { + QPID_LOG(debug, "Cluster change: " << std::make_pair(joined, nJoined) << std::make_pair(left, nLeft)); Mutex::ScopedLock l(lock); - for (int i = 0; i < nLeft; ++i) - members.erase(left[i]); - for(int j = 0; j < nCurrent; ++j) - members[current[j]].id = current[j]; - QPID_LOG(debug, "Cluster members: " << nCurrent << " ("<< nLeft << " left, " << nJoined << " joined):" - << members); - assert(members.size() == size_t(nCurrent)); - if (members.find(self) == members.end()) + // We add URLs to the map in joined() we don't keep track of pre-URL members yet. + for (int l = 0; l < nLeft; ++l) urls.erase(left[l]); + + if (std::find(left, left+nLeft, self) != left+nLeft) { broker = 0; // We have left the group, this is the final config change. - lock.notifyAll(); // Threads waiting for membership changes. + QPID_LOG(debug, "Leaving cluster " << *this); + } + lock.notifyAll(); // Threads waiting for url changes. } void Cluster::dispatch(sys::DispatchHandle& h) { @@ -301,6 +263,16 @@ void Cluster::disconnect(sys::DispatchHandle& h) { broker->shutdown(); } +void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { + Mutex::ScopedLock l(lock); + connections[c->getId()] = c; +} + +void Cluster::erase(ConnectionId id) { + Mutex::ScopedLock l(lock); + connections.erase(id); +} + }} // namespace qpid::cluster |