diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/LinkRegistry.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/LinkRegistry.cpp | 435 |
1 files changed, 435 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp new file mode 100644 index 0000000000..b3a78b98ca --- /dev/null +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp @@ -0,0 +1,435 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/LinkRegistry.h" + +#include "qpid/broker/Broker.h" +#include "qpid/broker/amqp_0_10/Connection.h" +#include "qpid/broker/Link.h" +#include "qpid/log/Statement.h" +#include <iostream> +#include <boost/format.hpp> + +namespace qpid { +namespace broker { + +using namespace qpid::sys; +using std::string; +using std::pair; +using std::stringstream; +using boost::intrusive_ptr; +using boost::format; +using boost::str; +namespace _qmf = qmf::org::apache::qpid::broker; + +// TODO: This constructor is only used by the store unit tests - +// That probably indicates that LinkRegistry isn't correctly +// factored: The persistence element should be factored separately +LinkRegistry::LinkRegistry () : + broker(0), + parent(0), store(0), + realm("") +{ +} + +class LinkRegistryConnectionObserver : public ConnectionObserver { + LinkRegistry& links; + public: + LinkRegistryConnectionObserver(LinkRegistry& l) : links(l) {} + void connection(Connection& in) + { + amqp_0_10::Connection* c = dynamic_cast<amqp_0_10::Connection*>(&in); + if (c) links.notifyConnection(c->getMgmtId(), c); + } + void opened(Connection& in) + { + amqp_0_10::Connection* c = dynamic_cast<amqp_0_10::Connection*>(&in); + if (c) links.notifyOpened(c->getMgmtId()); + } + void closed(Connection& in) + { + amqp_0_10::Connection* c = dynamic_cast<amqp_0_10::Connection*>(&in); + if (c) links.notifyClosed(c->getMgmtId()); + } + void forced(Connection& in, const string& text) + { + amqp_0_10::Connection* c = dynamic_cast<amqp_0_10::Connection*>(&in); + if (c) links.notifyConnectionForced(c->getMgmtId(), text); + } +}; + +LinkRegistry::LinkRegistry (Broker* _broker) : + broker(_broker), + parent(0), store(0), + realm(broker->getRealm()) +{ + broker->getConnectionObservers().add( + boost::shared_ptr<ConnectionObserver>(new LinkRegistryConnectionObserver(*this))); +} + +LinkRegistry::~LinkRegistry() {} + +/** find link by the *configured* remote address */ +boost::shared_ptr<Link> LinkRegistry::getLink(const std::string& host, + uint16_t port, + const std::string& transport) +{ + Mutex::ScopedLock locker(lock); + for (LinkMap::iterator i = links.begin(); i != links.end(); ++i) { + Link::shared_ptr& link = i->second; + if (link->getHost() == host && + link->getPort() == port && + (transport.empty() || link->getTransport() == transport)) + return link; + } + return boost::shared_ptr<Link>(); +} + +/** find link by name */ +boost::shared_ptr<Link> LinkRegistry::getLink(const std::string& name) +{ + Mutex::ScopedLock locker(lock); + LinkMap::iterator l = links.find(name); + if (l != links.end()) + return l->second; + return boost::shared_ptr<Link>(); +} + +pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& name, + const string& host, + uint16_t port, + const string& transport, + bool durable, + const string& authMechanism, + const string& username, + const string& password, + bool failover) + +{ + Mutex::ScopedLock locker(lock); + + LinkMap::iterator i = links.find(name); + if (i == links.end()) + { + Link::shared_ptr link; + + link = Link::shared_ptr ( + new Link (name, this, host, port, transport, + boost::bind(&LinkRegistry::linkDestroyed, this, _1), + durable, authMechanism, username, password, broker, + parent, failover)); + if (durable && store && !broker->inRecovery()) + store->create(*link); + links[name] = link; + pendingLinks[name] = link; + QPID_LOG(debug, "Creating new link; name=" << name ); + return std::pair<Link::shared_ptr, bool>(link, true); + } + return std::pair<Link::shared_ptr, bool>(i->second, false); +} + +/** find bridge by link & route info */ +Bridge::shared_ptr LinkRegistry::getBridge(const Link& link, + const std::string& src, + const std::string& dest, + const std::string& key) +{ + Mutex::ScopedLock locker(lock); + for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) { + if (i->second->getSrc() == src && i->second->getDest() == dest && + i->second->getKey() == key && i->second->getLink() && + i->second->getLink()->getName() == link.getName()) { + return i->second; + } + } + return Bridge::shared_ptr(); +} + +/** find bridge by name */ +Bridge::shared_ptr LinkRegistry::getBridge(const std::string& name) +{ + Mutex::ScopedLock locker(lock); + BridgeMap::iterator b = bridges.find(name); + if (b != bridges.end()) + return b->second; + return Bridge::shared_ptr(); +} + +pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& name, + Link& link, + bool durable, + const std::string& src, + const std::string& dest, + const std::string& key, + bool isQueue, + bool isLocal, + const std::string& tag, + const std::string& excludes, + bool dynamic, + uint16_t sync, + uint32_t credit, + Bridge::InitializeCallback init, + const std::string& queueName, + const std::string& altExchange +) +{ + Mutex::ScopedLock locker(lock); + + // Durable bridges are only valid on durable links + if (durable && !link.isDurable()) { + QPID_LOG(error, "Can't create a durable route '" << name << "' on a non-durable link '" << link.getName()); + return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false); + } + + if (dynamic) { + Exchange::shared_ptr exchange = broker->getExchanges().get(src); + if (exchange.get() == 0) { + QPID_LOG(error, "Exchange not found, name='" << src << "'" ); + return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false); + } + if (!exchange->supportsDynamicBinding()) { + QPID_LOG(error, "Exchange type does not support dynamic routing, name='" << src << "'"); + return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false); + } + } + + BridgeMap::iterator b = bridges.find(name); + if (b == bridges.end()) + { + _qmf::ArgsLinkBridge args; + Bridge::shared_ptr bridge; + + args.i_durable = durable; + args.i_src = src; + args.i_dest = dest; + args.i_key = key; + args.i_srcIsQueue = isQueue; + args.i_srcIsLocal = isLocal; + args.i_tag = tag; + args.i_excludes = excludes; + args.i_dynamic = dynamic; + args.i_sync = sync; + args.i_credit = credit; + + bridge = Bridge::shared_ptr + (new Bridge (name, &link, link.nextChannel(), + boost::bind(&LinkRegistry::destroyBridge, this, _1), + args, init, queueName, altExchange)); + bridges[name] = bridge; + link.add(bridge); + if (durable && store && !broker->inRecovery()) + store->create(*bridge); + + QPID_LOG(debug, "Bridge '" << name <<"' declared on link '" << link.getName() << + "' from " << src << " to " << dest << " (" << key << ")"); + + return std::pair<Bridge::shared_ptr, bool>(bridge, true); + } + return std::pair<Bridge::shared_ptr, bool>(b->second, false); +} + +/** called back by the link when it has completed its cleanup and can be removed. */ +void LinkRegistry::linkDestroyed(Link *link) +{ + QPID_LOG(debug, "LinkRegistry::destroy(); link= " << link->getName()); + Mutex::ScopedLock locker(lock); + + pendingLinks.erase(link->getName()); + LinkMap::iterator i = links.find(link->getName()); + if (i != links.end()) + { + if (i->second->isDurable() && store) + store->destroy(*(i->second)); + links.erase(i); + } +} + +/** called back by bridge when its destruction has been requested */ +void LinkRegistry::destroyBridge(Bridge *bridge) +{ + QPID_LOG(debug, "LinkRegistry::destroy(); bridge= " << bridge->getName()); + Mutex::ScopedLock locker(lock); + + BridgeMap::iterator b = bridges.find(bridge->getName()); + if (b == bridges.end()) + return; + + Link *link = b->second->getLink(); + if (link) { + link->cancel(b->second); + link->returnChannel( bridge->getChannel() ); + } + if (b->second->isDurable()) + store->destroy(*(b->second)); + bridges.erase(b); +} + +void LinkRegistry::setStore (MessageStore* _store) +{ + store = _store; +} + +MessageStore* LinkRegistry::getStore() const { + return store; +} + +/** find the Link that corresponds to the given connection */ +Link::shared_ptr LinkRegistry::findLink(const std::string& connId) +{ + Mutex::ScopedLock locker(lock); + ConnectionMap::iterator c = connections.find(connId); + if (c != connections.end()) { + LinkMap::iterator l = links.find(c->second); + if (l != links.end()) + return l->second; + } + return Link::shared_ptr(); +} + +void LinkRegistry::notifyConnection(const std::string& key, amqp_0_10::Connection* c) +{ + // find a link that is attempting to connect to the remote, and + // create a mapping from connection id to link + QPID_LOG(debug, "LinkRegistry::notifyConnection(); key=" << key ); + std::string host; + Link::shared_ptr link; + { + Mutex::ScopedLock locker(lock); + LinkMap::iterator l = pendingLinks.find(key); + if (l != pendingLinks.end()) { + link = l->second; + pendingLinks.erase(l); + connections[key] = link->getName(); + QPID_LOG(debug, "LinkRegistry:: found pending =" << link->getName()); + } + } + + if (link) { + link->established(c); + c->setUserId(str(format("%1%@%2%") % link->getUsername() % realm)); + } +} + +void LinkRegistry::notifyOpened(const std::string& key) +{ + Link::shared_ptr link = findLink(key); + if (link) link->opened(); +} + +void LinkRegistry::notifyClosed(const std::string& key) +{ + Link::shared_ptr link = findLink(key); + if (link) { + { + Mutex::ScopedLock locker(lock); + pendingLinks[link->getName()] = link; + } + link->closed(0, "Closed by peer"); + } +} + +void LinkRegistry::notifyConnectionForced(const std::string& key, const std::string& text) +{ + Link::shared_ptr link = findLink(key); + if (link) { + { + Mutex::ScopedLock locker(lock); + pendingLinks[link->getName()] = link; + } + link->notifyConnectionForced(text); + } +} + +std::string LinkRegistry::getAuthMechanism(const std::string& key) +{ + Link::shared_ptr link = findLink(key); + if (link) + return link->getAuthMechanism(); + return string("ANONYMOUS"); +} + +std::string LinkRegistry::getAuthCredentials(const std::string& key) +{ + Link::shared_ptr link = findLink(key); + if (!link) + return string(); + + string result; + result += '\0'; + result += link->getUsername(); + result += '\0'; + result += link->getPassword(); + + return result; +} + +std::string LinkRegistry::getUsername(const std::string& key) +{ + Link::shared_ptr link = findLink(key); + if (!link) + return string(); + + return link->getUsername(); +} + +/** note: returns the current remote host (may be different from the host originally + configured for the Link due to failover) */ +std::string LinkRegistry::getHost(const std::string& key) +{ + Link::shared_ptr link = findLink(key); + if (!link) + return string(); + + qpid::Address addr; + link->getRemoteAddress(addr); + return addr.host; +} + +/** returns the current remote port (ditto above) */ +uint16_t LinkRegistry::getPort(const std::string& key) +{ + Link::shared_ptr link = findLink(key); + if (!link) + return 0; + + qpid::Address addr; + link->getRemoteAddress(addr); + return addr.port; +} + +std::string LinkRegistry::getPassword(const std::string& key) +{ + Link::shared_ptr link = findLink(key); + if (!link) + return string(); + + return link->getPassword(); +} + +std::string LinkRegistry::getAuthIdentity(const std::string& key) +{ + Link::shared_ptr link = findLink(key); + if (!link) + return string(); + + return link->getUsername(); +} + +}} // namespace qpid::broker |