diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/LinkRegistry.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/LinkRegistry.cpp | 273 |
1 files changed, 180 insertions, 93 deletions
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp index c6c5a1ac05..75c311c917 100644 --- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp @@ -68,54 +68,92 @@ LinkRegistry::LinkRegistry (Broker* _broker) : LinkRegistry::~LinkRegistry() {} +/** find link by the *configured* remote address */ +boost::shared_ptr<Link> LinkRegistry::getLink(const std::string& host, + uint16_t port, + const std::string& transport) +{ + Mutex::ScopedLock locker(lock); + for (LinkMap::iterator i = links.begin(); i != links.end(); ++i) { + Link::shared_ptr& link = i->second; + if (link->getHost() == host && + link->getPort() == port && + (transport.empty() || link->getTransport() == transport)) + return link; + } + return boost::shared_ptr<Link>(); +} -void LinkRegistry::changeAddress(const qpid::Address& oldAddress, const qpid::Address& newAddress) +/** find link by name */ +boost::shared_ptr<Link> LinkRegistry::getLink(const std::string& name) { Mutex::ScopedLock locker(lock); - std::string oldKey = createKey(oldAddress); - std::string newKey = createKey(newAddress); - if (links.find(newKey) != links.end()) { - QPID_LOG(error, "Attempted to update key from " << oldKey << " to " << newKey << " which is already in use"); - } else { - LinkMap::iterator i = links.find(oldKey); - if (i == links.end()) { - QPID_LOG(error, "Attempted to update key from " << oldKey << " which does not exist, to " << newKey); - } else { - links[newKey] = i->second; - links.erase(oldKey); - QPID_LOG(info, "Updated link key from " << oldKey << " to " << newKey); - } - } + LinkMap::iterator l = links.find(name); + if (l != links.end()) + return l->second; + return boost::shared_ptr<Link>(); } -pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& host, +pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& name, + const string& host, uint16_t port, const string& transport, bool durable, const string& authMechanism, const string& username, - const string& password) + const string& password, + bool failover) { Mutex::ScopedLock locker(lock); - string key = createKey(host, port); - LinkMap::iterator i = links.find(key); + LinkMap::iterator i = links.find(name); if (i == links.end()) { Link::shared_ptr link; - link = Link::shared_ptr (new Link (this, store, host, port, transport, durable, - authMechanism, username, password, - broker, parent)); - links[key] = link; + link = Link::shared_ptr ( + new Link (name, this, host, port, transport, + boost::bind(&LinkRegistry::linkDestroyed, this, _1), + durable, authMechanism, username, password, broker, + parent, failover)); + if (durable && store) store->create(*link); + links[name] = link; + QPID_LOG(debug, "Creating new link; name=" << name ); return std::pair<Link::shared_ptr, bool>(link, true); } return std::pair<Link::shared_ptr, bool>(i->second, false); } -pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host, - uint16_t port, +/** find bridge by link & route info */ +Bridge::shared_ptr LinkRegistry::getBridge(const Link& link, + const std::string& src, + const std::string& dest, + const std::string& key) +{ + Mutex::ScopedLock locker(lock); + for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) { + if (i->second->getSrc() == src && i->second->getDest() == dest && + i->second->getKey() == key && i->second->getLink() && + i->second->getLink()->getName() == link.getName()) { + return i->second; + } + } + return Bridge::shared_ptr(); +} + +/** find bridge by name */ +Bridge::shared_ptr LinkRegistry::getBridge(const std::string& name) +{ + Mutex::ScopedLock locker(lock); + BridgeMap::iterator b = bridges.find(name); + if (b != bridges.end()) + return b->second; + return Bridge::shared_ptr(); +} + +pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& name, + Link& link, bool durable, const std::string& src, const std::string& dest, @@ -126,22 +164,32 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host, const std::string& excludes, bool dynamic, uint16_t sync, - Bridge::InitializeCallback init + Bridge::InitializeCallback init, + const std::string& queueName, + const std::string& altExchange ) { Mutex::ScopedLock locker(lock); - QPID_LOG(debug, "Bridge declared " << host << ": " << port << " from " << src << " to " << dest << " (" << key << ")"); - string linkKey = createKey(host, port); - stringstream keystream; - keystream << linkKey << "!" << src << "!" << dest << "!" << key; - string bridgeKey = keystream.str(); + // Durable bridges are only valid on durable links + if (durable && !link.isDurable()) { + QPID_LOG(error, "Can't create a durable route '" << name << "' on a non-durable link '" << link.getName()); + return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false); + } - LinkMap::iterator l = links.find(linkKey); - if (l == links.end()) - return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false); + if (dynamic) { + Exchange::shared_ptr exchange = broker->getExchanges().get(src); + if (exchange.get() == 0) { + QPID_LOG(error, "Exchange not found, name='" << src << "'" ); + return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false); + } + if (!exchange->supportsDynamicBinding()) { + QPID_LOG(error, "Exchange type does not support dynamic routing, name='" << src << "'"); + return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false); + } + } - BridgeMap::iterator b = bridges.find(bridgeKey); + BridgeMap::iterator b = bridges.find(name); if (b == bridges.end()) { _qmf::ArgsLinkBridge args; @@ -159,23 +207,29 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host, args.i_sync = sync; bridge = Bridge::shared_ptr - (new Bridge (l->second.get(), l->second->nextChannel(), - boost::bind(&LinkRegistry::destroy, this, - host, port, src, dest, key), - args, init)); - bridges[bridgeKey] = bridge; - l->second->add(bridge); + (new Bridge (name, &link, link.nextChannel(), + boost::bind(&LinkRegistry::destroyBridge, this, _1), + args, init, queueName, altExchange)); + bridges[name] = bridge; + link.add(bridge); + if (durable && store) + store->create(*bridge); + + QPID_LOG(debug, "Bridge '" << name <<"' declared on link '" << link.getName() << + "' from " << src << " to " << dest << " (" << key << ")"); + return std::pair<Bridge::shared_ptr, bool>(bridge, true); } return std::pair<Bridge::shared_ptr, bool>(b->second, false); } -void LinkRegistry::destroy(const string& host, const uint16_t port) +/** called back by the link when it has completed its cleanup and can be removed. */ +void LinkRegistry::linkDestroyed(Link *link) { + QPID_LOG(debug, "LinkRegistry::destroy(); link= " << link->getName()); Mutex::ScopedLock locker(lock); - string key = createKey(host, port); - LinkMap::iterator i = links.find(key); + LinkMap::iterator i = links.find(link->getName()); if (i != links.end()) { if (i->second->isDurable() && store) @@ -184,27 +238,20 @@ void LinkRegistry::destroy(const string& host, const uint16_t port) } } -void LinkRegistry::destroy(const std::string& host, - const uint16_t port, - const std::string& src, - const std::string& dest, - const std::string& key) +/** called back by bridge when its destruction has been requested */ +void LinkRegistry::destroyBridge(Bridge *bridge) { + QPID_LOG(debug, "LinkRegistry::destroy(); bridge= " << bridge->getName()); Mutex::ScopedLock locker(lock); - string linkKey = createKey(host, port); - stringstream keystream; - keystream << linkKey << "!" << src << "!" << dest << "!" << key; - string bridgeKey = keystream.str(); - LinkMap::iterator l = links.find(linkKey); - if (l == links.end()) - return; - - BridgeMap::iterator b = bridges.find(bridgeKey); + BridgeMap::iterator b = bridges.find(bridge->getName()); if (b == bridges.end()) return; - l->second->cancel(b->second); + Link *link = b->second->getLink(); + if (link) { + link->cancel(b->second); + } if (b->second->isDurable()) store->destroy(*(b->second)); bridges.erase(b); @@ -219,28 +266,73 @@ MessageStore* LinkRegistry::getStore() const { return store; } -Link::shared_ptr LinkRegistry::findLink(const std::string& keyOrMgmtId) -{ - // Convert keyOrMgmtId to a host:port key. - // - // TODO aconway 2011-02-01: centralize code that constructs/parses - // connection management IDs. Currently sys:: protocol factories - // and IO plugins construct the IDs and LinkRegistry parses them. - size_t separator = keyOrMgmtId.find('-'); - if (separator == std::string::npos) separator = 0; - std::string key = keyOrMgmtId.substr(separator+1, std::string::npos); +namespace { + void extractHostPort(const std::string& connId, std::string *host, uint16_t *port) + { + // Extract host and port of remote broker from connection id string. + // + // TODO aconway 2011-02-01: centralize code that constructs/parses connection + // management IDs. Currently sys:: protocol factories and IO plugins construct the + // IDs and LinkRegistry parses them. + // KAG: current connection id format assumed: + // "localhost:port-remotehost:port". In the case of IpV6, the host addresses are + // contained within brackets "[...]", example: + // connId="[::1]:36859-[::1]:48603". Liberal use of "asserts" provided to alert us + // if this assumption changes! + size_t separator = connId.find('-'); + assert(separator != std::string::npos); + std::string remote = connId.substr(separator+1, std::string::npos); + separator = remote.rfind(":"); + assert(separator != std::string::npos); + *host = remote.substr(0, separator); + // IPv6 - host is bracketed by "[]", strip them + if ((*host)[0] == '[' && (*host)[host->length() - 1] == ']') { + *host = host->substr(1, host->length() - 2); + } + try { + *port = boost::lexical_cast<uint16_t>(remote.substr(separator+1, std::string::npos)); + } catch (const boost::bad_lexical_cast&) { + QPID_LOG(error, "Invalid format for connection identifier! '" << connId << "'"); + assert(false); + } + } +} +/** find the Link that corresponds to the given connection */ +Link::shared_ptr LinkRegistry::findLink(const std::string& connId) +{ Mutex::ScopedLock locker(lock); - LinkMap::iterator l = links.find(key); - if (l != links.end()) return l->second; - else return Link::shared_ptr(); + ConnectionMap::iterator c = connections.find(connId); + if (c != connections.end()) { + LinkMap::iterator l = links.find(c->second); + if (l != links.end()) + return l->second; + } + return Link::shared_ptr(); } void LinkRegistry::notifyConnection(const std::string& key, Connection* c) { - Link::shared_ptr link = findLink(key); + // find a link that is attempting to connect to the remote, and + // create a mapping from connection id to link + QPID_LOG(debug, "LinkRegistry::notifyConnection(); key=" << key ); + std::string host; + uint16_t port; + extractHostPort( key, &host, &port ); + Link::shared_ptr link; + { + Mutex::ScopedLock locker(lock); + for (LinkMap::iterator l = links.begin(); l != links.end(); ++l) { + if (l->second->pendingConnection(host, port)) { + link = l->second; + connections[key] = link->getName(); + link->established(c); + break; + } + } + } + if (link) { - link->established(c); c->setUserId(str(format("%1%@%2%") % link->getUsername() % realm)); } } @@ -299,22 +391,29 @@ std::string LinkRegistry::getUsername(const std::string& key) return link->getUsername(); } +/** note: returns the current remote host (may be different from the host originally + configured for the Link due to failover) */ std::string LinkRegistry::getHost(const std::string& key) { - Link::shared_ptr link = findLink(key); - if (!link) - return string(); + Link::shared_ptr link = findLink(key); + if (!link) + return string(); - return link->getHost(); + qpid::Address addr; + link->getRemoteAddress(addr); + return addr.host; } +/** returns the current remote port (ditto above) */ uint16_t LinkRegistry::getPort(const std::string& key) { Link::shared_ptr link = findLink(key); if (!link) return 0; - return link->getPort(); + qpid::Address addr; + link->getRemoteAddress(addr); + return addr.port; } std::string LinkRegistry::getPassword(const std::string& key) @@ -336,20 +435,6 @@ std::string LinkRegistry::getAuthIdentity(const std::string& key) } -std::string LinkRegistry::createKey(const qpid::Address& a) { - // TODO aconway 2010-05-11: key should also include protocol/transport to - // be unique. Requires refactor of LinkRegistry interface. - return createKey(a.host, a.port); -} - -std::string LinkRegistry::createKey(const std::string& host, uint16_t port) { - // TODO aconway 2010-05-11: key should also include protocol/transport to - // be unique. Requires refactor of LinkRegistry interface. - stringstream keystream; - keystream << host << ":" << port; - return keystream.str(); -} - void LinkRegistry::setPassive(bool p) { Mutex::ScopedLock locker(lock); @@ -362,10 +447,12 @@ void LinkRegistry::setPassive(bool p) } void LinkRegistry::eachLink(boost::function<void(boost::shared_ptr<Link>)> f) { + Mutex::ScopedLock locker(lock); for (LinkMap::iterator i = links.begin(); i != links.end(); ++i) f(i->second); } void LinkRegistry::eachBridge(boost::function<void(boost::shared_ptr<Bridge>)> f) { + Mutex::ScopedLock locker(lock); for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) f(i->second); } |