From a0c96ada9fa6a792ad9fd57356a4e19ead8b030d Mon Sep 17 00:00:00 2001 From: Kenneth Anthony Giusti Date: Tue, 1 May 2012 13:57:13 +0000 Subject: QPID-3963: subscribe link to remote broker's to amq.failover exchange. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1332653 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/Link.cpp | 129 ++++++++++++++++++++++++++++++++++++-- qpid/cpp/src/qpid/broker/Link.h | 6 +- 2 files changed, 127 insertions(+), 8 deletions(-) diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index 855063a6ad..bbffa93a53 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -31,6 +31,8 @@ #include "qpid/framing/enum.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/broker/AclModule.h" +#include "qpid/broker/Exchange.h" +#include "qpid/UrlArray.h" namespace qpid { namespace broker { @@ -65,6 +67,50 @@ struct LinkTimerTask : public sys::TimerTask { sys::Timer& timer; }; + +namespace { + const std::string FAILOVER_EXCHANGE("amq.failover"); + const std::string FAILOVER_HEADER_KEY("amq.failover"); + const framing::ChannelId FAILOVER_CHANNEL(framing::CHANNEL_HIGH_BIT | 1); // reserved for this link +} + +/** LinkExchange is used by the link to subscribe to the remote broker's amq.failover exchange. + */ +class LinkExchange : public broker::Exchange +{ +public: + LinkExchange(Link& _link, const std::string& name) : Exchange(name), link(_link) {} + ~LinkExchange() {}; + std::string getType() const { return std::string("qpid.LinkExchange"); } + + // Exchange methods - set up to prevent binding/unbinding etc from clients! + bool bind(boost::shared_ptr, const std::string&, const framing::FieldTable*) { return false; } + bool unbind(boost::shared_ptr, const std::string&, const framing::FieldTable*) { return false; } + bool isBound(boost::shared_ptr, const std::string* const, const framing::FieldTable* const) {return false;} + + // Process messages sent from the remote's amq.failover exchange by extracting the failover URLs + // and saving them should the Link need to reconnect. + void route(broker::Deliverable& msg) + { + const framing::FieldTable* headers = msg.getMessage().getApplicationHeaders(); + framing::Array addresses; + if (headers && headers->getArray(FAILOVER_HEADER_KEY, addresses)) { + // convert the Array of addresses to a single Url container for used with setUrl(): + std::vector urlVec; + Url urls; + urlVec = urlArrayToVector(addresses); + for(size_t i = 0; i < urlVec.size(); ++i) + urls.insert(urls.end(), urlVec[i].begin(), urlVec[i].end()); + QPID_LOG(debug, "Remote broker has provided these failover addresses= " << urls); + link.setUrl(urls); + } + } + +private: + Link& link; +}; + + Link::Link(LinkRegistry* _links, MessageStore* _store, const string& _host, @@ -106,15 +152,22 @@ Link::Link(LinkRegistry* _links, startConnectionLH(); } broker->getTimer().add(timerTask); + + exchange.reset(new broker::LinkExchange(*this, + "qpid.link." + framing::Uuid(true).str())); + broker->getExchanges().registerExchange(exchange); } Link::~Link () { - if (state == STATE_OPERATIONAL && connection != 0) - connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management"); + if (state == STATE_OPERATIONAL && connection != 0) { + closeConnection("closed by management"); + } if (mgmtObject != 0) mgmtObject->resourceDestroy (); + + broker->getExchanges().destroy(exchange->getName()); } void Link::setStateLH (int newState) @@ -185,6 +238,20 @@ void Link::setUrl(const Url& u) { reconnectNext = 0; } + +namespace { + /** invoked when session used to subscribe to remote's amq.failover exchange detaches */ + void sessionDetached(Link *link) { + // ??? really not sure what the right thing to do here, if anything... + // ??? Q: do I need to cancel the subscription and detached the session in the I/O thread (???) + // e.g: + //peer->getMessage().cancel(args.i_dest); + //peer->getSession().detach(name); + QPID_LOG(debug, "detached from 'amq.failover' for link: " << link->getName()); + } +} + + void Link::opened() { Mutex::ScopedLock mutex(lock); if (!connection) return; @@ -198,6 +265,40 @@ void Link::opened() { reconnectNext = 0; QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << url); } + + // + // attempt to subscribe to failover exchange for updates from remote + // + + const std::string queueName = "qpid.link." + framing::Uuid(true).str(); + + SessionHandler& sessionHandler = connection->getChannel(FAILOVER_CHANNEL); + sessionHandler.setDetachedCallback( boost::bind(&sessionDetached, this) ); + sessionHandler.attachAs(getName()); + + framing::AMQP_ServerProxy remoteBroker(sessionHandler.out); + + remoteBroker.getQueue().declare(queueName, + "", // alt-exchange + false, // passive + false, // durable + true, // exclusive + true, // auto-delete + FieldTable()); + remoteBroker.getExchange().bind(queueName, + FAILOVER_EXCHANGE, + "", // no key + FieldTable()); + remoteBroker.getMessage().subscribe(queueName, + exchange->getName(), + 1, // implied-accept mode + 0, // pre-acquire mode + false, // exclusive + "", // resume-id + 0, // resume-ttl + FieldTable()); + remoteBroker.getMessage().flow(exchange->getName(), 0, 0xFFFFFFFF); + remoteBroker.getMessage().flow(exchange->getName(), 1, 0xFFFFFFFF); } void Link::closed(int, std::string text) @@ -241,9 +342,7 @@ void Link::destroy () Mutex::ScopedLock mutex(lock); QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management"); - if (connection) - connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management"); - connection = 0; + closeConnection("closed by management"); setStateLH(STATE_CLOSED); // Move the bridges to be deleted into a local vector so there is no @@ -399,7 +498,8 @@ bool Link::hideManagement() const { uint Link::nextChannel() { Mutex::ScopedLock mutex(lock); - + if (channelCounter >= framing::CHANNEL_MAX) + channelCounter = 1; return channelCounter++; } @@ -542,4 +642,21 @@ void Link::setPassive(bool passive) } } + +/** utility to clean up connection resources correctly */ +void Link::closeConnection( const std::string& reason) +{ + if (connection != 0) { + // cancel our subscription to the failover exchange + SessionHandler& sessionHandler = connection->getChannel(FAILOVER_CHANNEL); + if (sessionHandler.getSession()) { + framing::AMQP_ServerProxy remoteBroker(sessionHandler.out); + remoteBroker.getMessage().cancel(exchange->getName()); + remoteBroker.getSession().detach(getName()); + } + connection->close(CLOSE_CODE_CONNECTION_FORCED, reason); + connection = 0; + } +} + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h index c7c8209db3..a941aee4f3 100644 --- a/qpid/cpp/src/qpid/broker/Link.h +++ b/qpid/cpp/src/qpid/broker/Link.h @@ -47,6 +47,7 @@ namespace broker { class LinkRegistry; class Broker; class Connection; +class LinkExchange; class Link : public PersistableConfig, public management::Manageable { private: @@ -77,8 +78,8 @@ class Link : public PersistableConfig, public management::Manageable { uint channelCounter; Connection* connection; management::ManagementAgent* agent; - boost::intrusive_ptr timerTask; + boost::shared_ptr exchange; static const int STATE_WAITING = 1; static const int STATE_CONNECTING = 2; @@ -100,8 +101,9 @@ class Link : public PersistableConfig, public management::Manageable { void opened(); // Called when connection is open (after create) void closed(int, std::string); // Called when connection goes away void reconnectLH(const Address&); //called by LinkRegistry + void closeConnection(const std::string& reason); - friend class LinkRegistry; // to call established, opened, closed + friend class LinkRegistry; // to call established, opened, closed public: typedef boost::shared_ptr shared_ptr; -- cgit v1.2.1