diff options
author | Alan Conway <aconway@apache.org> | 2008-09-03 03:21:00 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-09-03 03:21:00 +0000 |
commit | e8f6b7cd234088e7c33e42eb10e29719ea8e8aa9 (patch) | |
tree | 2d7df29ebba337fbbe28aa7716f0e32ff9e24c70 /cpp/src/qpid/cluster/Cluster.cpp | |
parent | 05b6583dc0d080d6bc5a0cca09218bb045090daf (diff) | |
download | qpid-python-e8f6b7cd234088e7c33e42eb10e29719ea8e8aa9.tar.gz |
Cluster multicasts buffers rather than frames.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@691489 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 215 |
1 files changed, 91 insertions, 124 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index aea10949e4..f93203acbf 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -4,7 +4,7 @@ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. -n * You may obtain a copy of the License at + * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -20,14 +20,17 @@ n * You may obtain a copy of the License at #include "Connection.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/SessionState.h" +#include "qpid/broker/Connection.h" #include "qpid/framing/AMQFrame.h" -#include "qpid/framing/ClusterJoinedBody.h" +#include "qpid/framing/AMQP_AllOperations.h" +#include "qpid/framing/AllInvoker.h" +#include "qpid/framing/ClusterUrlNoticeBody.h" +#include "qpid/framing/ClusterConnectionDeliverCloseBody.h" +#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" #include "qpid/log/Statement.h" #include "qpid/memory.h" #include "qpid/shared_ptr.h" -#include "qpid/framing/AMQP_AllOperations.h" -#include "qpid/framing/AllInvoker.h" -#include "qpid/framing/Invoker.h" #include <boost/bind.hpp> #include <boost/cast.hpp> @@ -38,36 +41,17 @@ n * You may obtain a copy of the License at namespace qpid { namespace cluster { - using namespace qpid::framing; using namespace qpid::sys; using namespace std; -// Handle cluster controls from a given member. -struct ClusterOperations : public framing::AMQP_AllOperations::ClusterHandler { +struct ClusterOperations : public AMQP_AllOperations::ClusterHandler { Cluster& cluster; MemberId member; - - ClusterOperations(Cluster& c, const MemberId& m) : cluster(c), member(m) {} - - void joined(const std::string& url) { - cluster.joined(member, url); - } + ClusterOperations(Cluster& c, const MemberId& id) : cluster(c), member(id) {} + void urlNotice(const std::string& u) { cluster.urlNotice (member, u); } + bool invoke(AMQFrame& f) { return framing::invoke(*this, *f.getBody()).wasHandled(); } }; - -ostream& operator <<(ostream& out, const Cluster& cluster) { - return out << cluster.name.str() << "-" << cluster.self; -} - -ostream& operator<<(ostream& out, const Cluster::UrlMap::value_type& m) { - return out << m.first << " at " << m.second; -} - -ostream& operator <<(ostream& out, const Cluster::UrlMap& urls) { - ostream_iterator<Cluster::UrlMap::value_type> o(out, " "); - copy(urls.begin(), urls.end(), o); - return out; -} Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : broker(&b), @@ -80,30 +64,39 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : boost::bind(&Cluster::dispatch, this, _1), // read 0, // write boost::bind(&Cluster::disconnect, this, _1) // disconnect - ), - deliverQueue(boost::bind(&Cluster::deliverQueueCb, this, _1, _2)), - mcastQueue(boost::bind(&Cluster::mcastQueueCb, this, _1, _2)) + ) { broker->addFinalizer(boost::bind(&Cluster::leave, this)); - QPID_LOG(trace, "Node " << self << " joining cluster: " << name_); + QPID_LOG(trace, "Joining cluster: " << name << " as " << self); cpg.join(name); - send(AMQFrame(in_place<ClusterJoinedBody>(ProtocolVersion(), url.str())), ConnectionId(self,0)); + mcastFrame(AMQFrame(in_place<ClusterUrlNoticeBody>(ProtocolVersion(), url.str())), + ConnectionId(self,0)); // Start dispatching from the poller. cpgDispatchHandle.startWatch(poller); - deliverQueue.start(poller); - mcastQueue.start(poller); } Cluster::~Cluster() {} +void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { + Mutex::ScopedLock l(lock); + connections.insert(ConnectionMap::value_type(ConnectionId(self, c.get()), c)); +} + +void Cluster::erase(ConnectionId id) { + Mutex::ScopedLock l(lock); + connections.erase(id); +} + void Cluster::leave() { Mutex::ScopedLock l(lock); if (!broker) return; // Already left. // Leave is called by from Broker destructor after the poller has // been shut down. No dispatches can occur. + + QPID_LOG(debug, "Leaving cluster " << name.str()); cpg.leave(name); - // broker is set to 0 when the final config-change is delivered. + // broker= is set to 0 when the final config-change is delivered. while(broker) { Mutex::ScopedUnlock u(lock); cpg.dispatchAll(); @@ -121,30 +114,30 @@ template <class T> void encodePtr(Buffer& buf, T* ptr) { buf.putLongLong(value); } -void Cluster::send(const AMQFrame& frame, const ConnectionId& id) { - QPID_LOG(trace, "MCAST [" << id << "] " << frame); - mcastQueue.push(Message(frame, id)); -} - -void Cluster::mcastQueueCb(const MessageQueue::iterator& begin, - const MessageQueue::iterator& end) -{ - // Static is OK because there is only one cluster allowed per - // process and only one thread in mcastQueueCb at a time. - static char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management. +void Cluster::mcastFrame(const AMQFrame& frame, const ConnectionId& connection) { + QPID_LOG(trace, "MCAST [" << connection << "] " << frame); + // FIXME aconway 2008-09-02: restore queueing. + Mutex::ScopedLock l(lock); // FIXME aconway 2008-09-02: review locking. + static char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management or FrameEncoder. Buffer buf(buffer, sizeof(buffer)); - for (MessageQueue::iterator i = begin; i != end; ++i) { - AMQFrame& frame =i->first; - ConnectionId id =i->second; - if (buf.available() < frame.size() + sizeof(uint64_t)) - break; - frame.encode(buf); - encodePtr(buf, id.second); - } + buf.putOctet(CONTROL); + encodePtr(buf, connection.getConnectionPtr()); + frame.encode(buf); iovec iov = { buffer, buf.getPosition() }; cpg.mcast(name, &iov, 1); } +void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& id) { + // FIXME aconway 2008-09-02: does this need locking? + Mutex::ScopedLock l(lock); // FIXME aconway 2008-09-02: review locking. + char hdrbuf[1+sizeof(uint64_t)]; + Buffer buf(hdrbuf, sizeof(hdrbuf)); + buf.putOctet(DATA); + encodePtr(buf, id.getConnectionPtr()); + iovec iov[] = { { hdrbuf, buf.getPosition() }, { const_cast<char*>(data), size } }; + cpg.mcast(name, iov, sizeof(iov)/sizeof(*iov)); +} + size_t Cluster::size() const { Mutex::ScopedLock l(lock); return urls.size(); @@ -153,19 +146,23 @@ size_t Cluster::size() const { std::vector<Url> Cluster::getUrls() const { Mutex::ScopedLock l(lock); std::vector<Url> result(urls.size()); - std::transform(urls.begin(), urls.end(), result.begin(), boost::bind(&UrlMap::value_type::second, _1)); + std::transform(urls.begin(), urls.end(), result.begin(), + boost::bind(&UrlMap::value_type::second, _1)); return result; } boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& id) { - boost::intrusive_ptr<Connection> c = connections[id]; - if (!c && id.first != self) { // Shadow connection - std::ostringstream os; - os << id; - c = connections[id] = new Connection(*this, shadowOut, os.str(), id); + if (id.getMember() == self) + return boost::intrusive_ptr<Connection>(id.getConnectionPtr()); + ConnectionMap::iterator i = connections.find(id); + if (i == connections.end()) { // New shadow connection. + assert(id.getMember() != self); + std::ostringstream mgmtId; + mgmtId << name << ":" << id; + ConnectionMap::value_type value(id, new Connection(*this, shadowOut, mgmtId.str(), id)); + i = connections.insert(value).first; } - assert(c); - return c; + return i->second; } void Cluster::deliver( @@ -176,17 +173,28 @@ void Cluster::deliver( void* msg, int msg_len) { - MemberId from(nodeid, pid); try { + MemberId from(nodeid, pid); Buffer buf(static_cast<char*>(msg), msg_len); - while (buf.available() > 0) { + Connection* connection; + uint8_t type = buf.getOctet(); + decodePtr(buf, connection); + if (connection == 0) { // Cluster controls AMQFrame frame; - if (!frame.decode(buf)) // Not enough data. - throw Exception("Received incomplete cluster event."); - Connection* cp; - decodePtr(buf, cp); - QPID_LOG(critical, "deliverQ.push " << frame); - deliverQueue.push(Message(frame, ConnectionId(from, cp))); + while (frame.decode(buf)) + if (!ClusterOperations(*this, from).invoke(frame)) + throw Exception("Invalid cluster control"); + } + else { // Connection data or control + boost::intrusive_ptr<Connection> c = + getConnection(ConnectionId(from, connection)); + if (type == DATA) + c->deliverBuffer(buf); + else { + AMQFrame frame; + while (frame.decode(buf)) + c->deliver(frame); + } } } catch (const std::exception& e) { @@ -197,59 +205,24 @@ void Cluster::deliver( } } -void Cluster::deliverQueueCb(const MessageQueue::iterator& begin, - const MessageQueue::iterator& end) -{ - for (MessageQueue::iterator i = begin; i != end; ++i) { - AMQFrame& frame(i->first); - ConnectionId connectionId(i->second); - try { - QPID_LOG(trace, "DLVR [" << connectionId << "]: " << frame); - if (!broker) { - QPID_LOG(error, "Unexpected DLVR after leaving the cluster."); - return; - } - if (connectionId.getConnectionPtr()) // Connection control - getConnection(connectionId)->deliver(frame); - else { // Cluster control - ClusterOperations cops(*this, connectionId.getMember()); - bool invoked = framing::invoke(cops, *frame.getBody()).wasHandled(); - assert(invoked); - } - } - catch (const std::exception& e) { - // FIXME aconway 2008-01-30: exception handling. - QPID_LOG(critical, "Error in cluster deliverQueueCb: " << e.what()); - assert(0); - throw; - } - } -} - -void Cluster::joined(const MemberId& member, const string& url) { - Mutex::ScopedLock l(lock); - QPID_LOG(debug, member << " has URL " << url); - urls[member] = url; - lock.notifyAll(); -} - void Cluster::configChange( cpg_handle_t /*handle*/, cpg_name */*group*/, - cpg_address */*current*/, int /*nCurrent*/, + cpg_address *current, int nCurrent, cpg_address *left, int nLeft, - cpg_address *joined, int nJoined) + cpg_address */*joined*/, int /*nJoined*/) { - QPID_LOG(debug, "Cluster change: " << std::make_pair(joined, nJoined) << std::make_pair(left, nLeft)); + QPID_LOG(debug, "Cluster change: " + << std::make_pair(current, nCurrent) + << std::make_pair(left, nLeft)); + Mutex::ScopedLock l(lock); - // We add URLs to the map in joined() we don't keep track of pre-URL members yet. - for (int l = 0; l < nLeft; ++l) urls.erase(left[l]); + for (int i = 0; i < nLeft; ++i) urls.erase(left[i]); + // Add new members when their URL notice arraives. - if (std::find(left, left+nLeft, self) != left+nLeft) { + if (std::find(left, left+nLeft, self) != left+nLeft) broker = 0; // We have left the group, this is the final config change. - QPID_LOG(debug, "Leaving cluster " << *this); - } - lock.notifyAll(); // Threads waiting for url changes. + lock.notifyAll(); // Threads waiting for membership changes. } void Cluster::dispatch(sys::DispatchHandle& h) { @@ -263,14 +236,8 @@ void Cluster::disconnect(sys::DispatchHandle& h) { broker->shutdown(); } -void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { - Mutex::ScopedLock l(lock); - connections[c->getId()] = c; -} - -void Cluster::erase(ConnectionId id) { - Mutex::ScopedLock l(lock); - connections.erase(id); +void Cluster::urlNotice(const MemberId& m, const std::string& url) { + urls.insert(UrlMap::value_type(m,Url(url))); } }} // namespace qpid::cluster |