diff options
Diffstat (limited to 'cpp/src/qpid/broker/Link.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Link.cpp | 273 |
1 files changed, 199 insertions, 74 deletions
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp index 05b759f695..cdba18ccf9 100644 --- a/cpp/src/qpid/broker/Link.cpp +++ b/cpp/src/qpid/broker/Link.cpp @@ -19,50 +19,61 @@ * */ -#include "Link.h" -#include "LinkRegistry.h" -#include "Broker.h" -#include "Connection.h" -#include "qpid/agent/ManagementAgent.h" -#include "qpid/management/Link.h" +#include "qpid/broker/Link.h" +#include "qpid/broker/LinkRegistry.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/Connection.h" +#include "qmf/org/apache/qpid/broker/EventBrokerLinkUp.h" +#include "qmf/org/apache/qpid/broker/EventBrokerLinkDown.h" #include "boost/bind.hpp" #include "qpid/log/Statement.h" +#include "qpid/framing/enum.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/broker/AclModule.h" using namespace qpid::broker; using qpid::framing::Buffer; using qpid::framing::FieldTable; +using qpid::framing::NotAllowedException; +using qpid::framing::connection::CLOSE_CODE_CONNECTION_FORCED; using qpid::management::ManagementAgent; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; using qpid::sys::Mutex; +using std::stringstream; +namespace _qmf = qmf::org::apache::qpid::broker; Link::Link(LinkRegistry* _links, MessageStore* _store, string& _host, uint16_t _port, - bool _useSsl, + string& _transport, bool _durable, string& _authMechanism, string& _username, string& _password, Broker* _broker, - management::Manageable* parent) - : links(_links), store(_store), host(_host), port(_port), useSsl(_useSsl), durable(_durable), + Manageable* parent) + : links(_links), store(_store), host(_host), port(_port), + transport(_transport), + durable(_durable), authMechanism(_authMechanism), username(_username), password(_password), persistenceId(0), mgmtObject(0), broker(_broker), state(0), visitCount(0), currentInterval(1), closing(false), + updateUrls(false), channelCounter(1), - connection(0) + connection(0), + agent(0) { - if (parent != 0) + if (parent != 0 && broker != 0) { - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + agent = broker->getManagementAgent(); if (agent != 0) { - mgmtObject = new management::Link(agent, this, parent, _host, _port, _useSsl, _durable); + mgmtObject = new _qmf::Link(agent, this, parent, _host, _port, _transport, _durable); if (!durable) agent->addObject(mgmtObject); } @@ -73,7 +84,7 @@ Link::Link(LinkRegistry* _links, Link::~Link () { if (state == STATE_OPERATIONAL && connection != 0) - connection->close(); + connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management"); if (mgmtObject != 0) mgmtObject->resourceDestroy (); @@ -95,6 +106,7 @@ void Link::setStateLH (int newState) case STATE_OPERATIONAL : mgmtObject->set_state("Operational"); break; case STATE_FAILED : mgmtObject->set_state("Failed"); break; case STATE_CLOSED : mgmtObject->set_state("Closed"); break; + case STATE_PASSIVE : mgmtObject->set_state("Passive"); break; } } @@ -104,8 +116,9 @@ void Link::startConnectionLH () // Set the state before calling connect. It is possible that connect // will fail synchronously and call Link::closed before returning. setStateLH(STATE_CONNECTING); - broker->connect (host, port, useSsl, + broker->connect (host, port, transport, boost::bind (&Link::closed, this, _1, _2)); + QPID_LOG (debug, "Inter-broker link connecting to " << host << ":" << port); } catch(std::exception& e) { setStateLH(STATE_WAITING); if (mgmtObject != 0) @@ -115,27 +128,39 @@ void Link::startConnectionLH () void Link::established () { - Mutex::ScopedLock mutex(lock); + stringstream addr; + addr << host << ":" << port; - QPID_LOG (info, "Inter-broker link established to " << host << ":" << port); - setStateLH(STATE_OPERATIONAL); - currentInterval = 1; - visitCount = 0; - if (closing) - destroy(); + QPID_LOG (info, "Inter-broker link established to " << addr.str()); + agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str())); + { + Mutex::ScopedLock mutex(lock); + setStateLH(STATE_OPERATIONAL); + currentInterval = 1; + visitCount = 0; + if (closing) + destroy(); + } } void Link::closed (int, std::string text) { Mutex::ScopedLock mutex(lock); + QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text); connection = 0; - if (state == STATE_OPERATIONAL) - QPID_LOG (warning, "Inter-broker link disconnected from " << host << ":" << port); + if (state == STATE_OPERATIONAL) { + stringstream addr; + addr << host << ":" << port; + QPID_LOG (warning, "Inter-broker link disconnected from " << addr.str()); + agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str())); + } - for (Bridges::iterator i = active.begin(); i != active.end(); i++) + for (Bridges::iterator i = active.begin(); i != active.end(); i++) { + (*i)->closed(); created.push_back(*i); + } active.clear(); if (state != STATE_FAILED) @@ -149,32 +174,46 @@ void Link::closed (int, std::string text) destroy(); } -void Link::destroy () +void Link::checkClosePermission() { Mutex::ScopedLock mutex(lock); - Bridges toDelete; + + AclModule* acl = getBroker()->getAcl(); + std::string userID = getUsername() + "@" + getBroker()->getOptions().realm; + if (acl && !acl->authorise(userID,acl::ACT_DELETE,acl::OBJ_LINK,"")){ + throw NotAllowedException("ACL denied delete link request"); + } +} - QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management"); - if (connection) - connection->close(403, "closed by management"); - setStateLH(STATE_CLOSED); +void Link::destroy () +{ + Bridges toDelete; + { + Mutex::ScopedLock mutex(lock); - // Move the bridges to be deleted into a local vector so there is no - // corruption of the iterator caused by bridge deletion. - for (Bridges::iterator i = active.begin(); i != active.end(); i++) - toDelete.push_back(*i); - active.clear(); + QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management"); + if (connection) + connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management"); + + setStateLH(STATE_CLOSED); - for (Bridges::iterator i = created.begin(); i != created.end(); i++) - toDelete.push_back(*i); - created.clear(); + // Move the bridges to be deleted into a local vector so there is no + // corruption of the iterator caused by bridge deletion. + for (Bridges::iterator i = active.begin(); i != active.end(); i++) { + (*i)->closed(); + toDelete.push_back(*i); + } + active.clear(); - // Now delete all bridges on this link. + for (Bridges::iterator i = created.begin(); i != created.end(); i++) + toDelete.push_back(*i); + created.clear(); + } + // Now delete all bridges on this link (don't hold the lock for this). for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++) (*i)->destroy(); toDelete.clear(); - links->destroy (host, port); } @@ -186,21 +225,27 @@ void Link::add(Bridge::shared_ptr bridge) void Link::cancel(Bridge::shared_ptr bridge) { - Mutex::ScopedLock mutex(lock); - - for (Bridges::iterator i = created.begin(); i != created.end(); i++) { - if ((*i).get() == bridge.get()) { - created.erase(i); - break; + { + Mutex::ScopedLock mutex(lock); + + for (Bridges::iterator i = created.begin(); i != created.end(); i++) { + if ((*i).get() == bridge.get()) { + created.erase(i); + break; + } } - } - for (Bridges::iterator i = active.begin(); i != active.end(); i++) { - if ((*i).get() == bridge.get()) { - bridge->cancel(); - active.erase(i); - break; + for (Bridges::iterator i = active.begin(); i != active.end(); i++) { + if ((*i).get() == bridge.get()) { + cancellations.push_back(bridge); + bridge->closed(); + active.erase(i); + break; + } } } + if (!cancellations.empty()) { + connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); + } } void Link::ioThreadProcessing() @@ -209,8 +254,9 @@ void Link::ioThreadProcessing() if (state != STATE_OPERATIONAL) return; + QPID_LOG(debug, "Link::ioThreadProcessing()"); - //process any pending creates + //process any pending creates and/or cancellations if (!created.empty()) { for (Bridges::iterator i = created.begin(); i != created.end(); ++i) { active.push_back(*i); @@ -218,34 +264,77 @@ void Link::ioThreadProcessing() } created.clear(); } + if (!cancellations.empty()) { + for (Bridges::iterator i = cancellations.begin(); i != cancellations.end(); ++i) { + (*i)->cancel(*connection); + } + cancellations.clear(); + } } void Link::setConnection(Connection* c) { Mutex::ScopedLock mutex(lock); connection = c; + updateUrls = true; } void Link::maintenanceVisit () { Mutex::ScopedLock mutex(lock); + if (connection && updateUrls) { + urls.reset(connection->getKnownHosts()); + QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << urls); + updateUrls = false; + } + if (state == STATE_WAITING) { visitCount++; if (visitCount >= currentInterval) { visitCount = 0; - currentInterval *= 2; - if (currentInterval > MAX_INTERVAL) - currentInterval = MAX_INTERVAL; - startConnectionLH(); + //switch host and port to next in url list if possible + if (!tryFailover()) { + currentInterval *= 2; + if (currentInterval > MAX_INTERVAL) + currentInterval = MAX_INTERVAL; + startConnectionLH(); + } } } - else if (state == STATE_OPERATIONAL && !created.empty() && connection != 0) + else if (state == STATE_OPERATIONAL && (!created.empty() || !cancellations.empty()) && connection != 0) connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); } +void Link::reconnect(const qpid::TcpAddress& a) +{ + Mutex::ScopedLock mutex(lock); + host = a.host; + port = a.port; + startConnectionLH(); + if (mgmtObject != 0) { + stringstream errorString; + errorString << "Failed over to " << a; + mgmtObject->set_lastError(errorString.str()); + } +} + +bool Link::tryFailover() +{ + //TODO: urls only work for TCP at present, update when that has changed + TcpAddress next; + if (transport == Broker::TCP_TRANSPORT && urls.next(next) && + (next.host != host || next.port != port)) { + links->changeAddress(TcpAddress(host, port), next); + QPID_LOG(debug, "Link failing over to " << host << ":" << port); + return true; + } else { + return false; + } +} + uint Link::nextChannel() { Mutex::ScopedLock mutex(lock); @@ -265,7 +354,7 @@ void Link::notifyConnectionForced(const string text) void Link::setPersistenceId(uint64_t id) const { if (mgmtObject != 0 && persistenceId == 0) { - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = broker->getManagementAgent(); agent->addObject(mgmtObject, id); } persistenceId = id; @@ -280,19 +369,20 @@ Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer) { string host; uint16_t port; + string transport; string authMechanism; string username; string password; buffer.getShortString(host); port = buffer.getShort(); - bool useSsl(buffer.getOctet()); + buffer.getShortString(transport); bool durable(buffer.getOctet()); buffer.getShortString(authMechanism); buffer.getShortString(username); buffer.getShortString(password); - return links.declare(host, port, useSsl, durable, authMechanism, username, password).first; + return links.declare(host, port, transport, durable, authMechanism, username, password).first; } void Link::encode(Buffer& buffer) const @@ -300,7 +390,7 @@ void Link::encode(Buffer& buffer) const buffer.putShortString(string("link")); buffer.putShortString(host); buffer.putShort(port); - buffer.putOctet(useSsl ? 1 : 0); + buffer.putShortString(transport); buffer.putOctet(durable ? 1 : 0); buffer.putShortString(authMechanism); buffer.putShortString(username); @@ -312,7 +402,7 @@ uint32_t Link::encodedSize() const return host.size() + 1 // short-string (host) + 5 // short-string ("link") + 2 // port - + 1 // useSsl + + transport.size() + 1 // short-string(transport) + 1 // durable + authMechanism.size() + 1 + username.size() + 1 @@ -324,27 +414,48 @@ ManagementObject* Link::GetManagementObject (void) const return (ManagementObject*) mgmtObject; } -Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args) +Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& text) { switch (op) { - case management::Link::METHOD_CLOSE : - closing = true; - if (state != STATE_CONNECTING) - destroy(); + case _qmf::Link::METHOD_CLOSE : + checkClosePermission(); + if (!closing) { + closing = true; + if (state != STATE_CONNECTING && connection) { + //connection can only be closed on the connections own IO processing thread + connection->requestIOProcessing(boost::bind(&Link::destroy, this)); + } + } return Manageable::STATUS_OK; - case management::Link::METHOD_BRIDGE : - management::ArgsLinkBridge& iargs = (management::ArgsLinkBridge&) args; + case _qmf::Link::METHOD_BRIDGE : + _qmf::ArgsLinkBridge& iargs = (_qmf::ArgsLinkBridge&) args; + QPID_LOG(debug, "Link::bridge() request received"); // Durable bridges are only valid on durable links - if (iargs.i_durable && !durable) - return Manageable::STATUS_INVALID_PARAMETER; + if (iargs.i_durable && !durable) { + text = "Can't create a durable route on a non-durable link"; + return Manageable::STATUS_USER; + } + + if (iargs.i_dynamic) { + Exchange::shared_ptr exchange = getBroker()->getExchanges().get(iargs.i_src); + if (exchange.get() == 0) { + text = "Exchange not found"; + return Manageable::STATUS_USER; + } + if (!exchange->supportsDynamicBinding()) { + text = "Exchange type does not support dynamic routing"; + return Manageable::STATUS_USER; + } + } std::pair<Bridge::shared_ptr, bool> result = links->declare (host, port, iargs.i_durable, iargs.i_src, iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue, - iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes); + iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes, + iargs.i_dynamic, iargs.i_sync); if (result.second && iargs.i_durable) store->create(*result.first); @@ -354,3 +465,17 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args return Manageable::STATUS_UNKNOWN_METHOD; } + +void Link::setPassive(bool passive) +{ + Mutex::ScopedLock mutex(lock); + if (passive) { + setStateLH(STATE_PASSIVE); + } else { + if (state == STATE_PASSIVE) { + setStateLH(STATE_WAITING); + } else { + QPID_LOG(warning, "Ignoring attempt to activate non-passive link"); + } + } +} |