diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Link.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Link.cpp | 388 |
1 files changed, 317 insertions, 71 deletions
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index 56a90e7fb7..1be388b989 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -31,6 +31,8 @@ #include "qpid/framing/enum.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/broker/AclModule.h" +#include "qpid/broker/Exchange.h" +#include "qpid/UrlArray.h" namespace qpid { namespace broker { @@ -48,6 +50,13 @@ using std::stringstream; using std::string; namespace _qmf = ::qmf::org::apache::qpid::broker; + +namespace { + const std::string FAILOVER_EXCHANGE("amq.failover"); + const std::string FAILOVER_HEADER_KEY("amq.failover"); +} + + struct LinkTimerTask : public sys::TimerTask { LinkTimerTask(Link& l, sys::Timer& t) : TimerTask(int64_t(l.getBroker()->getOptions().linkMaintenanceInterval* @@ -65,19 +74,73 @@ struct LinkTimerTask : public sys::TimerTask { sys::Timer& timer; }; -Link::Link(LinkRegistry* _links, - MessageStore* _store, + + +/** LinkExchange is used by the link to subscribe to the remote broker's amq.failover exchange. + */ +class LinkExchange : public broker::Exchange +{ +public: + LinkExchange(const std::string& name) : Exchange(name), link(0) {} + ~LinkExchange() {}; + std::string getType() const { return Link::exchangeTypeName; } + + // Exchange methods - set up to prevent binding/unbinding etc from clients! + bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*) { return false; } + bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*) { return false; } + bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const) {return false;} + + // Process messages sent from the remote's amq.failover exchange by extracting the failover URLs + // and saving them should the Link need to reconnect. + void route(broker::Deliverable& msg) + { + if (!link) return; + const framing::FieldTable* headers = msg.getMessage().getApplicationHeaders(); + framing::Array addresses; + if (headers && headers->getArray(FAILOVER_HEADER_KEY, addresses)) { + // convert the Array of addresses to a single Url container for used with setUrl(): + std::vector<Url> urlVec; + Url urls; + urlVec = urlArrayToVector(addresses); + for(size_t i = 0; i < urlVec.size(); ++i) + urls.insert(urls.end(), urlVec[i].begin(), urlVec[i].end()); + QPID_LOG(debug, "Remote broker has provided these failover addresses= " << urls); + link->setUrl(urls); + } + } + + void setLink(Link *_link) + { + assert(!link); + link = _link; + } + +private: + Link *link; +}; + + +boost::shared_ptr<Exchange> Link::linkExchangeFactory( const std::string& _name ) +{ + return Exchange::shared_ptr(new LinkExchange(_name)); +} + +Link::Link(const string& _name, + LinkRegistry* _links, const string& _host, uint16_t _port, const string& _transport, + DestroyedListener l, bool _durable, const string& _authMechanism, const string& _username, const string& _password, Broker* _broker, - Manageable* parent) - : links(_links), store(_store), host(_host), port(_port), - transport(_transport), + Manageable* parent, + bool failover_) + : name(_name), links(_links), + configuredTransport(_transport), configuredHost(_host), configuredPort(_port), + host(_host), port(_port), transport(_transport), durable(_durable), authMechanism(_authMechanism), username(_username), password(_password), persistenceId(0), mgmtObject(0), broker(_broker), state(0), @@ -88,14 +151,20 @@ Link::Link(LinkRegistry* _links, channelCounter(1), connection(0), agent(0), - timerTask(new LinkTimerTask(*this, broker->getTimer())) + listener(l), + timerTask(new LinkTimerTask(*this, broker->getTimer())), + failover(failover_), + failoverChannel(0) { if (parent != 0 && broker != 0) { agent = broker->getManagementAgent(); if (agent != 0) { - mgmtObject = new _qmf::Link(agent, this, parent, _host, _port, _transport, _durable); + mgmtObject = new _qmf::Link(agent, this, parent, name, durable); + mgmtObject->set_host(host); + mgmtObject->set_port(port); + mgmtObject->set_transport(transport); agent->addObject(mgmtObject, 0, durable); } } @@ -106,15 +175,29 @@ Link::Link(LinkRegistry* _links, startConnectionLH(); } broker->getTimer().add(timerTask); + + if (failover) { + stringstream exchangeName; + exchangeName << "qpid.link." << name; + std::pair<Exchange::shared_ptr, bool> rc = + broker->getExchanges().declare(exchangeName.str(), exchangeTypeName); + failoverExchange = boost::static_pointer_cast<LinkExchange>(rc.first); + assert(failoverExchange); + failoverExchange->setLink(this); + } } Link::~Link () { - if (state == STATE_OPERATIONAL && connection != 0) - connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management"); + if (state == STATE_OPERATIONAL && connection != 0) { + closeConnection("closed by management"); + } if (mgmtObject != 0) mgmtObject->resourceDestroy (); + + if (failover) + broker->getExchanges().destroy(failoverExchange->getName()); } void Link::setStateLH (int newState) @@ -172,6 +255,7 @@ void Link::established(Connection* c) currentInterval = 1; visitCount = 0; connection = c; + if (closing) destroy(); else // Process any IO tasks bridges added before established. @@ -180,14 +264,34 @@ void Link::established(Connection* c) void Link::setUrl(const Url& u) { + QPID_LOG(info, "Setting remote broker failover addresses for link '" << getName() << "' to these urls: " << u); Mutex::ScopedLock mutex(lock); url = u; reconnectNext = 0; } + +namespace { +class DetachedCallback : public SessionHandler::ErrorListener { + public: + DetachedCallback(const Link& link) : name(link.getName()) {} + void connectionException(framing::connection::CloseCode, const std::string&) {} + void channelException(framing::session::DetachCode, const std::string&) {} + void executionException(framing::execution::ErrorCode, const std::string&) {} + void detach() {} + private: + const std::string name; +}; +} + void Link::opened() { Mutex::ScopedLock mutex(lock); if (!connection) return; + + if (!hideManagement() && connection->GetManagementObject()) { + mgmtObject->set_connectionRef(connection->GetManagementObject()->getObjectId()); + } + // Get default URL from known-hosts if not already set if (url.empty()) { const std::vector<Url>& known = connection->getKnownHosts(); @@ -198,6 +302,45 @@ void Link::opened() { reconnectNext = 0; QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << url); } + + if (failover) { + // + // attempt to subscribe to failover exchange for updates from remote + // + + const std::string queueName = "qpid.link." + framing::Uuid(true).str(); + failoverChannel = nextChannel(); + + SessionHandler& sessionHandler = connection->getChannel(failoverChannel); + sessionHandler.setErrorListener( + boost::shared_ptr<SessionHandler::ErrorListener>(new DetachedCallback(*this))); + failoverSession = queueName; + sessionHandler.attachAs(failoverSession); + + framing::AMQP_ServerProxy remoteBroker(sessionHandler.out); + + remoteBroker.getQueue().declare(queueName, + "", // alt-exchange + false, // passive + false, // durable + true, // exclusive + true, // auto-delete + FieldTable()); + remoteBroker.getExchange().bind(queueName, + FAILOVER_EXCHANGE, + "", // no key + FieldTable()); + remoteBroker.getMessage().subscribe(queueName, + failoverExchange->getName(), + 1, // implied-accept mode + 0, // pre-acquire mode + false, // exclusive + "", // resume-id + 0, // resume-ttl + FieldTable()); + remoteBroker.getMessage().flow(failoverExchange->getName(), 0, 0xFFFFFFFF); + remoteBroker.getMessage().flow(failoverExchange->getName(), 1, 0xFFFFFFFF); + } } void Link::closed(int, std::string text) @@ -206,11 +349,14 @@ void Link::closed(int, std::string text) QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text); connection = 0; - if (state == STATE_OPERATIONAL) { - stringstream addr; - addr << host << ":" << port; - if (!hideManagement() && agent) + + if (!hideManagement()) { + mgmtObject->set_connectionRef(qpid::management::ObjectId()); + if (state == STATE_OPERATIONAL && agent) { + stringstream addr; + addr << host << ":" << port; agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str())); + } } for (Bridges::iterator i = active.begin(); i != active.end(); i++) { @@ -225,22 +371,19 @@ void Link::closed(int, std::string text) if (!hideManagement()) mgmtObject->set_lastError (text); } - - if (closing) - destroy(); } -// Called in connection IO thread. +// Called in connection IO thread, cleans up the connection before destroying Link void Link::destroy () { Bridges toDelete; + + timerTask->cancel(); // call prior to locking so maintenance visit can finish { Mutex::ScopedLock mutex(lock); - QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management"); - if (connection) - connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management"); - connection = 0; + QPID_LOG (info, "Inter-broker link to " << configuredHost << ":" << configuredPort << " removed by management"); + closeConnection("closed by management"); setStateLH(STATE_CLOSED); // Move the bridges to be deleted into a local vector so there is no @@ -254,14 +397,13 @@ void Link::destroy () for (Bridges::iterator i = created.begin(); i != created.end(); i++) toDelete.push_back(*i); created.clear(); - - timerTask->cancel(); } + // Now delete all bridges on this link (don't hold the lock for this). for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++) - (*i)->destroy(); + (*i)->close(); toDelete.clear(); - links->destroy (host, port); + listener(this); // notify LinkRegistry that this Link has been destroyed } void Link::add(Bridge::shared_ptr bridge) @@ -303,13 +445,13 @@ void Link::ioThreadProcessing() { Mutex::ScopedLock mutex(lock); - if (state != STATE_OPERATIONAL) + if (state != STATE_OPERATIONAL || closing) return; // check for bridge session errors and recover if (!active.empty()) { Bridges::iterator removed = std::remove_if( - active.begin(), active.end(), !boost::bind(&Bridge::isSessionReady, _1)); + active.begin(), active.end(), boost::bind(&Bridge::isDetached, _1)); for (Bridges::iterator i = removed; i != active.end(); ++i) { Bridge::shared_ptr bridge = *i; bridge->closed(); @@ -340,7 +482,7 @@ void Link::ioThreadProcessing() void Link::maintenanceVisit () { Mutex::ScopedLock mutex(lock); - + if (closing) return; if (state == STATE_WAITING) { visitCount++; @@ -358,19 +500,23 @@ void Link::maintenanceVisit () } else if (state == STATE_OPERATIONAL && (!active.empty() || !created.empty() || !cancellations.empty()) && connection != 0) connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); - } +} void Link::reconnectLH(const Address& a) { host = a.host; port = a.port; transport = a.protocol; - startConnectionLH(); + if (!hideManagement()) { stringstream errorString; - errorString << "Failed over to " << a; + errorString << "Failing over to " << a; mgmtObject->set_lastError(errorString.str()); + mgmtObject->set_host(host); + mgmtObject->set_port(port); + mgmtObject->set_transport(transport); } + startConnectionLH(); } bool Link::tryFailoverLH() { @@ -379,15 +525,14 @@ bool Link::tryFailoverLH() { if (url.empty()) return false; Address next = url[reconnectNext++]; if (next.host != host || next.port != port || next.protocol != transport) { - links->changeAddress(Address(transport, host, port), next); - QPID_LOG(debug, "Inter-broker link failing over to " << next.host << ":" << next.port); + QPID_LOG(notice, "Inter-broker link '" << name << "' failing over to " << next); reconnectLH(next); return true; } return false; } -// Management updates for a linke are inconsistent in a cluster, so they are +// Management updates for a link are inconsistent in a cluster, so they are // suppressed. bool Link::hideManagement() const { return !mgmtObject || ( broker && broker->isInCluster()); @@ -396,7 +541,8 @@ bool Link::hideManagement() const { uint Link::nextChannel() { Mutex::ScopedLock mutex(lock); - + if (channelCounter >= framing::CHANNEL_MAX) + channelCounter = 1; return channelCounter++; } @@ -415,18 +561,34 @@ void Link::setPersistenceId(uint64_t id) const const string& Link::getName() const { - return host; + return name; +} + +const std::string Link::ENCODED_IDENTIFIER("link.v2"); +const std::string Link::ENCODED_IDENTIFIER_V1("link"); + +bool Link::isEncodedLink(const std::string& key) +{ + return key == ENCODED_IDENTIFIER || key == ENCODED_IDENTIFIER_V1; } Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer) { + string kind; + buffer.getShortString(kind); + string host; uint16_t port; string transport; string authMechanism; string username; string password; + string name; + if (kind == ENCODED_IDENTIFIER) { + // newer version provides a link name. + buffer.getShortString(name); + } buffer.getShortString(host); port = buffer.getShort(); buffer.getShortString(transport); @@ -435,15 +597,24 @@ Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer) buffer.getShortString(username); buffer.getShortString(password); - return links.declare(host, port, transport, durable, authMechanism, username, password).first; + if (kind == ENCODED_IDENTIFIER_V1) { + /** previous versions identified the Link by host:port, there was no name + * assigned. So create a name for the new Link. + */ + name = createName(transport, host, port); + } + + return links.declare(name, host, port, transport, durable, authMechanism, + username, password).first; } void Link::encode(Buffer& buffer) const { - buffer.putShortString(string("link")); - buffer.putShortString(host); - buffer.putShort(port); - buffer.putShortString(transport); + buffer.putShortString(ENCODED_IDENTIFIER); + buffer.putShortString(name); + buffer.putShortString(configuredHost); + buffer.putShort(configuredPort); + buffer.putShortString(configuredTransport); buffer.putOctet(durable ? 1 : 0); buffer.putShortString(authMechanism); buffer.putShortString(username); @@ -452,10 +623,11 @@ void Link::encode(Buffer& buffer) const uint32_t Link::encodedSize() const { - return host.size() + 1 // short-string (host) - + 5 // short-string ("link") + return ENCODED_IDENTIFIER.size() + 1 // +1 byte length + + name.size() + 1 + + configuredHost.size() + 1 // short-string (host) + 2 // port - + transport.size() + 1 // short-string(transport) + + configuredTransport.size() + 1 // short-string(transport) + 1 // durable + authMechanism.size() + 1 + username.size() + 1 @@ -468,6 +640,7 @@ ManagementObject* Link::GetManagementObject (void) const } void Link::close() { + QPID_LOG(debug, "Link::close(), link=" << name ); Mutex::ScopedLock mutex(lock); if (!closing) { closing = true; @@ -488,36 +661,31 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& te return Manageable::STATUS_OK; case _qmf::Link::METHOD_BRIDGE : + /* TBD: deprecate this interface in favor of the Broker::create() method. The + * Broker::create() method allows the user to assign a name to the bridge. + */ + QPID_LOG(info, "The Link::bridge() method will be removed in a future release of QPID." + " Please use the Broker::create() method with type='bridge' instead."); _qmf::ArgsLinkBridge& iargs = (_qmf::ArgsLinkBridge&) args; - QPID_LOG(debug, "Link::bridge() request received"); - - // Durable bridges are only valid on durable links - if (iargs.i_durable && !durable) { - text = "Can't create a durable route on a non-durable link"; - return Manageable::STATUS_USER; - } - - if (iargs.i_dynamic) { - Exchange::shared_ptr exchange = getBroker()->getExchanges().get(iargs.i_src); - if (exchange.get() == 0) { - text = "Exchange not found"; - return Manageable::STATUS_USER; - } - if (!exchange->supportsDynamicBinding()) { - text = "Exchange type does not support dynamic routing"; - return Manageable::STATUS_USER; + QPID_LOG(debug, "Link::bridge() request received; src=" << iargs.i_src << + "; dest=" << iargs.i_dest << "; key=" << iargs.i_key); + + // Does a bridge already exist that has the src/dest/key? If so, re-use the + // existing bridge - this behavior is backward compatible with previous releases. + Bridge::shared_ptr bridge = links->getBridge(*this, iargs.i_src, iargs.i_dest, iargs.i_key); + if (!bridge) { + // need to create a new bridge on this link. + std::pair<Bridge::shared_ptr, bool> rc = + links->declare( Bridge::createName(name, iargs.i_src, iargs.i_dest, iargs.i_key), + *this, iargs.i_durable, + iargs.i_src, iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue, + iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes, + iargs.i_dynamic, iargs.i_sync); + if (!rc.first) { + text = "invalid parameters"; + return Manageable::STATUS_PARAMETER_INVALID; } } - - std::pair<Bridge::shared_ptr, bool> result = - links->declare (host, port, iargs.i_durable, iargs.i_src, - iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue, - iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes, - iargs.i_dynamic, iargs.i_sync); - - if (result.second && iargs.i_durable) - store->create(*result.first); - return Manageable::STATUS_OK; } @@ -539,4 +707,82 @@ void Link::setPassive(bool passive) } } + +/** utility to clean up connection resources correctly */ +void Link::closeConnection( const std::string& reason) +{ + if (connection != 0) { + // cancel our subscription to the failover exchange + if (failover) { + SessionHandler& sessionHandler = connection->getChannel(failoverChannel); + if (sessionHandler.getSession()) { + framing::AMQP_ServerProxy remoteBroker(sessionHandler.out); + remoteBroker.getMessage().cancel(failoverExchange->getName()); + remoteBroker.getSession().detach(failoverSession); + } + } + connection->close(CLOSE_CODE_CONNECTION_FORCED, reason); + connection = 0; + } +} + +/** returns the current remote's address, and connection state */ +bool Link::getRemoteAddress(qpid::Address& addr) const +{ + addr.protocol = transport; + addr.host = host; + addr.port = port; + + return state == STATE_OPERATIONAL; +} + + +// FieldTable keys for internal state data +namespace { + const std::string FAILOVER_ADDRESSES("failover-addresses"); + const std::string FAILOVER_INDEX("failover-index"); +} + +void Link::getState(framing::FieldTable& state) const +{ + state.clear(); + Mutex::ScopedLock mutex(lock); + if (!url.empty()) { + state.setString(FAILOVER_ADDRESSES, url.str()); + state.setInt(FAILOVER_INDEX, reconnectNext); + } +} + +void Link::setState(const framing::FieldTable& state) +{ + Mutex::ScopedLock mutex(lock); + if (state.isSet(FAILOVER_ADDRESSES)) { + Url failovers(state.getAsString(FAILOVER_ADDRESSES)); + setUrl(failovers); + } + if (state.isSet(FAILOVER_INDEX)) { + reconnectNext = state.getAsInt(FAILOVER_INDEX); + } +} + +std::string Link::createName(const std::string& transport, + const std::string& host, + uint16_t port) +{ + stringstream linkName; + linkName << QPID_NAME_PREFIX << transport << std::string(":") + << host << std::string(":") << port; + return linkName.str(); +} + + +bool Link::pendingConnection(const std::string& _host, uint16_t _port) const +{ + Mutex::ScopedLock mutex(lock); + return (isConnecting() && _port == port && _host == host); +} + + +const std::string Link::exchangeTypeName("qpid.LinkExchange"); + }} // namespace qpid::broker |