summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp215
1 files changed, 91 insertions, 124 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index aea10949e4..f93203acbf 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -4,7 +4,7 @@
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
-n * You may obtain a copy of the License at
+ * You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -20,14 +20,17 @@ n * You may obtain a copy of the License at
#include "Connection.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/SessionState.h"
+#include "qpid/broker/Connection.h"
#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/ClusterJoinedBody.h"
+#include "qpid/framing/AMQP_AllOperations.h"
+#include "qpid/framing/AllInvoker.h"
+#include "qpid/framing/ClusterUrlNoticeBody.h"
+#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
+#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
#include "qpid/log/Statement.h"
#include "qpid/memory.h"
#include "qpid/shared_ptr.h"
-#include "qpid/framing/AMQP_AllOperations.h"
-#include "qpid/framing/AllInvoker.h"
-#include "qpid/framing/Invoker.h"
#include <boost/bind.hpp>
#include <boost/cast.hpp>
@@ -38,36 +41,17 @@ n * You may obtain a copy of the License at
namespace qpid {
namespace cluster {
-
using namespace qpid::framing;
using namespace qpid::sys;
using namespace std;
-// Handle cluster controls from a given member.
-struct ClusterOperations : public framing::AMQP_AllOperations::ClusterHandler {
+struct ClusterOperations : public AMQP_AllOperations::ClusterHandler {
Cluster& cluster;
MemberId member;
-
- ClusterOperations(Cluster& c, const MemberId& m) : cluster(c), member(m) {}
-
- void joined(const std::string& url) {
- cluster.joined(member, url);
- }
+ ClusterOperations(Cluster& c, const MemberId& id) : cluster(c), member(id) {}
+ void urlNotice(const std::string& u) { cluster.urlNotice (member, u); }
+ bool invoke(AMQFrame& f) { return framing::invoke(*this, *f.getBody()).wasHandled(); }
};
-
-ostream& operator <<(ostream& out, const Cluster& cluster) {
- return out << cluster.name.str() << "-" << cluster.self;
-}
-
-ostream& operator<<(ostream& out, const Cluster::UrlMap::value_type& m) {
- return out << m.first << " at " << m.second;
-}
-
-ostream& operator <<(ostream& out, const Cluster::UrlMap& urls) {
- ostream_iterator<Cluster::UrlMap::value_type> o(out, " ");
- copy(urls.begin(), urls.end(), o);
- return out;
-}
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
broker(&b),
@@ -80,30 +64,39 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
boost::bind(&Cluster::dispatch, this, _1), // read
0, // write
boost::bind(&Cluster::disconnect, this, _1) // disconnect
- ),
- deliverQueue(boost::bind(&Cluster::deliverQueueCb, this, _1, _2)),
- mcastQueue(boost::bind(&Cluster::mcastQueueCb, this, _1, _2))
+ )
{
broker->addFinalizer(boost::bind(&Cluster::leave, this));
- QPID_LOG(trace, "Node " << self << " joining cluster: " << name_);
+ QPID_LOG(trace, "Joining cluster: " << name << " as " << self);
cpg.join(name);
- send(AMQFrame(in_place<ClusterJoinedBody>(ProtocolVersion(), url.str())), ConnectionId(self,0));
+ mcastFrame(AMQFrame(in_place<ClusterUrlNoticeBody>(ProtocolVersion(), url.str())),
+ ConnectionId(self,0));
// Start dispatching from the poller.
cpgDispatchHandle.startWatch(poller);
- deliverQueue.start(poller);
- mcastQueue.start(poller);
}
Cluster::~Cluster() {}
+void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
+ Mutex::ScopedLock l(lock);
+ connections.insert(ConnectionMap::value_type(ConnectionId(self, c.get()), c));
+}
+
+void Cluster::erase(ConnectionId id) {
+ Mutex::ScopedLock l(lock);
+ connections.erase(id);
+}
+
void Cluster::leave() {
Mutex::ScopedLock l(lock);
if (!broker) return; // Already left.
// Leave is called by from Broker destructor after the poller has
// been shut down. No dispatches can occur.
+
+ QPID_LOG(debug, "Leaving cluster " << name.str());
cpg.leave(name);
- // broker is set to 0 when the final config-change is delivered.
+ // broker= is set to 0 when the final config-change is delivered.
while(broker) {
Mutex::ScopedUnlock u(lock);
cpg.dispatchAll();
@@ -121,30 +114,30 @@ template <class T> void encodePtr(Buffer& buf, T* ptr) {
buf.putLongLong(value);
}
-void Cluster::send(const AMQFrame& frame, const ConnectionId& id) {
- QPID_LOG(trace, "MCAST [" << id << "] " << frame);
- mcastQueue.push(Message(frame, id));
-}
-
-void Cluster::mcastQueueCb(const MessageQueue::iterator& begin,
- const MessageQueue::iterator& end)
-{
- // Static is OK because there is only one cluster allowed per
- // process and only one thread in mcastQueueCb at a time.
- static char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management.
+void Cluster::mcastFrame(const AMQFrame& frame, const ConnectionId& connection) {
+ QPID_LOG(trace, "MCAST [" << connection << "] " << frame);
+ // FIXME aconway 2008-09-02: restore queueing.
+ Mutex::ScopedLock l(lock); // FIXME aconway 2008-09-02: review locking.
+ static char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management or FrameEncoder.
Buffer buf(buffer, sizeof(buffer));
- for (MessageQueue::iterator i = begin; i != end; ++i) {
- AMQFrame& frame =i->first;
- ConnectionId id =i->second;
- if (buf.available() < frame.size() + sizeof(uint64_t))
- break;
- frame.encode(buf);
- encodePtr(buf, id.second);
- }
+ buf.putOctet(CONTROL);
+ encodePtr(buf, connection.getConnectionPtr());
+ frame.encode(buf);
iovec iov = { buffer, buf.getPosition() };
cpg.mcast(name, &iov, 1);
}
+void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& id) {
+ // FIXME aconway 2008-09-02: does this need locking?
+ Mutex::ScopedLock l(lock); // FIXME aconway 2008-09-02: review locking.
+ char hdrbuf[1+sizeof(uint64_t)];
+ Buffer buf(hdrbuf, sizeof(hdrbuf));
+ buf.putOctet(DATA);
+ encodePtr(buf, id.getConnectionPtr());
+ iovec iov[] = { { hdrbuf, buf.getPosition() }, { const_cast<char*>(data), size } };
+ cpg.mcast(name, iov, sizeof(iov)/sizeof(*iov));
+}
+
size_t Cluster::size() const {
Mutex::ScopedLock l(lock);
return urls.size();
@@ -153,19 +146,23 @@ size_t Cluster::size() const {
std::vector<Url> Cluster::getUrls() const {
Mutex::ScopedLock l(lock);
std::vector<Url> result(urls.size());
- std::transform(urls.begin(), urls.end(), result.begin(), boost::bind(&UrlMap::value_type::second, _1));
+ std::transform(urls.begin(), urls.end(), result.begin(),
+ boost::bind(&UrlMap::value_type::second, _1));
return result;
}
boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& id) {
- boost::intrusive_ptr<Connection> c = connections[id];
- if (!c && id.first != self) { // Shadow connection
- std::ostringstream os;
- os << id;
- c = connections[id] = new Connection(*this, shadowOut, os.str(), id);
+ if (id.getMember() == self)
+ return boost::intrusive_ptr<Connection>(id.getConnectionPtr());
+ ConnectionMap::iterator i = connections.find(id);
+ if (i == connections.end()) { // New shadow connection.
+ assert(id.getMember() != self);
+ std::ostringstream mgmtId;
+ mgmtId << name << ":" << id;
+ ConnectionMap::value_type value(id, new Connection(*this, shadowOut, mgmtId.str(), id));
+ i = connections.insert(value).first;
}
- assert(c);
- return c;
+ return i->second;
}
void Cluster::deliver(
@@ -176,17 +173,28 @@ void Cluster::deliver(
void* msg,
int msg_len)
{
- MemberId from(nodeid, pid);
try {
+ MemberId from(nodeid, pid);
Buffer buf(static_cast<char*>(msg), msg_len);
- while (buf.available() > 0) {
+ Connection* connection;
+ uint8_t type = buf.getOctet();
+ decodePtr(buf, connection);
+ if (connection == 0) { // Cluster controls
AMQFrame frame;
- if (!frame.decode(buf)) // Not enough data.
- throw Exception("Received incomplete cluster event.");
- Connection* cp;
- decodePtr(buf, cp);
- QPID_LOG(critical, "deliverQ.push " << frame);
- deliverQueue.push(Message(frame, ConnectionId(from, cp)));
+ while (frame.decode(buf))
+ if (!ClusterOperations(*this, from).invoke(frame))
+ throw Exception("Invalid cluster control");
+ }
+ else { // Connection data or control
+ boost::intrusive_ptr<Connection> c =
+ getConnection(ConnectionId(from, connection));
+ if (type == DATA)
+ c->deliverBuffer(buf);
+ else {
+ AMQFrame frame;
+ while (frame.decode(buf))
+ c->deliver(frame);
+ }
}
}
catch (const std::exception& e) {
@@ -197,59 +205,24 @@ void Cluster::deliver(
}
}
-void Cluster::deliverQueueCb(const MessageQueue::iterator& begin,
- const MessageQueue::iterator& end)
-{
- for (MessageQueue::iterator i = begin; i != end; ++i) {
- AMQFrame& frame(i->first);
- ConnectionId connectionId(i->second);
- try {
- QPID_LOG(trace, "DLVR [" << connectionId << "]: " << frame);
- if (!broker) {
- QPID_LOG(error, "Unexpected DLVR after leaving the cluster.");
- return;
- }
- if (connectionId.getConnectionPtr()) // Connection control
- getConnection(connectionId)->deliver(frame);
- else { // Cluster control
- ClusterOperations cops(*this, connectionId.getMember());
- bool invoked = framing::invoke(cops, *frame.getBody()).wasHandled();
- assert(invoked);
- }
- }
- catch (const std::exception& e) {
- // FIXME aconway 2008-01-30: exception handling.
- QPID_LOG(critical, "Error in cluster deliverQueueCb: " << e.what());
- assert(0);
- throw;
- }
- }
-}
-
-void Cluster::joined(const MemberId& member, const string& url) {
- Mutex::ScopedLock l(lock);
- QPID_LOG(debug, member << " has URL " << url);
- urls[member] = url;
- lock.notifyAll();
-}
-
void Cluster::configChange(
cpg_handle_t /*handle*/,
cpg_name */*group*/,
- cpg_address */*current*/, int /*nCurrent*/,
+ cpg_address *current, int nCurrent,
cpg_address *left, int nLeft,
- cpg_address *joined, int nJoined)
+ cpg_address */*joined*/, int /*nJoined*/)
{
- QPID_LOG(debug, "Cluster change: " << std::make_pair(joined, nJoined) << std::make_pair(left, nLeft));
+ QPID_LOG(debug, "Cluster change: "
+ << std::make_pair(current, nCurrent)
+ << std::make_pair(left, nLeft));
+
Mutex::ScopedLock l(lock);
- // We add URLs to the map in joined() we don't keep track of pre-URL members yet.
- for (int l = 0; l < nLeft; ++l) urls.erase(left[l]);
+ for (int i = 0; i < nLeft; ++i) urls.erase(left[i]);
+ // Add new members when their URL notice arraives.
- if (std::find(left, left+nLeft, self) != left+nLeft) {
+ if (std::find(left, left+nLeft, self) != left+nLeft)
broker = 0; // We have left the group, this is the final config change.
- QPID_LOG(debug, "Leaving cluster " << *this);
- }
- lock.notifyAll(); // Threads waiting for url changes.
+ lock.notifyAll(); // Threads waiting for membership changes.
}
void Cluster::dispatch(sys::DispatchHandle& h) {
@@ -263,14 +236,8 @@ void Cluster::disconnect(sys::DispatchHandle& h) {
broker->shutdown();
}
-void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
- Mutex::ScopedLock l(lock);
- connections[c->getId()] = c;
-}
-
-void Cluster::erase(ConnectionId id) {
- Mutex::ScopedLock l(lock);
- connections.erase(id);
+void Cluster::urlNotice(const MemberId& m, const std::string& url) {
+ urls.insert(UrlMap::value_type(m,Url(url)));
}
}} // namespace qpid::cluster