diff options
Diffstat (limited to 'cpp/src/qpid/broker/LinkRegistry.cpp')
-rw-r--r-- | cpp/src/qpid/broker/LinkRegistry.cpp | 94 |
1 files changed, 19 insertions, 75 deletions
diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp index a79081b8ed..d048b9c05f 100644 --- a/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/cpp/src/qpid/broker/LinkRegistry.cpp @@ -19,8 +19,10 @@ * */ #include "qpid/broker/LinkRegistry.h" -#include "qpid/broker/Link.h" + +#include "qpid/broker/Broker.h" #include "qpid/broker/Connection.h" +#include "qpid/broker/Link.h" #include "qpid/log/Statement.h" #include <iostream> #include <boost/format.hpp> @@ -42,8 +44,8 @@ namespace _qmf = qmf::org::apache::qpid::broker; // factored: The persistence element should be factored separately LinkRegistry::LinkRegistry () : broker(0), -// parent(0), store(0), passive(false), - parent(0), asyncStore(0), passive(false), +// parent(0), store(0), + parent(0), asyncStore(0), realm("") { } @@ -60,7 +62,8 @@ class LinkRegistryConnectionObserver : public ConnectionObserver { LinkRegistry::LinkRegistry (Broker* _broker) : broker(_broker), - parent(0), asyncStore(0), passive(false), +// parent(0), store(0), + parent(0), asyncStore(0), realm(broker->getOptions().realm) { broker->getConnectionObservers().add( @@ -118,10 +121,9 @@ pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& name, boost::bind(&LinkRegistry::linkDestroyed, this, _1), durable, authMechanism, username, password, broker, parent, failover)); -// if (durable && store) store->create(*link); - if (durable && asyncStore) { -// store->create(*link); - // TODO: kpvdr: async create config (link) + if (durable && asyncStore && !broker->inRecovery()) { + //store->create(*link); + // TODO: kpvdr: async create config (link) } links[name] = link; pendingLinks[name] = link; @@ -218,9 +220,8 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& name, args, init, queueName, altExchange)); bridges[name] = bridge; link.add(bridge); -// if (durable && store) - if (durable && asyncStore) { -// store->create(*bridge); + if (durable && asyncStore && !broker->inRecovery()) { + //store->create(*bridge); // TODO: kpvdr: Async create config (bridge) } @@ -264,6 +265,7 @@ void LinkRegistry::destroyBridge(Bridge *bridge) Link *link = b->second->getLink(); if (link) { link->cancel(b->second); + link->returnChannel( bridge->getChannel() ); } // if (b->second->isDurable()) if (b->second->isDurable()) { @@ -283,38 +285,6 @@ AsyncStore* LinkRegistry::getStore() const { return asyncStore; } -namespace { - void extractHostPort(const std::string& connId, std::string *host, uint16_t *port) - { - // Extract host and port of remote broker from connection id string. - // - // TODO aconway 2011-02-01: centralize code that constructs/parses connection - // management IDs. Currently sys:: protocol factories and IO plugins construct the - // IDs and LinkRegistry parses them. - // KAG: current connection id format assumed: - // "localhost:port-remotehost:port". In the case of IpV6, the host addresses are - // contained within brackets "[...]", example: - // connId="[::1]:36859-[::1]:48603". Liberal use of "asserts" provided to alert us - // if this assumption changes! - size_t separator = connId.find('-'); - assert(separator != std::string::npos); - std::string remote = connId.substr(separator+1, std::string::npos); - separator = remote.rfind(":"); - assert(separator != std::string::npos); - *host = remote.substr(0, separator); - // IPv6 - host is bracketed by "[]", strip them - if ((*host)[0] == '[' && (*host)[host->length() - 1] == ']') { - *host = host->substr(1, host->length() - 2); - } - try { - *port = boost::lexical_cast<uint16_t>(remote.substr(separator+1, std::string::npos)); - } catch (const boost::bad_lexical_cast&) { - QPID_LOG(error, "Invalid format for connection identifier! '" << connId << "'"); - assert(false); - } - } -} - /** find the Link that corresponds to the given connection */ Link::shared_ptr LinkRegistry::findLink(const std::string& connId) { @@ -334,19 +304,15 @@ void LinkRegistry::notifyConnection(const std::string& key, Connection* c) // create a mapping from connection id to link QPID_LOG(debug, "LinkRegistry::notifyConnection(); key=" << key ); std::string host; - uint16_t port = 0; - extractHostPort( key, &host, &port ); Link::shared_ptr link; { Mutex::ScopedLock locker(lock); - for (LinkMap::iterator l = pendingLinks.begin(); l != pendingLinks.end(); ++l) { - if (l->second->pendingConnection(host, port)) { - link = l->second; - pendingLinks.erase(l); - connections[key] = link->getName(); - QPID_LOG(debug, "LinkRegistry:: found pending =" << link->getName()); - break; - } + LinkMap::iterator l = pendingLinks.find(key); + if (l != pendingLinks.end()) { + link = l->second; + pendingLinks.erase(l); + connections[key] = link->getName(); + QPID_LOG(debug, "LinkRegistry:: found pending =" << link->getName()); } } @@ -461,26 +427,4 @@ std::string LinkRegistry::getAuthIdentity(const std::string& key) return link->getUsername(); } - -void LinkRegistry::setPassive(bool p) -{ - Mutex::ScopedLock locker(lock); - passive = p; - if (passive) { QPID_LOG(info, "Passivating links"); } - else { QPID_LOG(info, "Activating links"); } - for (LinkMap::iterator i = links.begin(); i != links.end(); i++) { - i->second->setPassive(passive); - } -} - -void LinkRegistry::eachLink(boost::function<void(boost::shared_ptr<Link>)> f) { - Mutex::ScopedLock locker(lock); - for (LinkMap::iterator i = links.begin(); i != links.end(); ++i) f(i->second); -} - -void LinkRegistry::eachBridge(boost::function<void(boost::shared_ptr<Bridge>)> f) { - Mutex::ScopedLock locker(lock); - for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) f(i->second); -} - }} // namespace qpid::broker |