summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-08-29 18:18:45 +0000
committerAlan Conway <aconway@apache.org>2008-08-29 18:18:45 +0000
commitac1660c072fc1fc783731b7cd216861fc6999ac0 (patch)
tree5e401b8cd8e4c491cefa4e74b8185e2cfc032736
parent605ec8d6c0cfab3683fd962e42fbcd39b4b53db9 (diff)
downloadqpid-python-ac1660c072fc1fc783731b7cd216861fc6999ac0.tar.gz
Refactored cluster to intercept at ConnectionCode, using sys:: interfaces rather than boost functions.
Use framing::Operations and Invoker to dispatch cluster methods. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@690358 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/cluster.mk14
-rw-r--r--qpid/cpp/src/qpid/Url.h4
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/Connection.cpp14
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/Connection.h15
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h7
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.cpp17
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.h11
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionFactory.cpp15
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionFactory.h3
-rw-r--r--qpid/cpp/src/qpid/cluster/ClassifierHandler.cpp51
-rw-r--r--qpid/cpp/src/qpid/cluster/ClassifierHandler.h50
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp214
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h74
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp36
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp94
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h (renamed from qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h)75
-rw-r--r--qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp64
-rw-r--r--qpid/cpp/src/qpid/cluster/ConnectionCodec.h77
-rw-r--r--qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp102
-rw-r--r--qpid/cpp/src/qpid/cluster/Cpg.cpp44
-rw-r--r--qpid/cpp/src/qpid/cluster/Cpg.h27
-rw-r--r--qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h (renamed from qpid/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h)10
-rw-r--r--qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp19
-rw-r--r--qpid/cpp/src/qpid/cluster/OutputInterceptor.h8
-rw-r--r--qpid/cpp/src/qpid/cluster/PollableQueue.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/ProxyInputHandler.h57
-rw-r--r--qpid/cpp/src/qpid/cluster/types.h58
-rw-r--r--qpid/cpp/src/qpid/sys/ConnectionCodec.h6
-rw-r--r--qpid/cpp/src/qpid/sys/ConnectionInputHandler.h1
-rw-r--r--qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h3
-rw-r--r--qpid/cpp/src/tests/ForkedBroker.h6
-rw-r--r--qpid/cpp/src/tests/cluster_test.cpp12
-rw-r--r--qpid/cpp/xml/cluster.xml12
34 files changed, 670 insertions, 538 deletions
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk
index 934ec0174b..d5be6dfc57 100644
--- a/qpid/cpp/src/cluster.mk
+++ b/qpid/cpp/src/cluster.mk
@@ -6,24 +6,26 @@ lib_LTLIBRARIES += libqpidcluster.la
if CPG
libqpidcluster_la_SOURCES = \
+ qpid/cluster/types.h \
qpid/cluster/Cluster.cpp \
qpid/cluster/Cluster.h \
qpid/cluster/Cpg.cpp \
qpid/cluster/Cpg.h \
qpid/cluster/Dispatchable.h \
qpid/cluster/ClusterPlugin.cpp \
- qpid/cluster/ConnectionInterceptor.h \
- qpid/cluster/ConnectionInterceptor.cpp \
- qpid/cluster/ClassifierHandler.h \
- qpid/cluster/ClassifierHandler.cpp \
- qpid/cluster/ShadowConnectionOutputHandler.h \
+ qpid/cluster/ConnectionCodec.h \
+ qpid/cluster/ConnectionCodec.cpp \
+ qpid/cluster/Connection.h \
+ qpid/cluster/Connection.cpp \
+ qpid/cluster/NoOpConnectionOutputHandler.h \
qpid/cluster/PollableCondition.h \
qpid/cluster/PollableCondition.cpp \
qpid/cluster/PollableQueue.h \
qpid/cluster/WriteEstimate.h \
qpid/cluster/WriteEstimate.cpp \
qpid/cluster/OutputInterceptor.h \
- qpid/cluster/OutputInterceptor.cpp
+ qpid/cluster/OutputInterceptor.cpp \
+ qpid/cluster/ProxyInputHandler.h
libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la
diff --git a/qpid/cpp/src/qpid/Url.h b/qpid/cpp/src/qpid/Url.h
index 20f42db0ad..97b72ea993 100644
--- a/qpid/cpp/src/qpid/Url.h
+++ b/qpid/cpp/src/qpid/Url.h
@@ -76,7 +76,9 @@ struct Url : public std::vector<Address> {
/** Parse url, throw InvalidUrl if invalid. */
explicit Url(const char* url) { parse(url); }
- template<class T> Url& operator=(T s) { parse(s); return *this; }
+ Url& operator=(const Url& u) { this->std::vector<Address>::operator=(u); cache=u.cache; return *this; }
+ Url& operator=(const char* s) { parse(s); return *this; }
+ Url& operator=(const std::string& s) { parse(s); return *this; }
/** Throw InvalidUrl if the URL does not contain any addresses. */
void throwIfEmpty() const;
diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
index 0b996dedd2..15a8e9663d 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
+++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
@@ -21,16 +21,22 @@
#include "Connection.h"
#include "qpid/log/Statement.h"
#include "qpid/amqp_0_10/exceptions.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/framing/ProtocolInitiation.h"
namespace qpid {
namespace amqp_0_10 {
using sys::Mutex;
-Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient)
- : frameQueueClosed(false), output(o),
- connection(new broker::Connection(this, broker, id, _isClient)),
- identifier(id), initialized(false), isClient(_isClient), buffered(0) {}
+Connection::Connection(sys::OutputControl& o, const std::string& id, bool _isClient)
+ : frameQueueClosed(false), output(o), identifier(id), initialized(false), isClient(_isClient), buffered(0)
+{}
+
+void Connection::setInputHandler(std::auto_ptr<sys::ConnectionInputHandler> c) {
+ connection = c;
+}
size_t Connection::decode(const char* buffer, size_t size) {
framing::Buffer in(const_cast<char*>(buffer), size);
diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.h b/qpid/cpp/src/qpid/amqp_0_10/Connection.h
index f6fb87f928..409ac0b9e3 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/Connection.h
+++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.h
@@ -22,15 +22,19 @@
*
*/
#include "qpid/sys/ConnectionCodec.h"
+#include "qpid/sys/ConnectionInputHandler.h"
#include "qpid/sys/ConnectionOutputHandler.h"
#include "qpid/sys/Mutex.h"
-#include "qpid/broker/Connection.h"
#include <boost/intrusive_ptr.hpp>
-#include <deque>
#include <memory>
+#include <deque>
namespace qpid {
-namespace broker { class Broker; }
+
+namespace sys {
+class ConnectionInputHandlerFactory;
+}
+
namespace amqp_0_10 {
class Connection : public sys::ConnectionCodec,
@@ -42,14 +46,15 @@ class Connection : public sys::ConnectionCodec,
bool frameQueueClosed;
mutable sys::Mutex frameQueueLock;
sys::OutputControl& output;
- boost::intrusive_ptr<broker::Connection> connection;
+ std::auto_ptr<sys::ConnectionInputHandler> connection;
std::string identifier;
bool initialized;
bool isClient;
size_t buffered;
public:
- Connection(sys::OutputControl&, broker::Broker&, const std::string& id, bool isClient = false);
+ Connection(sys::OutputControl&, const std::string& id, bool isClient);
+ void setInputHandler(std::auto_ptr<sys::ConnectionInputHandler> c);
size_t decode(const char* buffer, size_t size);
size_t encode(const char* buffer, size_t size);
bool isClosed() const;
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 4d7c07649b..e983aee5c9 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -133,7 +133,7 @@ Broker::Broker(const Broker::Options& conf) :
acl(0),
dataDir(conf.noDataDir ? std::string () : conf.dataDir),
links(this),
- factory(*this),
+ factory(new ConnectionFactory(*this)),
sessionManager(
qpid::SessionState::Configuration(
conf.replayFlushLimit*1024, // convert kb to bytes.
@@ -372,7 +372,7 @@ uint16_t Broker::getPort() const {
// TODO: This should iterate over all protocolFactories
void Broker::accept() {
for (unsigned int i = 0; i < protocolFactories.size(); ++i)
- protocolFactories[i]->accept(poller, &factory);
+ protocolFactories[i]->accept(poller, factory.get());
}
@@ -382,7 +382,7 @@ void Broker::connect(
boost::function2<void, int, std::string> failed,
sys::ConnectionCodec::Factory* f)
{
- getProtocolFactory()->connect(poller, host, port, f ? f : &factory, failed);
+ getProtocolFactory()->connect(poller, host, port, f ? f : factory.get(), failed);
}
void Broker::connect(
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index f7399c375f..bd17d1bb00 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -103,7 +103,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
QueueRegistry queues;
ExchangeRegistry exchanges;
LinkRegistry links;
- ConnectionFactory factory;
+ boost::shared_ptr<sys::ConnectionCodec::Factory> factory;
DtxManager dtxManager;
SessionManager sessionManager;
management::ManagementAgent* managementAgent;
@@ -178,7 +178,10 @@ class Broker : public sys::Runnable, public Plugin::Target,
boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory() const;
/** Expose poller so plugins can register their descriptors. */
- boost::shared_ptr<sys::Poller> getPoller();
+ boost::shared_ptr<sys::Poller> getPoller();
+
+ boost::shared_ptr<sys::ConnectionCodec::Factory> getConnectionFactory() { return factory; }
+ void setConnectionFactory(boost::shared_ptr<sys::ConnectionCodec::Factory> f) { factory = f; }
};
}}
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index d65dbaeec7..8ed3ce84c8 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -49,9 +49,6 @@ namespace broker {
Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink_) :
ConnectionState(out_, broker_),
- receivedFn(boost::bind(&Connection::receivedImpl, this, _1)),
- closedFn(boost::bind(&Connection::closedImpl, this)),
- doOutputFn(boost::bind(&Connection::doOutputImpl, this)),
adapter(*this, isLink_),
isLink(isLink_),
mgmtClosing(false),
@@ -72,8 +69,6 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std
mgmtObject = new management::Connection(agent, this, parent, mgmtId, !isLink);
agent->addObject(mgmtObject);
}
-
- Plugin::initializeAll(*this); // Let plug-ins update extension points.
}
void Connection::requestIOProcessing(boost::function0<void> callback)
@@ -90,9 +85,7 @@ Connection::~Connection()
links.notifyClosed(mgmtId);
}
-void Connection::received(framing::AMQFrame& frame) { receivedFn(frame); }
-
-void Connection::receivedImpl(framing::AMQFrame& frame){
+void Connection::received(framing::AMQFrame& frame) {
if (frame.getChannel() == 0 && frame.getMethod()) {
adapter.handle(frame);
} else {
@@ -172,9 +165,7 @@ void Connection::idleOut(){}
void Connection::idleIn(){}
-void Connection::closed() { closedFn(); }
-
-void Connection::closedImpl(){ // Physically closed, suspend open sessions.
+void Connection::closed(){ // Physically closed, suspend open sessions.
try {
while (!channels.empty())
ptr_map_ptr(channels.begin())->handleDetach();
@@ -194,9 +185,7 @@ void Connection::closedImpl(){ // Physically closed, suspend open sessions.
bool Connection::hasOutput() { return outputTasks.hasOutput(); }
-bool Connection::doOutput() { return doOutputFn(); }
-
-bool Connection::doOutputImpl() {
+bool Connection::doOutput() {
try{
if (ioCallback)
ioCallback(); // Lend the IO thread for management processing
diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h
index 1367f3b9ca..8c23e64edf 100644
--- a/qpid/cpp/src/qpid/broker/Connection.h
+++ b/qpid/cpp/src/qpid/broker/Connection.h
@@ -44,7 +44,6 @@
#include "SessionHandler.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/Connection.h"
-#include "qpid/Plugin.h"
#include "qpid/RefCounted.h"
#include <boost/ptr_container/ptr_map.hpp>
@@ -56,7 +55,6 @@ class LinkRegistry;
class Connection : public sys::ConnectionInputHandler,
public ConnectionState,
- public Plugin::Target,
public RefCounted
{
public:
@@ -95,19 +93,10 @@ class Connection : public sys::ConnectionInputHandler,
void notifyConnectionForced(const std::string& text);
void setUserId(const string& uid);
- // Extension points: allow plugins to insert additional functionality.
- boost::function<void(framing::AMQFrame&)> receivedFn;
- boost::function<void ()> closedFn;
- boost::function<bool ()> doOutputFn;
-
private:
typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
- void receivedImpl(framing::AMQFrame& frame);
- void closedImpl();
- bool doOutputImpl();
-
ChannelMap channels;
framing::AMQP_ClientProxy::Connection* client;
ConnectionHandler adapter;
diff --git a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp b/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
index 5de5a0230a..e6d8c49055 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
+++ b/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
@@ -21,11 +21,14 @@
#include "ConnectionFactory.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/amqp_0_10/Connection.h"
+#include "qpid/broker/Connection.h"
namespace qpid {
namespace broker {
using framing::ProtocolVersion;
+typedef std::auto_ptr<amqp_0_10::Connection> ConnectionPtr;
+typedef std::auto_ptr<sys::ConnectionInputHandler> InputPtr;
ConnectionFactory::ConnectionFactory(Broker& b) : broker(b) {}
@@ -33,15 +36,21 @@ ConnectionFactory::~ConnectionFactory() {}
sys::ConnectionCodec*
ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id) {
- if (v == ProtocolVersion(0, 10))
- return new amqp_0_10::Connection(out, broker, id);
+ if (v == ProtocolVersion(0, 10)) {
+ ConnectionPtr c(new amqp_0_10::Connection(out, id, false));
+ c->setInputHandler(InputPtr(new broker::Connection(c.get(), broker, id, false)));
+ return c.release();
+ }
return 0;
}
sys::ConnectionCodec*
ConnectionFactory::create(sys::OutputControl& out, const std::string& id) {
// used to create connections from one broker to another
- return new amqp_0_10::Connection(out, broker, id, true);
+ ConnectionPtr c(new amqp_0_10::Connection(out, id, true));
+ c->setInputHandler(InputPtr(new broker::Connection(c.get(), broker, id, true)));
+ return c.release();
}
+
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/ConnectionFactory.h b/qpid/cpp/src/qpid/broker/ConnectionFactory.h
index 5797495054..c61da81024 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionFactory.h
+++ b/qpid/cpp/src/qpid/broker/ConnectionFactory.h
@@ -27,7 +27,8 @@ namespace qpid {
namespace broker {
class Broker;
-class ConnectionFactory : public sys::ConnectionCodec::Factory {
+class ConnectionFactory : public sys::ConnectionCodec::Factory
+{
public:
ConnectionFactory(Broker& b);
diff --git a/qpid/cpp/src/qpid/cluster/ClassifierHandler.cpp b/qpid/cpp/src/qpid/cluster/ClassifierHandler.cpp
deleted file mode 100644
index b78f795d20..0000000000
--- a/qpid/cpp/src/qpid/cluster/ClassifierHandler.cpp
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "ClassifierHandler.h"
-
-#include "qpid/framing/FrameDefaultVisitor.h"
-#include "qpid/framing/AMQFrame.h"
-
-namespace qpid {
-namespace cluster {
-
-using namespace framing;
-
-struct ClassifierHandler::Visitor : public FrameDefaultVisitor {
- Visitor(AMQFrame& f, ClassifierHandler& c)
- : chosen(0), frame(f), classifier(c) { f.getBody()->accept(*this); }
-
- void visit(const ExchangeDeclareBody&) { chosen=&classifier.wiring; }
- void visit(const ExchangeDeleteBody&) { chosen=&classifier.wiring; }
- void visit(const ExchangeBindBody&) { chosen=&classifier.wiring; }
- void visit(const ExchangeUnbindBody&) { chosen=&classifier.wiring; }
- void visit(const QueueDeclareBody&) { chosen=&classifier.wiring; }
- void visit(const QueueDeleteBody&) { chosen=&classifier.wiring; }
- void defaultVisit(const AMQBody&) { chosen=&classifier.other; }
-
- using framing::FrameDefaultVisitor::visit;
- using framing::FrameDefaultVisitor::defaultVisit;
-
- FrameHandler* chosen;
- AMQFrame& frame;
- ClassifierHandler& classifier;
-};
-
-void ClassifierHandler::handle(AMQFrame& f) { Visitor(f, *this).chosen->handle(f); }
-
-}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/ClassifierHandler.h b/qpid/cpp/src/qpid/cluster/ClassifierHandler.h
deleted file mode 100644
index 696e457c04..0000000000
--- a/qpid/cpp/src/qpid/cluster/ClassifierHandler.h
+++ /dev/null
@@ -1,50 +0,0 @@
-#ifndef QPID_CLUSTER_CLASSIFIERHANDLER_H
-#define QPID_CLUSTER_CLASSIFIERHANDLER_H
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "qpid/framing/FrameHandler.h"
-
-namespace qpid {
-namespace cluster {
-
-/**
- * Classify frames and forward to the appropriate handler.
- */
-class ClassifierHandler : public framing::FrameHandler
-{
- public:
- ClassifierHandler(framing::FrameHandler& wiring_,
- framing::FrameHandler& other_)
- : wiring(wiring_), other(other_) {}
-
- void handle(framing::AMQFrame&);
-
- private:
- struct Visitor;
- friend struct Visitor;
- framing::FrameHandler& wiring;
- framing::FrameHandler& other;
-};
-
-}} // namespace qpid::cluster
-
-
-
-#endif /*!QPID_CLUSTER_CLASSIFIERHANDLER_H*/
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index f36d606af8..aea10949e4 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -4,7 +4,7 @@
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+n * You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -17,18 +17,17 @@
*/
#include "Cluster.h"
-#include "ConnectionInterceptor.h"
+#include "Connection.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/SessionState.h"
-#include "qpid/broker/Connection.h"
#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/ClusterNotifyBody.h"
-#include "qpid/framing/ClusterConnectionCloseBody.h"
-#include "qpid/framing/ClusterConnectionDoOutputBody.h"
+#include "qpid/framing/ClusterJoinedBody.h"
#include "qpid/log/Statement.h"
#include "qpid/memory.h"
#include "qpid/shared_ptr.h"
+#include "qpid/framing/AMQP_AllOperations.h"
+#include "qpid/framing/AllInvoker.h"
+#include "qpid/framing/Invoker.h"
#include <boost/bind.hpp>
#include <boost/cast.hpp>
@@ -39,22 +38,34 @@
namespace qpid {
namespace cluster {
+
using namespace qpid::framing;
using namespace qpid::sys;
using namespace std;
-using broker::Connection;
+// Handle cluster controls from a given member.
+struct ClusterOperations : public framing::AMQP_AllOperations::ClusterHandler {
+ Cluster& cluster;
+ MemberId member;
+
+ ClusterOperations(Cluster& c, const MemberId& m) : cluster(c), member(m) {}
+
+ void joined(const std::string& url) {
+ cluster.joined(member, url);
+ }
+};
+
ostream& operator <<(ostream& out, const Cluster& cluster) {
return out << cluster.name.str() << "-" << cluster.self;
}
-ostream& operator<<(ostream& out, const Cluster::MemberMap::value_type& m) {
- return out << m.first << "=" << m.second.url;
+ostream& operator<<(ostream& out, const Cluster::UrlMap::value_type& m) {
+ return out << m.first << " at " << m.second;
}
-ostream& operator <<(ostream& out, const Cluster::MemberMap& members) {
- ostream_iterator<Cluster::MemberMap::value_type> o(out, " ");
- copy(members.begin(), members.end(), o);
+ostream& operator <<(ostream& out, const Cluster::UrlMap& urls) {
+ ostream_iterator<Cluster::UrlMap::value_type> o(out, " ");
+ copy(urls.begin(), urls.end(), o);
return out;
}
@@ -74,9 +85,9 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
mcastQueue(boost::bind(&Cluster::mcastQueueCb, this, _1, _2))
{
broker->addFinalizer(boost::bind(&Cluster::leave, this));
- QPID_LOG(trace, "Joining cluster: " << name_);
+ QPID_LOG(trace, "Node " << self << " joining cluster: " << name_);
cpg.join(name);
- notify();
+ send(AMQFrame(in_place<ClusterJoinedBody>(ProtocolVersion(), url.str())), ConnectionId(self,0));
// Start dispatching from the poller.
cpgDispatchHandle.startWatch(poller);
@@ -84,31 +95,15 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
mcastQueue.start(poller);
}
-Cluster::~Cluster() {
- for (ShadowConnectionMap::iterator i = shadowConnectionMap.begin();
- i != shadowConnectionMap.end();
- ++i)
- {
- i->second->dirtyClose();
- }
- std::for_each(localConnectionSet.begin(), localConnectionSet.end(), boost::bind(&ConnectionInterceptor::dirtyClose, _1));
-}
-
-void Cluster::initialize(broker::Connection& c) {
- bool isLocal = c.getOutput().get() != &shadowOut;
- if (isLocal)
- localConnectionSet.insert(new ConnectionInterceptor(c, *this));
-}
+Cluster::~Cluster() {}
void Cluster::leave() {
Mutex::ScopedLock l(lock);
if (!broker) return; // Already left.
// Leave is called by from Broker destructor after the poller has
// been shut down. No dispatches can occur.
-
- QPID_LOG(debug, "Leaving cluster " << *this);
cpg.leave(name);
- // broker= is set to 0 when the final config-change is delivered.
+ // broker is set to 0 when the final config-change is delivered.
while(broker) {
Mutex::ScopedUnlock u(lock);
cpg.dispatchAll();
@@ -126,9 +121,9 @@ template <class T> void encodePtr(Buffer& buf, T* ptr) {
buf.putLongLong(value);
}
-void Cluster::send(const AMQFrame& frame, ConnectionInterceptor* connection) {
- QPID_LOG(trace, "MCAST [" << connection << "] " << frame);
- mcastQueue.push(Message(frame, self, connection));
+void Cluster::send(const AMQFrame& frame, const ConnectionId& id) {
+ QPID_LOG(trace, "MCAST [" << id << "] " << frame);
+ mcastQueue.push(Message(frame, id));
}
void Cluster::mcastQueueCb(const MessageQueue::iterator& begin,
@@ -137,48 +132,40 @@ void Cluster::mcastQueueCb(const MessageQueue::iterator& begin,
// Static is OK because there is only one cluster allowed per
// process and only one thread in mcastQueueCb at a time.
static char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management.
- MessageQueue::iterator i = begin;
- while (i != end) {
- Buffer buf(buffer, sizeof(buffer));
- while (i != end && buf.available() > i->frame.size() + sizeof(uint64_t)) {
- i->frame.encode(buf);
- encodePtr(buf, i->connection);
- ++i;
- }
- iovec iov = { buffer, buf.getPosition() };
- cpg.mcast(name, &iov, 1);
+ Buffer buf(buffer, sizeof(buffer));
+ for (MessageQueue::iterator i = begin; i != end; ++i) {
+ AMQFrame& frame =i->first;
+ ConnectionId id =i->second;
+ if (buf.available() < frame.size() + sizeof(uint64_t))
+ break;
+ frame.encode(buf);
+ encodePtr(buf, id.second);
}
-}
-
-void Cluster::notify() {
- send(AMQFrame(in_place<ClusterNotifyBody>(ProtocolVersion(), url.str())), 0);
+ iovec iov = { buffer, buf.getPosition() };
+ cpg.mcast(name, &iov, 1);
}
size_t Cluster::size() const {
Mutex::ScopedLock l(lock);
- return members.size();
+ return urls.size();
}
-Cluster::MemberList Cluster::getMembers() const {
+std::vector<Url> Cluster::getUrls() const {
Mutex::ScopedLock l(lock);
- MemberList result(members.size());
- std::transform(members.begin(), members.end(), result.begin(),
- boost::bind(&MemberMap::value_type::second, _1));
+ std::vector<Url> result(urls.size());
+ std::transform(urls.begin(), urls.end(), result.begin(), boost::bind(&UrlMap::value_type::second, _1));
return result;
}
-ConnectionInterceptor* Cluster::getShadowConnection(const Cpg::Id& member, void* remotePtr) {
- ShadowConnectionId id(member, remotePtr);
- ShadowConnectionMap::iterator i = shadowConnectionMap.find(id);
- if (i == shadowConnectionMap.end()) { // A new shadow connection.
+boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& id) {
+ boost::intrusive_ptr<Connection> c = connections[id];
+ if (!c && id.first != self) { // Shadow connection
std::ostringstream os;
- os << name << ":" << member << ":" << remotePtr;
- assert(broker);
- broker::Connection* c = new broker::Connection(&shadowOut, *broker, os.str());
- ShadowConnectionMap::value_type value(id, new ConnectionInterceptor(*c, *this, id));
- i = shadowConnectionMap.insert(value).first;
+ os << id;
+ c = connections[id] = new Connection(*this, shadowOut, os.str(), id);
}
- return i->second;
+ assert(c);
+ return c;
}
void Cluster::deliver(
@@ -189,16 +176,17 @@ void Cluster::deliver(
void* msg,
int msg_len)
{
- Id from(nodeid, pid);
+ MemberId from(nodeid, pid);
try {
Buffer buf(static_cast<char*>(msg), msg_len);
while (buf.available() > 0) {
AMQFrame frame;
if (!frame.decode(buf)) // Not enough data.
throw Exception("Received incomplete cluster event.");
- void* connection;
- decodePtr(buf, connection);
- deliverQueue.push(Message(frame, from, connection));
+ Connection* cp;
+ decodePtr(buf, cp);
+ QPID_LOG(critical, "deliverQ.push " << frame);
+ deliverQueue.push(Message(frame, ConnectionId(from, cp)));
}
}
catch (const std::exception& e) {
@@ -213,23 +201,21 @@ void Cluster::deliverQueueCb(const MessageQueue::iterator& begin,
const MessageQueue::iterator& end)
{
for (MessageQueue::iterator i = begin; i != end; ++i) {
- AMQFrame& frame(i->frame);
- Id from(i->from);
- ConnectionInterceptor* connection = reinterpret_cast<ConnectionInterceptor*>(i->connection);
+ AMQFrame& frame(i->first);
+ ConnectionId connectionId(i->second);
try {
- QPID_LOG(trace, "DLVR [" << from << " " << connection << "] " << frame);
-
+ QPID_LOG(trace, "DLVR [" << connectionId << "]: " << frame);
if (!broker) {
- QPID_LOG(warning, "Unexpected DLVR, already left the cluster.");
+ QPID_LOG(error, "Unexpected DLVR after leaving the cluster.");
return;
}
- if (connection && from != self) // Look up shadow for remote connections
- connection = getShadowConnection(from, connection);
-
- if (frame.getMethod() && frame.getMethod()->amqpClassId() == CLUSTER_CLASS_ID)
- handleMethod(from, connection, *frame.getMethod());
- else
- connection->deliver(frame);
+ if (connectionId.getConnectionPtr()) // Connection control
+ getConnection(connectionId)->deliver(frame);
+ else { // Cluster control
+ ClusterOperations cops(*this, connectionId.getMember());
+ bool invoked = framing::invoke(cops, *frame.getBody()).wasHandled();
+ assert(invoked);
+ }
}
catch (const std::exception& e) {
// FIXME aconway 2008-01-30: exception handling.
@@ -240,54 +226,30 @@ void Cluster::deliverQueueCb(const MessageQueue::iterator& begin,
}
}
-// Handle cluster methods
-// FIXME aconway 2008-07-11: Generate/template a better dispatch mechanism.
-void Cluster::handleMethod(Id from, ConnectionInterceptor* connection, AMQMethodBody& method) {
- assert(method.amqpClassId() == CLUSTER_CLASS_ID);
- switch (method.amqpMethodId()) {
- case CLUSTER_NOTIFY_METHOD_ID: {
- ClusterNotifyBody& notify=static_cast<ClusterNotifyBody&>(method);
- Mutex::ScopedLock l(lock);
- members[from].url=notify.getUrl();
- lock.notifyAll();
- break;
- }
- case CLUSTER_CONNECTION_CLOSE_METHOD_ID: {
- if (!connection->isLocal())
- shadowConnectionMap.erase(connection->getShadowId());
- else
- localConnectionSet.erase(connection);
- connection->deliverClosed();
- break;
- }
- case CLUSTER_CONNECTION_DO_OUTPUT_METHOD_ID: {
- ClusterConnectionDoOutputBody& doOutput = static_cast<ClusterConnectionDoOutputBody&>(method);
- connection->deliverDoOutput(doOutput.getBytes());
- break;
- }
- default:
- assert(0);
- }
+void Cluster::joined(const MemberId& member, const string& url) {
+ Mutex::ScopedLock l(lock);
+ QPID_LOG(debug, member << " has URL " << url);
+ urls[member] = url;
+ lock.notifyAll();
}
void Cluster::configChange(
cpg_handle_t /*handle*/,
cpg_name */*group*/,
- cpg_address *current, int nCurrent,
+ cpg_address */*current*/, int /*nCurrent*/,
cpg_address *left, int nLeft,
- cpg_address */*joined*/, int nJoined)
+ cpg_address *joined, int nJoined)
{
+ QPID_LOG(debug, "Cluster change: " << std::make_pair(joined, nJoined) << std::make_pair(left, nLeft));
Mutex::ScopedLock l(lock);
- for (int i = 0; i < nLeft; ++i)
- members.erase(left[i]);
- for(int j = 0; j < nCurrent; ++j)
- members[current[j]].id = current[j];
- QPID_LOG(debug, "Cluster members: " << nCurrent << " ("<< nLeft << " left, " << nJoined << " joined):"
- << members);
- assert(members.size() == size_t(nCurrent));
- if (members.find(self) == members.end())
+ // We add URLs to the map in joined() we don't keep track of pre-URL members yet.
+ for (int l = 0; l < nLeft; ++l) urls.erase(left[l]);
+
+ if (std::find(left, left+nLeft, self) != left+nLeft) {
broker = 0; // We have left the group, this is the final config change.
- lock.notifyAll(); // Threads waiting for membership changes.
+ QPID_LOG(debug, "Leaving cluster " << *this);
+ }
+ lock.notifyAll(); // Threads waiting for url changes.
}
void Cluster::dispatch(sys::DispatchHandle& h) {
@@ -301,6 +263,16 @@ void Cluster::disconnect(sys::DispatchHandle& h) {
broker->shutdown();
}
+void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
+ Mutex::ScopedLock l(lock);
+ connections[c->getId()] = c;
+}
+
+void Cluster::erase(ConnectionId id) {
+ Mutex::ScopedLock l(lock);
+ connections.erase(id);
+}
+
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index 2b40193dd3..45bb3ed3c4 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -19,22 +19,16 @@
*
*/
+#include "qpid/cluster/types.h"
#include "qpid/cluster/Cpg.h"
-#include "qpid/cluster/ShadowConnectionOutputHandler.h"
#include "qpid/cluster/PollableQueue.h"
+#include "qpid/cluster/NoOpConnectionOutputHandler.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/Connection.h"
-#include "qpid/sys/Dispatcher.h"
#include "qpid/sys/Monitor.h"
-#include "qpid/sys/Runnable.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/log/Logger.h"
+#include "qpid/framing/AMQP_AllOperations.h"
#include "qpid/Url.h"
-#include "qpid/RefCounted.h"
-#include <boost/optional.hpp>
-#include <boost/function.hpp>
#include <boost/intrusive_ptr.hpp>
#include <map>
@@ -43,24 +37,15 @@
namespace qpid {
namespace cluster {
-class ConnectionInterceptor;
+class Connection;
/**
* Connection to the cluster.
* Keeps cluster membership data.
*/
-class Cluster : private Cpg::Handler, public RefCounted
+class Cluster : public RefCounted, private Cpg::Handler
{
public:
- typedef boost::tuple<Cpg::Id, void*> ShadowConnectionId;
-
- /** Details of a cluster member */
- struct Member {
- Cpg::Id id;
- Url url;
- };
-
- typedef std::vector<Member> MemberList;
/**
* Join a cluster.
@@ -71,11 +56,11 @@ class Cluster : private Cpg::Handler, public RefCounted
virtual ~Cluster();
- /** Initialize interceptors for a new connection */
- void initialize(broker::Connection&);
+ void insert(const boost::intrusive_ptr<Connection>&); // Insert a local connection
+ void erase(ConnectionId); // Erase a connection.
- /** Get the current cluster membership. */
- MemberList getMembers() const;
+ /** Get the URLs of current cluster members. */
+ std::vector<Url> getUrls() const;
/** Number of members in the cluster. */
size_t size() const;
@@ -83,33 +68,27 @@ class Cluster : private Cpg::Handler, public RefCounted
bool empty() const { return size() == 0; }
/** Send frame to the cluster */
- void send(const framing::AMQFrame&, ConnectionInterceptor*);
+ void send(const framing::AMQFrame&, const ConnectionId&);
/** Leave the cluster */
void leave();
- // Cluster frame handing functions
- void notify(const std::string& url);
- void connectionClose();
+ void joined(const MemberId&, const std::string& url);
+
+ broker::Broker& getBroker() { assert(broker); return *broker; }
+ MemberId getSelf() const { return self; }
+
private:
- typedef Cpg::Id Id;
- typedef std::map<Id, Member> MemberMap;
- typedef std::map<ShadowConnectionId, ConnectionInterceptor*> ShadowConnectionMap;
- typedef std::set<ConnectionInterceptor*> LocalConnectionSet;
+ typedef std::map<MemberId, Url> UrlMap;
+ typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > ConnectionMap;
/** Message sent over the cluster. */
- struct Message {
- framing::AMQFrame frame; Id from; void* connection;
- Message(const framing::AMQFrame& f, const Id i, void* c)
- : frame(f), from(i), connection(c) {}
- };
+ typedef std::pair<framing::AMQFrame, ConnectionId> Message;
typedef PollableQueue<Message> MessageQueue;
boost::function<void()> shutdownNext;
- void notify(); ///< Notify cluster of my details.
-
/** CPG deliver callback. */
void deliver(
cpg_handle_t /*handle*/,
@@ -142,9 +121,9 @@ class Cluster : private Cpg::Handler, public RefCounted
/** Callback if CPG fd is disconnected. */
void disconnect(sys::DispatchHandle&);
- void handleMethod(Id from, ConnectionInterceptor* connection, framing::AMQMethodBody& method);
+ void handleMethod(MemberId from, cluster::Connection* connection, framing::AMQMethodBody& method);
- ConnectionInterceptor* getShadowConnection(const Cpg::Id&, void*);
+ boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&);
mutable sys::Monitor lock; // Protect access to members.
broker::Broker* broker;
@@ -152,18 +131,17 @@ class Cluster : private Cpg::Handler, public RefCounted
Cpg cpg;
Cpg::Name name;
Url url;
- MemberMap members;
- Id self;
- ShadowConnectionMap shadowConnectionMap;
- LocalConnectionSet localConnectionSet;
- ShadowConnectionOutputHandler shadowOut;
+ UrlMap urls;
+ MemberId self;
+ ConnectionMap connections;
+ NoOpConnectionOutputHandler shadowOut;
sys::DispatchHandle cpgDispatchHandle;
MessageQueue deliverQueue;
MessageQueue mcastQueue;
friend std::ostream& operator <<(std::ostream&, const Cluster&);
- friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);
- friend std::ostream& operator <<(std::ostream&, const MemberMap&);
+ friend std::ostream& operator <<(std::ostream&, const UrlMap::value_type&);
+ friend std::ostream& operator <<(std::ostream&, const UrlMap&);
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 1d07660455..d829683000 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -16,10 +16,13 @@
*
*/
-#include "ConnectionInterceptor.h"
+#include "Connection.h"
+#include "ConnectionCodec.h"
-#include "qpid/broker/Broker.h"
#include "qpid/cluster/Cluster.h"
+#include "qpid/cluster/ConnectionCodec.h"
+
+#include "qpid/broker/Broker.h"
#include "qpid/Plugin.h"
#include "qpid/Options.h"
#include "qpid/shared_ptr.h"
@@ -63,36 +66,25 @@ struct ClusterPlugin : public Plugin {
ClusterValues values;
ClusterOptions options;
boost::intrusive_ptr<Cluster> cluster;
+ boost::scoped_ptr<ConnectionCodec::Factory> factory;
ClusterPlugin() : options(values) {}
Options* getOptions() { return &options; }
- void init(broker::Broker& b) {
- if (values.name.empty()) return; // Only if --cluster-name option was specified.
+ void initialize(Plugin::Target& target) {
+ broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
+ if (!broker || values.name.empty()) return; // Only if --cluster-name option was specified.
if (cluster) throw Exception("Cluster plugin cannot be initialized twice in one process.");
- cluster = new Cluster(values.name, values.getUrl(b.getPort()), b);
- b.addFinalizer(boost::bind(&ClusterPlugin::shutdown, this));
- }
-
- template <class T> void init(T& t) {
- if (cluster) cluster->initialize(t);
- }
-
- template <class T> bool init(Plugin::Target& target) {
- T* t = dynamic_cast<T*>(&target);
- if (t) init(*t);
- return t;
+ cluster = new Cluster(values.name, values.getUrl(broker->getPort()), *broker);
+ broker->addFinalizer(boost::bind(&ClusterPlugin::shutdown, this));
+ broker->setConnectionFactory(
+ boost::shared_ptr<sys::ConnectionCodec::Factory>(
+ new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster)));
}
void earlyInitialize(Plugin::Target&) {}
- void initialize(Plugin::Target& target) {
- if (init<broker::Broker>(target)) return;
- if (!cluster) return; // Remaining plugins only valid if cluster initialized.
- if (init<broker::Connection>(target)) return;
- }
-
void shutdown() { cluster = 0; }
};
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
new file mode 100644
index 0000000000..b82b4565c3
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -0,0 +1,94 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Connection.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/Invoker.h"
+#include "qpid/framing/AllInvoker.h"
+#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
+#include "qpid/log/Statement.h"
+
+#include <boost/current_function.hpp>
+
+namespace qpid {
+namespace cluster {
+
+using namespace framing;
+
+Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
+ const std::string& wrappedId, ConnectionId myId)
+ : cluster(c), self(myId), output(*this, out),
+ connection(&output, cluster.getBroker(), wrappedId)
+{}
+
+Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
+ const std::string& wrappedId, MemberId myId)
+ : cluster(c), self(myId, this), output(*this, out),
+ connection(&output, cluster.getBroker(), wrappedId)
+{}
+
+Connection::~Connection() {}
+
+// Forward all received frames to the cluster, continue handling on delivery.
+void Connection::received(framing::AMQFrame& f) {
+ cluster.send(f, self);
+}
+
+// Don't doOutput in the
+bool Connection::doOutput() { return output.doOutput(); }
+
+// Handle frames delivered from cluster.
+void Connection::deliver(framing::AMQFrame& f) {
+ // Handle connection controls, deliver other frames to connection.
+ if (!framing::invoke(*this, *f.getBody()).wasHandled())
+ connection.received(f);
+}
+
+void Connection::closed() {
+ try {
+ // Called when the local network connection is closed. We still
+ // need to process any outstanding cluster frames for this
+ // connection to ensure our sessions are up-to-date. We defer
+ // closing the Connection object till deliverClosed(), but replace
+ // its output handler with a null handler since the network output
+ // handler will be deleted.
+ //
+ connection.setOutputHandler(&discardHandler);
+ cluster.send(AMQFrame(in_place<ClusterConnectionDeliverCloseBody>()), self);
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(error, QPID_MSG("While closing connection: " << e.what()));
+ }
+}
+
+void Connection::deliverClose () {
+ connection.closed();
+ cluster.erase(self);
+}
+
+// Delivery of doOutput allows us to run the real connection doOutput()
+// which stocks up the write buffers with data.
+//
+void Connection::deliverDoOutput(size_t requested) {
+ output.deliverDoOutput(requested);
+}
+
+}} // namespace qpid::cluster
+
diff --git a/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h b/qpid/cpp/src/qpid/cluster/Connection.h
index 9216921067..648ec1a1d0 100644
--- a/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -1,5 +1,5 @@
-#ifndef QPID_CLUSTER_CONNECTIONINTERCEPTOR_H
-#define QPID_CLUSTER_CONNECTIONINTERCEPTOR_H
+#ifndef QPID_CLUSTER_CONNECTION_H
+#define QPID_CLUSTER_CONNECTION_H
/*
*
@@ -22,67 +22,80 @@
*
*/
+#include "types.h"
#include "Cluster.h"
#include "WriteEstimate.h"
#include "OutputInterceptor.h"
+
#include "qpid/broker/Connection.h"
+#include "qpid/amqp_0_10/Connection.h"
+#include "qpid/sys/ConnectionInputHandler.h"
#include "qpid/sys/ConnectionOutputHandler.h"
namespace qpid {
+
namespace framing { class AMQFrame; }
+
namespace cluster {
/**
* Plug-in associated with broker::Connections, both local and shadow.
*/
-class ConnectionInterceptor {
+class Connection :
+ public RefCounted,
+ public sys::ConnectionInputHandler,
+ public sys::ConnectionOutputHandler,
+ public framing::AMQP_AllOperations::ClusterConnectionHandler
+
+{
public:
- ConnectionInterceptor(broker::Connection&, Cluster&,
- Cluster::ShadowConnectionId shadowId=Cluster::ShadowConnectionId(0,0));
- ~ConnectionInterceptor();
+ /** Local connection, use this in ConnectionId */
+ Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, MemberId);
+ /** Shadow connection */
+ Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, ConnectionId);
+ ~Connection();
- Cluster::ShadowConnectionId getShadowId() const { return shadowId; }
+ ConnectionId getId() const { return self; }
+ bool isLocal() const { return self.second == this; }
- bool isShadow() const { return shadowId != Cluster::ShadowConnectionId(0,0); }
- bool isLocal() const { return !isShadow(); }
- bool getClosed() const { return isClosed; }
-
// self-delivery of intercepted extension points.
void deliver(framing::AMQFrame& f);
- void deliverClosed();
+ void deliverClose();
void deliverDoOutput(size_t requested);
- void dirtyClose();
-
- Cluster& getCluster() { return cluster; }
+ void codecDeleted();
- private:
- struct NullConnectionHandler : public qpid::sys::ConnectionOutputHandler {
- void close() {}
- void send(framing::AMQFrame&) {}
- void activateOutput() {}
- };
+ Cluster& getCluster() { return cluster; }
+
+ // ConnectionOutputHandler methods
+ void close() {}
+ void send(framing::AMQFrame&) {}
+ void activateOutput() {}
+ virtual size_t getBuffered() const { assert(0); return 0; }
- // Functions to intercept to Connection extension points.
+ // ConnectionInputHandler methods
void received(framing::AMQFrame&);
void closed();
bool doOutput();
- void activateOutput();
+ bool hasOutput() { return connection.hasOutput(); }
+ void idleOut() { idleOut(); }
+ void idleIn() { idleIn(); }
- void sendDoOutput();
+ // ConnectionInputHandlerFactory
+ sys::ConnectionInputHandler* create(sys::ConnectionOutputHandler* out, const std::string& id, bool isClient);
- boost::function<void (framing::AMQFrame&)> receivedNext;
- boost::function<void ()> closedNext;
+ broker::Connection& getBrokerConnection() { return connection; }
+ private:
+ void sendDoOutput();
- boost::intrusive_ptr<broker::Connection> connection;
Cluster& cluster;
- NullConnectionHandler discardHandler;
- bool isClosed;
- Cluster::ShadowConnectionId shadowId;
+ ConnectionId self;
+ NoOpConnectionOutputHandler discardHandler;
WriteEstimate writeEstimate;
OutputInterceptor output;
+ broker::Connection connection;
};
}} // namespace qpid::cluster
-#endif /*!QPID_CLUSTER_CONNECTIONINTERCEPTOR_H*/
+#endif /*!QPID_CLUSTER_CONNECTION_H*/
diff --git a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp b/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
new file mode 100644
index 0000000000..cb396cd10c
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ConnectionCodec.h"
+#include "Connection.h"
+#include "ProxyInputHandler.h"
+#include "qpid/broker/Connection.h"
+#include "qpid/memory.h"
+
+namespace qpid {
+namespace cluster {
+
+sys::ConnectionCodec*
+ConnectionCodec::Factory::create(framing::ProtocolVersion v, sys::OutputControl& out, const std::string& id) {
+ if (v == framing::ProtocolVersion(0, 10))
+ return new ConnectionCodec(out, id, cluster);
+ return 0;
+}
+
+sys::ConnectionCodec*
+ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& id) {
+ // FIXME aconway 2008-08-27: outbound connections need to be made
+ // with proper qpid::client code for failover, get rid of this
+ // broker-side hack.
+ return next->create(out, id);
+}
+
+ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& cluster)
+ : codec(out, id, false),
+ interceptor(new Connection(cluster, codec, id, cluster.getSelf()))
+{
+ std::auto_ptr<sys::ConnectionInputHandler> ih(new ProxyInputHandler(interceptor));
+ codec.setInputHandler(ih);
+ cluster.insert(interceptor);
+}
+
+ConnectionCodec::~ConnectionCodec() {}
+
+// ConnectionCodec functions delegate to the codecOutput
+size_t ConnectionCodec::decode(const char* buffer, size_t size) { return codec.decode(buffer, size); }
+size_t ConnectionCodec::encode(const char* buffer, size_t size) { return codec.encode(buffer, size); }
+bool ConnectionCodec::canEncode() { return codec.canEncode(); }
+void ConnectionCodec::closed() { codec.closed(); }
+bool ConnectionCodec::isClosed() const { return codec.isClosed(); }
+framing::ProtocolVersion ConnectionCodec::getVersion() const { return codec.getVersion(); }
+
+}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/ConnectionCodec.h b/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
new file mode 100644
index 0000000000..cbc3dcdfe6
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
@@ -0,0 +1,77 @@
+#ifndef QPID_CLUSTER_CONNCTIONCODEC_H
+#define QPID_CLUSTER_CONNCTIONCODEC_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/amqp_0_10/Connection.h"
+#include "qpid/cluster/Connection.h"
+#include <boost/shared_ptr.hpp>
+#include <boost/intrusive_ptr.hpp>
+
+namespace qpid {
+
+namespace broker {
+class Connection;
+}
+
+namespace cluster {
+class Cluster;
+
+/**
+ * Encapsulates the standard amqp_0_10::ConnectionCodec and sets up
+ * a cluster::Connection for the connection.
+ *
+ * The ConnectionCodec is deleted by the network layer when the
+ * connection closes. The cluster::Connection needs to be kept
+ * around until all cluster business on the connection is complete.
+ *
+ */
+class ConnectionCodec : public sys::ConnectionCodec {
+ public:
+ struct Factory : public sys::ConnectionCodec::Factory {
+ boost::shared_ptr<sys::ConnectionCodec::Factory> next;
+ Cluster& cluster;
+ Factory(boost::shared_ptr<sys::ConnectionCodec::Factory> f, Cluster& c) : next(f), cluster(c) {}
+ sys::ConnectionCodec* create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id);
+ sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id);
+ };
+
+ ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& c);
+ ~ConnectionCodec();
+
+ // ConnectionCodec functions delegate to the codecOutput
+ size_t decode(const char* buffer, size_t size);
+ size_t encode(const char* buffer, size_t size);
+ bool canEncode();
+ void closed();
+ bool isClosed() const;
+ framing::ProtocolVersion getVersion() const;
+
+
+ private:
+ amqp_0_10::Connection codec;
+ boost::intrusive_ptr<cluster::Connection> interceptor;
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_CONNCTIONCODEC_H*/
diff --git a/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp b/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
deleted file mode 100644
index efcab1b731..0000000000
--- a/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "ConnectionInterceptor.h"
-#include "qpid/framing/ClusterConnectionCloseBody.h"
-#include "qpid/framing/ClusterConnectionDoOutputBody.h"
-#include "qpid/framing/AMQFrame.h"
-#include <boost/current_function.hpp>
-
-
-namespace qpid {
-namespace cluster {
-
-using namespace framing;
-
-template <class T, class U, class V> void shift(T& a, U& b, const V& c) { a = b; b = c; }
-
-ConnectionInterceptor::ConnectionInterceptor(
- broker::Connection& conn, Cluster& clust, Cluster::ShadowConnectionId shadowId_)
- : connection(&conn), cluster(clust), isClosed(false), shadowId(shadowId_), output(*this, *conn.getOutput().get())
-{
- connection->addFinalizer(boost::bind(operator delete, this));
- connection->setOutputHandler(&output),
- // Attach my functions to Connection extension points.
- shift(receivedNext, connection->receivedFn, boost::bind(&ConnectionInterceptor::received, this, _1));
- shift(closedNext, connection->closedFn, boost::bind(&ConnectionInterceptor::closed, this));
- shift(output.doOutputNext, connection->doOutputFn, boost::bind(&OutputInterceptor::doOutput, &output));
-}
-
-ConnectionInterceptor::~ConnectionInterceptor() {
- assert(connection == 0);
-}
-
-// Forward all received frames to the cluster, continue handling on delivery.
-void ConnectionInterceptor::received(framing::AMQFrame& f) {
- if (isClosed) return;
- cluster.send(f, this);
-}
-
-// Continue normal handling of delivered frames.
-void ConnectionInterceptor::deliver(framing::AMQFrame& f) {
- receivedNext(f);
-}
-
-void ConnectionInterceptor::closed() {
- if (isClosed) return;
- try {
- // Called when the local network connection is closed. We still
- // need to process any outstanding cluster frames for this
- // connection to ensure our sessions are up-to-date. We defer
- // closing the Connection object till deliverClosed(), but replace
- // its output handler with a null handler since the network output
- // handler will be deleted.
- //
- connection->setOutputHandler(&discardHandler);
- cluster.send(AMQFrame(in_place<ClusterConnectionCloseBody>()), this);
- isClosed = true;
- }
- catch (const std::exception& e) {
- QPID_LOG(error, QPID_MSG("While closing connection: " << e.what()));
- }
-}
-
-void ConnectionInterceptor::deliverClosed() {
- closedNext();
- // Drop reference so connection will be deleted, which in turn
- // will delete this via finalizer added in ctor.
- connection = 0;
-}
-
-void ConnectionInterceptor::dirtyClose() {
- // Not closed via cluster self-delivery but closed locally. Used
- // when local broker is shut down without a clean cluster shutdown.
- // Release the connection, it will delete this.
- connection = 0;
-}
-
-// Delivery of doOutput allows us to run the real connection doOutput()
-// which stocks up the write buffers with data.
-//
-void ConnectionInterceptor::deliverDoOutput(size_t requested) {
- output.deliverDoOutput(requested);
-}
-
-}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/Cpg.cpp b/qpid/cpp/src/qpid/cluster/Cpg.cpp
index 2ffd3509bf..ce678015a2 100644
--- a/qpid/cpp/src/qpid/cluster/Cpg.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cpg.cpp
@@ -18,7 +18,6 @@
#include "Cpg.h"
#include "qpid/sys/Mutex.h"
-// Note cpg is currently unix-specific. Refactor if availble on other platforms.
#include "qpid/sys/posix/PrivatePosix.h"
#include "qpid/log/Statement.h"
@@ -170,27 +169,50 @@ std::string Cpg::cantMcastMsg(const Name& group) {
return "Cannot mcast to CPG group "+group.str();
}
-Cpg::Id Cpg::self() const {
+MemberId Cpg::self() const {
unsigned int nodeid;
check(cpg_local_get(handle, &nodeid), "Cannot get local CPG identity");
- return Id(nodeid, getpid());
+ return MemberId(nodeid, getpid());
}
-ostream& operator<<(ostream& o, std::pair<cpg_address*,int> a) {
- ostream_iterator<Cpg::Id> i(o, " ");
- std::copy(a.first, a.first+a.second, i);
- return o;
+ostream& operator <<(ostream& out, const MemberId& id) {
+ return out << std::hex << id.first << ":" << std::dec << id.second;
}
-ostream& operator <<(ostream& out, const Cpg::Id& id) {
- return out << id.getNodeId() << "-" << id.getPid();
+ostream& operator<<(ostream& o, const ConnectionId& c) {
+ return o << c.first << "-" << c.second;
}
-ostream& operator <<(ostream& out, const cpg_name& name) {
- return out << string(name.value, name.length);
+ostream& operator<<(ostream& o, const cpg_name& name) {
+ return o << string(name.value, name.length);
}
}} // namespace qpid::cluster
+// In proper namespace for ADL.
+
+std::ostream& operator<<(std::ostream& o, const ::cpg_address& a) {
+ const char* reasonString;
+ switch (a.reason) {
+ case CPG_REASON_JOIN: reasonString = "joined"; break;
+ case CPG_REASON_LEAVE: reasonString = "left";break;
+ case CPG_REASON_NODEDOWN: reasonString = "node-down";break;
+ case CPG_REASON_NODEUP: reasonString = "node-up";break;
+ case CPG_REASON_PROCDOWN: reasonString = "process-down";break;
+ default:
+ assert(0);
+ reasonString = "";
+ }
+ return o << qpid::cluster::MemberId(a.nodeid, a.pid) << " " << reasonString;
+}
+
+namespace std {
+ostream& operator<<(ostream& o, std::pair<cpg_address*,int> a) {
+ for (cpg_address* p = a.first; p < a.first+a.second; ++p)
+ o << *p << " ";
+ return o;
+}
+}
+
diff --git a/qpid/cpp/src/qpid/cluster/Cpg.h b/qpid/cpp/src/qpid/cluster/Cpg.h
index 96fd692a77..fdc451fbbc 100644
--- a/qpid/cpp/src/qpid/cluster/Cpg.h
+++ b/qpid/cpp/src/qpid/cluster/Cpg.h
@@ -19,12 +19,12 @@
*
*/
+#include "qpid/cluster/types.h"
+#include "qpid/cluster/Dispatchable.h"
+
#include "qpid/Exception.h"
#include "qpid/sys/IOHandle.h"
-#include "qpid/cluster/Dispatchable.h"
-#include <boost/tuple/tuple.hpp>
-#include <boost/tuple/tuple_comparison.hpp>
#include <boost/scoped_ptr.hpp>
#include <cassert>
@@ -65,14 +65,6 @@ class Cpg : public sys::IOHandle {
std::string str() const { return std::string(value, length); }
};
- // boost::tuple gives us == and < for free.
- struct Id : public boost::tuple<uint32_t, uint32_t> {
- Id(uint32_t n=0, uint32_t p=0) : boost::tuple<uint32_t, uint32_t>(n, p) {}
- Id(const cpg_address& addr) : boost::tuple<uint32_t, uint32_t>(addr.nodeid, addr.pid) {}
- uint32_t getNodeId() const { return boost::get<0>(*this); }
- uint32_t getPid() const { return boost::get<1>(*this); }
- };
-
static std::string str(const cpg_name& n) {
return std::string(n.value, n.length);
}
@@ -127,7 +119,7 @@ class Cpg : public sys::IOHandle {
cpg_handle_t getHandle() const { return handle; }
- Id self() const;
+ MemberId self() const;
int getFd();
@@ -166,9 +158,7 @@ class Cpg : public sys::IOHandle {
bool isShutdown;
};
-std::ostream& operator <<(std::ostream& out, const cpg_name& name);
-std::ostream& operator <<(std::ostream& out, const Cpg::Id& id);
-std::ostream& operator <<(std::ostream& out, const std::pair<cpg_address*,int> addresses);
+std::ostream& operator <<(std::ostream& out, const MemberId& id);
inline bool operator==(const cpg_name& a, const cpg_name& b) {
return a.length==b.length && strncmp(a.value, b.value, a.length) == 0;
@@ -177,5 +167,12 @@ inline bool operator!=(const cpg_name& a, const cpg_name& b) { return !(a == b);
}} // namespace qpid::cluster
+// In proper namespaces for ADL
+std::ostream& operator <<(std::ostream& out, const cpg_name& name);
+std::ostream& operator<<(std::ostream& o, const cpg_address& a);
+namespace std {
+std::ostream& operator <<(std::ostream& out, std::pair<cpg_address*,int> addresses);
+}
+
#endif /*!CPG_H*/
diff --git a/qpid/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h b/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h
index 6d429535e6..3c24dd71f2 100644
--- a/qpid/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h
+++ b/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h
@@ -1,5 +1,5 @@
-#ifndef QPID_CLUSTER_SHADOWCONNECTIONOUTPUTHANDLER_H
-#define QPID_CLUSTER_SHADOWCONNECTIONOUTPUTHANDLER_H
+#ifndef QPID_CLUSTER_NOOPCONNECTIONOUTPUTHANDLER_H
+#define QPID_CLUSTER_NOOPCONNECTIONOUTPUTHANDLER_H
/*
*
@@ -30,10 +30,10 @@ namespace framing { class AMQFrame; }
namespace cluster {
/**
- * Output handler for frames sent to shadow connections.
+ * Output handler for frames sent to noop connections.
* Simply discards frames.
*/
-class ShadowConnectionOutputHandler : public sys::ConnectionOutputHandler
+class NoOpConnectionOutputHandler : public sys::ConnectionOutputHandler
{
public:
virtual void send(framing::AMQFrame&) {}
@@ -43,4 +43,4 @@ class ShadowConnectionOutputHandler : public sys::ConnectionOutputHandler
}} // namespace qpid::cluster
-#endif /*!QPID_CLUSTER_SHADOWCONNECTIONOUTPUTHANDLER_H*/
+#endif /*!QPID_CLUSTER_NOOPCONNECTIONOUTPUTHANDLER_H*/
diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
index 84d3a6ad69..6c77d2747a 100644
--- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
+++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -19,9 +19,10 @@
*
*/
#include "OutputInterceptor.h"
-#include "ConnectionInterceptor.h"
-#include "qpid/framing/ClusterConnectionDoOutputBody.h"
+#include "Connection.h"
+#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
#include "qpid/framing/AMQFrame.h"
+#include "qpid/log/Statement.h"
#include <boost/current_function.hpp>
@@ -30,7 +31,7 @@ namespace cluster {
using namespace framing;
-OutputInterceptor::OutputInterceptor(ConnectionInterceptor& p, sys::ConnectionOutputHandler& h)
+OutputInterceptor::OutputInterceptor(cluster::Connection& p, sys::ConnectionOutputHandler& h)
: parent(p), next(h), sent(), moreOutput(), doingOutput()
{}
@@ -57,8 +58,6 @@ bool OutputInterceptor::doOutput() {
// which stocks up the write buffers with data.
//
void OutputInterceptor::deliverDoOutput(size_t requested) {
- if (parent.getClosed()) return;
-
Locker l(lock);
size_t buf = next.getBuffered();
if (parent.isLocal())
@@ -68,7 +67,7 @@ void OutputInterceptor::deliverDoOutput(size_t requested) {
sent = 0;
do {
sys::Mutex::ScopedUnlock u(lock);
- moreOutput = doOutputNext(); // Calls send()
+ moreOutput = parent.getBrokerConnection().doOutput();
} while (sent < requested && moreOutput);
sent += buf; // Include buffered data in the sent total.
@@ -88,8 +87,8 @@ void OutputInterceptor::startDoOutput() {
// Send a doOutput request if one is not already in flight.
void OutputInterceptor::sendDoOutput() {
// Call with lock held.
- if (parent.isShadow() || parent.getClosed())
- return;
+ // FIXME aconway 2008-08-28: used to have || parent.getClosed())
+ if (!parent.isLocal()) return;
doingOutput = true;
size_t request = writeEstimate.sending(getBuffered());
@@ -98,8 +97,8 @@ void OutputInterceptor::sendDoOutput() {
// Send it anyway to keep the doOutput chain going until we are sure there's no more output
// (in deliverDoOutput)
//
- parent.getCluster().send(AMQFrame(in_place<ClusterConnectionDoOutputBody>(
- framing::ProtocolVersion(), request)), &parent);
+ parent.getCluster().send(AMQFrame(in_place<ClusterConnectionDeliverDoOutputBody>(
+ framing::ProtocolVersion(), request)), parent.getId());
QPID_LOG(trace, &parent << "Send doOutput request for " << request);
}
diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.h b/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
index b39f2a2be9..548ec32b5b 100644
--- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
+++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
@@ -31,14 +31,14 @@ namespace qpid {
namespace framing { class AMQFrame; }
namespace cluster {
-class ConnectionInterceptor;
+class Connection;
/**
* Interceptor for connection OutputHandler, manages outgoing message replication.
*/
class OutputInterceptor : public sys::ConnectionOutputHandler {
public:
- OutputInterceptor(ConnectionInterceptor& p, sys::ConnectionOutputHandler& h);
+ OutputInterceptor(cluster::Connection& p, sys::ConnectionOutputHandler& h);
// sys::ConnectionOutputHandler functions
void send(framing::AMQFrame& f);
@@ -51,9 +51,7 @@ class OutputInterceptor : public sys::ConnectionOutputHandler {
// Intercept doOutput requests on Connection.
bool doOutput();
- boost::function<bool ()> doOutputNext;
-
- ConnectionInterceptor& parent;
+ cluster::Connection& parent;
private:
typedef sys::Mutex::ScopedLock Locker;
diff --git a/qpid/cpp/src/qpid/cluster/PollableQueue.h b/qpid/cpp/src/qpid/cluster/PollableQueue.h
index 0bba2ba790..29891da344 100644
--- a/qpid/cpp/src/qpid/cluster/PollableQueue.h
+++ b/qpid/cpp/src/qpid/cluster/PollableQueue.h
@@ -90,7 +90,7 @@ template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) {
batch.swap(queue);
condition.clear();
ScopedUnlock u(lock);
- callback(batch.begin(), batch.end()); // Process the batch outside the lock.
+ callback(batch.begin(), batch.end()); // Process outside the lock to allow concurrent push.
h.rewatch();
}
diff --git a/qpid/cpp/src/qpid/cluster/ProxyInputHandler.h b/qpid/cpp/src/qpid/cluster/ProxyInputHandler.h
new file mode 100644
index 0000000000..228f8d092d
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/ProxyInputHandler.h
@@ -0,0 +1,57 @@
+#ifndef QPID_CLUSTER_PROXYINPUTHANDLER_H
+#define QPID_CLUSTER_PROXYINPUTHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/ConnectionInputHandler.h"
+#include <boost/intrusive_ptr.hpp>
+
+namespace qpid {
+
+namespace framing { class AMQFrame; }
+
+namespace cluster {
+
+/**
+ * Proxies ConnectionInputHandler functions and ensures target.closed()
+ * is called, on deletion if not before.
+ */
+class ProxyInputHandler : public sys::ConnectionInputHandler
+{
+ public:
+ ProxyInputHandler(boost::intrusive_ptr<cluster::Connection> t) : target(t) {}
+ ~ProxyInputHandler() { closed(); }
+
+ void received(framing::AMQFrame& f) { target->received(f); }
+ void closed() { if (target) target->closed(); target = 0; }
+ void idleOut() { target->idleOut(); }
+ void idleIn() { target->idleIn(); }
+ bool doOutput() { return target->doOutput(); }
+ bool hasOutput() { return target->hasOutput(); }
+
+ private:
+ boost::intrusive_ptr<cluster::Connection> target;
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_PROXYINPUTHANDLER_H*/
diff --git a/qpid/cpp/src/qpid/cluster/types.h b/qpid/cpp/src/qpid/cluster/types.h
new file mode 100644
index 0000000000..4646cd9174
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/types.h
@@ -0,0 +1,58 @@
+#ifndef QPID_CLUSTER_TYPES_H
+#define QPID_CLUSTER_TYPES_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <utility>
+#include <iosfwd>
+#include <stdint.h>
+
+extern "C" {
+#include <openais/cpg.h>
+}
+
+namespace qpid {
+namespace cluster {
+
+class Connection;
+
+/** first=node-id, second=pid */
+struct MemberId : std::pair<uint32_t, uint32_t> {
+ MemberId(uint32_t node=0, uint32_t pid=0) : std::pair<uint32_t,uint32_t>(node, pid) {}
+ MemberId(const cpg_address& caddr) : std::pair<uint32_t,uint32_t>(caddr.nodeid, caddr.pid) {}
+ uint32_t getNode() const { return first; }
+ uint32_t getPid() const { return second; }
+};
+
+inline bool operator==(const cpg_address& caddr, const MemberId& id) { return id == MemberId(caddr); }
+
+std::ostream& operator<<(std::ostream&, const MemberId&);
+
+struct ConnectionId : public std::pair<MemberId, Connection*> {
+ ConnectionId(const MemberId& m=MemberId(), Connection* c=0) : std::pair<MemberId, Connection*> (m,c) {}
+ MemberId getMember() const { return first; }
+ Connection* getConnectionPtr() const { return second; }
+};
+std::ostream& operator<<(std::ostream&, const ConnectionId&);
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_TYPES_H*/
diff --git a/qpid/cpp/src/qpid/sys/ConnectionCodec.h b/qpid/cpp/src/qpid/sys/ConnectionCodec.h
index efc6839b60..b1b047d2cc 100644
--- a/qpid/cpp/src/qpid/sys/ConnectionCodec.h
+++ b/qpid/cpp/src/qpid/sys/ConnectionCodec.h
@@ -22,14 +22,14 @@
*
*/
#include "qpid/framing/ProtocolVersion.h"
-#include "OutputControl.h"
-#include <memory>
-#include <map>
namespace qpid {
namespace sys {
+class InputHandlerFactory;
+class OutputControl;
+
/**
* Interface of coder/decoder for a connection of a specific protocol
* version.
diff --git a/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h b/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h
index a2c18d6d9a..9a5b9f75a5 100644
--- a/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h
+++ b/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h
@@ -33,6 +33,7 @@ namespace sys {
public TimeoutHandler, public OutputTask
{
public:
+
virtual void closed() = 0;
};
diff --git a/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h b/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h
index 2b309b5758..9bb7e13686 100644
--- a/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h
+++ b/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h
@@ -42,7 +42,8 @@ class ConnectionInputHandlerFactory : private boost::noncopyable
*@param id identify the connection for management purposes.
*/
virtual ConnectionInputHandler* create(ConnectionOutputHandler* out,
- const std::string& id) = 0;
+ const std::string& id,
+ bool isClient) = 0;
virtual ~ConnectionInputHandlerFactory(){}
};
diff --git a/qpid/cpp/src/tests/ForkedBroker.h b/qpid/cpp/src/tests/ForkedBroker.h
index a7869ff602..07e69a0735 100644
--- a/qpid/cpp/src/tests/ForkedBroker.h
+++ b/qpid/cpp/src/tests/ForkedBroker.h
@@ -53,12 +53,12 @@ class ForkedBroker {
}
~ForkedBroker() {
- try { stop(); } catch(const std::exception& e) {
- QPID_LOG(error, QPID_MSG("Stopping forked broker: " << e.what()));
+ try { kill(); } catch(const std::exception& e) {
+ QPID_LOG(error, QPID_MSG("Killing forked broker: " << e.what()));
}
}
- void stop() {
+ void kill() {
using qpid::ErrnoException;
if (pid == 0) return;
if (::kill(pid, SIGINT) < 0) throw ErrnoException("kill failed");
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp
index 7140cc73bd..3f09143fff 100644
--- a/qpid/cpp/src/tests/cluster_test.cpp
+++ b/qpid/cpp/src/tests/cluster_test.cpp
@@ -69,8 +69,8 @@ struct ClusterFixture : public vector<uint16_t> {
void add();
void setup();
void kill(size_t n) {
- if (n) forkedBrokers[n-1]->stop();
- else broker0.shutdown();
+ if (n) forkedBrokers[n-1].kill();
+ else broker0->broker->shutdown();
}
};
@@ -139,6 +139,14 @@ QPID_AUTO_TEST_CASE(testForkedBroker) {
BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("amq.direct").getType());
}
+QPID_AUTO_TEST_CASE(testSingletonCluster) {
+ // Test against a singleton cluster, verify basic operation.
+ ClusterFixture cluster(1);
+ Client c(cluster[0]);
+ BOOST_CHECK(c.session.queueQuery("q").getQueue().empty());
+ BOOST_CHECK(c.session.exchangeQuery("ex").getType().empty());
+}
+
QPID_AUTO_TEST_CASE(testWiringReplication) {
ClusterFixture cluster(3);
Client c0(cluster[0]);
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index 8d6b5a241e..f5afe34e4a 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -25,19 +25,17 @@
<class name = "cluster" code = "0x80" label="Qpid clustering extensions.">
<doc>Qpid extension class to allow clustered brokers to communicate.</doc>
- <control name = "notify" code="0x1">
- <role name="server" implement="MUST" />
+ <control name = "joined" code="0x1">
<field name="url" type="str16" />
</control>
+ </class>
- <control name="connection-close" code="0x2">
- <role name="server" implement="MUST" />
+ <class name="cluster-connection" code="0x81" label="Qpid clustering extensions.">
+ <control name="deliver-close" code="0x2">
</control>
- <control name="connection-do-output" code="0x3">
- <role name="server" implement="MUST" />
+ <control name="deliver-do-output" code="0x3">
<field name="bytes" type="uint32"/>
</control>
-
</class>
</amqp>