diff options
author | Alan Conway <aconway@apache.org> | 2009-04-11 14:29:04 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-04-11 14:29:04 +0000 |
commit | 78799720865c42d6f81701602f4c94d25b97f3be (patch) | |
tree | b524c5891e7b5b8ab68640168b06a82dd3349055 /qpid/cpp/src | |
parent | 757103d8205d3009f89c0af2ab150a696344f99d (diff) | |
download | qpid-python-78799720865c42d6f81701602f4c94d25b97f3be.tar.gz |
Fix issues when cluster is run with persistence enabled.
- Handle partial failures (e.g. due to disk error): failing brokers shut down, others continue.
- Enable persistence in cluster tests.
- Correct message status in DeliveryRecord updates.
- Remove qpid.update queue when update complete - avoid it becoming persistent
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@764204 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
36 files changed, 866 insertions, 187 deletions
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk index e2054d75e9..8c4ec94ffe 100644 --- a/qpid/cpp/src/cluster.mk +++ b/qpid/cpp/src/cluster.mk @@ -56,6 +56,8 @@ cluster_la_SOURCES = \ qpid/cluster/Dispatchable.h \ qpid/cluster/UpdateClient.cpp \ qpid/cluster/UpdateClient.h \ + qpid/cluster/ErrorCheck.cpp \ + qpid/cluster/ErrorCheck.h \ qpid/cluster/Event.cpp \ qpid/cluster/Event.h \ qpid/cluster/EventFrame.h \ @@ -70,6 +72,7 @@ cluster_la_SOURCES = \ qpid/cluster/Multicaster.h \ qpid/cluster/McastFrameHandler.h \ qpid/cluster/NoOpConnectionOutputHandler.h \ + qpid/cluster/StallConnectionOutputHandler.h \ qpid/cluster/OutputInterceptor.cpp \ qpid/cluster/OutputInterceptor.h \ qpid/cluster/PollerDispatch.cpp \ diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp index db957051d8..f927db09bb 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp @@ -88,6 +88,7 @@ void SessionHandler::handleIn(AMQFrame& f) { } catch(const SessionException& e) { QPID_LOG(error, "Execution exception: " << e.what()); + executionException(e.code, e.what()); // Let subclass handle this first. framing::AMQP_AllProxy::Execution execution(channel); AMQMethodBody* m = f.getMethod(); SequenceNumber commandId; @@ -98,6 +99,7 @@ void SessionHandler::handleIn(AMQFrame& f) { } catch(const ChannelException& e){ QPID_LOG(error, "Channel exception: " << e.what()); + channelException(e.code, e.what()); // Let subclass handle this first. peer.detached(name, e.code); } catch(const ConnectionException& e) { diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h index 0b158ec2b4..0d9c72ff02 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h +++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h @@ -87,8 +87,9 @@ class SessionHandler : public framing::AMQP_AllOperations::SessionHandler, QPID_COMMON_EXTERN virtual void invoke(const framing::AMQMethodBody& m); virtual void setState(const std::string& sessionName, bool force) = 0; - virtual void channelException(framing::session::DetachCode code, const std::string& msg) = 0; virtual void connectionException(framing::connection::CloseCode code, const std::string& msg) = 0; + virtual void channelException(framing::session::DetachCode, const std::string& msg) = 0; + virtual void executionException(framing::execution::ErrorCode, const std::string& msg) = 0; virtual void detaching() = 0; // Notification of events diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index b06e06d353..365b3ccbeb 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -57,7 +57,8 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std mgmtObject(0), links(broker_.getLinks()), agent(0), - timer(broker_.getTimer()) + timer(broker_.getTimer()), + errorListener(0) { Manageable* parent = broker.GetVhostObject(); diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h index b659fe6468..e67cdce681 100644 --- a/qpid/cpp/src/qpid/broker/Connection.h +++ b/qpid/cpp/src/qpid/broker/Connection.h @@ -66,6 +66,17 @@ class Connection : public sys::ConnectionInputHandler, public RefCounted { public: + /** + * Listener that can be registered with a Connection to be informed of errors. + */ + class ErrorListener + { + public: + virtual ~ErrorListener() {} + virtual void sessionError(uint16_t channel, const std::string&) = 0; + virtual void connectionError(const std::string&) = 0; + }; + Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false, uint64_t objectId = 0); ~Connection (); @@ -101,6 +112,9 @@ class Connection : public sys::ConnectionInputHandler, const std::string& getMgmtId() const { return mgmtId; } management::ManagementAgent* getAgent() const { return agent; } void setFederationLink(bool b); + /** Connection does not delete the listener. 0 resets. */ + void setErrorListener(ErrorListener* l) { errorListener=l; } + ErrorListener* getErrorListener() { return errorListener; } void setHeartbeatInterval(uint16_t heartbeat); void sendHeartbeat(); @@ -112,6 +126,7 @@ class Connection : public sys::ConnectionInputHandler, void sendClose(); void setSecureConnection(SecureConnection* secured); + private: typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap; typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; @@ -128,6 +143,8 @@ class Connection : public sys::ConnectionInputHandler, management::ManagementAgent* agent; Timer& timer; boost::intrusive_ptr<TimerTask> heartbeatTimer; + ErrorListener* errorListener; + public: qmf::org::apache::qpid::broker::Connection* getMgmtObject() { return mgmtObject; } }; diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp index 63212c7794..8b70836da0 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -64,13 +64,16 @@ void ConnectionHandler::heartbeat() void ConnectionHandler::handle(framing::AMQFrame& frame) { AMQMethodBody* method=frame.getBody()->getMethod(); + Connection::ErrorListener* errorListener = handler->connection.getErrorListener(); try{ if (!invoke(static_cast<AMQP_AllOperations::ConnectionHandler&>(*handler.get()), *method)) { handler->connection.getChannel(frame.getChannel()).in(frame); } }catch(ConnectionException& e){ + if (errorListener) errorListener->connectionError(e.what()); handler->proxy.close(e.code, e.what()); }catch(std::exception& e){ + if (errorListener) errorListener->connectionError(e.what()); handler->proxy.close(541/*internal error*/, e.what()); } } diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp index 442c3eb34b..ca1f875991 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp @@ -45,14 +45,20 @@ ClassId classId(AMQMethodBody* m) { return m ? m->amqpMethodId() : 0; } MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; } } // namespace -void SessionHandler::channelException(framing::session::DetachCode, const std::string&) { - handleDetach(); -} - void SessionHandler::connectionException(framing::connection::CloseCode code, const std::string& msg) { + // NOTE: must tell the error listener _before_ calling connection.close() + if (connection.getErrorListener()) connection.getErrorListener()->connectionError(msg); connection.close(code, msg); } +void SessionHandler::channelException(framing::session::DetachCode, const std::string& msg) { + if (connection.getErrorListener()) connection.getErrorListener()->sessionError(getChannel(), msg); +} + +void SessionHandler::executionException(framing::execution::ErrorCode, const std::string& msg) { + if (connection.getErrorListener()) connection.getErrorListener()->sessionError(getChannel(), msg); +} + ConnectionState& SessionHandler::getConnection() { return connection; } const ConnectionState& SessionHandler::getConnection() const { return connection; } diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h index ffc032f64c..ca6d6bb193 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.h +++ b/qpid/cpp/src/qpid/broker/SessionHandler.h @@ -73,8 +73,9 @@ class SessionHandler : public amqp_0_10::SessionHandler { virtual void setState(const std::string& sessionName, bool force); virtual qpid::SessionState* getState(); virtual framing::FrameHandler* getInHandler(); - virtual void channelException(framing::session::DetachCode code, const std::string& msg); virtual void connectionException(framing::connection::CloseCode code, const std::string& msg); + virtual void channelException(framing::session::DetachCode, const std::string& msg); + virtual void executionException(framing::execution::ErrorCode, const std::string& msg); virtual void detaching(); virtual void readyToSend(); diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index 38a41c36e8..ca325dde36 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -36,6 +36,7 @@ #include "qpid/framing/ClusterConfigChangeBody.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" +#include "qpid/framing/ClusterErrorCheckBody.h" #include "qpid/framing/ClusterReadyBody.h" #include "qpid/framing/ClusterShutdownBody.h" #include "qpid/framing/ClusterUpdateOfferBody.h" @@ -63,6 +64,7 @@ using namespace qpid::framing; using namespace qpid::sys; using namespace std; using namespace qpid::cluster; +using namespace qpid::framing::cluster; using qpid::management::ManagementAgent; using qpid::management::ManagementObject; using qpid::management::Manageable; @@ -77,9 +79,10 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); } void ready(const std::string& url) { cluster.ready(member, url, l); } - void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); } + void configChange(const std::string& current) { cluster.configChange(member, current, l); } void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); } void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); } + void errorCheck(uint8_t type, uint64_t seq) { cluster.errorCheck(member, type, seq, l); } void shutdown() { cluster.shutdown(member, l); } bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } @@ -112,7 +115,8 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : discarding(true), state(INIT), lastSize(0), - lastBroker(false) + lastBroker(false), + error(*this) { mAgent = ManagementAgent::Singleton::getInstance(); if (mAgent != 0){ @@ -195,14 +199,19 @@ void Cluster::leave() { leave(l); } +#define LEAVE_TRY(STMT) try { STMT; } \ + catch (const std::exception& e) { \ + QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \ + } do {} while(0) + void Cluster::leave(Lock&) { if (state != LEFT) { state = LEFT; QPID_LOG(notice, *this << " leaving cluster " << name); - try { broker.shutdown(); } - catch (const std::exception& e) { - QPID_LOG(critical, *this << " error during broker shutdown: " << e.what()); - } + // Finalize connections now now to avoid problems later in destructor. + LEAVE_TRY(localConnections.clear()); + LEAVE_TRY(connections.clear()); + LEAVE_TRY(broker.shutdown()); } } @@ -254,10 +263,22 @@ void Cluster::deliveredEvent(const Event& e) { QPID_LOG(trace, *this << " DROP: " << e); } +void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) { + Mutex::ScopedLock l(lock); + error.error(connection, type, map.getFrameSeq(), map.getMembers()); +} + // Handler for deliverFrameQueue. // This thread executes the main logic. void Cluster::deliveredFrame(const EventFrame& e) { Mutex::ScopedLock l(lock); + // Process each frame through the error checker. + error.delivered(e); + while (error.canProcess()) // There is a frame ready to process. + processFrame(error.getNext(), l); +} + +void Cluster::processFrame(const EventFrame& e, Lock& l) { if (e.isCluster()) { QPID_LOG(trace, *this << " DLVR: " << e); ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l); @@ -265,7 +286,8 @@ void Cluster::deliveredFrame(const EventFrame& e) { throw Exception(QPID_MSG("Invalid cluster control")); } else if (state >= CATCHUP) { - QPID_LOG(trace, *this << " DLVR: " << e); + map.incrementFrameSeq(); + QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ": " << e); ConnectionPtr connection = getConnection(e.connectionId, l); if (connection) connection->deliveredFrame(e); @@ -357,8 +379,8 @@ void Cluster::setReady(Lock&) { broker.getQueueEvents().enable(); } -void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& l) { - bool memberChange = map.configChange(addresses); +void Cluster::configChange(const MemberId&, const std::string& current, Lock& l) { + bool memberChange = map.configChange(current); if (state == LEFT) return; if (!map.isAlive(self)) { // Final config change. @@ -600,8 +622,13 @@ void Cluster::memberUpdate(Lock& l) { } std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { - static const char* STATE[] = { "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" }; - return o << cluster.self << "(" << STATE[cluster.state] << ")"; + static const char* STATE[] = { + "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" + }; + assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1); + o << cluster.self << "(" << STATE[cluster.state]; + if (cluster.error.isUnresolved()) o << "/error"; + return o << ")"; } MemberId Cluster::getId() const { @@ -635,4 +662,13 @@ void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { expiryPolicy->deliverExpire(id); } +void Cluster::errorCheck(const MemberId& , uint8_t type, uint64_t frameSeq, Lock&) { + // If we receive an errorCheck here, it's because we have processed past the point + // of the error so respond with ERROR_TYPE_NONE + assert(map.getFrameSeq() >= frameSeq); + if (type != framing::cluster::ERROR_TYPE_NONE) // Don't respond if its already NONE. + mcast.mcastControl( + ClusterErrorCheckBody(ProtocolVersion(), framing::cluster::ERROR_TYPE_NONE, frameSeq), self); +} + }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index b716e2d781..8a94fc79dd 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -23,6 +23,7 @@ #include "ClusterSettings.h" #include "Cpg.h" #include "Decoder.h" +#include "ErrorCheck.h" #include "Event.h" #include "EventFrame.h" #include "ExpiryPolicy.h" @@ -105,6 +106,10 @@ class Cluster : private Cpg::Handler, public management::Manageable { void deliverFrame(const EventFrame&); + // Called in deliverFrame thread to indicate an error from the broker. + void flagError(Connection&, ErrorCheck::ErrorType); + void connectionError(); + // Called only during update by Connection::shadowReady Decoder& getDecoder() { return decoder; } @@ -132,13 +137,15 @@ class Cluster : private Cpg::Handler, public management::Manageable { // == Called in deliverFrameQueue thread void deliveredFrame(const EventFrame&); + void processFrame(const EventFrame&, Lock&); // Cluster controls implement XML methods from cluster.xml. void updateRequest(const MemberId&, const std::string&, Lock&); void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&); void ready(const MemberId&, const std::string&, Lock&); - void configChange(const MemberId&, const std::string& addresses, Lock& l); + void configChange(const MemberId&, const std::string& current, Lock& l); void messageExpired(const MemberId&, uint64_t, Lock& l); + void errorCheck(const MemberId&, uint8_t, uint64_t, Lock&); void shutdown(const MemberId&, Lock&); // Helper functions @@ -216,11 +223,13 @@ class Cluster : private Cpg::Handler, public management::Manageable { Decoder decoder; bool discarding; + // Remaining members are protected by lock. - // FIXME aconway 2009-03-06: Most of these members are also only used in + + // TODO aconway 2009-03-06: Most of these members are also only used in // deliverFrameQueue thread or during stall. Review and separate members // that require a lock, drop lock when not needed. - // + mutable sys::Monitor lock; @@ -243,7 +252,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { bool lastBroker; sys::Thread updateThread; boost::optional<ClusterMap> updatedMap; - + ErrorCheck error; friend std::ostream& operator<<(std::ostream&, const Cluster&); friend class ClusterDispatcher; diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp index 9e7232180d..0395ff6382 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp +++ b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp @@ -33,6 +33,13 @@ using namespace framing; namespace cluster { +ClusterMap::Set ClusterMap::decode(const std::string& s) { + Set set; + for (std::string::const_iterator i = s.begin(); i < s.end(); i += 8) + set.insert(MemberId(std::string(i, i+8))); + return set; +} + namespace { void addFieldTableValue(FieldTable::ValueMap::value_type vt, ClusterMap::Map& map, ClusterMap::Set& set) { @@ -54,9 +61,9 @@ void assignFieldTable(FieldTable& ft, const ClusterMap::Map& map) { } -ClusterMap::ClusterMap() {} +ClusterMap::ClusterMap() : frameSeq(0) {} -ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) { +ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) : frameSeq(0) { alive.insert(id); if (isMember) members[id] = url; @@ -64,7 +71,9 @@ ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) { joiners[id] = url; } -ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt) { +ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt, uint64_t frameSeq_) + : frameSeq(frameSeq_) +{ std::for_each(joinersFt.begin(), joinersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(joiners), boost::ref(alive))); std::for_each(membersFt.begin(), membersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(members), boost::ref(alive))); } @@ -78,22 +87,7 @@ void ClusterMap::toMethodBody(framing::ClusterConnectionMembershipBody& b) const } b.getMembers().clear(); std::for_each(members.begin(), members.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getMembers()), _1)); -} - -bool ClusterMap::configChange( - cpg_address *current, int nCurrent, - cpg_address *left, int nLeft, - cpg_address */*joined*/, int /*nJoined*/) -{ - cpg_address* a; - bool memberChange=false; - for (a = left; a != left+nLeft; ++a) { - memberChange = memberChange || members.erase(*a); - joiners.erase(*a); - } - alive.clear(); - std::copy(current, current+nCurrent, std::inserter(alive, alive.end())); - return memberChange; + b.setFrameSeq(frameSeq); } Url ClusterMap::getUrl(const Map& map, const MemberId& id) { @@ -123,8 +117,13 @@ std::vector<Url> ClusterMap::memberUrls() const { return urls; } -ClusterMap::Set ClusterMap::getAlive() const { - return alive; +ClusterMap::Set ClusterMap::getAlive() const { return alive; } + +ClusterMap::Set ClusterMap::getMembers() const { + Set s; + std::transform(members.begin(), members.end(), std::inserter(s, s.begin()), + boost::bind(&Map::value_type::first, _1)); + return s; } std::ostream& operator<<(std::ostream& o, const ClusterMap::Map& m) { @@ -158,7 +157,7 @@ bool ClusterMap::ready(const MemberId& id, const Url& url) { bool ClusterMap::configChange(const std::string& addresses) { bool memberChange = false; - Set update; + Set update = decode(addresses); for (std::string::const_iterator i = addresses.begin(); i < addresses.end(); i += 8) update.insert(MemberId(std::string(i, i+8))); Set removed; diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.h b/qpid/cpp/src/qpid/cluster/ClusterMap.h index 4548441442..3359c7c1f3 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterMap.h +++ b/qpid/cpp/src/qpid/cluster/ClusterMap.h @@ -38,26 +38,26 @@ namespace qpid { namespace cluster { +typedef std::set<MemberId> MemberSet; + /** - * Map of established cluster members and joiners waiting for an update. + * Map of established cluster members and joiners waiting for an update, + * along with other cluster state that must be updated. */ class ClusterMap { public: typedef std::map<MemberId, Url> Map; typedef std::set<MemberId> Set; + static Set decode(const std::string&); + ClusterMap(); ClusterMap(const MemberId& id, const Url& url, bool isReady); - ClusterMap(const framing::FieldTable& urls, const framing::FieldTable& states); + ClusterMap(const framing::FieldTable& joiners, const framing::FieldTable& members, uint64_t frameSeq); /** Update from config change. *@return true if member set changed. */ - bool configChange( - cpg_address *current, int nCurrent, - cpg_address *left, int nLeft, - cpg_address *joined, int nJoined); - bool configChange(const std::string& addresses); bool isJoiner(const MemberId& id) const { return joiners.find(id) != joiners.end(); } @@ -78,6 +78,7 @@ class ClusterMap { std::vector<std::string> memberIds() const; std::vector<Url> memberUrls() const; Set getAlive() const; + Set getMembers() const; bool updateRequest(const MemberId& id, const std::string& url); /** Return non-empty Url if accepted */ @@ -90,11 +91,16 @@ class ClusterMap { * Utility method to return intersection of two member sets */ static Set intersection(const Set& a, const Set& b); + + uint64_t getFrameSeq() { return frameSeq; } + uint64_t incrementFrameSeq() { return ++frameSeq; } + private: Url getUrl(const Map& map, const MemberId& id); Map joiners, members; Set alive; + uint64_t frameSeq; friend std::ostream& operator<<(std::ostream&, const Map&); friend std::ostream& operator<<(std::ostream&, const ClusterMap&); diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index aa7d082720..4cb3dec970 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -56,8 +56,16 @@ namespace qpid { namespace cluster { using namespace framing; +using namespace framing::cluster; + +qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL); + +Connection::NullFrameHandler Connection::nullFrameHandler; + +struct NullFrameHandler : public framing::FrameHandler { + void handle(framing::AMQFrame&) {} +}; -NoOpConnectionOutputHandler Connection::discardHandler; namespace { sys::AtomicValue<uint64_t> idCounter; @@ -89,6 +97,8 @@ void Connection::init() { connection.setClusterOrderOutput(nullFrameHandler); // Passive, discard cluster-order frames connection.setClientThrottling(false); // Disable client throttling, done by active node. } + if (!isCatchUp()) + connection.setErrorListener(this); } void Connection::giveReadCredit(int credit) { @@ -97,6 +107,7 @@ void Connection::giveReadCredit(int credit) { } Connection::~Connection() { + connection.setErrorListener(0); QPID_LOG(debug, cluster << " deleted connection: " << *this); } @@ -126,7 +137,7 @@ void Connection::received(framing::AMQFrame& f) { cluster.addShadowConnection(this); AMQFrame ok((ConnectionCloseOkBody())); connection.getOutput().send(ok); - output.closeOutput(discardHandler); + output.closeOutput(); catchUp = false; } else @@ -156,8 +167,8 @@ void Connection::deliveredFrame(const EventFrame& f) { { if (f.type == DATA) // incoming data frames to broker::Connection connection.received(const_cast<AMQFrame&>(f.frame)); - else { // frame control, send frame via SessionState - broker::SessionState* ss = connection.getChannel(f.frame.getChannel()).getSession(); + else { // frame control, send frame via SessionState + broker::SessionState* ss = connection.getChannel(currentChannel).getSession(); if (ss) ss->out(const_cast<AMQFrame&>(f.frame)); } } @@ -180,7 +191,7 @@ void Connection::closed() { // This was a local replicated connection. Multicast a deliver // closed and process any outstanding frames from the cluster // until self-delivery of deliver-close. - output.closeOutput(discardHandler); + output.closeOutput(); cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self); } } @@ -275,13 +286,14 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const str QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId); self = shadowId; connection.setUserId(username); - // OK to use decoder here because we are stalled for update. + // OK to use decoder here because cluster is stalled for update. cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size()); + connection.setErrorListener(this); } -void Connection::membership(const FieldTable& joiners, const FieldTable& members) { +void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameSeq) { QPID_LOG(debug, cluster << " incoming update complete on connection " << *this); - cluster.updateInDone(ClusterMap(joiners, members)); + cluster.updateInDone(ClusterMap(joiners, members, frameSeq)); self.second = 0; // Mark this as completed update connection. } @@ -305,7 +317,9 @@ shared_ptr<broker::Queue> Connection::findQueue(const std::string& qname) { } broker::QueuedMessage Connection::getUpdateMessage() { - broker::QueuedMessage m = findQueue(UpdateClient::UPDATE)->get(); + shared_ptr<broker::Queue> updateq = findQueue(UpdateClient::UPDATE); + assert(!updateq->isDurable()); + broker::QueuedMessage m = updateq->get(); if (!m.payload) throw Exception(QPID_MSG(cluster << " empty update queue")); return m; } @@ -342,15 +356,15 @@ void Connection::deliveryRecord(const string& qname, // If the message was unacked, the newbie broker must place // it in its messageStore. - if ( m.payload && m.payload->isPersistent() && !completed && !ended && !accepted && !cancelled ) + if ( m.payload && m.payload->isPersistent() && acquired && !ended) queue->enqueue ( 0, m.payload ); } void Connection::queuePosition(const string& qname, const SequenceNumber& position) { - shared_ptr<broker::Queue> q = cluster.getBroker().getQueues().find(qname); - if (!q) throw InvalidArgumentException(QPID_MSG("Invalid queue name " << qname)); - q->setPosition(position); -} + shared_ptr<broker::Queue> q = cluster.getBroker().getQueues().find(qname); + if (!q) throw InvalidArgumentException(QPID_MSG("Invalid queue name " << qname)); + q->setPosition(position); + } void Connection::expiryId(uint64_t id) { cluster.getExpiryPolicy().setId(id); @@ -407,7 +421,14 @@ void Connection::queue(const std::string& encoded) { QPID_LOG(debug, cluster << " decoded queue " << q->getName()); } -qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL); +void Connection::sessionError(uint16_t , const std::string& ) { + cluster.flagError(*this, ERROR_TYPE_SESSION); + +} + +void Connection::connectionError(const std::string& ) { + cluster.flagError(*this, ERROR_TYPE_CONNECTION); +} }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index 6434f763a8..49839a456b 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -25,7 +25,6 @@ #include "types.h" #include "WriteEstimate.h" #include "OutputInterceptor.h" -#include "NoOpConnectionOutputHandler.h" #include "EventFrame.h" #include "McastFrameHandler.h" @@ -58,7 +57,8 @@ class Event; class Connection : public RefCounted, public sys::ConnectionInputHandler, - public framing::AMQP_AllOperations::ClusterConnectionHandler + public framing::AMQP_AllOperations::ClusterConnectionHandler, + private broker::Connection::ErrorListener { public: @@ -120,7 +120,7 @@ class Connection : void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment); - void membership(const framing::FieldTable&, const framing::FieldTable&); + void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameSeq); void deliveryRecord(const std::string& queue, const framing::SequenceNumber& position, @@ -156,6 +156,13 @@ class Connection : void handle(framing::AMQFrame&) {} }; + + static NullFrameHandler nullFrameHandler; + + // Error listener functions + void connectionError(const std::string&); + void sessionError(uint16_t channel, const std::string&); + void init(); bool checkUnsupported(const framing::AMQBody& body); void deliverClose(); @@ -167,8 +174,6 @@ class Connection : broker::SemanticState& semanticState(); broker::QueuedMessage getUpdateMessage(); - static NoOpConnectionOutputHandler discardHandler; - Cluster& cluster; ConnectionId self; bool catchUp; @@ -181,7 +186,6 @@ class Connection : boost::shared_ptr<broker::TxBuffer> txBuffer; bool expectProtocolHeader; McastFrameHandler mcastFrameHandler; - NullFrameHandler nullFrameHandler; static qpid::sys::AtomicValue<uint64_t> catchUpId; diff --git a/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp b/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp new file mode 100644 index 0000000000..cbe3e3daa4 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp @@ -0,0 +1,120 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ErrorCheck.h" +#include "EventFrame.h" +#include "ClusterMap.h" +#include "Cluster.h" +#include "qpid/framing/ClusterErrorCheckBody.h" +#include "qpid/framing/ClusterConfigChangeBody.h" +#include "qpid/log/Statement.h" + +#include <algorithm> + +namespace qpid { +namespace cluster { + +using namespace std; +using namespace framing; +using namespace framing::cluster; + +ErrorCheck::ErrorCheck(Cluster& c) + : cluster(c), mcast(c.getMulticast()), frameSeq(0), type(ERROR_TYPE_NONE), connection(0) +{} + +ostream& operator<<(ostream& o, ErrorCheck::MemberSet ms) { + copy(ms.begin(), ms.end(), ostream_iterator<MemberId>(o, " ")); + return o; +} + +void ErrorCheck::error(Connection& c, ErrorType t, uint64_t seq, const MemberSet& ms) +{ + // Detected a local error, inform cluster and set error state. + assert(t != ERROR_TYPE_NONE); // Must be an error. + assert(type == ERROR_TYPE_NONE); // Can only be called while processing + type = t; + unresolved = ms; + frameSeq = seq; + connection = &c; + QPID_LOG(debug, cluster << (type == ERROR_TYPE_SESSION ? " Session" : " Connection") + << " error " << frameSeq << " unresolved: " << unresolved); + mcast.mcastControl(ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), cluster.getId()); +} + +void ErrorCheck::delivered(const EventFrame& e) { + if (isUnresolved()) { + const ClusterErrorCheckBody* errorCheck = + dynamic_cast<const ClusterErrorCheckBody*>(e.frame.getMethod()); + const ClusterConfigChangeBody* configChange = + dynamic_cast<const ClusterConfigChangeBody*>(e.frame.getMethod()); + + if (errorCheck && errorCheck->getFrameSeq() == frameSeq) { // Same error + if (errorCheck->getType() < type) { // my error is worse than his + QPID_LOG(critical, cluster << " Error " << frameSeq << " did not occur on " << e.getMemberId()); + throw Exception("Aborted by local failure that did not occur on all replicas"); + } + else { // his error is worse/same as mine. + QPID_LOG(critical, cluster << " Error " << frameSeq << " outcome agrees with " << e.getMemberId()); + unresolved.erase(e.getMemberId()); + checkResolved(); + } + } + else { + frames.push_back(e); // Only drop matching errorCheck controls. + if (configChange) { + MemberSet members(ClusterMap::decode(configChange->getCurrent())); + MemberSet result; + set_intersection(members.begin(), members.end(), + unresolved.begin(), unresolved.end(), + inserter(result, result.begin())); + unresolved.swap(result); + checkResolved(); + } + } + } + else + frames.push_back(e); +} + +void ErrorCheck::checkResolved() { + if (unresolved.empty()) { // No more potentially conflicted members, we're clear. + type = ERROR_TYPE_NONE; + QPID_LOG(debug, cluster << " Error " << frameSeq << " resolved."); + } + else + QPID_LOG(debug, cluster << " Error " << frameSeq << " still unresolved: " << unresolved); +} + +EventFrame ErrorCheck::getNext() { + assert(canProcess()); + EventFrame e(frames.front()); + frames.pop_front(); + return e; +} + +bool ErrorCheck::canProcess() const { + return type == ERROR_TYPE_NONE && !frames.empty(); +} + +bool ErrorCheck::isUnresolved() const { + return type != ERROR_TYPE_NONE; +} + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/ErrorCheck.h b/qpid/cpp/src/qpid/cluster/ErrorCheck.h new file mode 100644 index 0000000000..97b5f2bffd --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/ErrorCheck.h @@ -0,0 +1,80 @@ +#ifndef QPID_CLUSTER_ERRORCHECK_H +#define QPID_CLUSTER_ERRORCHECK_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "types.h" +#include "Multicaster.h" +#include "qpid/framing/enum.h" +#include <boost/function.hpp> +#include <deque> +#include <set> + +namespace qpid { +namespace cluster { + +class EventFrame; +class ClusterMap; +class Cluster; +class Multicaster; +class Connection; + +/** + * Error checking logic. + * + * When an error occurs stop processing frames and queue them until we + * can determine if all nodes experienced the error. If not, we shut down. + */ +class ErrorCheck +{ + public: + typedef std::set<MemberId> MemberSet; + typedef framing::cluster::ErrorType ErrorType; + + ErrorCheck(Cluster&); + + /** A local error has occured */ + void error(Connection&, ErrorType, uint64_t frameSeq, const MemberSet&); + + /** Called when a frame is delivered */ + void delivered(const EventFrame&); + + EventFrame getNext(); + + bool canProcess() const; + bool isUnresolved() const; + + private: + void checkResolved(); + + Cluster& cluster; + Multicaster& mcast; + std::deque<EventFrame> frames; + std::set<MemberId> unresolved; + uint64_t frameSeq; + ErrorType type; + Connection* connection; +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_ERRORCHECK_H*/ diff --git a/qpid/cpp/src/qpid/cluster/EventFrame.h b/qpid/cpp/src/qpid/cluster/EventFrame.h index d6ff58dd38..aada4c2628 100644 --- a/qpid/cpp/src/qpid/cluster/EventFrame.h +++ b/qpid/cpp/src/qpid/cluster/EventFrame.h @@ -45,6 +45,7 @@ struct EventFrame bool isCluster() const { return connectionId.getNumber() == 0; } bool isConnection() const { return connectionId.getNumber() != 0; } bool isLastInEvent() const { return readCredit; } + MemberId getMemberId() const { return connectionId.getMember(); } ConnectionId connectionId; diff --git a/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h b/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h index 8b2f6dae8e..4df742d6c2 100644 --- a/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h +++ b/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h @@ -52,6 +52,8 @@ class LockedConnectionMap return 0; } + void clear() { sys::Mutex::ScopedLock l(lock); map.clear(); } + private: typedef std::map<ConnectionId, ConnectionPtr> Map; mutable sys::Mutex lock; diff --git a/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h b/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h index 74a376a657..6a30bddf06 100644 --- a/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h +++ b/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h @@ -30,8 +30,7 @@ namespace framing { class AMQFrame; } namespace cluster { /** - * Output handler for frames sent to noop connections. - * Simply discards frames. + * Output handler shadow connections, simply discards frames. */ class NoOpConnectionOutputHandler : public sys::ConnectionOutputHandler { diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp index cd42446016..da674fa6fd 100644 --- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -32,8 +32,9 @@ namespace cluster { using namespace framing; -OutputInterceptor::OutputInterceptor( - cluster::Connection& p, sys::ConnectionOutputHandler& h) +NoOpConnectionOutputHandler OutputInterceptor::discardHandler; + +OutputInterceptor::OutputInterceptor(Connection& p, sys::ConnectionOutputHandler& h) : parent(p), closing(false), next(&h), sent(), writeEstimate(p.getCluster().getWriteEstimate()), moreOutput(), doingOutput() @@ -111,10 +112,10 @@ void OutputInterceptor::sendDoOutput() { QPID_LOG(trace, parent << "Send doOutput request for " << request); } -void OutputInterceptor::closeOutput(sys::ConnectionOutputHandler& h) { +void OutputInterceptor::closeOutput() { sys::Mutex::ScopedLock l(lock); closing = true; - next = &h; + next = &discardHandler; } void OutputInterceptor::close() { diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.h b/qpid/cpp/src/qpid/cluster/OutputInterceptor.h index c080a419e1..5000893727 100644 --- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.h +++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.h @@ -23,6 +23,7 @@ */ #include "WriteEstimate.h" +#include "NoOpConnectionOutputHandler.h" #include "qpid/sys/ConnectionOutputHandler.h" #include "qpid/broker/ConnectionFactory.h" #include "qpid/sys/LatencyMetric.h" @@ -53,7 +54,7 @@ class OutputInterceptor : public sys::ConnectionOutputHandler, sys::LatencyMetri // Intercept doOutput requests on Connection. bool doOutput(); - void closeOutput(sys::ConnectionOutputHandler& h); + void closeOutput(); cluster::Connection& parent; @@ -70,6 +71,7 @@ class OutputInterceptor : public sys::ConnectionOutputHandler, sys::LatencyMetri WriteEstimate writeEstimate; bool moreOutput; bool doingOutput; + static NoOpConnectionOutputHandler discardHandler; }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index c00b811a20..2696495cb7 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -125,15 +125,19 @@ void UpdateClient::update() { // Update queue is used to transfer acquired messages that are no longer on their original queue. session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true); session.sync(); - session.close(); std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1)); + session.queueDelete(arg::queue=UPDATE); + session.close(); + + ClusterConnectionProxy(session).expiryId(expiry.getId()); ClusterConnectionMembershipBody membership; map.toMethodBody(membership); AMQFrame frame(membership); client::ConnectionAccess::getImpl(connection)->handle(frame); + connection.close(); QPID_LOG(debug, updaterId << " updated state to " << updateeId << " at " << updateeUrl); } @@ -202,7 +206,6 @@ class MessageUpdater { sb.get()->send(transfer, message.payload->getFrames()); if (message.payload->isContentReleased()){ uint16_t maxFrameSize = sb.get()->getConnection()->getNegotiatedSettings().maxFrameSize; - uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); bool morecontent = true; for (uint64_t offset = 0; morecontent; offset += maxContentSize) diff --git a/qpid/cpp/src/tests/BrokerFixture.h b/qpid/cpp/src/tests/BrokerFixture.h index f55560739d..b32b7f44ba 100644 --- a/qpid/cpp/src/tests/BrokerFixture.h +++ b/qpid/cpp/src/tests/BrokerFixture.h @@ -114,10 +114,12 @@ struct ClientT { SessionType session; qpid::client::SubscriptionManager subs; qpid::client::LocalQueue lq; - ClientT(uint16_t port, const std::string& name=std::string()) - : connection(port), session(connection.newSession(name)), subs(session) {} - ClientT(const qpid::client::ConnectionSettings& settings, const std::string& name=std::string()) - : connection(settings), session(connection.newSession(name)), subs(session) {} + std::string name; + + ClientT(uint16_t port, const std::string& name_=std::string()) + : connection(port), session(connection.newSession(name_)), subs(session), name(name_) {} + ClientT(const qpid::client::ConnectionSettings& settings, const std::string& name_=std::string()) + : connection(settings), session(connection.newSession(name_)), subs(session), name(name_) {} ~ClientT() { connection.close(); } }; diff --git a/qpid/cpp/src/tests/ClusterFixture.cpp b/qpid/cpp/src/tests/ClusterFixture.cpp index 5658957b48..d49be76f79 100644 --- a/qpid/cpp/src/tests/ClusterFixture.cpp +++ b/qpid/cpp/src/tests/ClusterFixture.cpp @@ -67,16 +67,23 @@ ClusterFixture::ClusterFixture(size_t n, int localIndex_, const Args& args_) add(n); } +ClusterFixture::ClusterFixture(size_t n, int localIndex_, boost::function<void (Args&, size_t)> updateArgs_) + : name(Uuid(true).str()), localIndex(localIndex_), updateArgs(updateArgs_) +{ + add(n); +} + const ClusterFixture::Args ClusterFixture::DEFAULT_ARGS = list_of<string>("--auth=no")("--no-data-dir"); -ClusterFixture::Args ClusterFixture::makeArgs(const std::string& prefix) { +ClusterFixture::Args ClusterFixture::makeArgs(const std::string& prefix, size_t index) { Args args = list_of<string>("qpidd " __FILE__) ("--no-module-dir") ("--load-module=../.libs/cluster.so") ("--cluster-name")(name) ("--log-prefix")(prefix); args.insert(args.end(), userArgs.begin(), userArgs.end()); + if (updateArgs) updateArgs(args, index); return args; } @@ -84,7 +91,7 @@ void ClusterFixture::add() { if (size() != size_t(localIndex)) { // fork a broker process. std::ostringstream os; os << "fork" << size(); std::string prefix = os.str(); - forkedBrokers.push_back(shared_ptr<ForkedBroker>(new ForkedBroker(makeArgs(prefix)))); + forkedBrokers.push_back(shared_ptr<ForkedBroker>(new ForkedBroker(makeArgs(prefix, size())))); push_back(forkedBrokers.back()->getPort()); } else { // Run in this process @@ -106,7 +113,7 @@ void ClusterFixture::addLocal() { assert(int(size()) == localIndex); ostringstream os; os << "local" << localIndex; string prefix = os.str(); - Args args(makeArgs(prefix)); + Args args(makeArgs(prefix, localIndex)); vector<const char*> argv(args.size()); transform(args.begin(), args.end(), argv.begin(), boost::bind(&string::c_str, _1)); qpid::log::Logger::instance().setPrefix(prefix); @@ -131,3 +138,22 @@ void ClusterFixture::killWithSilencer(size_t n, client::Connection& c, int sig) kill(n,sig); try { c.close(); } catch(...) {} } + +/** + * Get the known broker ports from a Connection. + *@param n if specified wait for the cluster size to be n, up to a timeout. + */ +std::set<int> knownBrokerPorts(qpid::client::Connection& source, int n) { + std::vector<qpid::Url> urls = source.getKnownBrokers(); + if (n >= 0 && unsigned(n) != urls.size()) { + // Retry up to 10 secs in .1 second intervals. + for (size_t retry=100; urls.size() != unsigned(n) && retry != 0; --retry) { + qpid::sys::usleep(1000*100); // 0.1 secs + urls = source.getKnownBrokers(); + } + } + std::set<int> s; + for (std::vector<qpid::Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) + s.insert((*i)[0].get<qpid::TcpAddress>()->port); + return s; +} diff --git a/qpid/cpp/src/tests/ClusterFixture.h b/qpid/cpp/src/tests/ClusterFixture.h index 84fb9f2202..75d39fc7e5 100644 --- a/qpid/cpp/src/tests/ClusterFixture.h +++ b/qpid/cpp/src/tests/ClusterFixture.h @@ -38,6 +38,7 @@ #include "qpid/log/Logger.h" #include <boost/bind.hpp> +#include <boost/function.hpp> #include <boost/shared_ptr.hpp> #include <string> @@ -69,33 +70,44 @@ using qpid::cluster::Cluster; class ClusterFixture : public vector<uint16_t> { public: typedef std::vector<std::string> Args; + static const Args DEFAULT_ARGS; + /** @param localIndex can be -1 meaning don't automatically start a local broker. * A local broker can be started with addLocal(). */ ClusterFixture(size_t n, int localIndex=0, const Args& args=DEFAULT_ARGS); + + /**@param updateArgs function is passed the index of the cluster member and can update the arguments. */ + ClusterFixture(size_t n, int localIndex, boost::function<void (Args&, size_t)> updateArgs); + void add(size_t n) { for (size_t i=0; i < n; ++i) add(); } void add(); // Add a broker. void setup(); bool hasLocal() const; - /** Kill a forked broker with sig, or shutdown localBroker if n==0. */ + /** Kill a forked broker with sig, or shutdown localBroker. */ void kill(size_t n, int sig=SIGINT); /** Kill a broker and suppressing errors from closing connection c. */ void killWithSilencer(size_t n, client::Connection& c, int sig=SIGINT); private: - static const Args DEFAULT_ARGS; void addLocal(); // Add a local broker. - Args makeArgs(const std::string& prefix); + Args makeArgs(const std::string& prefix, size_t index); string name; std::auto_ptr<BrokerFixture> localBroker; int localIndex; std::vector<shared_ptr<ForkedBroker> > forkedBrokers; Args userArgs; + boost::function<void (Args&, size_t)> updateArgs; }; +/** + * Get the known broker ports from a Connection. + *@param n if specified wait for the cluster size to be n, up to a timeout. + */ +std::set<int> knownBrokerPorts(qpid::client::Connection& source, int n=-1); #endif /*!CLUSTER_FIXTURE_H*/ diff --git a/qpid/cpp/src/tests/ForkedBroker.cpp b/qpid/cpp/src/tests/ForkedBroker.cpp index f90f76aeb2..383dcc496c 100644 --- a/qpid/cpp/src/tests/ForkedBroker.cpp +++ b/qpid/cpp/src/tests/ForkedBroker.cpp @@ -20,12 +20,17 @@ */ #include "ForkedBroker.h" +#include "qpid/log/Statement.h" #include <boost/bind.hpp> +#include <boost/algorithm/string.hpp> #include <algorithm> #include <stdlib.h> #include <sys/types.h> #include <signal.h> +using namespace std; +using qpid::ErrnoException; + ForkedBroker::ForkedBroker(const Args& args) { init(args); } ForkedBroker::ForkedBroker(int argc, const char* const argv[]) { init(Args(argv, argc+argv)); } @@ -42,14 +47,25 @@ void ForkedBroker::kill(int sig) { pid = 0; // Reset pid here in case of an exception. using qpid::ErrnoException; if (::kill(savePid, sig) < 0) - throw ErrnoException("kill failed"); + throw ErrnoException("kill failed"); int status; if (::waitpid(savePid, &status, 0) < 0 && sig != 9) throw ErrnoException("wait for forked process failed"); if (WEXITSTATUS(status) != 0 && sig != 9) throw qpid::Exception(QPID_MSG("Forked broker exited with: " << WEXITSTATUS(status))); } + +namespace std { +static ostream& operator<<(ostream& o, const ForkedBroker::Args& a) { + copy(a.begin(), a.end(), ostream_iterator<string>(o, " ")); + return o; +} +bool isLogOption(const std::string& s) { + return boost::starts_with(s, "--log-enable") || boost::starts_with(s, "--trace"); +} + +} void ForkedBroker::init(const Args& userArgs) { using qpid::ErrnoException; @@ -70,17 +86,19 @@ void ForkedBroker::init(const Args& userArgs) { } else { // child ::close(pipeFds[0]); - // FIXME aconway 2009-02-12: int fd = ::dup2(pipeFds[1], 1); // pipe stdout to the parent. if (fd < 0) throw ErrnoException("dup2 failed"); const char* prog = "../qpidd"; Args args(userArgs); args.push_back("--port=0"); - if (!::getenv("QPID_TRACE") && !::getenv("QPID_LOG_ENABLE")) - args.push_back("--log-enable=error+"); // Keep quiet except for errors. + // Keep quiet except for errors. + if (!::getenv("QPID_TRACE") && !::getenv("QPID_LOG_ENABLE") + && find_if(userArgs.begin(), userArgs.end(), isLogOption) == userArgs.end()) + args.push_back("--log-enable=error+"); std::vector<const char*> argv(args.size()); std::transform(args.begin(), args.end(), argv.begin(), boost::bind(&std::string::c_str, _1)); argv.push_back(0); + QPID_LOG(debug, "ForkedBroker exec " << prog << ": " << args); execv(prog, const_cast<char* const*>(&argv[0])); throw ErrnoException("execv failed"); } diff --git a/qpid/cpp/src/tests/ForkedBroker.h b/qpid/cpp/src/tests/ForkedBroker.h index 6f97fbdc09..e72f421563 100644 --- a/qpid/cpp/src/tests/ForkedBroker.h +++ b/qpid/cpp/src/tests/ForkedBroker.h @@ -53,6 +53,7 @@ class ForkedBroker { ~ForkedBroker(); void kill(int sig=SIGINT); + int wait(); // Wait for exit, return exit status. uint16_t getPort() { return port; } pid_t getPID() { return pid; } diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index 9a81ef18b3..4b59e8ebe9 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -110,11 +110,17 @@ endif # amqp_0_10/Map.cpp \ # amqp_0_10/handlers.cpp +TESTLIBFLAGS = -module -rpath $(abs_builddir) check_LTLIBRARIES += libshlibtest.la -libshlibtest_la_LDFLAGS = -module -rpath $(abs_builddir) +libshlibtest_la_LDFLAGS = $(TESTLIBFLAGS) libshlibtest_la_SOURCES = shlibtest.cpp +check_LTLIBRARIES += test_store.la +test_store_la_SOURCES = test_store.cpp +test_store_la_LIBADD = $(lib_broker) # FIXME aconway 2009-04-03: required? +test_store_la_LDFLAGS = $(TESTLIBFLAGS) + include cluster.mk if SSL include ssl.mk diff --git a/qpid/cpp/src/tests/PartialFailure.cpp b/qpid/cpp/src/tests/PartialFailure.cpp new file mode 100644 index 0000000000..f7187b2e77 --- /dev/null +++ b/qpid/cpp/src/tests/PartialFailure.cpp @@ -0,0 +1,222 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/**@file Tests for partial failure in a cluster. + * Partial failure means some nodes experience a failure while others do not. + * In this case the failed nodes must shut down. + */ + +#include "test_tools.h" +#include "unit_test.h" +#include "ClusterFixture.h" +#include <boost/assign.hpp> +#include <boost/algorithm/string.hpp> +#include <boost/bind.hpp> + +QPID_AUTO_TEST_SUITE(PartialFailureTestSuite) + + using namespace std; +using namespace qpid; +using namespace qpid::cluster; +using namespace qpid::framing; +using namespace qpid::client; +using namespace qpid::client::arg; +using namespace boost::assign; +using broker::Broker; +using boost::shared_ptr; + +// Timeout for tests that wait for messages +const sys::Duration TIMEOUT=sys::TIME_SEC/4; + +static bool isLogOption(const std::string& s) { return boost::starts_with(s, "--log-enable"); } + +void updateArgs(ClusterFixture::Args& args, size_t index) { + ostringstream os; + os << "--test-store-name=s" << index; + args.push_back(os.str()); + args.push_back("--load-module=.libs/test_store.so"); + string dataDir("/tmp/PartialFailure.XXXXXX"); + if (!mkdtemp(const_cast<char*>(dataDir.c_str()))) + throw ErrnoException("Can't create data dir"); + args.push_back("--data-dir="+dataDir); + args.push_back("--auth=no"); + + // These tests generate errors deliberately, disable error logging unless a log env var is set. + if (!::getenv("QPID_TRACE") && !::getenv("QPID_LOG_ENABLE")) { + remove_if(args.begin(), args.end(), isLogOption); + args.push_back("--log-enable=critical+:DISABLED"); // hacky way to disable logs. + } +} + +Message pMessage(string data, string q) { + Message msg(data, q); + msg.getDeliveryProperties().setDeliveryMode(PERSISTENT); + return msg; +} + +void queueAndSub(Client& c) { + c.session.queueDeclare(c.name, durable=true); + c.subs.subscribe(c.lq, c.name); +} + +// Verify normal cluster-wide errors. +QPID_AUTO_TEST_CASE(testNormalErrors) { + // FIXME aconway 2009-04-10: Would like to put a scope just around + // the statements expected to fail (in BOOST_CHECK_THROW) but that + // sproadically lets out messages, possibly because they're in + // Connection thread. + ScopedSuppressLogging allQuiet; + + ClusterFixture cluster(3, -1, updateArgs); + Client c0(cluster[0], "c0"); + Client c1(cluster[1], "c1"); + Client c2(cluster[2], "c2"); + + queueAndSub(c0); + c0.session.messageTransfer(content=Message("x", "c0")); + BOOST_CHECK_EQUAL(c0.lq.get(TIMEOUT).getData(), "x"); + + // Session error. + BOOST_CHECK_THROW(c0.session.exchangeBind(), SessionException); + c1.session.messageTransfer(content=Message("stay", "c0")); // Will stay on queue, session c0 is dead. + + // Connection error, kill c1 on all members. + queueAndSub(c1); + BOOST_CHECK_THROW( + c1.session.messageTransfer( + content=pMessage("TEST_STORE_DO: s0[exception] s1[exception] s2[exception] testNormalErrors", "c1")), + ConnectionException); + c2.session.messageTransfer(content=Message("stay", "c1")); // Will stay on queue, session/connection c1 is dead. + + BOOST_CHECK_EQUAL(3u, knownBrokerPorts(c2.connection, 3).size()); + BOOST_CHECK_EQUAL(c2.subs.get("c0", TIMEOUT).getData(), "stay"); + BOOST_CHECK_EQUAL(c2.subs.get("c1", TIMEOUT).getData(), "stay"); +} + + +// Test errors after a new member joins to verify frame-sequence-numbers are ok in update. +QPID_AUTO_TEST_CASE(testErrorAfterJoin) { + ScopedSuppressLogging allQuiet; + + ClusterFixture cluster(1, -1, updateArgs); + Client c0(cluster[0]); + c0.session.queueDeclare("q", durable=true); + c0.session.messageTransfer(content=pMessage("a", "q")); + + // Kill the new guy + cluster.add(); + Client c1(cluster[1]); + c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] testErrorAfterJoin", "q")); + BOOST_CHECK_THROW(c1.session.messageTransfer(content=pMessage("xxx", "q")), TransportFailure); + BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c0.connection, 1).size()); + + // Kill the old guy + cluster.add(); + Client c2(cluster[2]); + c2.session.messageTransfer(content=pMessage("TEST_STORE_DO: s0[exception] testErrorAfterJoin2", "q")); + BOOST_CHECK_THROW(c0.session.messageTransfer(content=pMessage("xxx", "q")), TransportFailure); + + BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c2.connection, 1).size()); +} + +// Test that if one member fails and others do not, the failure leaves the cluster. +QPID_AUTO_TEST_CASE(testSinglePartialFailure) { + ScopedSuppressLogging allQuiet; + + ClusterFixture cluster(3, -1, updateArgs); + Client c0(cluster[0], "c0"); + Client c1(cluster[1], "c1"); + Client c2(cluster[2], "c2"); + + c0.session.queueDeclare("q", durable=true); + c0.session.messageTransfer(content=pMessage("a", "q")); + // Cause partial failure on c1 + c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] testSinglePartialFailure", "q")); + BOOST_CHECK_THROW(c1.session.queueQuery("q"), TransportFailure); + + c0.session.messageTransfer(content=pMessage("b", "q")); + BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 3u); + BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c0.connection, 2).size()); + + // Cause partial failure on c2 + c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s2[exception] testSinglePartialFailure2", "q")); + BOOST_CHECK_THROW(c2.session.queueQuery("q"), TransportFailure); + + c0.session.messageTransfer(content=pMessage("c", "q")); + BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 5u); + BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c0.connection, 1).size()); +} + +// Test multiple partial falures: 2 fail 2 pass +QPID_AUTO_TEST_CASE(testMultiPartialFailure) { + ScopedSuppressLogging allQuiet; + + ClusterFixture cluster(4, -1, updateArgs); + Client c0(cluster[0], "c0"); + Client c1(cluster[1], "c1"); + Client c2(cluster[2], "c2"); + Client c3(cluster[3], "c3"); + + c0.session.queueDeclare("q", durable=true); + c0.session.messageTransfer(content=pMessage("a", "q")); + + // Cause partial failure on c1, c2 + c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] s2[exception] testMultiPartialFailure", "q")); + BOOST_CHECK_THROW(c1.session.queueQuery("q"), TransportFailure); + BOOST_CHECK_THROW(c2.session.queueQuery("q"), TransportFailure); + + c0.session.messageTransfer(content=pMessage("b", "q")); + c3.session.messageTransfer(content=pMessage("c", "q")); + BOOST_CHECK_EQUAL(c3.session.queueQuery("q").getMessageCount(), 4u); + BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c0.connection, 2).size()); +} + +/** FIXME aconway 2009-04-10: + * The current approach to shutting down a process in test_store + * sometimes leads to assertion failures and errors in the shut-down + * process. Need a cleaner solution + */ +#if 0 +QPID_AUTO_TEST_CASE(testPartialFailureMemberLeaves) { + ScopedSuppressLogging allQuiet; + + ClusterFixture cluster(2, -1, updateArgs); + Client c0(cluster[0], "c0"); + Client c1(cluster[1], "c1"); + + c0.session.queueDeclare("q", durable=true); + c0.session.messageTransfer(content=pMessage("a", "q")); + + // Cause failure on member 0 and simultaneous crash on member 1. + BOOST_CHECK_THROW( + c0.session.messageTransfer( + content=pMessage("TEST_STORE_DO: s0[exception] s1[exit_process] testPartialFailureMemberLeaves", "q")), + ConnectionException); + cluster.wait(1); + + Client c00(cluster[0], "c00"); // Old connection is dead. + BOOST_CHECK_EQUAL(c00.session.queueQuery("q").getMessageCount(), 1u); + BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c00.connection, 1).size()); +} +#endif + + +QPID_AUTO_TEST_SUITE_END() diff --git a/qpid/cpp/src/tests/cluster.mk b/qpid/cpp/src/tests/cluster.mk index 5d115de5a5..f92bb112e4 100644 --- a/qpid/cpp/src/tests/cluster.mk +++ b/qpid/cpp/src/tests/cluster.mk @@ -34,8 +34,10 @@ EXTRA_DIST+=ais_check start_cluster stop_cluster restart_cluster cluster_python_ federated_cluster_test clustered_replication_test check_PROGRAMS+=cluster_test -cluster_test_SOURCES=unit_test.cpp cluster_test.cpp ClusterFixture.cpp ClusterFixture.h ForkedBroker.h ForkedBroker.cpp -cluster_test_LDADD=$(lib_client) ../cluster.la -lboost_unit_test_framework +cluster_test_SOURCES=unit_test.cpp ClusterFixture.cpp ClusterFixture.h ForkedBroker.h ForkedBroker.cpp \ + cluster_test.cpp PartialFailure.cpp + +cluster_test_LDADD=$(lib_client) ../cluster.la test_store.la -lboost_unit_test_framework unit_test_LDADD+=../cluster.la diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp index eee2df58cc..98b399c187 100644 --- a/qpid/cpp/src/tests/cluster_test.cpp +++ b/qpid/cpp/src/tests/cluster_test.cpp @@ -73,7 +73,7 @@ const sys::Duration TIMEOUT=sys::TIME_SEC/4; ostream& operator<<(ostream& o, const cpg_name* n) { - return o << cluster::Cpg::str(*n); + return o << Cpg::str(*n); } ostream& operator<<(ostream& o, const cpg_address& a) { @@ -89,29 +89,12 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) { return o; } -template <class C> set<uint16_t> makeSet(const C& c) { - set<uint16_t> s; +template <class C> set<int> makeSet(const C& c) { + set<int> s; copy(c.begin(), c.end(), inserter(s, s.begin())); return s; } -template <class T> set<uint16_t> knownBrokerPorts(T& source, int n=-1) { - vector<Url> urls = source.getKnownBrokers(); - if (n >= 0 && unsigned(n) != urls.size()) { - BOOST_MESSAGE("knownBrokerPorts waiting for " << n << ": " << urls); - // Retry up to 10 secs in .1 second intervals. - for (size_t retry=100; urls.size() != unsigned(n) && retry != 0; --retry) { - sys::usleep(1000*100); // 0.1 secs - urls = source.getKnownBrokers(); - } - } - BOOST_MESSAGE("knownBrokerPorts expecting " << n << ": " << urls); - set<uint16_t> s; - for (vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) - s.insert((*i)[0].get<TcpAddress>()->port); - return s; -} - class Sender { public: Sender(boost::shared_ptr<ConnectionImpl> ci, uint16_t ch) : connection(ci), channel(ch) {} @@ -175,7 +158,6 @@ ConnectionSettings aclSettings(int port, const std::string& id) { QPID_AUTO_TEST_CASE(testAcl) { ofstream policyFile("cluster_test.acl"); - // FIXME aconway 2009-02-12: guest -> qpidd? policyFile << "acl allow foo@QPID create queue name=foo" << endl << "acl allow foo@QPID create queue name=foo2" << endl << "acl deny foo@QPID create queue name=bar" << endl @@ -446,13 +428,13 @@ QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) { QPID_AUTO_TEST_CASE(testConnectionKnownHosts) { ClusterFixture cluster(1); Client c0(cluster[0], "c0"); - set<uint16_t> kb0 = knownBrokerPorts(c0.connection); + set<int> kb0 = knownBrokerPorts(c0.connection); BOOST_CHECK_EQUAL(kb0.size(), 1u); BOOST_CHECK_EQUAL(kb0, makeSet(cluster)); cluster.add(); Client c1(cluster[1], "c1"); - set<uint16_t> kb1 = knownBrokerPorts(c1.connection); + set<int> kb1 = knownBrokerPorts(c1.connection); kb0 = knownBrokerPorts(c0.connection, 2); BOOST_CHECK_EQUAL(kb1.size(), 2u); BOOST_CHECK_EQUAL(kb1, makeSet(cluster)); @@ -460,7 +442,7 @@ QPID_AUTO_TEST_CASE(testConnectionKnownHosts) { cluster.add(); Client c2(cluster[2], "c2"); - set<uint16_t> kb2 = knownBrokerPorts(c2.connection); + set<int> kb2 = knownBrokerPorts(c2.connection); kb1 = knownBrokerPorts(c1.connection, 3); kb0 = knownBrokerPorts(c0.connection, 3); BOOST_CHECK_EQUAL(kb2.size(), 3u); diff --git a/qpid/cpp/src/tests/clustered_replication_test b/qpid/cpp/src/tests/clustered_replication_test index 2a3e742632..7afda87733 100755 --- a/qpid/cpp/src/tests/clustered_replication_test +++ b/qpid/cpp/src/tests/clustered_replication_test @@ -23,6 +23,7 @@ # failures: srcdir=`dirname $0` PYTHON_DIR=$srcdir/../../../python +export PYTHONPATH=$PYTHON_DIR trap stop_brokers INT EXIT diff --git a/qpid/cpp/src/tests/failover_soak.cpp b/qpid/cpp/src/tests/failover_soak.cpp index 18d693e5ec..2da60f47b5 100644 --- a/qpid/cpp/src/tests/failover_soak.cpp +++ b/qpid/cpp/src/tests/failover_soak.cpp @@ -220,63 +220,13 @@ struct children : public vector<child *> cout << "\n\n\n\n"; } - - /* - Only call this if you already know there is at least - one child still running. Supply a time in seconds. - If it has been at least that long since a shild stopped - running, we judge the system to have hung. - */ - int - hanging ( int hangTime ) - { - struct timeval now, - duration; - gettimeofday ( &now, 0 ); - - int how_many_hanging = 0; - - vector<child *>::iterator i; - for ( i = begin(); i != end(); ++ i ) - { - //Not in POSIX - //timersub ( & now, &((*i)->startTime), & duration ); - duration.tv_sec = now.tv_sec - (*i)->startTime.tv_sec; - duration.tv_usec = now.tv_usec - (*i)->startTime.tv_usec; - if (duration.tv_usec < 0) { - --duration.tv_sec; - duration.tv_usec += 1000000; - } - - if ( (COMPLETED != (*i)->status) // child isn't done running - && - ( duration.tv_sec >= hangTime ) // it's been too long - ) - { - std::cerr << "Child of type " - << (*i)->type - << " hanging. " - << "PID is " - << (*i)->pid - << endl; - ++ how_many_hanging; - } - } - - return how_many_hanging; - } - - int verbosity; }; - children allMyChildren; - - void childExit ( int ) { @@ -389,6 +339,7 @@ startNewBroker ( brokerVector & brokers, ("--log-prefix") (prefix.str()) ("--log-to-file") + ("--log-enable=error+") (prefix.str()+".log"); if (endsWith(moduleOrDir, "cluster.so")) { @@ -818,16 +769,6 @@ main ( int argc, char const ** argv ) return ERROR_ON_CHILD; } - // If one is hanging, quit. - if ( allMyChildren.hanging ( 120 ) ) - { - /* - * Don't kill any processes. Leave alive for questioning. - * */ - std::cerr << "END_OF_TEST ERROR_HANGING\n"; - return HANGING; - } - if ( verbosity > 1 ) { std::cerr << "------- next kill-broker loop --------\n"; allMyChildren.print(); diff --git a/qpid/cpp/src/tests/run_failover_soak b/qpid/cpp/src/tests/run_failover_soak index cf9646ac58..333dd0be23 100755 --- a/qpid/cpp/src/tests/run_failover_soak +++ b/qpid/cpp/src/tests/run_failover_soak @@ -51,5 +51,6 @@ REPORT_FREQUENCY=${REPORT_FREQUENCY:-`expr $MESSAGES / 20`} VERBOSITY=${VERBOSITY:-1} DURABILITY=${DURABILITY:-0} +rm -f soak-*.log exec ./failover_soak $MODULES ./declare_queues ./replaying_sender ./resuming_receiver $MESSAGES $REPORT_FREQUENCY $VERBOSITY $DURABILITY diff --git a/qpid/cpp/src/tests/start_cluster b/qpid/cpp/src/tests/start_cluster index 053b23da33..585ba082d5 100755 --- a/qpid/cpp/src/tests/start_cluster +++ b/qpid/cpp/src/tests/start_cluster @@ -28,15 +28,17 @@ with_ais_group() { echo $* | newgrp ais } -rm -f cluster*.log -SIZE=${1:-1}; shift +rm -f cluster*.log cluster.ports qpidd.port + +SIZE=${1:-3}; shift CLUSTER=`pwd` # Cluster name=pwd, avoid clashes. -OPTS="-d --no-module-dir --load-module ../.libs/cluster.so --cluster-name=$CLUSTER --no-data-dir --auth=no $@" +OPTS="-d --no-module-dir --load-module ../.libs/cluster.so --cluster-name=$CLUSTER --auth=no $@" for (( i=0; i<SIZE; ++i )); do - PORT=`with_ais_group ../qpidd -p0 --log-to-file=cluster$i.log $OPTS` || exit 1 + DDIR=`mktemp -d /tmp/start_cluster.XXXXXXXXXX` + PORT=`with_ais_group ../qpidd -p0 --log-to-file=cluster$i.log $OPTS --data-dir=$DDIR` || exit 1 echo $PORT >> cluster.ports done -head cluster.ports > qpidd.port # First member's port for tests. +head -n 1 cluster.ports > qpidd.port # First member's port for tests. diff --git a/qpid/cpp/src/tests/test_store.cpp b/qpid/cpp/src/tests/test_store.cpp new file mode 100644 index 0000000000..da4f03192a --- /dev/null +++ b/qpid/cpp/src/tests/test_store.cpp @@ -0,0 +1,146 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +/**@file + * Plug-in message store for tests. + * + * Add functionality as required, build up a comprehensive set of + * features to support persistent behavior tests. + * + * Current features special "action" messages can: + * - raise exception from enqueue. + * - force host process to exit. + * - do async completion after a delay. + */ + +#include "qpid/broker/NullMessageStore.h" +#include "qpid/broker/Broker.h" +#include "qpid/log/Statement.h" +#include "qpid/Plugin.h" +#include "qpid/Options.h" +#include <boost/algorithm/string.hpp> +#include <boost/cast.hpp> +#include <boost/lexical_cast.hpp> + +using namespace qpid; +using namespace broker; +using namespace std; +using namespace boost; +using namespace qpid::sys; + +struct TestStoreOptions : public Options { + + string name; + + TestStoreOptions() : Options("Test Store Options") { + addOptions() + ("test-store-name", optValue(name, "NAME"), "Name to identify test store instance."); + } +}; + +struct Completer : public Runnable { + intrusive_ptr<PersistableMessage> message; + int usecs; + Completer(intrusive_ptr<PersistableMessage> m, int u) : message(m), usecs(u) {} + void run() { + qpid::sys::usleep(usecs); + message->enqueueComplete(); + delete this; + } +}; + +class TestStore : public NullMessageStore { + public: + TestStore(const string& name_, Broker& broker_) : name(name_), broker(broker_) {} + + ~TestStore() { + for_each(threads.begin(), threads.end(), boost::bind(&Thread::join, _1)); + } + + void enqueue(TransactionContext* , + const boost::intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& ) + { + string data = polymorphic_downcast<Message*>(msg.get())->getFrames().getContent(); + + // Check the message for special instructions. + size_t i, j; + if (starts_with(data, TEST_STORE_DO) + && (i = data.find(name+"[")) != string::npos + && (j = data.find("]", i)) != string::npos) + { + size_t start = i+name.size()+1; + string action = data.substr(start, j-start); + + if (action == EXCEPTION) { + throw Exception(QPID_MSG("TestStore " << name << " throwing exception for: " << data)); + } + else if (action == EXIT_PROCESS) { + // FIXME aconway 2009-04-10: this is a dubious way to + // close the process at best, it can cause assertions or seg faults + // rather than clean exit. + QPID_LOG(critical, "TestStore " << name << " forcing process exit for: " << data); + exit(0); + } + else if (starts_with(action, ASYNC)) { + std::string delayStr(action.substr(ASYNC.size())); + int delay = lexical_cast<int>(delayStr); + threads.push_back(Thread(*new Completer(msg, delay))); + } + else { + QPID_LOG(error, "TestStore " << name << " unknown action " << action); + msg->enqueueComplete(); + } + } + else + msg->enqueueComplete(); + } + + private: + static const string TEST_STORE_DO, EXCEPTION, EXIT_PROCESS, ASYNC; + string name; + Broker& broker; + vector<Thread> threads; +}; + +const string TestStore::TEST_STORE_DO = "TEST_STORE_DO: "; +const string TestStore::EXCEPTION = "exception"; +const string TestStore::EXIT_PROCESS = "exit_process"; +const string TestStore::ASYNC="async "; + +struct TestStorePlugin : public Plugin { + + TestStoreOptions options; + + Options* getOptions() { return &options; } + + void earlyInitialize (Plugin::Target& target) + { + Broker* broker = dynamic_cast<Broker*>(&target); + if (!broker) return; + broker->setStore (new TestStore(options.name, *broker)); + } + + void initialize(qpid::Plugin::Target&) {} +}; + +static TestStorePlugin pluginInstance; |