diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/LinkRegistry.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/LinkRegistry.cpp | 126 |
1 files changed, 49 insertions, 77 deletions
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp index e9885f5462..a4fd90684e 100644 --- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp @@ -35,102 +35,65 @@ using boost::format; using boost::str; namespace _qmf = qmf::org::apache::qpid::broker; -#define LINK_MAINT_INTERVAL 2 - // TODO: This constructor is only used by the store unit tests - // That probably indicates that LinkRegistry isn't correctly -// factored: The persistence element and maintenance element -// should be factored separately +// factored: The persistence element should be factored separately LinkRegistry::LinkRegistry () : - broker(0), timer(0), - parent(0), store(0), passive(false), passiveChanged(false), + broker(0), + parent(0), store(0), passive(false), realm("") { } -LinkRegistry::LinkRegistry (Broker* _broker) : - broker(_broker), timer(&broker->getTimer()), - maintenanceTask(new Periodic(*this)), - parent(0), store(0), passive(false), passiveChanged(false), - realm(broker->getOptions().realm) -{ - timer->add(maintenanceTask); -} - -LinkRegistry::~LinkRegistry() -{ - // This test is only necessary if the default constructor above is present - if (maintenanceTask) - maintenanceTask->cancel(); +namespace { +struct ConnectionObserverImpl : public ConnectionObserver { + LinkRegistry& links; + ConnectionObserverImpl(LinkRegistry& l) : links(l) {} + void connection(Connection& c) { links.notifyConnection(c.getMgmtId(), &c); } + void opened(Connection& c) { links.notifyOpened(c.getMgmtId()); } + void closed(Connection& c) { links.notifyClosed(c.getMgmtId()); } + void forced(Connection& c, const string& text) { links.notifyConnectionForced(c.getMgmtId(), text); } +}; } -LinkRegistry::Periodic::Periodic (LinkRegistry& _links) : - TimerTask (Duration (LINK_MAINT_INTERVAL * TIME_SEC),"LinkRegistry"), links(_links) {} - -void LinkRegistry::Periodic::fire () +LinkRegistry::LinkRegistry (Broker* _broker) : + broker(_broker), + parent(0), store(0), passive(false), + realm(broker->getOptions().realm) { - links.periodicMaintenance (); - setupNextFire(); - links.timer->add(this); + broker->getConnectionObservers().add( + boost::shared_ptr<ConnectionObserver>(new ConnectionObserverImpl(*this))); } -void LinkRegistry::periodicMaintenance () -{ - Mutex::ScopedLock locker(lock); +LinkRegistry::~LinkRegistry() {} - linksToDestroy.clear(); - bridgesToDestroy.clear(); - if (passiveChanged) { - if (passive) { QPID_LOG(info, "Passivating links"); } - else { QPID_LOG(info, "Activating links"); } - for (LinkMap::iterator i = links.begin(); i != links.end(); i++) { - i->second->setPassive(passive); - } - passiveChanged = false; - } - for (LinkMap::iterator i = links.begin(); i != links.end(); i++) - i->second->maintenanceVisit(); - //now process any requests for re-addressing - for (AddressMap::iterator i = reMappings.begin(); i != reMappings.end(); i++) - updateAddress(i->first, i->second); - reMappings.clear(); -} void LinkRegistry::changeAddress(const qpid::Address& oldAddress, const qpid::Address& newAddress) { - //done on periodic maintenance thread; hold changes in separate - //map to avoid modifying the link map that is iterated over - reMappings[createKey(oldAddress)] = newAddress; -} - -bool LinkRegistry::updateAddress(const std::string& oldKey, const qpid::Address& newAddress) -{ + Mutex::ScopedLock locker(lock); + std::string oldKey = createKey(oldAddress); std::string newKey = createKey(newAddress); if (links.find(newKey) != links.end()) { QPID_LOG(error, "Attempted to update key from " << oldKey << " to " << newKey << " which is already in use"); - return false; } else { LinkMap::iterator i = links.find(oldKey); if (i == links.end()) { QPID_LOG(error, "Attempted to update key from " << oldKey << " which does not exist, to " << newKey); - return false; } else { links[newKey] = i->second; - i->second->reconnect(newAddress); links.erase(oldKey); QPID_LOG(info, "Updated link key from " << oldKey << " to " << newKey); - return true; } } } -pair<Link::shared_ptr, bool> LinkRegistry::declare(string& host, +pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& host, uint16_t port, - string& transport, + const string& transport, bool durable, - string& authMechanism, - string& username, - string& password) + const string& authMechanism, + const string& username, + const string& password) { Mutex::ScopedLock locker(lock); @@ -151,18 +114,20 @@ pair<Link::shared_ptr, bool> LinkRegistry::declare(string& host, return std::pair<Link::shared_ptr, bool>(i->second, false); } -pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host, +pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host, uint16_t port, bool durable, - std::string& src, - std::string& dest, - std::string& key, + const std::string& src, + const std::string& dest, + const std::string& key, bool isQueue, bool isLocal, - std::string& tag, - std::string& excludes, + const std::string& tag, + const std::string& excludes, bool dynamic, - uint16_t sync) + uint16_t sync, + Bridge::InitializeCallback init +) { Mutex::ScopedLock locker(lock); QPID_LOG(debug, "Bridge declared " << host << ": " << port << " from " << src << " to " << dest << " (" << key << ")"); @@ -196,7 +161,8 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host, bridge = Bridge::shared_ptr (new Bridge (l->second.get(), l->second->nextChannel(), boost::bind(&LinkRegistry::destroy, this, - host, port, src, dest, key), args)); + host, port, src, dest, key), + args, init)); bridges[bridgeKey] = bridge; l->second->add(bridge); return std::pair<Bridge::shared_ptr, bool>(bridge, true); @@ -214,7 +180,6 @@ void LinkRegistry::destroy(const string& host, const uint16_t port) { if (i->second->isDurable() && store) store->destroy(*(i->second)); - linksToDestroy[key] = i->second; links.erase(i); } } @@ -242,7 +207,6 @@ void LinkRegistry::destroy(const std::string& host, l->second->cancel(b->second); if (b->second->isDurable()) store->destroy(*(b->second)); - bridgesToDestroy[bridgeKey] = b->second; bridges.erase(b); } @@ -276,12 +240,17 @@ void LinkRegistry::notifyConnection(const std::string& key, Connection* c) { Link::shared_ptr link = findLink(key); if (link) { - link->established(); - link->setConnection(c); + link->established(c); c->setUserId(str(format("%1%@%2%") % link->getUsername() % realm)); } } +void LinkRegistry::notifyOpened(const std::string& key) +{ + Link::shared_ptr link = findLink(key); + if (link) link->opened(); +} + void LinkRegistry::notifyClosed(const std::string& key) { Link::shared_ptr link = findLink(key); @@ -384,9 +353,12 @@ std::string LinkRegistry::createKey(const std::string& host, uint16_t port) { void LinkRegistry::setPassive(bool p) { Mutex::ScopedLock locker(lock); - passiveChanged = p != passive; passive = p; - //will activate or passivate links on maintenance visit + if (passive) { QPID_LOG(info, "Passivating links"); } + else { QPID_LOG(info, "Activating links"); } + for (LinkMap::iterator i = links.begin(); i != links.end(); i++) { + i->second->setPassive(passive); + } } void LinkRegistry::eachLink(boost::function<void(boost::shared_ptr<Link>)> f) { |