diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Link.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Link.cpp | 474 |
1 files changed, 474 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp new file mode 100644 index 0000000000..9ab4379a69 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -0,0 +1,474 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/broker/Link.h" +#include "qpid/broker/LinkRegistry.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/Connection.h" +#include "qmf/org/apache/qpid/broker/EventBrokerLinkUp.h" +#include "qmf/org/apache/qpid/broker/EventBrokerLinkDown.h" +#include "boost/bind.hpp" +#include "qpid/log/Statement.h" +#include "qpid/framing/enum.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/broker/AclModule.h" + +using namespace qpid::broker; +using qpid::framing::Buffer; +using qpid::framing::FieldTable; +using qpid::framing::UnauthorizedAccessException; +using qpid::framing::connection::CLOSE_CODE_CONNECTION_FORCED; +using qpid::management::ManagementAgent; +using qpid::management::ManagementObject; +using qpid::management::Manageable; +using qpid::management::Args; +using qpid::sys::Mutex; +using std::stringstream; +using std::string; +namespace _qmf = qmf::org::apache::qpid::broker; + +Link::Link(LinkRegistry* _links, + MessageStore* _store, + string& _host, + uint16_t _port, + string& _transport, + bool _durable, + string& _authMechanism, + string& _username, + string& _password, + Broker* _broker, + Manageable* parent) + : links(_links), store(_store), host(_host), port(_port), + transport(_transport), + durable(_durable), + authMechanism(_authMechanism), username(_username), password(_password), + persistenceId(0), mgmtObject(0), broker(_broker), state(0), + visitCount(0), + currentInterval(1), + closing(false), + updateUrls(false), + channelCounter(1), + connection(0), + agent(0) +{ + if (parent != 0 && broker != 0) + { + agent = broker->getManagementAgent(); + if (agent != 0) + { + mgmtObject = new _qmf::Link(agent, this, parent, _host, _port, _transport, _durable); + agent->addObject(mgmtObject, 0, durable); + } + } + setStateLH(STATE_WAITING); +} + +Link::~Link () +{ + if (state == STATE_OPERATIONAL && connection != 0) + connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management"); + + if (mgmtObject != 0) + mgmtObject->resourceDestroy (); +} + +void Link::setStateLH (int newState) +{ + if (newState == state) + return; + + state = newState; + + if (hideManagement()) + return; + + switch (state) + { + case STATE_WAITING : mgmtObject->set_state("Waiting"); break; + case STATE_CONNECTING : mgmtObject->set_state("Connecting"); break; + case STATE_OPERATIONAL : mgmtObject->set_state("Operational"); break; + case STATE_FAILED : mgmtObject->set_state("Failed"); break; + case STATE_CLOSED : mgmtObject->set_state("Closed"); break; + case STATE_PASSIVE : mgmtObject->set_state("Passive"); break; + } +} + +void Link::startConnectionLH () +{ + try { + // Set the state before calling connect. It is possible that connect + // will fail synchronously and call Link::closed before returning. + setStateLH(STATE_CONNECTING); + broker->connect (host, boost::lexical_cast<std::string>(port), transport, + boost::bind (&Link::closed, this, _1, _2)); + QPID_LOG (debug, "Inter-broker link connecting to " << host << ":" << port); + } catch(std::exception& e) { + setStateLH(STATE_WAITING); + if (!hideManagement()) + mgmtObject->set_lastError (e.what()); + } +} + +void Link::established () +{ + stringstream addr; + addr << host << ":" << port; + QPID_LOG (info, "Inter-broker link established to " << addr.str()); + + if (!hideManagement() && agent) + agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str())); + + { + Mutex::ScopedLock mutex(lock); + setStateLH(STATE_OPERATIONAL); + currentInterval = 1; + visitCount = 0; + if (closing) + destroy(); + } +} + +void Link::closed (int, std::string text) +{ + Mutex::ScopedLock mutex(lock); + QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text); + + connection = 0; + + if (state == STATE_OPERATIONAL) { + stringstream addr; + addr << host << ":" << port; + QPID_LOG (warning, "Inter-broker link disconnected from " << addr.str()); + if (!hideManagement() && agent) + agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str())); + } + + for (Bridges::iterator i = active.begin(); i != active.end(); i++) { + (*i)->closed(); + created.push_back(*i); + } + active.clear(); + + if (state != STATE_FAILED) + { + setStateLH(STATE_WAITING); + if (!hideManagement()) + mgmtObject->set_lastError (text); + } + + if (closing) + destroy(); +} + +void Link::destroy () +{ + Bridges toDelete; + { + Mutex::ScopedLock mutex(lock); + + QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management"); + if (connection) + connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management"); + + setStateLH(STATE_CLOSED); + + // Move the bridges to be deleted into a local vector so there is no + // corruption of the iterator caused by bridge deletion. + for (Bridges::iterator i = active.begin(); i != active.end(); i++) { + (*i)->closed(); + toDelete.push_back(*i); + } + active.clear(); + + for (Bridges::iterator i = created.begin(); i != created.end(); i++) + toDelete.push_back(*i); + created.clear(); + } + // Now delete all bridges on this link (don't hold the lock for this). + for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++) + (*i)->destroy(); + toDelete.clear(); + links->destroy (host, port); +} + +void Link::add(Bridge::shared_ptr bridge) +{ + Mutex::ScopedLock mutex(lock); + created.push_back (bridge); +} + +void Link::cancel(Bridge::shared_ptr bridge) +{ + { + Mutex::ScopedLock mutex(lock); + + for (Bridges::iterator i = created.begin(); i != created.end(); i++) { + if ((*i).get() == bridge.get()) { + created.erase(i); + break; + } + } + for (Bridges::iterator i = active.begin(); i != active.end(); i++) { + if ((*i).get() == bridge.get()) { + cancellations.push_back(bridge); + bridge->closed(); + active.erase(i); + break; + } + } + } + if (!cancellations.empty()) { + connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); + } +} + +void Link::ioThreadProcessing() +{ + Mutex::ScopedLock mutex(lock); + + if (state != STATE_OPERATIONAL) + return; + QPID_LOG(debug, "Link::ioThreadProcessing()"); + + //process any pending creates and/or cancellations + if (!created.empty()) { + for (Bridges::iterator i = created.begin(); i != created.end(); ++i) { + active.push_back(*i); + (*i)->create(*connection); + } + created.clear(); + } + if (!cancellations.empty()) { + for (Bridges::iterator i = cancellations.begin(); i != cancellations.end(); ++i) { + (*i)->cancel(*connection); + } + cancellations.clear(); + } +} + +void Link::setConnection(Connection* c) +{ + Mutex::ScopedLock mutex(lock); + connection = c; + updateUrls = true; +} + +void Link::maintenanceVisit () +{ + Mutex::ScopedLock mutex(lock); + + if (connection && updateUrls) { + urls.reset(connection->getKnownHosts()); + QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << urls); + updateUrls = false; + } + + if (state == STATE_WAITING) + { + visitCount++; + if (visitCount >= currentInterval) + { + visitCount = 0; + //switch host and port to next in url list if possible + if (!tryFailover()) { + currentInterval *= 2; + if (currentInterval > MAX_INTERVAL) + currentInterval = MAX_INTERVAL; + startConnectionLH(); + } + } + } + else if (state == STATE_OPERATIONAL && (!created.empty() || !cancellations.empty()) && connection != 0) + connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); +} + +void Link::reconnect(const qpid::Address& a) +{ + Mutex::ScopedLock mutex(lock); + host = a.host; + port = a.port; + transport = a.protocol; + startConnectionLH(); + if (!hideManagement()) { + stringstream errorString; + errorString << "Failed over to " << a; + mgmtObject->set_lastError(errorString.str()); + } +} + +bool Link::tryFailover() +{ + Address next; + if (urls.next(next) && + (next.host != host || next.port != port || next.protocol != transport)) { + links->changeAddress(Address(transport, host, port), next); + QPID_LOG(debug, "Link failing over to " << host << ":" << port); + return true; + } else { + return false; + } +} + +// Management updates for a linke are inconsistent in a cluster, so they are +// suppressed. +bool Link::hideManagement() const { + return !mgmtObject || ( broker && broker->isInCluster()); +} + +uint Link::nextChannel() +{ + Mutex::ScopedLock mutex(lock); + + return channelCounter++; +} + +void Link::notifyConnectionForced(const string text) +{ + Mutex::ScopedLock mutex(lock); + + setStateLH(STATE_FAILED); + if (!hideManagement()) + mgmtObject->set_lastError(text); +} + +void Link::setPersistenceId(uint64_t id) const +{ + persistenceId = id; +} + +const string& Link::getName() const +{ + return host; +} + +Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer) +{ + string host; + uint16_t port; + string transport; + string authMechanism; + string username; + string password; + + buffer.getShortString(host); + port = buffer.getShort(); + buffer.getShortString(transport); + bool durable(buffer.getOctet()); + buffer.getShortString(authMechanism); + buffer.getShortString(username); + buffer.getShortString(password); + + return links.declare(host, port, transport, durable, authMechanism, username, password).first; +} + +void Link::encode(Buffer& buffer) const +{ + buffer.putShortString(string("link")); + buffer.putShortString(host); + buffer.putShort(port); + buffer.putShortString(transport); + buffer.putOctet(durable ? 1 : 0); + buffer.putShortString(authMechanism); + buffer.putShortString(username); + buffer.putShortString(password); +} + +uint32_t Link::encodedSize() const +{ + return host.size() + 1 // short-string (host) + + 5 // short-string ("link") + + 2 // port + + transport.size() + 1 // short-string(transport) + + 1 // durable + + authMechanism.size() + 1 + + username.size() + 1 + + password.size() + 1; +} + +ManagementObject* Link::GetManagementObject (void) const +{ + return (ManagementObject*) mgmtObject; +} + +Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& text) +{ + switch (op) + { + case _qmf::Link::METHOD_CLOSE : + if (!closing) { + closing = true; + if (state != STATE_CONNECTING && connection) { + //connection can only be closed on the connections own IO processing thread + connection->requestIOProcessing(boost::bind(&Link::destroy, this)); + } + } + return Manageable::STATUS_OK; + + case _qmf::Link::METHOD_BRIDGE : + _qmf::ArgsLinkBridge& iargs = (_qmf::ArgsLinkBridge&) args; + QPID_LOG(debug, "Link::bridge() request received"); + + // Durable bridges are only valid on durable links + if (iargs.i_durable && !durable) { + text = "Can't create a durable route on a non-durable link"; + return Manageable::STATUS_USER; + } + + if (iargs.i_dynamic) { + Exchange::shared_ptr exchange = getBroker()->getExchanges().get(iargs.i_src); + if (exchange.get() == 0) { + text = "Exchange not found"; + return Manageable::STATUS_USER; + } + if (!exchange->supportsDynamicBinding()) { + text = "Exchange type does not support dynamic routing"; + return Manageable::STATUS_USER; + } + } + + std::pair<Bridge::shared_ptr, bool> result = + links->declare (host, port, iargs.i_durable, iargs.i_src, + iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue, + iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes, + iargs.i_dynamic, iargs.i_sync); + + if (result.second && iargs.i_durable) + store->create(*result.first); + + return Manageable::STATUS_OK; + } + + return Manageable::STATUS_UNKNOWN_METHOD; +} + +void Link::setPassive(bool passive) +{ + Mutex::ScopedLock mutex(lock); + if (passive) { + setStateLH(STATE_PASSIVE); + } else { + if (state == STATE_PASSIVE) { + setStateLH(STATE_WAITING); + } else { + QPID_LOG(warning, "Ignoring attempt to activate non-passive link"); + } + } +} |