diff options
Diffstat (limited to 'cpp/src/qpid/broker/LinkRegistry.cpp')
-rw-r--r-- | cpp/src/qpid/broker/LinkRegistry.cpp | 164 |
1 files changed, 129 insertions, 35 deletions
diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp index 0703c276cf..f32587dd68 100644 --- a/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/cpp/src/qpid/broker/LinkRegistry.cpp @@ -18,20 +18,49 @@ * under the License. * */ -#include "LinkRegistry.h" +#include "qpid/broker/LinkRegistry.h" +#include "qpid/broker/Link.h" +#include "qpid/broker/Connection.h" +#include "qpid/log/Statement.h" #include <iostream> +#include <boost/format.hpp> using namespace qpid::broker; using namespace qpid::sys; using std::pair; using std::stringstream; using boost::intrusive_ptr; +using boost::format; +using boost::str; +namespace _qmf = qmf::org::apache::qpid::broker; #define LINK_MAINT_INTERVAL 2 -LinkRegistry::LinkRegistry (Broker* _broker) : broker(_broker), parent(0), store(0) +// TODO: This constructor is only used by the store unit tests - +// That probably indicates that LinkRegistry isn't correctly +// factored: The persistence element and maintenance element +// should be factored separately +LinkRegistry::LinkRegistry () : + broker(0), timer(0), + parent(0), store(0), passive(false), passiveChanged(false), + realm("") { - timer.add (intrusive_ptr<TimerTask> (new Periodic(*this))); +} + +LinkRegistry::LinkRegistry (Broker* _broker) : + broker(_broker), timer(&broker->getTimer()), + maintenanceTask(new Periodic(*this)), + parent(0), store(0), passive(false), passiveChanged(false), + realm(broker->getOptions().realm) +{ + timer->add(maintenanceTask); +} + +LinkRegistry::~LinkRegistry() +{ + // This test is only necessary if the default constructor above is present + if (maintenanceTask) + maintenanceTask->cancel(); } LinkRegistry::Periodic::Periodic (LinkRegistry& _links) : @@ -40,7 +69,8 @@ LinkRegistry::Periodic::Periodic (LinkRegistry& _links) : void LinkRegistry::Periodic::fire () { links.periodicMaintenance (); - links.timer.add (intrusive_ptr<TimerTask> (new Periodic(links))); + setupNextFire(); + links.timer->add(this); } void LinkRegistry::periodicMaintenance () @@ -49,13 +79,53 @@ void LinkRegistry::periodicMaintenance () linksToDestroy.clear(); bridgesToDestroy.clear(); + if (passiveChanged) { + if (passive) { QPID_LOG(info, "Passivating links"); } + else { QPID_LOG(info, "Activating links"); } + for (LinkMap::iterator i = links.begin(); i != links.end(); i++) { + i->second->setPassive(passive); + } + passiveChanged = false; + } for (LinkMap::iterator i = links.begin(); i != links.end(); i++) i->second->maintenanceVisit(); + //now process any requests for re-addressing + for (AddressMap::iterator i = reMappings.begin(); i != reMappings.end(); i++) + updateAddress(i->first, i->second); + reMappings.clear(); +} + +void LinkRegistry::changeAddress(const qpid::TcpAddress& oldAddress, const qpid::TcpAddress& newAddress) +{ + //done on periodic maintenance thread; hold changes in separate + //map to avoid modifying the link map that is iterated over + reMappings[createKey(oldAddress)] = newAddress; +} + +bool LinkRegistry::updateAddress(const std::string& oldKey, const qpid::TcpAddress& newAddress) +{ + std::string newKey = createKey(newAddress); + if (links.find(newKey) != links.end()) { + QPID_LOG(error, "Attempted to update key from " << oldKey << " to " << newKey << " which is already in use"); + return false; + } else { + LinkMap::iterator i = links.find(oldKey); + if (i == links.end()) { + QPID_LOG(error, "Attempted to update key from " << oldKey << " which does not exist, to " << newKey); + return false; + } else { + links[newKey] = i->second; + i->second->reconnect(newAddress); + links.erase(oldKey); + QPID_LOG(info, "Updated link key from " << oldKey << " to " << newKey); + return true; + } + } } pair<Link::shared_ptr, bool> LinkRegistry::declare(string& host, uint16_t port, - bool useSsl, + string& transport, bool durable, string& authMechanism, string& username, @@ -72,9 +142,10 @@ pair<Link::shared_ptr, bool> LinkRegistry::declare(string& host, { Link::shared_ptr link; - link = Link::shared_ptr (new Link (this, store, host, port, useSsl, durable, + link = Link::shared_ptr (new Link (this, store, host, port, transport, durable, authMechanism, username, password, broker, parent)); + if (passive) link->setPassive(true); links[key] = link; return std::pair<Link::shared_ptr, bool>(link, true); } @@ -90,9 +161,13 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host, bool isQueue, bool isLocal, std::string& tag, - std::string& excludes) + std::string& excludes, + bool dynamic, + uint16_t sync) { Mutex::ScopedLock locker(lock); + QPID_LOG(debug, "Bridge declared " << host << ": " << port << " from " << src << " to " << dest << " (" << key << ")"); + stringstream keystream; keystream << host << ":" << port; string linkKey = string(keystream.str()); @@ -107,7 +182,7 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host, BridgeMap::iterator b = bridges.find(bridgeKey); if (b == bridges.end()) { - management::ArgsLinkBridge args; + _qmf::ArgsLinkBridge args; Bridge::shared_ptr bridge; args.i_durable = durable; @@ -118,6 +193,8 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host, args.i_srcIsLocal = isLocal; args.i_tag = tag; args.i_excludes = excludes; + args.i_dynamic = dynamic; + args.i_sync = sync; bridge = Bridge::shared_ptr (new Bridge (l->second.get(), l->second->nextChannel(), @@ -177,7 +254,6 @@ void LinkRegistry::destroy(const std::string& host, void LinkRegistry::setStore (MessageStore* _store) { - assert (store == 0 && _store != 0); store = _store; } @@ -185,66 +261,84 @@ MessageStore* LinkRegistry::getStore() const { return store; } -void LinkRegistry::notifyConnection(const std::string& key, Connection* c) +Link::shared_ptr LinkRegistry::findLink(const std::string& key) { Mutex::ScopedLock locker(lock); LinkMap::iterator l = links.find(key); - if (l != links.end()) - { - l->second->established(); - l->second->setConnection(c); + if (l != links.end()) return l->second; + else return Link::shared_ptr(); +} + +void LinkRegistry::notifyConnection(const std::string& key, Connection* c) +{ + Link::shared_ptr link = findLink(key); + if (link) { + link->established(); + link->setConnection(c); + c->setUserId(str(format("%1%@%2%") % link->getUsername() % realm)); } } void LinkRegistry::notifyClosed(const std::string& key) { - Mutex::ScopedLock locker(lock); - LinkMap::iterator l = links.find(key); - if (l != links.end()) - l->second->closed(0, "Closed by peer"); + Link::shared_ptr link = findLink(key); + if (link) { + link->closed(0, "Closed by peer"); + } } void LinkRegistry::notifyConnectionForced(const std::string& key, const std::string& text) { - Mutex::ScopedLock locker(lock); - LinkMap::iterator l = links.find(key); - if (l != links.end()) - l->second->notifyConnectionForced(text); + Link::shared_ptr link = findLink(key); + if (link) { + link->notifyConnectionForced(text); + } } std::string LinkRegistry::getAuthMechanism(const std::string& key) { - Mutex::ScopedLock locker(lock); - LinkMap::iterator l = links.find(key); - if (l != links.end()) - return l->second->getAuthMechanism(); + Link::shared_ptr link = findLink(key); + if (link) + return link->getAuthMechanism(); return string("ANONYMOUS"); } std::string LinkRegistry::getAuthCredentials(const std::string& key) { - Mutex::ScopedLock locker(lock); - LinkMap::iterator l = links.find(key); - if (l == links.end()) + Link::shared_ptr link = findLink(key); + if (!link) return string(); string result; result += '\0'; - result += l->second->getUsername(); + result += link->getUsername(); result += '\0'; - result += l->second->getPassword(); + result += link->getPassword(); return result; } std::string LinkRegistry::getAuthIdentity(const std::string& key) { - Mutex::ScopedLock locker(lock); - LinkMap::iterator l = links.find(key); - if (l == links.end()) + Link::shared_ptr link = findLink(key); + if (!link) return string(); - return l->second->getUsername(); + return link->getUsername(); } +std::string LinkRegistry::createKey(const qpid::TcpAddress& a) +{ + stringstream keystream; + keystream << a.host << ":" << a.port; + return string(keystream.str()); +} + +void LinkRegistry::setPassive(bool p) +{ + Mutex::ScopedLock locker(lock); + passiveChanged = p != passive; + passive = p; + //will activate or passivate links on maintenance visit +} |