diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/cluster.mk | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ChannelManager.cpp | 85 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ChannelManager.h | 66 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 197 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 105 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterPluginProvider.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cpg.cpp | 48 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cpg.h | 53 | ||||
-rw-r--r-- | cpp/src/qpid/framing/Handler.h | 9 | ||||
-rw-r--r-- | cpp/src/qpid/log/Selector.h | 2 | ||||
-rw-r--r-- | cpp/src/tests/Cluster.cpp | 47 | ||||
-rw-r--r-- | cpp/src/tests/Cluster.h | 52 | ||||
-rw-r--r-- | cpp/src/tests/Cluster_child.cpp | 18 | ||||
-rw-r--r-- | cpp/src/tests/Cpg.cpp | 9 | ||||
-rw-r--r-- | cpp/src/tests/Makefile.am | 29 | ||||
-rwxr-xr-x | cpp/src/tests/ais_check | 16 | ||||
-rw-r--r-- | cpp/src/tests/cluster.mk | 36 |
17 files changed, 353 insertions, 429 deletions
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk index 4eddf4ffe7..07b4f045cc 100644 --- a/cpp/src/cluster.mk +++ b/cpp/src/cluster.mk @@ -11,8 +11,6 @@ libqpidcluster_la_SOURCES = \ qpid/cluster/Cpg.cpp \ qpid/cluster/Cpg.h \ qpid/cluster/Dispatchable.h \ - qpid/cluster/ChannelManager.h \ - qpid/cluster/ChannelManager.cpp \ qpid/cluster/ClusterPluginProvider.cpp libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la diff --git a/cpp/src/qpid/cluster/ChannelManager.cpp b/cpp/src/qpid/cluster/ChannelManager.cpp deleted file mode 100644 index f573d78ca1..0000000000 --- a/cpp/src/qpid/cluster/ChannelManager.cpp +++ /dev/null @@ -1,85 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "qpid/log/Statement.h" -#include "qpid/framing/amqp_types.h" -#include "qpid/framing/AMQFrame.h" -#include "ChannelManager.h" - -namespace qpid { -namespace cluster { - -using namespace framing; - -/** Handler to multicast to the cluster */ -struct ClusterHandler : public FrameHandler { - - ClusterHandler(FrameHandler::Chain next, ChannelId bitmask_) - : FrameHandler(next), bitmask(bitmask_) {} - - void handle(AMQFrame& frame) { - frame.channel |= bitmask; // Mark the frame - nextHandler(frame); - // TODO aconway 2007-06-28: Right now everything is backed up - // via multicast. When we have point-to-point backups this - // function must determine where each frame should be sent: to - // multicast or only to specific backup(s) via AMQP. - } - - ChannelId bitmask; -}; - -ChannelManager::ChannelManager(FrameHandler::Chain mcast) : mcastOut(mcast){} - -void ChannelManager::update(ChannelId id, FrameHandler::Chains& chains) { - // Store the original cluster chains for the channel. - channels[id] = chains; - - // Replace chains with multicast-to-cluster handlers that mark the - // high-bit of the channel ID on outgoing frames so we can tell - // them from incoming frames in handle() - // - // When handle() receives the frames from the cluster it - // will forward them to the original channel chains stored in - // channels map. - // - chains.in = make_shared_ptr(new ClusterHandler(mcastOut, 0)); - chains.out= make_shared_ptr(new ClusterHandler(mcastOut, CHANNEL_HIGH_BIT)); -} - -void ChannelManager::handle(AMQFrame& frame) { - bool isOut = frame.channel | CHANNEL_HIGH_BIT; - frame.channel |= ~CHANNEL_HIGH_BIT; // Clear the bit. - ChannelMap::iterator i = channels.find(frame.getChannel()); - if (i != channels.end()) { - Chain& chain = isOut ? i->second.out : i->second.in; - chain->handle(frame); - } - else - updateFailoverState(frame); -} - -void ChannelManager::updateFailoverState(AMQFrame& ) { - QPID_LOG(critical, "Failover is not implemented"); - // FIXME aconway 2007-06-28: - // If the channel is not in my map then I'm not primary so - // I don't pass the frame to the channel handler but I - // do need to update the session failover state. -} - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ChannelManager.h b/cpp/src/qpid/cluster/ChannelManager.h deleted file mode 100644 index 59fce77957..0000000000 --- a/cpp/src/qpid/cluster/ChannelManager.h +++ /dev/null @@ -1,66 +0,0 @@ -#ifndef QPID_CLUSTER_CHANNELMANAGER_H -#define QPID_CLUSTER_CHANNELMANAGER_H - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "qpid/framing/HandlerUpdater.h" -#include <map> - -namespace qpid { -namespace cluster { - -/** - * Manage channels and handler chains on channels for the cluster. - * - * As HandlerUpdater plugin, updates channel handler chains with - * cluster handlers. - * - * As a FrameHandler handles frames coming from the cluster and - * dispatches them to the appropriate channel handler. - * - */ -class ChannelManager : public framing::HandlerUpdater, - public framing::FrameHandler -{ - public: - /** Takes a handler to send frames to the cluster. */ - ChannelManager(framing::FrameHandler::Chain mcastOut); - - /** As FrameHandler handle frames from the cluster */ - void handle(framing::AMQFrame& frame); - - /** As ChannelUpdater update the handler chains. */ - void update(framing::ChannelId id, framing::FrameHandler::Chains& chains); - - private: - void updateFailoverState(framing::AMQFrame&); - - typedef std::map<framing::ChannelId, - framing::FrameHandler::Chains> ChannelMap; - - framing::FrameHandler::Chain mcastOut; - ChannelMap channels; -}; - - -}} // namespace qpid::cluster - - - -#endif /*!QPID_CLUSTER_CHANNELMANAGER_H*/ diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 8d898eefa3..e691ad357d 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -32,65 +32,89 @@ using namespace qpid::sys; using namespace std; ostream& operator <<(ostream& out, const Cluster& cluster) { - return out << cluster.name.str() << "(" << cluster.self << ")"; + return out << "cluster[" << cluster.name.str() << " " << cluster.self << "]"; } -namespace { -Cluster::Member::Status statusMap[CPG_REASON_PROCDOWN+1]; -struct StatusMapInit { - StatusMapInit() { - statusMap[CPG_REASON_JOIN] = Cluster::Member::JOIN; - statusMap[CPG_REASON_LEAVE] = Cluster::Member::LEAVE; - statusMap[CPG_REASON_NODEDOWN] = Cluster::Member::NODEDOWN; - statusMap[CPG_REASON_NODEUP] = Cluster::Member::NODEUP; - statusMap[CPG_REASON_PROCDOWN] = Cluster::Member::PROCDOWN; - } -} instance; +ostream& operator<<(ostream& out, const Cluster::MemberMap::value_type& m) { + return out << m.first << "=" << m.second->url; } -Cluster::Member::Member(const cpg_address& addr) - : status(statusMap[addr.reason]) {} +ostream& operator <<(ostream& out, const Cluster::MemberMap& members) { + ostream_iterator<Cluster::MemberMap::value_type> o(out, " "); + copy(members.begin(), members.end(), o); + return out; +} + +namespace { + +/** We mark the high bit of a frame's channel number to know if it's + * an incoming or outgoing frame when frames arrive via multicast. + */ +bool isOutgoing(AMQFrame& frame) { return frame.channel&CHANNEL_HIGH_BIT; } +bool isIncoming(AMQFrame& frame) { return !isOutgoing(frame); } +void markOutgoing(AMQFrame& frame) { frame.channel |= CHANNEL_HIGH_BIT; } +void markIncoming(AMQFrame&) { /*noop*/ } +void unMark(AMQFrame& frame) { frame.channel &= ~CHANNEL_HIGH_BIT; } -void Cluster::notify() { - ProtocolVersion version; - // TODO aconway 2007-06-25: Use proxy here. - AMQFrame frame(version, 0, - make_shared_ptr(new ClusterNotifyBody(version, url))); - handle(frame); } +struct Cluster::IncomingHandler : public FrameHandler { + IncomingHandler(Cluster& c) : cluster(c) {} + void handle(AMQFrame& frame) { + markIncoming(frame); + cluster.mcast(frame); + } + Cluster& cluster; +}; + +struct Cluster::OutgoingHandler : public FrameHandler { + OutgoingHandler(Cluster& c) : cluster(c) {} + void handle(AMQFrame& frame) { + markOutgoing(frame); + cluster.mcast(frame); + } + Cluster& cluster; +}; + + +// TODO aconway 2007-06-28: Right now everything is backed up via +// multicast. When we have point-to-point backups the +// Incoming/Outgoing handlers must determine where each frame should +// be sent: to multicast or only to specific backup(s) via AMQP. + Cluster::Cluster(const std::string& name_, const std::string& url_) : + cpg(new Cpg(*this)), name(name_), url(url_), - cpg(new Cpg( - boost::bind(&Cluster::cpgDeliver, this, _1, _2, _3, _4, _5, _6), - boost::bind(&Cluster::cpgConfigChange, this, _1, _2, _3, _4, _5, _6, _7, _8))), - self(cpg->getLocalNoideId(), getpid()) -{} - -void Cluster::join(FrameHandler::Chain next) { + self(cpg->getLocalNoideId(), getpid()), + toChains(new IncomingHandler(*this), new OutgoingHandler(*this)) +{ QPID_LOG(trace, *this << " Joining cluster."); - next = next; - dispatcher=Thread(*this); cpg->join(name); notify(); + dispatcher=Thread(*this); + // Wait till we show up in the cluster map. + { + Mutex::ScopedLock l(lock); + while (empty()) + lock.wait(); + } } Cluster::~Cluster() { - if (cpg) { - try { - QPID_LOG(trace, *this << " Leaving cluster."); - cpg->leave(name); - cpg.reset(); - dispatcher.join(); - } catch (const std::exception& e) { - QPID_LOG(error, "Exception leaving cluster " << e.what()); - } + QPID_LOG(trace, *this << " Leaving cluster."); + try { + cpg->leave(name); + cpg.reset(); + dispatcher.join(); + } + catch (const std::exception& e) { + QPID_LOG(error, "Exception leaving cluster " << *this << ": " + << e.what()); } } -void Cluster::handle(AMQFrame& frame) { - assert(cpg); +void Cluster::mcast(AMQFrame& frame) { QPID_LOG(trace, *this << " SEND: " << frame); Buffer buf(frame.size()); frame.encode(buf); @@ -99,11 +123,24 @@ void Cluster::handle(AMQFrame& frame) { cpg->mcast(name, &iov, 1); } +void Cluster::notify() { + // TODO aconway 2007-06-25: Use proxy here. + ProtocolVersion version; + AMQFrame frame(version, 0, + make_shared_ptr(new ClusterNotifyBody(version, url))); + mcast(frame); +} + size_t Cluster::size() const { Mutex::ScopedLock l(lock); return members.size(); } +void Cluster::setFromChains(const framing::FrameHandler::Chains& chains) { + Mutex::ScopedLock l(lock); + fromChains = chains; +} + Cluster::MemberList Cluster::getMembers() const { Mutex::ScopedLock l(lock); MemberList result(members.size()); @@ -112,7 +149,7 @@ Cluster::MemberList Cluster::getMembers() const { return result; } -void Cluster::cpgDeliver( +void Cluster::deliver( cpg_handle_t /*handle*/, struct cpg_name* /* group */, uint32_t nodeid, @@ -124,61 +161,71 @@ void Cluster::cpgDeliver( Buffer buf(static_cast<char*>(msg), msg_len); AMQFrame frame; frame.decode(buf); - QPID_LOG(trace, *this << " RECV: " << frame); - // TODO aconway 2007-06-20: use visitor pattern. + QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from); + if (!handleClusterFrame(from, frame)) { + FrameHandler::Chain chain = isIncoming(frame) ? fromChains.in : fromChains.out; + unMark(frame); + if (chain) + chain->handle(frame); + } +} + +bool Cluster::wait(boost::function<bool(const Cluster&)> predicate, + Duration timeout) const +{ + AbsTime deadline(now(), timeout); + Mutex::ScopedLock l(lock); + while (!predicate(*this) && lock.wait(deadline)) + ; + return (predicate(*this)); +} + +bool Cluster::handleClusterFrame(Id from, AMQFrame& frame) { + // TODO aconway 2007-06-20: use visitor pattern here. ClusterNotifyBody* notifyIn= dynamic_cast<ClusterNotifyBody*>(frame.getBody().get()); if (notifyIn) { + MemberList list; { Mutex::ScopedLock l(lock); - assert(members[from]); - members[from]->url = notifyIn->getUrl(); - members[from]->status = Member::BROKER; + if (!members[from]) + members[from].reset(new Member(url)); + else + members[from]->url = notifyIn->getUrl(); + QPID_LOG(trace, *this << ": member update: " << members); + lock.notifyAll(); } - if (callback) - callback(); + return true; } - else - next->handle(frame); + return false; } -void Cluster::cpgConfigChange( +void Cluster::configChange( cpg_handle_t /*handle*/, struct cpg_name */*group*/, - struct cpg_address *current, int nCurrent, + struct cpg_address */*current*/, int /*nCurrent*/, struct cpg_address *left, int nLeft, - struct cpg_address *joined, int nJoined -) + struct cpg_address *joined, int nJoined) { - QPID_LOG(trace, - *this << " Configuration change. " << endl - << " Joined: " << make_pair(joined, nJoined) << endl - << " Left: " << make_pair(left, nLeft) << endl - << " Current: " << make_pair(current, nCurrent)); - - bool needNotify=false; + bool newMembers=false; MemberList updated; { Mutex::ScopedLock l(lock); - for (int i = 0; i < nJoined; ++i) { - Id id(current[i]); - members[id].reset(new Member(current[i])); - if (id != self) - needNotify = true; // Notify new members other than myself. + if (nLeft) { + for (int i = 0; i < nLeft; ++i) + members.erase(Id(left[i])); + QPID_LOG(trace, *this << ": members left: " << members); + lock.notifyAll(); } - for (int i = 0; i < nLeft; ++i) - members.erase(Id(current[i])); - } // End of locked scope. - if (needNotify) + newMembers = nJoined > 1 || (nJoined==1 && Id(joined[0]) != self); + // We don't record members joining here, we record them when + // we get their ClusterNotify message. + } + if (newMembers) notify(); - if (callback) - callback(); } -void Cluster::setCallback(boost::function<void()> f) { callback=f; } - void Cluster::run() { - assert(cpg); cpg->dispatchBlocking(); } diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index aff213b6c9..199a93a7c5 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -21,89 +21,80 @@ #include "qpid/cluster/Cpg.h" #include "qpid/framing/FrameHandler.h" -#include "qpid/sys/Thread.h" +#include "qpid/shared_ptr.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Runnable.h" -#include "qpid/shared_ptr.h" +#include "qpid/sys/Thread.h" +#include "qpid/log/Logger.h" + #include <boost/function.hpp> #include <boost/scoped_ptr.hpp> + #include <map> #include <vector> -namespace qpid { - -namespace broker { -class HandlerUpdater; -} - -namespace cluster { - -class ChannelManager; +namespace qpid { namespace cluster { /** * Represents a cluster, provides access to data about members. - * - * Implements a FrameHandler that multicasts frames to the cluster. - * - * Requires a handler for frames arriving from the cluster, - * normally a ChannelManager but other handlers could be interposed - * for testing, logging etc. + * Implements HandlerUpdater to manage handlers that route frames to + * and from the cluster. */ -class Cluster : public framing::FrameHandler, private sys::Runnable { +class Cluster : private sys::Runnable, private Cpg::Handler +{ public: /** Details of a cluster member */ struct Member { typedef shared_ptr<const Member> Ptr; - /** Status of a cluster member. */ - enum Status { - JOIN, ///< Process joined the group. - LEAVE, ///< Process left the group cleanly. - NODEDOWN, ///< Process's node went down. - NODEUP, ///< Process's node joined the cluster. - PROCDOWN, ///< Process died without leaving. - BROKER ///< Broker details are available. - }; - - Member(const cpg_address&); - std::string url; - Status status; + Member(const std::string& url_) : url(url_) {} + std::string url; ///< Broker address. }; typedef std::vector<Member::Ptr> MemberList; - + /** - * Create a cluster object but do not joing. + * Join a cluster. * @param name of the cluster. * @param url of this broker, sent to the cluster. */ Cluster(const std::string& name, const std::string& url); - ~Cluster(); - - /** Join the cluster. - *@handler is the handler for frames arriving from the cluster. - */ - void join(framing::FrameHandler::Chain handler); - - /** Multicast a frame to the cluster. */ - void handle(framing::AMQFrame&); + virtual ~Cluster(); /** Get the current cluster membership. */ MemberList getMembers() const; - /** Called when membership changes. */ - void setCallback(boost::function<void()>); - /** Number of members in the cluster. */ size_t size() const; + bool empty() const { return size() == 0; } + + /** Get handler chains to send frames to the cluster */ + framing::FrameHandler::Chains getToChains() { + return toChains; + } + + /** Set handler chains for frames received from the cluster */ + void setFromChains(const framing::FrameHandler::Chains& chains); + + /** Wait for predicate(*this) to be true, up to timeout. + *@return True if predicate became true, false if timed out. + *Note the predicate may not be true after wait returns, + *all the caller can say is it was true at some earlier point. + */ + bool wait(boost::function<bool(const Cluster&)> predicate, + sys::Duration timeout=sys::TIME_INFINITE) const; + private: typedef Cpg::Id Id; typedef std::map<Id, shared_ptr<Member> > MemberMap; + typedef std::map< + framing::ChannelId, framing::FrameHandler::Chains> ChannelMap; + + void mcast(framing::AMQFrame&); ///< send frame by multicast. + void notify(); ///< Notify cluster of my details. - void run(); - void notify(); - void cpgDeliver( + void deliver( cpg_handle_t /*handle*/, struct cpg_name *group, uint32_t /*nodeid*/, @@ -111,7 +102,7 @@ class Cluster : public framing::FrameHandler, private sys::Runnable { void* /*msg*/, int /*msg_len*/); - void cpgConfigChange( + void configChange( cpg_handle_t /*handle*/, struct cpg_name */*group*/, struct cpg_address */*members*/, int /*nMembers*/, @@ -119,16 +110,30 @@ class Cluster : public framing::FrameHandler, private sys::Runnable { struct cpg_address */*joined*/, int /*nJoined*/ ); + void run(); + bool handleClusterFrame(Id from, framing::AMQFrame&); + mutable sys::Monitor lock; + boost::scoped_ptr<Cpg> cpg; Cpg::Name name; std::string url; - boost::scoped_ptr<Cpg> cpg; Id self; MemberMap members; + ChannelMap channels; sys::Thread dispatcher; boost::function<void()> callback; + framing::FrameHandler::Chains toChains; + framing::FrameHandler::Chains fromChains; + + struct IncomingHandler; + struct OutgoingHandler; + + friend struct IncomingHandler; + friend struct OutgoingHandler; friend std::ostream& operator <<(std::ostream&, const Cluster&); + friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&); + friend std::ostream& operator <<(std::ostream&, const MemberMap&); }; }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ClusterPluginProvider.cpp b/cpp/src/qpid/cluster/ClusterPluginProvider.cpp index 3a09a66b81..d48fbadf7b 100644 --- a/cpp/src/qpid/cluster/ClusterPluginProvider.cpp +++ b/cpp/src/qpid/cluster/ClusterPluginProvider.cpp @@ -18,7 +18,6 @@ #include "qpid/broker/Broker.h" #include "qpid/framing/HandlerUpdater.h" #include "qpid/cluster/Cluster.h" -#include "qpid/cluster/ChannelManager.h" #include "qpid/Plugin.h" #include "qpid/Options.h" @@ -51,12 +50,7 @@ struct ClusterPluginProvider : public PluginProvider { if (broker && !options.clusterName.empty()) { assert(!cluster); // A process can only belong to one cluster. cluster.reset(new Cluster(options.clusterName, broker->getUrl())); - - // Channel manager is both the next handler for the cluster - // and the HandlerUpdater plugin for the broker. - shared_ptr<ChannelManager> manager(new ChannelManager(cluster)); - cluster->join(manager); - broker->use(manager); + // FIXME aconway 2007-06-29: register HandlerUpdater. } } }; diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp index a979ce1eeb..3148e52789 100644 --- a/cpp/src/qpid/cluster/Cpg.cpp +++ b/cpp/src/qpid/cluster/Cpg.cpp @@ -17,7 +17,10 @@ */ #include "Cpg.h" + #include "qpid/sys/Mutex.h" +#include "qpid/log/Statement.h" + #include <vector> #include <limits> #include <iterator> @@ -33,16 +36,15 @@ using namespace std; class Cpg::Handles { public: - void put(cpg_handle_t handle, Cpg* object) { + void put(cpg_handle_t handle, Cpg::Handler* handler) { sys::Mutex::ScopedLock l(lock); - assert(object); uint32_t index=uint32_t(handle); // Lower 32 bits is an array index. if (index >= handles.size()) handles.resize(index+1, 0); - handles[index] = object; + handles[index] = handler; } - Cpg* get(cpg_handle_t handle) { + Cpg::Handler* get(cpg_handle_t handle) { sys::Mutex::ScopedLock l(lock); uint32_t index=uint32_t(handle); // Lower 32 bits is an array index. assert(index < handles.size()); @@ -52,7 +54,7 @@ class Cpg::Handles private: sys::Mutex lock; - vector<Cpg*> handles; + vector<Cpg::Handler*> handles; }; Cpg::Handles Cpg::handles; @@ -66,7 +68,9 @@ void Cpg::globalDeliver ( void* msg, int msg_len) { - handles.get(handle)->deliver(handle, group, nodeid, pid, msg, msg_len); + Cpg::Handler* handler=handles.get(handle); + if (handler) + handler->deliver(handle, group, nodeid, pid, msg, msg_len); } void Cpg::globalConfigChange( @@ -77,23 +81,35 @@ void Cpg::globalConfigChange( struct cpg_address *joined, int nJoined ) { - handles.get(handle)->configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined); + Cpg::Handler* handler=handles.get(handle); + if (handler) + handler->configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined); } -Cpg::Cpg(DeliverFn d, ConfigChangeFn c) : deliver(d), configChange(c) -{ +Cpg::Cpg(Handler& h) : handler(h) { cpg_callbacks_t callbacks = { &globalDeliver, &globalConfigChange }; check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG"); - handles.put(handle, this); + handles.put(handle, &handler); } Cpg::~Cpg() { try { - check(cpg_finalize(handle), "Error in shutdown of CPG"); + shutdown(); + } catch (const std::exception& e) { + QPID_LOG(error, string("Exception in Cpg destructor: ")+e.what()); } - catch (...) { - handles.put(handle, 0); - throw; +} + +struct Cpg::ClearHandleOnExit { + ClearHandleOnExit(cpg_handle_t h) : handle(h) {} + ~ClearHandleOnExit() { Cpg::handles.put(handle, 0); } + cpg_handle_t handle; +}; + +void Cpg::shutdown() { + if (handles.get(handle)) { + ClearHandleOnExit guard(handle); // Exception safe + check(cpg_finalize(handle), "Error in shutdown of CPG"); } } @@ -102,11 +118,11 @@ string Cpg::errorStr(cpg_error_t err, const std::string& msg) { case CPG_OK: return msg+": ok"; case CPG_ERR_LIBRARY: return msg+": library"; case CPG_ERR_TIMEOUT: return msg+": timeout"; - case CPG_ERR_TRY_AGAIN: return msg+": try again"; + case CPG_ERR_TRY_AGAIN: return msg+": timeout. The aisexec daemon may not be running"; case CPG_ERR_INVALID_PARAM: return msg+": invalid param"; case CPG_ERR_NO_MEMORY: return msg+": no memory"; case CPG_ERR_BAD_HANDLE: return msg+": bad handle"; - case CPG_ERR_ACCESS: return msg+": access"; + case CPG_ERR_ACCESS: return msg+": access denied. You may need to set your group ID to 'ais'"; case CPG_ERR_NOT_EXIST: return msg+": not exist"; case CPG_ERR_EXIST: return msg+": exist"; case CPG_ERR_NOT_SUPPORTED: return msg+": not supported"; diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h index e164ed1215..d616be74e2 100644 --- a/cpp/src/qpid/cluster/Cpg.h +++ b/cpp/src/qpid/cluster/Cpg.h @@ -21,8 +21,9 @@ #include "qpid/Exception.h" #include "qpid/cluster/Dispatchable.h" -#include <boost/function.hpp> + #include <cassert> + extern "C" { #include <openais/cpg.h> } @@ -69,31 +70,36 @@ class Cpg : public Dispatchable { return std::string(n.value, n.length); } - typedef boost::function<void ( - cpg_handle_t /*handle*/, - struct cpg_name *group, - uint32_t /*nodeid*/, - uint32_t /*pid*/, - void* /*msg*/, - int /*msg_len*/)> DeliverFn; - - typedef boost::function<void ( - cpg_handle_t /*handle*/, - struct cpg_name */*group*/, - struct cpg_address */*members*/, int /*nMembers*/, - struct cpg_address */*left*/, int /*nLeft*/, - struct cpg_address */*joined*/, int /*nJoined*/ - )> ConfigChangeFn; + struct Handler { + virtual ~Handler() {}; + virtual void deliver( + cpg_handle_t /*handle*/, + struct cpg_name *group, + uint32_t /*nodeid*/, + uint32_t /*pid*/, + void* /*msg*/, + int /*msg_len*/) = 0; + + virtual void configChange( + cpg_handle_t /*handle*/, + struct cpg_name */*group*/, + struct cpg_address */*members*/, int /*nMembers*/, + struct cpg_address */*left*/, int /*nLeft*/, + struct cpg_address */*joined*/, int /*nJoined*/ + ) = 0; + }; /** Open a CPG handle. - *@param deliver - free function called when a message is delivered. - *@param reconfig - free function called when CPG configuration changes. + *@param handler for CPG events. */ - Cpg(DeliverFn deliver, ConfigChangeFn reconfig); - - /** Disconnect from CPG. */ + Cpg(Handler&); + + /** Destructor calls shutdown. */ ~Cpg(); + /** Disconnect from CPG */ + void shutdown(); + /** Dispatch CPG events. *@param type one of * - CPG_DISPATCH_ONE - dispatch exactly one event. @@ -128,7 +134,9 @@ class Cpg : public Dispatchable { private: class Handles; + struct ClearHandleOnExit; friend class Handles; + friend struct ClearHandleOnExit; static std::string errorStr(cpg_error_t err, const std::string& msg); static std::string cantJoinMsg(const Name&); @@ -159,8 +167,7 @@ class Cpg : public Dispatchable { static Handles handles; cpg_handle_t handle; - DeliverFn deliver; - ConfigChangeFn configChange; + Handler& handler; }; std::ostream& operator <<(std::ostream& out, const Cpg::Id& id); diff --git a/cpp/src/qpid/framing/Handler.h b/cpp/src/qpid/framing/Handler.h index f6b59393d9..2f09911325 100644 --- a/cpp/src/qpid/framing/Handler.h +++ b/cpp/src/qpid/framing/Handler.h @@ -36,6 +36,9 @@ template <class T> struct Handler { /** Handler chains for incoming and outgoing traffic. */ struct Chains { + Chains() {} + Chains(Chain i, Chain o) : in(i), out(o) {} + Chains(Handler* i, Handler* o) : in(i), out(o) {} Chain in; Chain out; }; @@ -48,12 +51,6 @@ template <class T> struct Handler { /** Next handler. Public so chains can be modified by altering next. */ Chain next; - - protected: - /** Derived handle() implementations call nextHandler to invoke the - * next handler in the chain. */ - void nextHandler(T data) { if (next) next->handle(data); } - }; }} diff --git a/cpp/src/qpid/log/Selector.h b/cpp/src/qpid/log/Selector.h index 329541b7fc..bba3f05bfc 100644 --- a/cpp/src/qpid/log/Selector.h +++ b/cpp/src/qpid/log/Selector.h @@ -43,6 +43,8 @@ class Selector { Selector(Level l, const std::string& s=std::string()) { enable(l,s); } + + Selector(const std::string& enableStr) { enable(enableStr); } /** * Enable messages with level in levels where the file * name contains substring. Empty string matches all. diff --git a/cpp/src/tests/Cluster.cpp b/cpp/src/tests/Cluster.cpp index 008575140b..2ec140b924 100644 --- a/cpp/src/tests/Cluster.cpp +++ b/cpp/src/tests/Cluster.cpp @@ -20,51 +20,52 @@ #include <boost/test/auto_unit_test.hpp> #include "test_tools.h" #include "Cluster.h" +#include "qpid/framing/ChannelPingBody.h" #include "qpid/framing/ChannelOkBody.h" -#include "qpid/framing/BasicGetOkBody.h" - static const ProtocolVersion VER; -/** Verify membership ind a cluster with one member. */ +using namespace qpid::log; + +/** Verify membership in a cluster with one member. */ BOOST_AUTO_TEST_CASE(clusterOne) { - Cluster cluster("Test", "amqp:one:1"); - TestClusterHandler handler(cluster); - AMQFrame frame(VER, 1, new ChannelOkBody(VER)); - cluster.handle(frame); - BOOST_REQUIRE(handler.waitFrames(1)); + TestCluster cluster("clusterOne", "amqp:one:1"); + AMQFrame frame(VER, 1, new ChannelPingBody(VER)); + cluster.getToChains().in->handle(frame); + BOOST_REQUIRE(cluster.in.waitFor(1)); + + BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *cluster.in[0].getBody()); BOOST_CHECK_EQUAL(1u, cluster.size()); Cluster::MemberList members = cluster.getMembers(); BOOST_CHECK_EQUAL(1u, members.size()); - BOOST_REQUIRE_EQUAL(members.front()->url, "amqp:one:1"); - BOOST_CHECK_EQUAL(1u, handler.size()); - BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *handler[0].getBody()); + shared_ptr<const Cluster::Member> me=members.front(); + BOOST_REQUIRE_EQUAL(me->url, "amqp:one:1"); } -/** Fork a process to verify membership in a cluster with two members */ +/** Fork a process to test a cluster with two members */ BOOST_AUTO_TEST_CASE(clusterTwo) { pid_t pid=fork(); BOOST_REQUIRE(pid >= 0); - if (pid) { // Parent see Cluster_child.cpp for child. - Cluster cluster("Test", "amqp::1"); - TestClusterHandler handler(cluster); - BOOST_REQUIRE(handler.waitMembers(2)); + if (pid) { // Parent, see Cluster_child.cpp for child. + TestCluster cluster("clusterTwo", "amqp::1"); + BOOST_REQUIRE(cluster.waitFor(2)); // Myself and child. // Exchange frames with child. - AMQFrame frame(VER, 1, new ChannelOkBody(VER)); - cluster.handle(frame); - BOOST_REQUIRE(handler.waitFrames(2)); - BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *handler[0].getBody()); - BOOST_CHECK_TYPEID_EQUAL(BasicGetOkBody, *handler[1].getBody()); + AMQFrame frame(VER, 1, new ChannelPingBody(VER)); + cluster.getToChains().in->handle(frame); + BOOST_REQUIRE(cluster.in.waitFor(1)); + BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *cluster.in[0].getBody()); + BOOST_REQUIRE(cluster.out.waitFor(1)); + BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *cluster.out[0].getBody()); // Wait for child to exit. int status; BOOST_CHECK_EQUAL(::wait(&status), pid); BOOST_CHECK_EQUAL(0, status); - BOOST_CHECK(handler.waitMembers(1)); + BOOST_CHECK(cluster.waitFor(1)); BOOST_CHECK_EQUAL(1u, cluster.size()); } else { // Child - BOOST_REQUIRE(execl("Cluster_child", "Cluster_child", NULL)); + BOOST_REQUIRE(execl("./Cluster_child", "./Cluster_child", NULL)); } } diff --git a/cpp/src/tests/Cluster.h b/cpp/src/tests/Cluster.h index edb1f1524f..f37c87a9ad 100644 --- a/cpp/src/tests/Cluster.h +++ b/cpp/src/tests/Cluster.h @@ -27,6 +27,7 @@ #include <boost/bind.hpp> #include <iostream> #include <vector> +#include <functional> /** * Definitions for the Cluster.cpp and Cluster_child.cpp child program. @@ -39,45 +40,48 @@ using namespace qpid; using namespace qpid::cluster; using namespace qpid::framing; using namespace qpid::sys; +using namespace boost; void null_deleter(void*) {} -struct TestClusterHandler : - public std::vector<AMQFrame>, public FrameHandler, public Monitor - +struct TestFrameHandler : + public FrameHandler, public vector<AMQFrame>, public Monitor { - TestClusterHandler(Cluster& c) : cluster(c) { - cluster.join(make_shared_ptr(this, &null_deleter)); - cluster.setCallback(boost::bind(&Monitor::notify, this)); - } - - void handle(AMQFrame& f) { - ScopedLock l(*this); - push_back(f); + void handle(AMQFrame& frame) { + Mutex::ScopedLock l(*this); + push_back(frame); notifyAll(); } - /** Wait for the vector to contain n frames. */ - bool waitFrames(size_t n) { - ScopedLock l(*this); - AbsTime deadline(now(), TIME_SEC); + bool waitFor(size_t n) { + Mutex::ScopedLock l(*this); + AbsTime deadline(now(), 5*TIME_SEC); while (size() != n && wait(deadline)) ; return size() == n; } +}; - /** Wait for the cluster to have n members */ - bool waitMembers(size_t n) { - ScopedLock l(*this); - AbsTime deadline(now(), TIME_SEC); - while (cluster.size() != n && wait(deadline)) - ; - return cluster.size() == n; +void nullDeleter(void*) {} + +struct TestCluster : public Cluster +{ + TestCluster(string name, string url) : Cluster(name, url) + { + setFromChains( + FrameHandler::Chains( + make_shared_ptr(&in, nullDeleter), + make_shared_ptr(&out, nullDeleter) + )); } - Cluster& cluster; -}; + /** Wait for cluster to be of size n. */ + bool waitFor(size_t n) { + return wait(boost::bind(equal_to<size_t>(), bind(&Cluster::size,this), n)); + } + TestFrameHandler in, out; +}; diff --git a/cpp/src/tests/Cluster_child.cpp b/cpp/src/tests/Cluster_child.cpp index a5ac3e9669..d73d2bdbc7 100644 --- a/cpp/src/tests/Cluster_child.cpp +++ b/cpp/src/tests/Cluster_child.cpp @@ -26,20 +26,20 @@ using namespace qpid; using namespace qpid::cluster; using namespace qpid::framing; using namespace qpid::sys; - +using namespace qpid::log; static const ProtocolVersion VER; /** Chlid part of Cluster::clusterTwo test */ void clusterTwo() { - Cluster cluster("Test", "amqp::2"); - TestClusterHandler handler(cluster); - BOOST_REQUIRE(handler.waitFrames(1)); - BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *handler[0].getBody()); - AMQFrame frame(VER, 1, new BasicGetOkBody(VER)); - cluster.handle(frame); - BOOST_REQUIRE(handler.waitFrames(2)); - BOOST_CHECK_TYPEID_EQUAL(BasicGetOkBody, *handler[1].getBody()); + TestCluster cluster("clusterTwo", "amqp::2"); + BOOST_REQUIRE(cluster.in.waitFor(1)); // Frame from parent. + BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *cluster.in[0].getBody()); + BOOST_CHECK_EQUAL(2u, cluster.size()); // Me and parent + AMQFrame frame(VER, 1, new ChannelOkBody(VER)); + cluster.getToChains().out->handle(frame); + BOOST_REQUIRE(cluster.out.waitFor(1)); + BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *cluster.out[0].getBody()); } int test_main(int, char**) { diff --git a/cpp/src/tests/Cpg.cpp b/cpp/src/tests/Cpg.cpp index 97b829ea63..ec98ca4fc2 100644 --- a/cpp/src/tests/Cpg.cpp +++ b/cpp/src/tests/Cpg.cpp @@ -47,11 +47,11 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) { o << "{ "; ostream_iterator<cpg_address> i(o, " "); copy(array.first, array.first+array.second, i); - cout << "}"; + o << "}"; return o; } -struct Callback { +struct Callback : public Cpg::Handler { Callback(const string group_) : group(group_) {} string group; vector<string> delivered; @@ -88,10 +88,7 @@ BOOST_AUTO_TEST_CASE(Cpg_basic) { // Cpg::Name group("foo"); Callback cb(group.str()); - Cpg::DeliverFn deliver=boost::bind(&Callback::deliver, &cb, _1, _2, _3, _4, _5, _6); - Cpg::ConfigChangeFn reconfig=boost::bind<void>(&Callback::configChange, &cb, _1, _2, _3, _4, _5, _6, _7, _8); - - Cpg cpg(deliver, reconfig); + Cpg cpg(cb); cpg.join(group); iovec iov = { (void*)"Hello!", 6 }; cpg.mcast(group, &iov, 1); diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 3303afa0be..5a8ad1bde6 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -11,17 +11,19 @@ lib_broker = $(abs_builddir)/../libqpidbroker.la # Initialize variables that are incremented with += # check_PROGRAMS= -unit_progs= -unit_wrappers= +TESTS= +EXTRA_DIST= # # Unit test programs. # -unit_progs+=logging +TESTS+=logging +check_PROGRAMS+=logging logging_SOURCES=logging.cpp test_tools.h logging_LDADD=-lboost_unit_test_framework -lboost_regex $(lib_common) -unit_progs+=Url +TESTS+=Url +check_PROGRAMS+=Url Url_SOURCES=Url.cpp test_tools.h Url_LDADD=-lboost_unit_test_framework $(lib_common) @@ -76,21 +78,20 @@ unit_tests = \ # Executables for client tests -testprogs = \ +testprogs= \ client_test \ echo_service \ topic_listener \ - topic_publisher - - -check_PROGRAMS += $(unit_progs) $(testprogs) interop_runner + topic_publisher \ + interop_runner +check_PROGRAMS += $(testprogs) TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) $(srcdir)/run_test system_tests = client_test quick_topictest -TESTS = dummy_test $(unit_progs) $(unit_wrappers) run-unit-tests start_broker $(system_tests) python_tests kill_broker +TESTS += run-unit-tests start_broker $(system_tests) python_tests kill_broker -EXTRA_DIST = \ +EXTRA_DIST += \ test_env run_test \ run-unit-tests start_broker python_tests kill_broker \ quick_topictest \ @@ -131,10 +132,8 @@ gen.mk: Makefile.am check-unit: $(MAKE) check TESTS=$(UNIT_TESTS) run-unit-tests -# Dummy test to force necessary test files to be generated. -dummy_test: .valgrind.supp .valgrindrc - { echo "#!/bin/sh"; echo "# Dummy test, does nothing. "; } > $@ - chmod a+x $@ +# Make sure valgrind files are generated. +all: .valgrind.supp .valgrindrc # Create a copy so that can be modified without risk of committing the changes. .valgrindrc: .valgrindrc-default diff --git a/cpp/src/tests/ais_check b/cpp/src/tests/ais_check new file mode 100755 index 0000000000..df40899065 --- /dev/null +++ b/cpp/src/tests/ais_check @@ -0,0 +1,16 @@ +#!/bin/sh +test `id -ng` = "ais" || { + cat <<EOF + =========================== NOTICE============================== + + You do not appear to have you group ID set to "ais". + + Cluster tests that require the openais library will fail.Make sure + you are a member of group ais and run "newgrp ais" before running + the tests. + + ================================================================ + +EOF +exit 1; +} diff --git a/cpp/src/tests/cluster.mk b/cpp/src/tests/cluster.mk index 6f59b13107..33e1569d3c 100644 --- a/cpp/src/tests/cluster.mk +++ b/cpp/src/tests/cluster.mk @@ -5,33 +5,25 @@ if CLUSTER lib_cluster = $(abs_builddir)/../libqpidcluster.la # NOTE: Programs using the openais library must be run with gid=ais -# Such programs are built as *.ais, with a wrapper script *.sh that -# runs the program with newgrp ais. +# You should do "newgrp ais" before running the tests to run these. # -# Rule to generate wrapper scripts for tests that require gid=ais. -run_test="env VALGRIND=$(VALGRIND) srcdir=$(srcdir) $(srcdir)/run_test" -.ais.sh: - echo "if groups | grep '\bais\b' >/dev/null;" > $@_t - echo "then echo $(run_test) ./$< \"$$@ \"| newgrp ais;" >>$@_t - echo "else echo WARNING: `whoami` not in group ais, skipping $<.;" >>$@_t - echo "fi" >> $@_t - mv $@_t $@ - chmod a+x $@ - # # Cluster tests. # -check_PROGRAMS+=Cpg.ais -Cpg_ais_SOURCES=Cpg.cpp -Cpg_ais_LDADD=$(lib_cluster) -lboost_unit_test_framework -unit_wrappers+=Cpg.sh - -# FIXME aconway 2007-06-29: Fixing problems with the test. -# check_PROGRAMS+=Cluster.ais -# Cluster_ais_SOURCES=Cluster.cpp Cluster.h -# Cluster_ais_LDADD=$(lib_cluster) -lboost_unit_test_framework -# unit_wrappers+=Cluster.sh + +TESTS+=ais_check +EXTRA_DIST+=ais_check + +TESTS+=Cpg +check_PROGRAMS+=Cpg +Cpg_SOURCES=Cpg.cpp +Cpg_LDADD=$(lib_cluster) -lboost_unit_test_framework + +TESTS+=Cluster +check_PROGRAMS+=Cluster +Cluster_SOURCES=Cluster.cpp Cluster.h +Cluster_LDADD=$(lib_cluster) -lboost_unit_test_framework check_PROGRAMS+=Cluster_child Cluster_child_SOURCES=Cluster_child.cpp Cluster.h |