diff options
author | Alan Conway <aconway@apache.org> | 2008-08-29 18:18:45 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-08-29 18:18:45 +0000 |
commit | ac1660c072fc1fc783731b7cd216861fc6999ac0 (patch) | |
tree | 5e401b8cd8e4c491cefa4e74b8185e2cfc032736 | |
parent | 605ec8d6c0cfab3683fd962e42fbcd39b4b53db9 (diff) | |
download | qpid-python-ac1660c072fc1fc783731b7cd216861fc6999ac0.tar.gz |
Refactored cluster to intercept at ConnectionCode, using sys:: interfaces rather than boost functions.
Use framing::Operations and Invoker to dispatch cluster methods.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@690358 13f79535-47bb-0310-9956-ffa450edef68
34 files changed, 670 insertions, 538 deletions
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk index 934ec0174b..d5be6dfc57 100644 --- a/qpid/cpp/src/cluster.mk +++ b/qpid/cpp/src/cluster.mk @@ -6,24 +6,26 @@ lib_LTLIBRARIES += libqpidcluster.la if CPG libqpidcluster_la_SOURCES = \ + qpid/cluster/types.h \ qpid/cluster/Cluster.cpp \ qpid/cluster/Cluster.h \ qpid/cluster/Cpg.cpp \ qpid/cluster/Cpg.h \ qpid/cluster/Dispatchable.h \ qpid/cluster/ClusterPlugin.cpp \ - qpid/cluster/ConnectionInterceptor.h \ - qpid/cluster/ConnectionInterceptor.cpp \ - qpid/cluster/ClassifierHandler.h \ - qpid/cluster/ClassifierHandler.cpp \ - qpid/cluster/ShadowConnectionOutputHandler.h \ + qpid/cluster/ConnectionCodec.h \ + qpid/cluster/ConnectionCodec.cpp \ + qpid/cluster/Connection.h \ + qpid/cluster/Connection.cpp \ + qpid/cluster/NoOpConnectionOutputHandler.h \ qpid/cluster/PollableCondition.h \ qpid/cluster/PollableCondition.cpp \ qpid/cluster/PollableQueue.h \ qpid/cluster/WriteEstimate.h \ qpid/cluster/WriteEstimate.cpp \ qpid/cluster/OutputInterceptor.h \ - qpid/cluster/OutputInterceptor.cpp + qpid/cluster/OutputInterceptor.cpp \ + qpid/cluster/ProxyInputHandler.h libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la diff --git a/qpid/cpp/src/qpid/Url.h b/qpid/cpp/src/qpid/Url.h index 20f42db0ad..97b72ea993 100644 --- a/qpid/cpp/src/qpid/Url.h +++ b/qpid/cpp/src/qpid/Url.h @@ -76,7 +76,9 @@ struct Url : public std::vector<Address> { /** Parse url, throw InvalidUrl if invalid. */ explicit Url(const char* url) { parse(url); } - template<class T> Url& operator=(T s) { parse(s); return *this; } + Url& operator=(const Url& u) { this->std::vector<Address>::operator=(u); cache=u.cache; return *this; } + Url& operator=(const char* s) { parse(s); return *this; } + Url& operator=(const std::string& s) { parse(s); return *this; } /** Throw InvalidUrl if the URL does not contain any addresses. */ void throwIfEmpty() const; diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp index 0b996dedd2..15a8e9663d 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp +++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp @@ -21,16 +21,22 @@ #include "Connection.h" #include "qpid/log/Statement.h" #include "qpid/amqp_0_10/exceptions.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/ProtocolInitiation.h" namespace qpid { namespace amqp_0_10 { using sys::Mutex; -Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient) - : frameQueueClosed(false), output(o), - connection(new broker::Connection(this, broker, id, _isClient)), - identifier(id), initialized(false), isClient(_isClient), buffered(0) {} +Connection::Connection(sys::OutputControl& o, const std::string& id, bool _isClient) + : frameQueueClosed(false), output(o), identifier(id), initialized(false), isClient(_isClient), buffered(0) +{} + +void Connection::setInputHandler(std::auto_ptr<sys::ConnectionInputHandler> c) { + connection = c; +} size_t Connection::decode(const char* buffer, size_t size) { framing::Buffer in(const_cast<char*>(buffer), size); diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.h b/qpid/cpp/src/qpid/amqp_0_10/Connection.h index f6fb87f928..409ac0b9e3 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/Connection.h +++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.h @@ -22,15 +22,19 @@ * */ #include "qpid/sys/ConnectionCodec.h" +#include "qpid/sys/ConnectionInputHandler.h" #include "qpid/sys/ConnectionOutputHandler.h" #include "qpid/sys/Mutex.h" -#include "qpid/broker/Connection.h" #include <boost/intrusive_ptr.hpp> -#include <deque> #include <memory> +#include <deque> namespace qpid { -namespace broker { class Broker; } + +namespace sys { +class ConnectionInputHandlerFactory; +} + namespace amqp_0_10 { class Connection : public sys::ConnectionCodec, @@ -42,14 +46,15 @@ class Connection : public sys::ConnectionCodec, bool frameQueueClosed; mutable sys::Mutex frameQueueLock; sys::OutputControl& output; - boost::intrusive_ptr<broker::Connection> connection; + std::auto_ptr<sys::ConnectionInputHandler> connection; std::string identifier; bool initialized; bool isClient; size_t buffered; public: - Connection(sys::OutputControl&, broker::Broker&, const std::string& id, bool isClient = false); + Connection(sys::OutputControl&, const std::string& id, bool isClient); + void setInputHandler(std::auto_ptr<sys::ConnectionInputHandler> c); size_t decode(const char* buffer, size_t size); size_t encode(const char* buffer, size_t size); bool isClosed() const; diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 4d7c07649b..e983aee5c9 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -133,7 +133,7 @@ Broker::Broker(const Broker::Options& conf) : acl(0), dataDir(conf.noDataDir ? std::string () : conf.dataDir), links(this), - factory(*this), + factory(new ConnectionFactory(*this)), sessionManager( qpid::SessionState::Configuration( conf.replayFlushLimit*1024, // convert kb to bytes. @@ -372,7 +372,7 @@ uint16_t Broker::getPort() const { // TODO: This should iterate over all protocolFactories void Broker::accept() { for (unsigned int i = 0; i < protocolFactories.size(); ++i) - protocolFactories[i]->accept(poller, &factory); + protocolFactories[i]->accept(poller, factory.get()); } @@ -382,7 +382,7 @@ void Broker::connect( boost::function2<void, int, std::string> failed, sys::ConnectionCodec::Factory* f) { - getProtocolFactory()->connect(poller, host, port, f ? f : &factory, failed); + getProtocolFactory()->connect(poller, host, port, f ? f : factory.get(), failed); } void Broker::connect( diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index f7399c375f..bd17d1bb00 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -103,7 +103,7 @@ class Broker : public sys::Runnable, public Plugin::Target, QueueRegistry queues; ExchangeRegistry exchanges; LinkRegistry links; - ConnectionFactory factory; + boost::shared_ptr<sys::ConnectionCodec::Factory> factory; DtxManager dtxManager; SessionManager sessionManager; management::ManagementAgent* managementAgent; @@ -178,7 +178,10 @@ class Broker : public sys::Runnable, public Plugin::Target, boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory() const; /** Expose poller so plugins can register their descriptors. */ - boost::shared_ptr<sys::Poller> getPoller(); + boost::shared_ptr<sys::Poller> getPoller(); + + boost::shared_ptr<sys::ConnectionCodec::Factory> getConnectionFactory() { return factory; } + void setConnectionFactory(boost::shared_ptr<sys::ConnectionCodec::Factory> f) { factory = f; } }; }} diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index d65dbaeec7..8ed3ce84c8 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -49,9 +49,6 @@ namespace broker { Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink_) : ConnectionState(out_, broker_), - receivedFn(boost::bind(&Connection::receivedImpl, this, _1)), - closedFn(boost::bind(&Connection::closedImpl, this)), - doOutputFn(boost::bind(&Connection::doOutputImpl, this)), adapter(*this, isLink_), isLink(isLink_), mgmtClosing(false), @@ -72,8 +69,6 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std mgmtObject = new management::Connection(agent, this, parent, mgmtId, !isLink); agent->addObject(mgmtObject); } - - Plugin::initializeAll(*this); // Let plug-ins update extension points. } void Connection::requestIOProcessing(boost::function0<void> callback) @@ -90,9 +85,7 @@ Connection::~Connection() links.notifyClosed(mgmtId); } -void Connection::received(framing::AMQFrame& frame) { receivedFn(frame); } - -void Connection::receivedImpl(framing::AMQFrame& frame){ +void Connection::received(framing::AMQFrame& frame) { if (frame.getChannel() == 0 && frame.getMethod()) { adapter.handle(frame); } else { @@ -172,9 +165,7 @@ void Connection::idleOut(){} void Connection::idleIn(){} -void Connection::closed() { closedFn(); } - -void Connection::closedImpl(){ // Physically closed, suspend open sessions. +void Connection::closed(){ // Physically closed, suspend open sessions. try { while (!channels.empty()) ptr_map_ptr(channels.begin())->handleDetach(); @@ -194,9 +185,7 @@ void Connection::closedImpl(){ // Physically closed, suspend open sessions. bool Connection::hasOutput() { return outputTasks.hasOutput(); } -bool Connection::doOutput() { return doOutputFn(); } - -bool Connection::doOutputImpl() { +bool Connection::doOutput() { try{ if (ioCallback) ioCallback(); // Lend the IO thread for management processing diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h index 1367f3b9ca..8c23e64edf 100644 --- a/qpid/cpp/src/qpid/broker/Connection.h +++ b/qpid/cpp/src/qpid/broker/Connection.h @@ -44,7 +44,6 @@ #include "SessionHandler.h" #include "qpid/management/Manageable.h" #include "qpid/management/Connection.h" -#include "qpid/Plugin.h" #include "qpid/RefCounted.h" #include <boost/ptr_container/ptr_map.hpp> @@ -56,7 +55,6 @@ class LinkRegistry; class Connection : public sys::ConnectionInputHandler, public ConnectionState, - public Plugin::Target, public RefCounted { public: @@ -95,19 +93,10 @@ class Connection : public sys::ConnectionInputHandler, void notifyConnectionForced(const std::string& text); void setUserId(const string& uid); - // Extension points: allow plugins to insert additional functionality. - boost::function<void(framing::AMQFrame&)> receivedFn; - boost::function<void ()> closedFn; - boost::function<bool ()> doOutputFn; - private: typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap; typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; - void receivedImpl(framing::AMQFrame& frame); - void closedImpl(); - bool doOutputImpl(); - ChannelMap channels; framing::AMQP_ClientProxy::Connection* client; ConnectionHandler adapter; diff --git a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp b/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp index 5de5a0230a..e6d8c49055 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp @@ -21,11 +21,14 @@ #include "ConnectionFactory.h" #include "qpid/framing/ProtocolVersion.h" #include "qpid/amqp_0_10/Connection.h" +#include "qpid/broker/Connection.h" namespace qpid { namespace broker { using framing::ProtocolVersion; +typedef std::auto_ptr<amqp_0_10::Connection> ConnectionPtr; +typedef std::auto_ptr<sys::ConnectionInputHandler> InputPtr; ConnectionFactory::ConnectionFactory(Broker& b) : broker(b) {} @@ -33,15 +36,21 @@ ConnectionFactory::~ConnectionFactory() {} sys::ConnectionCodec* ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id) { - if (v == ProtocolVersion(0, 10)) - return new amqp_0_10::Connection(out, broker, id); + if (v == ProtocolVersion(0, 10)) { + ConnectionPtr c(new amqp_0_10::Connection(out, id, false)); + c->setInputHandler(InputPtr(new broker::Connection(c.get(), broker, id, false))); + return c.release(); + } return 0; } sys::ConnectionCodec* ConnectionFactory::create(sys::OutputControl& out, const std::string& id) { // used to create connections from one broker to another - return new amqp_0_10::Connection(out, broker, id, true); + ConnectionPtr c(new amqp_0_10::Connection(out, id, true)); + c->setInputHandler(InputPtr(new broker::Connection(c.get(), broker, id, true))); + return c.release(); } + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/ConnectionFactory.h b/qpid/cpp/src/qpid/broker/ConnectionFactory.h index 5797495054..c61da81024 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionFactory.h +++ b/qpid/cpp/src/qpid/broker/ConnectionFactory.h @@ -27,7 +27,8 @@ namespace qpid { namespace broker { class Broker; -class ConnectionFactory : public sys::ConnectionCodec::Factory { +class ConnectionFactory : public sys::ConnectionCodec::Factory +{ public: ConnectionFactory(Broker& b); diff --git a/qpid/cpp/src/qpid/cluster/ClassifierHandler.cpp b/qpid/cpp/src/qpid/cluster/ClassifierHandler.cpp deleted file mode 100644 index b78f795d20..0000000000 --- a/qpid/cpp/src/qpid/cluster/ClassifierHandler.cpp +++ /dev/null @@ -1,51 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "ClassifierHandler.h" - -#include "qpid/framing/FrameDefaultVisitor.h" -#include "qpid/framing/AMQFrame.h" - -namespace qpid { -namespace cluster { - -using namespace framing; - -struct ClassifierHandler::Visitor : public FrameDefaultVisitor { - Visitor(AMQFrame& f, ClassifierHandler& c) - : chosen(0), frame(f), classifier(c) { f.getBody()->accept(*this); } - - void visit(const ExchangeDeclareBody&) { chosen=&classifier.wiring; } - void visit(const ExchangeDeleteBody&) { chosen=&classifier.wiring; } - void visit(const ExchangeBindBody&) { chosen=&classifier.wiring; } - void visit(const ExchangeUnbindBody&) { chosen=&classifier.wiring; } - void visit(const QueueDeclareBody&) { chosen=&classifier.wiring; } - void visit(const QueueDeleteBody&) { chosen=&classifier.wiring; } - void defaultVisit(const AMQBody&) { chosen=&classifier.other; } - - using framing::FrameDefaultVisitor::visit; - using framing::FrameDefaultVisitor::defaultVisit; - - FrameHandler* chosen; - AMQFrame& frame; - ClassifierHandler& classifier; -}; - -void ClassifierHandler::handle(AMQFrame& f) { Visitor(f, *this).chosen->handle(f); } - -}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/ClassifierHandler.h b/qpid/cpp/src/qpid/cluster/ClassifierHandler.h deleted file mode 100644 index 696e457c04..0000000000 --- a/qpid/cpp/src/qpid/cluster/ClassifierHandler.h +++ /dev/null @@ -1,50 +0,0 @@ -#ifndef QPID_CLUSTER_CLASSIFIERHANDLER_H -#define QPID_CLUSTER_CLASSIFIERHANDLER_H - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "qpid/framing/FrameHandler.h" - -namespace qpid { -namespace cluster { - -/** - * Classify frames and forward to the appropriate handler. - */ -class ClassifierHandler : public framing::FrameHandler -{ - public: - ClassifierHandler(framing::FrameHandler& wiring_, - framing::FrameHandler& other_) - : wiring(wiring_), other(other_) {} - - void handle(framing::AMQFrame&); - - private: - struct Visitor; - friend struct Visitor; - framing::FrameHandler& wiring; - framing::FrameHandler& other; -}; - -}} // namespace qpid::cluster - - - -#endif /*!QPID_CLUSTER_CLASSIFIERHANDLER_H*/ diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index f36d606af8..aea10949e4 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -4,7 +4,7 @@ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at +n * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -17,18 +17,17 @@ */ #include "Cluster.h" -#include "ConnectionInterceptor.h" +#include "Connection.h" #include "qpid/broker/Broker.h" -#include "qpid/broker/SessionState.h" -#include "qpid/broker/Connection.h" #include "qpid/framing/AMQFrame.h" -#include "qpid/framing/ClusterNotifyBody.h" -#include "qpid/framing/ClusterConnectionCloseBody.h" -#include "qpid/framing/ClusterConnectionDoOutputBody.h" +#include "qpid/framing/ClusterJoinedBody.h" #include "qpid/log/Statement.h" #include "qpid/memory.h" #include "qpid/shared_ptr.h" +#include "qpid/framing/AMQP_AllOperations.h" +#include "qpid/framing/AllInvoker.h" +#include "qpid/framing/Invoker.h" #include <boost/bind.hpp> #include <boost/cast.hpp> @@ -39,22 +38,34 @@ namespace qpid { namespace cluster { + using namespace qpid::framing; using namespace qpid::sys; using namespace std; -using broker::Connection; +// Handle cluster controls from a given member. +struct ClusterOperations : public framing::AMQP_AllOperations::ClusterHandler { + Cluster& cluster; + MemberId member; + + ClusterOperations(Cluster& c, const MemberId& m) : cluster(c), member(m) {} + + void joined(const std::string& url) { + cluster.joined(member, url); + } +}; + ostream& operator <<(ostream& out, const Cluster& cluster) { return out << cluster.name.str() << "-" << cluster.self; } -ostream& operator<<(ostream& out, const Cluster::MemberMap::value_type& m) { - return out << m.first << "=" << m.second.url; +ostream& operator<<(ostream& out, const Cluster::UrlMap::value_type& m) { + return out << m.first << " at " << m.second; } -ostream& operator <<(ostream& out, const Cluster::MemberMap& members) { - ostream_iterator<Cluster::MemberMap::value_type> o(out, " "); - copy(members.begin(), members.end(), o); +ostream& operator <<(ostream& out, const Cluster::UrlMap& urls) { + ostream_iterator<Cluster::UrlMap::value_type> o(out, " "); + copy(urls.begin(), urls.end(), o); return out; } @@ -74,9 +85,9 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : mcastQueue(boost::bind(&Cluster::mcastQueueCb, this, _1, _2)) { broker->addFinalizer(boost::bind(&Cluster::leave, this)); - QPID_LOG(trace, "Joining cluster: " << name_); + QPID_LOG(trace, "Node " << self << " joining cluster: " << name_); cpg.join(name); - notify(); + send(AMQFrame(in_place<ClusterJoinedBody>(ProtocolVersion(), url.str())), ConnectionId(self,0)); // Start dispatching from the poller. cpgDispatchHandle.startWatch(poller); @@ -84,31 +95,15 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : mcastQueue.start(poller); } -Cluster::~Cluster() { - for (ShadowConnectionMap::iterator i = shadowConnectionMap.begin(); - i != shadowConnectionMap.end(); - ++i) - { - i->second->dirtyClose(); - } - std::for_each(localConnectionSet.begin(), localConnectionSet.end(), boost::bind(&ConnectionInterceptor::dirtyClose, _1)); -} - -void Cluster::initialize(broker::Connection& c) { - bool isLocal = c.getOutput().get() != &shadowOut; - if (isLocal) - localConnectionSet.insert(new ConnectionInterceptor(c, *this)); -} +Cluster::~Cluster() {} void Cluster::leave() { Mutex::ScopedLock l(lock); if (!broker) return; // Already left. // Leave is called by from Broker destructor after the poller has // been shut down. No dispatches can occur. - - QPID_LOG(debug, "Leaving cluster " << *this); cpg.leave(name); - // broker= is set to 0 when the final config-change is delivered. + // broker is set to 0 when the final config-change is delivered. while(broker) { Mutex::ScopedUnlock u(lock); cpg.dispatchAll(); @@ -126,9 +121,9 @@ template <class T> void encodePtr(Buffer& buf, T* ptr) { buf.putLongLong(value); } -void Cluster::send(const AMQFrame& frame, ConnectionInterceptor* connection) { - QPID_LOG(trace, "MCAST [" << connection << "] " << frame); - mcastQueue.push(Message(frame, self, connection)); +void Cluster::send(const AMQFrame& frame, const ConnectionId& id) { + QPID_LOG(trace, "MCAST [" << id << "] " << frame); + mcastQueue.push(Message(frame, id)); } void Cluster::mcastQueueCb(const MessageQueue::iterator& begin, @@ -137,48 +132,40 @@ void Cluster::mcastQueueCb(const MessageQueue::iterator& begin, // Static is OK because there is only one cluster allowed per // process and only one thread in mcastQueueCb at a time. static char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management. - MessageQueue::iterator i = begin; - while (i != end) { - Buffer buf(buffer, sizeof(buffer)); - while (i != end && buf.available() > i->frame.size() + sizeof(uint64_t)) { - i->frame.encode(buf); - encodePtr(buf, i->connection); - ++i; - } - iovec iov = { buffer, buf.getPosition() }; - cpg.mcast(name, &iov, 1); + Buffer buf(buffer, sizeof(buffer)); + for (MessageQueue::iterator i = begin; i != end; ++i) { + AMQFrame& frame =i->first; + ConnectionId id =i->second; + if (buf.available() < frame.size() + sizeof(uint64_t)) + break; + frame.encode(buf); + encodePtr(buf, id.second); } -} - -void Cluster::notify() { - send(AMQFrame(in_place<ClusterNotifyBody>(ProtocolVersion(), url.str())), 0); + iovec iov = { buffer, buf.getPosition() }; + cpg.mcast(name, &iov, 1); } size_t Cluster::size() const { Mutex::ScopedLock l(lock); - return members.size(); + return urls.size(); } -Cluster::MemberList Cluster::getMembers() const { +std::vector<Url> Cluster::getUrls() const { Mutex::ScopedLock l(lock); - MemberList result(members.size()); - std::transform(members.begin(), members.end(), result.begin(), - boost::bind(&MemberMap::value_type::second, _1)); + std::vector<Url> result(urls.size()); + std::transform(urls.begin(), urls.end(), result.begin(), boost::bind(&UrlMap::value_type::second, _1)); return result; } -ConnectionInterceptor* Cluster::getShadowConnection(const Cpg::Id& member, void* remotePtr) { - ShadowConnectionId id(member, remotePtr); - ShadowConnectionMap::iterator i = shadowConnectionMap.find(id); - if (i == shadowConnectionMap.end()) { // A new shadow connection. +boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& id) { + boost::intrusive_ptr<Connection> c = connections[id]; + if (!c && id.first != self) { // Shadow connection std::ostringstream os; - os << name << ":" << member << ":" << remotePtr; - assert(broker); - broker::Connection* c = new broker::Connection(&shadowOut, *broker, os.str()); - ShadowConnectionMap::value_type value(id, new ConnectionInterceptor(*c, *this, id)); - i = shadowConnectionMap.insert(value).first; + os << id; + c = connections[id] = new Connection(*this, shadowOut, os.str(), id); } - return i->second; + assert(c); + return c; } void Cluster::deliver( @@ -189,16 +176,17 @@ void Cluster::deliver( void* msg, int msg_len) { - Id from(nodeid, pid); + MemberId from(nodeid, pid); try { Buffer buf(static_cast<char*>(msg), msg_len); while (buf.available() > 0) { AMQFrame frame; if (!frame.decode(buf)) // Not enough data. throw Exception("Received incomplete cluster event."); - void* connection; - decodePtr(buf, connection); - deliverQueue.push(Message(frame, from, connection)); + Connection* cp; + decodePtr(buf, cp); + QPID_LOG(critical, "deliverQ.push " << frame); + deliverQueue.push(Message(frame, ConnectionId(from, cp))); } } catch (const std::exception& e) { @@ -213,23 +201,21 @@ void Cluster::deliverQueueCb(const MessageQueue::iterator& begin, const MessageQueue::iterator& end) { for (MessageQueue::iterator i = begin; i != end; ++i) { - AMQFrame& frame(i->frame); - Id from(i->from); - ConnectionInterceptor* connection = reinterpret_cast<ConnectionInterceptor*>(i->connection); + AMQFrame& frame(i->first); + ConnectionId connectionId(i->second); try { - QPID_LOG(trace, "DLVR [" << from << " " << connection << "] " << frame); - + QPID_LOG(trace, "DLVR [" << connectionId << "]: " << frame); if (!broker) { - QPID_LOG(warning, "Unexpected DLVR, already left the cluster."); + QPID_LOG(error, "Unexpected DLVR after leaving the cluster."); return; } - if (connection && from != self) // Look up shadow for remote connections - connection = getShadowConnection(from, connection); - - if (frame.getMethod() && frame.getMethod()->amqpClassId() == CLUSTER_CLASS_ID) - handleMethod(from, connection, *frame.getMethod()); - else - connection->deliver(frame); + if (connectionId.getConnectionPtr()) // Connection control + getConnection(connectionId)->deliver(frame); + else { // Cluster control + ClusterOperations cops(*this, connectionId.getMember()); + bool invoked = framing::invoke(cops, *frame.getBody()).wasHandled(); + assert(invoked); + } } catch (const std::exception& e) { // FIXME aconway 2008-01-30: exception handling. @@ -240,54 +226,30 @@ void Cluster::deliverQueueCb(const MessageQueue::iterator& begin, } } -// Handle cluster methods -// FIXME aconway 2008-07-11: Generate/template a better dispatch mechanism. -void Cluster::handleMethod(Id from, ConnectionInterceptor* connection, AMQMethodBody& method) { - assert(method.amqpClassId() == CLUSTER_CLASS_ID); - switch (method.amqpMethodId()) { - case CLUSTER_NOTIFY_METHOD_ID: { - ClusterNotifyBody& notify=static_cast<ClusterNotifyBody&>(method); - Mutex::ScopedLock l(lock); - members[from].url=notify.getUrl(); - lock.notifyAll(); - break; - } - case CLUSTER_CONNECTION_CLOSE_METHOD_ID: { - if (!connection->isLocal()) - shadowConnectionMap.erase(connection->getShadowId()); - else - localConnectionSet.erase(connection); - connection->deliverClosed(); - break; - } - case CLUSTER_CONNECTION_DO_OUTPUT_METHOD_ID: { - ClusterConnectionDoOutputBody& doOutput = static_cast<ClusterConnectionDoOutputBody&>(method); - connection->deliverDoOutput(doOutput.getBytes()); - break; - } - default: - assert(0); - } +void Cluster::joined(const MemberId& member, const string& url) { + Mutex::ScopedLock l(lock); + QPID_LOG(debug, member << " has URL " << url); + urls[member] = url; + lock.notifyAll(); } void Cluster::configChange( cpg_handle_t /*handle*/, cpg_name */*group*/, - cpg_address *current, int nCurrent, + cpg_address */*current*/, int /*nCurrent*/, cpg_address *left, int nLeft, - cpg_address */*joined*/, int nJoined) + cpg_address *joined, int nJoined) { + QPID_LOG(debug, "Cluster change: " << std::make_pair(joined, nJoined) << std::make_pair(left, nLeft)); Mutex::ScopedLock l(lock); - for (int i = 0; i < nLeft; ++i) - members.erase(left[i]); - for(int j = 0; j < nCurrent; ++j) - members[current[j]].id = current[j]; - QPID_LOG(debug, "Cluster members: " << nCurrent << " ("<< nLeft << " left, " << nJoined << " joined):" - << members); - assert(members.size() == size_t(nCurrent)); - if (members.find(self) == members.end()) + // We add URLs to the map in joined() we don't keep track of pre-URL members yet. + for (int l = 0; l < nLeft; ++l) urls.erase(left[l]); + + if (std::find(left, left+nLeft, self) != left+nLeft) { broker = 0; // We have left the group, this is the final config change. - lock.notifyAll(); // Threads waiting for membership changes. + QPID_LOG(debug, "Leaving cluster " << *this); + } + lock.notifyAll(); // Threads waiting for url changes. } void Cluster::dispatch(sys::DispatchHandle& h) { @@ -301,6 +263,16 @@ void Cluster::disconnect(sys::DispatchHandle& h) { broker->shutdown(); } +void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { + Mutex::ScopedLock l(lock); + connections[c->getId()] = c; +} + +void Cluster::erase(ConnectionId id) { + Mutex::ScopedLock l(lock); + connections.erase(id); +} + }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index 2b40193dd3..45bb3ed3c4 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -19,22 +19,16 @@ * */ +#include "qpid/cluster/types.h" #include "qpid/cluster/Cpg.h" -#include "qpid/cluster/ShadowConnectionOutputHandler.h" #include "qpid/cluster/PollableQueue.h" +#include "qpid/cluster/NoOpConnectionOutputHandler.h" #include "qpid/broker/Broker.h" -#include "qpid/broker/Connection.h" -#include "qpid/sys/Dispatcher.h" #include "qpid/sys/Monitor.h" -#include "qpid/sys/Runnable.h" -#include "qpid/sys/Thread.h" -#include "qpid/log/Logger.h" +#include "qpid/framing/AMQP_AllOperations.h" #include "qpid/Url.h" -#include "qpid/RefCounted.h" -#include <boost/optional.hpp> -#include <boost/function.hpp> #include <boost/intrusive_ptr.hpp> #include <map> @@ -43,24 +37,15 @@ namespace qpid { namespace cluster { -class ConnectionInterceptor; +class Connection; /** * Connection to the cluster. * Keeps cluster membership data. */ -class Cluster : private Cpg::Handler, public RefCounted +class Cluster : public RefCounted, private Cpg::Handler { public: - typedef boost::tuple<Cpg::Id, void*> ShadowConnectionId; - - /** Details of a cluster member */ - struct Member { - Cpg::Id id; - Url url; - }; - - typedef std::vector<Member> MemberList; /** * Join a cluster. @@ -71,11 +56,11 @@ class Cluster : private Cpg::Handler, public RefCounted virtual ~Cluster(); - /** Initialize interceptors for a new connection */ - void initialize(broker::Connection&); + void insert(const boost::intrusive_ptr<Connection>&); // Insert a local connection + void erase(ConnectionId); // Erase a connection. - /** Get the current cluster membership. */ - MemberList getMembers() const; + /** Get the URLs of current cluster members. */ + std::vector<Url> getUrls() const; /** Number of members in the cluster. */ size_t size() const; @@ -83,33 +68,27 @@ class Cluster : private Cpg::Handler, public RefCounted bool empty() const { return size() == 0; } /** Send frame to the cluster */ - void send(const framing::AMQFrame&, ConnectionInterceptor*); + void send(const framing::AMQFrame&, const ConnectionId&); /** Leave the cluster */ void leave(); - // Cluster frame handing functions - void notify(const std::string& url); - void connectionClose(); + void joined(const MemberId&, const std::string& url); + + broker::Broker& getBroker() { assert(broker); return *broker; } + MemberId getSelf() const { return self; } + private: - typedef Cpg::Id Id; - typedef std::map<Id, Member> MemberMap; - typedef std::map<ShadowConnectionId, ConnectionInterceptor*> ShadowConnectionMap; - typedef std::set<ConnectionInterceptor*> LocalConnectionSet; + typedef std::map<MemberId, Url> UrlMap; + typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > ConnectionMap; /** Message sent over the cluster. */ - struct Message { - framing::AMQFrame frame; Id from; void* connection; - Message(const framing::AMQFrame& f, const Id i, void* c) - : frame(f), from(i), connection(c) {} - }; + typedef std::pair<framing::AMQFrame, ConnectionId> Message; typedef PollableQueue<Message> MessageQueue; boost::function<void()> shutdownNext; - void notify(); ///< Notify cluster of my details. - /** CPG deliver callback. */ void deliver( cpg_handle_t /*handle*/, @@ -142,9 +121,9 @@ class Cluster : private Cpg::Handler, public RefCounted /** Callback if CPG fd is disconnected. */ void disconnect(sys::DispatchHandle&); - void handleMethod(Id from, ConnectionInterceptor* connection, framing::AMQMethodBody& method); + void handleMethod(MemberId from, cluster::Connection* connection, framing::AMQMethodBody& method); - ConnectionInterceptor* getShadowConnection(const Cpg::Id&, void*); + boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&); mutable sys::Monitor lock; // Protect access to members. broker::Broker* broker; @@ -152,18 +131,17 @@ class Cluster : private Cpg::Handler, public RefCounted Cpg cpg; Cpg::Name name; Url url; - MemberMap members; - Id self; - ShadowConnectionMap shadowConnectionMap; - LocalConnectionSet localConnectionSet; - ShadowConnectionOutputHandler shadowOut; + UrlMap urls; + MemberId self; + ConnectionMap connections; + NoOpConnectionOutputHandler shadowOut; sys::DispatchHandle cpgDispatchHandle; MessageQueue deliverQueue; MessageQueue mcastQueue; friend std::ostream& operator <<(std::ostream&, const Cluster&); - friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&); - friend std::ostream& operator <<(std::ostream&, const MemberMap&); + friend std::ostream& operator <<(std::ostream&, const UrlMap::value_type&); + friend std::ostream& operator <<(std::ostream&, const UrlMap&); }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp index 1d07660455..d829683000 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -16,10 +16,13 @@ * */ -#include "ConnectionInterceptor.h" +#include "Connection.h" +#include "ConnectionCodec.h" -#include "qpid/broker/Broker.h" #include "qpid/cluster/Cluster.h" +#include "qpid/cluster/ConnectionCodec.h" + +#include "qpid/broker/Broker.h" #include "qpid/Plugin.h" #include "qpid/Options.h" #include "qpid/shared_ptr.h" @@ -63,36 +66,25 @@ struct ClusterPlugin : public Plugin { ClusterValues values; ClusterOptions options; boost::intrusive_ptr<Cluster> cluster; + boost::scoped_ptr<ConnectionCodec::Factory> factory; ClusterPlugin() : options(values) {} Options* getOptions() { return &options; } - void init(broker::Broker& b) { - if (values.name.empty()) return; // Only if --cluster-name option was specified. + void initialize(Plugin::Target& target) { + broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); + if (!broker || values.name.empty()) return; // Only if --cluster-name option was specified. if (cluster) throw Exception("Cluster plugin cannot be initialized twice in one process."); - cluster = new Cluster(values.name, values.getUrl(b.getPort()), b); - b.addFinalizer(boost::bind(&ClusterPlugin::shutdown, this)); - } - - template <class T> void init(T& t) { - if (cluster) cluster->initialize(t); - } - - template <class T> bool init(Plugin::Target& target) { - T* t = dynamic_cast<T*>(&target); - if (t) init(*t); - return t; + cluster = new Cluster(values.name, values.getUrl(broker->getPort()), *broker); + broker->addFinalizer(boost::bind(&ClusterPlugin::shutdown, this)); + broker->setConnectionFactory( + boost::shared_ptr<sys::ConnectionCodec::Factory>( + new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster))); } void earlyInitialize(Plugin::Target&) {} - void initialize(Plugin::Target& target) { - if (init<broker::Broker>(target)) return; - if (!cluster) return; // Remaining plugins only valid if cluster initialized. - if (init<broker::Connection>(target)) return; - } - void shutdown() { cluster = 0; } }; diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp new file mode 100644 index 0000000000..b82b4565c3 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -0,0 +1,94 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Connection.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/Invoker.h" +#include "qpid/framing/AllInvoker.h" +#include "qpid/framing/ClusterConnectionDeliverCloseBody.h" +#include "qpid/log/Statement.h" + +#include <boost/current_function.hpp> + +namespace qpid { +namespace cluster { + +using namespace framing; + +Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, + const std::string& wrappedId, ConnectionId myId) + : cluster(c), self(myId), output(*this, out), + connection(&output, cluster.getBroker(), wrappedId) +{} + +Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, + const std::string& wrappedId, MemberId myId) + : cluster(c), self(myId, this), output(*this, out), + connection(&output, cluster.getBroker(), wrappedId) +{} + +Connection::~Connection() {} + +// Forward all received frames to the cluster, continue handling on delivery. +void Connection::received(framing::AMQFrame& f) { + cluster.send(f, self); +} + +// Don't doOutput in the +bool Connection::doOutput() { return output.doOutput(); } + +// Handle frames delivered from cluster. +void Connection::deliver(framing::AMQFrame& f) { + // Handle connection controls, deliver other frames to connection. + if (!framing::invoke(*this, *f.getBody()).wasHandled()) + connection.received(f); +} + +void Connection::closed() { + try { + // Called when the local network connection is closed. We still + // need to process any outstanding cluster frames for this + // connection to ensure our sessions are up-to-date. We defer + // closing the Connection object till deliverClosed(), but replace + // its output handler with a null handler since the network output + // handler will be deleted. + // + connection.setOutputHandler(&discardHandler); + cluster.send(AMQFrame(in_place<ClusterConnectionDeliverCloseBody>()), self); + } + catch (const std::exception& e) { + QPID_LOG(error, QPID_MSG("While closing connection: " << e.what())); + } +} + +void Connection::deliverClose () { + connection.closed(); + cluster.erase(self); +} + +// Delivery of doOutput allows us to run the real connection doOutput() +// which stocks up the write buffers with data. +// +void Connection::deliverDoOutput(size_t requested) { + output.deliverDoOutput(requested); +} + +}} // namespace qpid::cluster + diff --git a/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h b/qpid/cpp/src/qpid/cluster/Connection.h index 9216921067..648ec1a1d0 100644 --- a/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -1,5 +1,5 @@ -#ifndef QPID_CLUSTER_CONNECTIONINTERCEPTOR_H -#define QPID_CLUSTER_CONNECTIONINTERCEPTOR_H +#ifndef QPID_CLUSTER_CONNECTION_H +#define QPID_CLUSTER_CONNECTION_H /* * @@ -22,67 +22,80 @@ * */ +#include "types.h" #include "Cluster.h" #include "WriteEstimate.h" #include "OutputInterceptor.h" + #include "qpid/broker/Connection.h" +#include "qpid/amqp_0_10/Connection.h" +#include "qpid/sys/ConnectionInputHandler.h" #include "qpid/sys/ConnectionOutputHandler.h" namespace qpid { + namespace framing { class AMQFrame; } + namespace cluster { /** * Plug-in associated with broker::Connections, both local and shadow. */ -class ConnectionInterceptor { +class Connection : + public RefCounted, + public sys::ConnectionInputHandler, + public sys::ConnectionOutputHandler, + public framing::AMQP_AllOperations::ClusterConnectionHandler + +{ public: - ConnectionInterceptor(broker::Connection&, Cluster&, - Cluster::ShadowConnectionId shadowId=Cluster::ShadowConnectionId(0,0)); - ~ConnectionInterceptor(); + /** Local connection, use this in ConnectionId */ + Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, MemberId); + /** Shadow connection */ + Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, ConnectionId); + ~Connection(); - Cluster::ShadowConnectionId getShadowId() const { return shadowId; } + ConnectionId getId() const { return self; } + bool isLocal() const { return self.second == this; } - bool isShadow() const { return shadowId != Cluster::ShadowConnectionId(0,0); } - bool isLocal() const { return !isShadow(); } - bool getClosed() const { return isClosed; } - // self-delivery of intercepted extension points. void deliver(framing::AMQFrame& f); - void deliverClosed(); + void deliverClose(); void deliverDoOutput(size_t requested); - void dirtyClose(); - - Cluster& getCluster() { return cluster; } + void codecDeleted(); - private: - struct NullConnectionHandler : public qpid::sys::ConnectionOutputHandler { - void close() {} - void send(framing::AMQFrame&) {} - void activateOutput() {} - }; + Cluster& getCluster() { return cluster; } + + // ConnectionOutputHandler methods + void close() {} + void send(framing::AMQFrame&) {} + void activateOutput() {} + virtual size_t getBuffered() const { assert(0); return 0; } - // Functions to intercept to Connection extension points. + // ConnectionInputHandler methods void received(framing::AMQFrame&); void closed(); bool doOutput(); - void activateOutput(); + bool hasOutput() { return connection.hasOutput(); } + void idleOut() { idleOut(); } + void idleIn() { idleIn(); } - void sendDoOutput(); + // ConnectionInputHandlerFactory + sys::ConnectionInputHandler* create(sys::ConnectionOutputHandler* out, const std::string& id, bool isClient); - boost::function<void (framing::AMQFrame&)> receivedNext; - boost::function<void ()> closedNext; + broker::Connection& getBrokerConnection() { return connection; } + private: + void sendDoOutput(); - boost::intrusive_ptr<broker::Connection> connection; Cluster& cluster; - NullConnectionHandler discardHandler; - bool isClosed; - Cluster::ShadowConnectionId shadowId; + ConnectionId self; + NoOpConnectionOutputHandler discardHandler; WriteEstimate writeEstimate; OutputInterceptor output; + broker::Connection connection; }; }} // namespace qpid::cluster -#endif /*!QPID_CLUSTER_CONNECTIONINTERCEPTOR_H*/ +#endif /*!QPID_CLUSTER_CONNECTION_H*/ diff --git a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp b/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp new file mode 100644 index 0000000000..cb396cd10c --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ConnectionCodec.h" +#include "Connection.h" +#include "ProxyInputHandler.h" +#include "qpid/broker/Connection.h" +#include "qpid/memory.h" + +namespace qpid { +namespace cluster { + +sys::ConnectionCodec* +ConnectionCodec::Factory::create(framing::ProtocolVersion v, sys::OutputControl& out, const std::string& id) { + if (v == framing::ProtocolVersion(0, 10)) + return new ConnectionCodec(out, id, cluster); + return 0; +} + +sys::ConnectionCodec* +ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& id) { + // FIXME aconway 2008-08-27: outbound connections need to be made + // with proper qpid::client code for failover, get rid of this + // broker-side hack. + return next->create(out, id); +} + +ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& cluster) + : codec(out, id, false), + interceptor(new Connection(cluster, codec, id, cluster.getSelf())) +{ + std::auto_ptr<sys::ConnectionInputHandler> ih(new ProxyInputHandler(interceptor)); + codec.setInputHandler(ih); + cluster.insert(interceptor); +} + +ConnectionCodec::~ConnectionCodec() {} + +// ConnectionCodec functions delegate to the codecOutput +size_t ConnectionCodec::decode(const char* buffer, size_t size) { return codec.decode(buffer, size); } +size_t ConnectionCodec::encode(const char* buffer, size_t size) { return codec.encode(buffer, size); } +bool ConnectionCodec::canEncode() { return codec.canEncode(); } +void ConnectionCodec::closed() { codec.closed(); } +bool ConnectionCodec::isClosed() const { return codec.isClosed(); } +framing::ProtocolVersion ConnectionCodec::getVersion() const { return codec.getVersion(); } + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/ConnectionCodec.h b/qpid/cpp/src/qpid/cluster/ConnectionCodec.h new file mode 100644 index 0000000000..cbc3dcdfe6 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/ConnectionCodec.h @@ -0,0 +1,77 @@ +#ifndef QPID_CLUSTER_CONNCTIONCODEC_H +#define QPID_CLUSTER_CONNCTIONCODEC_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/amqp_0_10/Connection.h" +#include "qpid/cluster/Connection.h" +#include <boost/shared_ptr.hpp> +#include <boost/intrusive_ptr.hpp> + +namespace qpid { + +namespace broker { +class Connection; +} + +namespace cluster { +class Cluster; + +/** + * Encapsulates the standard amqp_0_10::ConnectionCodec and sets up + * a cluster::Connection for the connection. + * + * The ConnectionCodec is deleted by the network layer when the + * connection closes. The cluster::Connection needs to be kept + * around until all cluster business on the connection is complete. + * + */ +class ConnectionCodec : public sys::ConnectionCodec { + public: + struct Factory : public sys::ConnectionCodec::Factory { + boost::shared_ptr<sys::ConnectionCodec::Factory> next; + Cluster& cluster; + Factory(boost::shared_ptr<sys::ConnectionCodec::Factory> f, Cluster& c) : next(f), cluster(c) {} + sys::ConnectionCodec* create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id); + sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id); + }; + + ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& c); + ~ConnectionCodec(); + + // ConnectionCodec functions delegate to the codecOutput + size_t decode(const char* buffer, size_t size); + size_t encode(const char* buffer, size_t size); + bool canEncode(); + void closed(); + bool isClosed() const; + framing::ProtocolVersion getVersion() const; + + + private: + amqp_0_10::Connection codec; + boost::intrusive_ptr<cluster::Connection> interceptor; +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_CONNCTIONCODEC_H*/ diff --git a/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp b/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp deleted file mode 100644 index efcab1b731..0000000000 --- a/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp +++ /dev/null @@ -1,102 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "ConnectionInterceptor.h" -#include "qpid/framing/ClusterConnectionCloseBody.h" -#include "qpid/framing/ClusterConnectionDoOutputBody.h" -#include "qpid/framing/AMQFrame.h" -#include <boost/current_function.hpp> - - -namespace qpid { -namespace cluster { - -using namespace framing; - -template <class T, class U, class V> void shift(T& a, U& b, const V& c) { a = b; b = c; } - -ConnectionInterceptor::ConnectionInterceptor( - broker::Connection& conn, Cluster& clust, Cluster::ShadowConnectionId shadowId_) - : connection(&conn), cluster(clust), isClosed(false), shadowId(shadowId_), output(*this, *conn.getOutput().get()) -{ - connection->addFinalizer(boost::bind(operator delete, this)); - connection->setOutputHandler(&output), - // Attach my functions to Connection extension points. - shift(receivedNext, connection->receivedFn, boost::bind(&ConnectionInterceptor::received, this, _1)); - shift(closedNext, connection->closedFn, boost::bind(&ConnectionInterceptor::closed, this)); - shift(output.doOutputNext, connection->doOutputFn, boost::bind(&OutputInterceptor::doOutput, &output)); -} - -ConnectionInterceptor::~ConnectionInterceptor() { - assert(connection == 0); -} - -// Forward all received frames to the cluster, continue handling on delivery. -void ConnectionInterceptor::received(framing::AMQFrame& f) { - if (isClosed) return; - cluster.send(f, this); -} - -// Continue normal handling of delivered frames. -void ConnectionInterceptor::deliver(framing::AMQFrame& f) { - receivedNext(f); -} - -void ConnectionInterceptor::closed() { - if (isClosed) return; - try { - // Called when the local network connection is closed. We still - // need to process any outstanding cluster frames for this - // connection to ensure our sessions are up-to-date. We defer - // closing the Connection object till deliverClosed(), but replace - // its output handler with a null handler since the network output - // handler will be deleted. - // - connection->setOutputHandler(&discardHandler); - cluster.send(AMQFrame(in_place<ClusterConnectionCloseBody>()), this); - isClosed = true; - } - catch (const std::exception& e) { - QPID_LOG(error, QPID_MSG("While closing connection: " << e.what())); - } -} - -void ConnectionInterceptor::deliverClosed() { - closedNext(); - // Drop reference so connection will be deleted, which in turn - // will delete this via finalizer added in ctor. - connection = 0; -} - -void ConnectionInterceptor::dirtyClose() { - // Not closed via cluster self-delivery but closed locally. Used - // when local broker is shut down without a clean cluster shutdown. - // Release the connection, it will delete this. - connection = 0; -} - -// Delivery of doOutput allows us to run the real connection doOutput() -// which stocks up the write buffers with data. -// -void ConnectionInterceptor::deliverDoOutput(size_t requested) { - output.deliverDoOutput(requested); -} - -}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/Cpg.cpp b/qpid/cpp/src/qpid/cluster/Cpg.cpp index 2ffd3509bf..ce678015a2 100644 --- a/qpid/cpp/src/qpid/cluster/Cpg.cpp +++ b/qpid/cpp/src/qpid/cluster/Cpg.cpp @@ -18,7 +18,6 @@ #include "Cpg.h" #include "qpid/sys/Mutex.h" -// Note cpg is currently unix-specific. Refactor if availble on other platforms. #include "qpid/sys/posix/PrivatePosix.h" #include "qpid/log/Statement.h" @@ -170,27 +169,50 @@ std::string Cpg::cantMcastMsg(const Name& group) { return "Cannot mcast to CPG group "+group.str(); } -Cpg::Id Cpg::self() const { +MemberId Cpg::self() const { unsigned int nodeid; check(cpg_local_get(handle, &nodeid), "Cannot get local CPG identity"); - return Id(nodeid, getpid()); + return MemberId(nodeid, getpid()); } -ostream& operator<<(ostream& o, std::pair<cpg_address*,int> a) { - ostream_iterator<Cpg::Id> i(o, " "); - std::copy(a.first, a.first+a.second, i); - return o; +ostream& operator <<(ostream& out, const MemberId& id) { + return out << std::hex << id.first << ":" << std::dec << id.second; } -ostream& operator <<(ostream& out, const Cpg::Id& id) { - return out << id.getNodeId() << "-" << id.getPid(); +ostream& operator<<(ostream& o, const ConnectionId& c) { + return o << c.first << "-" << c.second; } -ostream& operator <<(ostream& out, const cpg_name& name) { - return out << string(name.value, name.length); +ostream& operator<<(ostream& o, const cpg_name& name) { + return o << string(name.value, name.length); } }} // namespace qpid::cluster +// In proper namespace for ADL. + +std::ostream& operator<<(std::ostream& o, const ::cpg_address& a) { + const char* reasonString; + switch (a.reason) { + case CPG_REASON_JOIN: reasonString = "joined"; break; + case CPG_REASON_LEAVE: reasonString = "left";break; + case CPG_REASON_NODEDOWN: reasonString = "node-down";break; + case CPG_REASON_NODEUP: reasonString = "node-up";break; + case CPG_REASON_PROCDOWN: reasonString = "process-down";break; + default: + assert(0); + reasonString = ""; + } + return o << qpid::cluster::MemberId(a.nodeid, a.pid) << " " << reasonString; +} + +namespace std { +ostream& operator<<(ostream& o, std::pair<cpg_address*,int> a) { + for (cpg_address* p = a.first; p < a.first+a.second; ++p) + o << *p << " "; + return o; +} +} + diff --git a/qpid/cpp/src/qpid/cluster/Cpg.h b/qpid/cpp/src/qpid/cluster/Cpg.h index 96fd692a77..fdc451fbbc 100644 --- a/qpid/cpp/src/qpid/cluster/Cpg.h +++ b/qpid/cpp/src/qpid/cluster/Cpg.h @@ -19,12 +19,12 @@ * */ +#include "qpid/cluster/types.h" +#include "qpid/cluster/Dispatchable.h" + #include "qpid/Exception.h" #include "qpid/sys/IOHandle.h" -#include "qpid/cluster/Dispatchable.h" -#include <boost/tuple/tuple.hpp> -#include <boost/tuple/tuple_comparison.hpp> #include <boost/scoped_ptr.hpp> #include <cassert> @@ -65,14 +65,6 @@ class Cpg : public sys::IOHandle { std::string str() const { return std::string(value, length); } }; - // boost::tuple gives us == and < for free. - struct Id : public boost::tuple<uint32_t, uint32_t> { - Id(uint32_t n=0, uint32_t p=0) : boost::tuple<uint32_t, uint32_t>(n, p) {} - Id(const cpg_address& addr) : boost::tuple<uint32_t, uint32_t>(addr.nodeid, addr.pid) {} - uint32_t getNodeId() const { return boost::get<0>(*this); } - uint32_t getPid() const { return boost::get<1>(*this); } - }; - static std::string str(const cpg_name& n) { return std::string(n.value, n.length); } @@ -127,7 +119,7 @@ class Cpg : public sys::IOHandle { cpg_handle_t getHandle() const { return handle; } - Id self() const; + MemberId self() const; int getFd(); @@ -166,9 +158,7 @@ class Cpg : public sys::IOHandle { bool isShutdown; }; -std::ostream& operator <<(std::ostream& out, const cpg_name& name); -std::ostream& operator <<(std::ostream& out, const Cpg::Id& id); -std::ostream& operator <<(std::ostream& out, const std::pair<cpg_address*,int> addresses); +std::ostream& operator <<(std::ostream& out, const MemberId& id); inline bool operator==(const cpg_name& a, const cpg_name& b) { return a.length==b.length && strncmp(a.value, b.value, a.length) == 0; @@ -177,5 +167,12 @@ inline bool operator!=(const cpg_name& a, const cpg_name& b) { return !(a == b); }} // namespace qpid::cluster +// In proper namespaces for ADL +std::ostream& operator <<(std::ostream& out, const cpg_name& name); +std::ostream& operator<<(std::ostream& o, const cpg_address& a); +namespace std { +std::ostream& operator <<(std::ostream& out, std::pair<cpg_address*,int> addresses); +} + #endif /*!CPG_H*/ diff --git a/qpid/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h b/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h index 6d429535e6..3c24dd71f2 100644 --- a/qpid/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h +++ b/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h @@ -1,5 +1,5 @@ -#ifndef QPID_CLUSTER_SHADOWCONNECTIONOUTPUTHANDLER_H -#define QPID_CLUSTER_SHADOWCONNECTIONOUTPUTHANDLER_H +#ifndef QPID_CLUSTER_NOOPCONNECTIONOUTPUTHANDLER_H +#define QPID_CLUSTER_NOOPCONNECTIONOUTPUTHANDLER_H /* * @@ -30,10 +30,10 @@ namespace framing { class AMQFrame; } namespace cluster { /** - * Output handler for frames sent to shadow connections. + * Output handler for frames sent to noop connections. * Simply discards frames. */ -class ShadowConnectionOutputHandler : public sys::ConnectionOutputHandler +class NoOpConnectionOutputHandler : public sys::ConnectionOutputHandler { public: virtual void send(framing::AMQFrame&) {} @@ -43,4 +43,4 @@ class ShadowConnectionOutputHandler : public sys::ConnectionOutputHandler }} // namespace qpid::cluster -#endif /*!QPID_CLUSTER_SHADOWCONNECTIONOUTPUTHANDLER_H*/ +#endif /*!QPID_CLUSTER_NOOPCONNECTIONOUTPUTHANDLER_H*/ diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp index 84d3a6ad69..6c77d2747a 100644 --- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -19,9 +19,10 @@ * */ #include "OutputInterceptor.h" -#include "ConnectionInterceptor.h" -#include "qpid/framing/ClusterConnectionDoOutputBody.h" +#include "Connection.h" +#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/log/Statement.h" #include <boost/current_function.hpp> @@ -30,7 +31,7 @@ namespace cluster { using namespace framing; -OutputInterceptor::OutputInterceptor(ConnectionInterceptor& p, sys::ConnectionOutputHandler& h) +OutputInterceptor::OutputInterceptor(cluster::Connection& p, sys::ConnectionOutputHandler& h) : parent(p), next(h), sent(), moreOutput(), doingOutput() {} @@ -57,8 +58,6 @@ bool OutputInterceptor::doOutput() { // which stocks up the write buffers with data. // void OutputInterceptor::deliverDoOutput(size_t requested) { - if (parent.getClosed()) return; - Locker l(lock); size_t buf = next.getBuffered(); if (parent.isLocal()) @@ -68,7 +67,7 @@ void OutputInterceptor::deliverDoOutput(size_t requested) { sent = 0; do { sys::Mutex::ScopedUnlock u(lock); - moreOutput = doOutputNext(); // Calls send() + moreOutput = parent.getBrokerConnection().doOutput(); } while (sent < requested && moreOutput); sent += buf; // Include buffered data in the sent total. @@ -88,8 +87,8 @@ void OutputInterceptor::startDoOutput() { // Send a doOutput request if one is not already in flight. void OutputInterceptor::sendDoOutput() { // Call with lock held. - if (parent.isShadow() || parent.getClosed()) - return; + // FIXME aconway 2008-08-28: used to have || parent.getClosed()) + if (!parent.isLocal()) return; doingOutput = true; size_t request = writeEstimate.sending(getBuffered()); @@ -98,8 +97,8 @@ void OutputInterceptor::sendDoOutput() { // Send it anyway to keep the doOutput chain going until we are sure there's no more output // (in deliverDoOutput) // - parent.getCluster().send(AMQFrame(in_place<ClusterConnectionDoOutputBody>( - framing::ProtocolVersion(), request)), &parent); + parent.getCluster().send(AMQFrame(in_place<ClusterConnectionDeliverDoOutputBody>( + framing::ProtocolVersion(), request)), parent.getId()); QPID_LOG(trace, &parent << "Send doOutput request for " << request); } diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.h b/qpid/cpp/src/qpid/cluster/OutputInterceptor.h index b39f2a2be9..548ec32b5b 100644 --- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.h +++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.h @@ -31,14 +31,14 @@ namespace qpid { namespace framing { class AMQFrame; } namespace cluster { -class ConnectionInterceptor; +class Connection; /** * Interceptor for connection OutputHandler, manages outgoing message replication. */ class OutputInterceptor : public sys::ConnectionOutputHandler { public: - OutputInterceptor(ConnectionInterceptor& p, sys::ConnectionOutputHandler& h); + OutputInterceptor(cluster::Connection& p, sys::ConnectionOutputHandler& h); // sys::ConnectionOutputHandler functions void send(framing::AMQFrame& f); @@ -51,9 +51,7 @@ class OutputInterceptor : public sys::ConnectionOutputHandler { // Intercept doOutput requests on Connection. bool doOutput(); - boost::function<bool ()> doOutputNext; - - ConnectionInterceptor& parent; + cluster::Connection& parent; private: typedef sys::Mutex::ScopedLock Locker; diff --git a/qpid/cpp/src/qpid/cluster/PollableQueue.h b/qpid/cpp/src/qpid/cluster/PollableQueue.h index 0bba2ba790..29891da344 100644 --- a/qpid/cpp/src/qpid/cluster/PollableQueue.h +++ b/qpid/cpp/src/qpid/cluster/PollableQueue.h @@ -90,7 +90,7 @@ template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) { batch.swap(queue); condition.clear(); ScopedUnlock u(lock); - callback(batch.begin(), batch.end()); // Process the batch outside the lock. + callback(batch.begin(), batch.end()); // Process outside the lock to allow concurrent push. h.rewatch(); } diff --git a/qpid/cpp/src/qpid/cluster/ProxyInputHandler.h b/qpid/cpp/src/qpid/cluster/ProxyInputHandler.h new file mode 100644 index 0000000000..228f8d092d --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/ProxyInputHandler.h @@ -0,0 +1,57 @@ +#ifndef QPID_CLUSTER_PROXYINPUTHANDLER_H +#define QPID_CLUSTER_PROXYINPUTHANDLER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/ConnectionInputHandler.h" +#include <boost/intrusive_ptr.hpp> + +namespace qpid { + +namespace framing { class AMQFrame; } + +namespace cluster { + +/** + * Proxies ConnectionInputHandler functions and ensures target.closed() + * is called, on deletion if not before. + */ +class ProxyInputHandler : public sys::ConnectionInputHandler +{ + public: + ProxyInputHandler(boost::intrusive_ptr<cluster::Connection> t) : target(t) {} + ~ProxyInputHandler() { closed(); } + + void received(framing::AMQFrame& f) { target->received(f); } + void closed() { if (target) target->closed(); target = 0; } + void idleOut() { target->idleOut(); } + void idleIn() { target->idleIn(); } + bool doOutput() { return target->doOutput(); } + bool hasOutput() { return target->hasOutput(); } + + private: + boost::intrusive_ptr<cluster::Connection> target; +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_PROXYINPUTHANDLER_H*/ diff --git a/qpid/cpp/src/qpid/cluster/types.h b/qpid/cpp/src/qpid/cluster/types.h new file mode 100644 index 0000000000..4646cd9174 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/types.h @@ -0,0 +1,58 @@ +#ifndef QPID_CLUSTER_TYPES_H +#define QPID_CLUSTER_TYPES_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <utility> +#include <iosfwd> +#include <stdint.h> + +extern "C" { +#include <openais/cpg.h> +} + +namespace qpid { +namespace cluster { + +class Connection; + +/** first=node-id, second=pid */ +struct MemberId : std::pair<uint32_t, uint32_t> { + MemberId(uint32_t node=0, uint32_t pid=0) : std::pair<uint32_t,uint32_t>(node, pid) {} + MemberId(const cpg_address& caddr) : std::pair<uint32_t,uint32_t>(caddr.nodeid, caddr.pid) {} + uint32_t getNode() const { return first; } + uint32_t getPid() const { return second; } +}; + +inline bool operator==(const cpg_address& caddr, const MemberId& id) { return id == MemberId(caddr); } + +std::ostream& operator<<(std::ostream&, const MemberId&); + +struct ConnectionId : public std::pair<MemberId, Connection*> { + ConnectionId(const MemberId& m=MemberId(), Connection* c=0) : std::pair<MemberId, Connection*> (m,c) {} + MemberId getMember() const { return first; } + Connection* getConnectionPtr() const { return second; } +}; +std::ostream& operator<<(std::ostream&, const ConnectionId&); + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_TYPES_H*/ diff --git a/qpid/cpp/src/qpid/sys/ConnectionCodec.h b/qpid/cpp/src/qpid/sys/ConnectionCodec.h index efc6839b60..b1b047d2cc 100644 --- a/qpid/cpp/src/qpid/sys/ConnectionCodec.h +++ b/qpid/cpp/src/qpid/sys/ConnectionCodec.h @@ -22,14 +22,14 @@ * */ #include "qpid/framing/ProtocolVersion.h" -#include "OutputControl.h" -#include <memory> -#include <map> namespace qpid { namespace sys { +class InputHandlerFactory; +class OutputControl; + /** * Interface of coder/decoder for a connection of a specific protocol * version. diff --git a/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h b/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h index a2c18d6d9a..9a5b9f75a5 100644 --- a/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h +++ b/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h @@ -33,6 +33,7 @@ namespace sys { public TimeoutHandler, public OutputTask { public: + virtual void closed() = 0; }; diff --git a/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h b/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h index 2b309b5758..9bb7e13686 100644 --- a/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h +++ b/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h @@ -42,7 +42,8 @@ class ConnectionInputHandlerFactory : private boost::noncopyable *@param id identify the connection for management purposes. */ virtual ConnectionInputHandler* create(ConnectionOutputHandler* out, - const std::string& id) = 0; + const std::string& id, + bool isClient) = 0; virtual ~ConnectionInputHandlerFactory(){} }; diff --git a/qpid/cpp/src/tests/ForkedBroker.h b/qpid/cpp/src/tests/ForkedBroker.h index a7869ff602..07e69a0735 100644 --- a/qpid/cpp/src/tests/ForkedBroker.h +++ b/qpid/cpp/src/tests/ForkedBroker.h @@ -53,12 +53,12 @@ class ForkedBroker { } ~ForkedBroker() { - try { stop(); } catch(const std::exception& e) { - QPID_LOG(error, QPID_MSG("Stopping forked broker: " << e.what())); + try { kill(); } catch(const std::exception& e) { + QPID_LOG(error, QPID_MSG("Killing forked broker: " << e.what())); } } - void stop() { + void kill() { using qpid::ErrnoException; if (pid == 0) return; if (::kill(pid, SIGINT) < 0) throw ErrnoException("kill failed"); diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp index 7140cc73bd..3f09143fff 100644 --- a/qpid/cpp/src/tests/cluster_test.cpp +++ b/qpid/cpp/src/tests/cluster_test.cpp @@ -69,8 +69,8 @@ struct ClusterFixture : public vector<uint16_t> { void add(); void setup(); void kill(size_t n) { - if (n) forkedBrokers[n-1]->stop(); - else broker0.shutdown(); + if (n) forkedBrokers[n-1].kill(); + else broker0->broker->shutdown(); } }; @@ -139,6 +139,14 @@ QPID_AUTO_TEST_CASE(testForkedBroker) { BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("amq.direct").getType()); } +QPID_AUTO_TEST_CASE(testSingletonCluster) { + // Test against a singleton cluster, verify basic operation. + ClusterFixture cluster(1); + Client c(cluster[0]); + BOOST_CHECK(c.session.queueQuery("q").getQueue().empty()); + BOOST_CHECK(c.session.exchangeQuery("ex").getType().empty()); +} + QPID_AUTO_TEST_CASE(testWiringReplication) { ClusterFixture cluster(3); Client c0(cluster[0]); diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index 8d6b5a241e..f5afe34e4a 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -25,19 +25,17 @@ <class name = "cluster" code = "0x80" label="Qpid clustering extensions."> <doc>Qpid extension class to allow clustered brokers to communicate.</doc> - <control name = "notify" code="0x1"> - <role name="server" implement="MUST" /> + <control name = "joined" code="0x1"> <field name="url" type="str16" /> </control> + </class> - <control name="connection-close" code="0x2"> - <role name="server" implement="MUST" /> + <class name="cluster-connection" code="0x81" label="Qpid clustering extensions."> + <control name="deliver-close" code="0x2"> </control> - <control name="connection-do-output" code="0x3"> - <role name="server" implement="MUST" /> + <control name="deliver-do-output" code="0x3"> <field name="bytes" type="uint32"/> </control> - </class> </amqp> |