summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/LinkRegistry.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/LinkRegistry.cpp126
1 files changed, 49 insertions, 77 deletions
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
index e9885f5462..a4fd90684e 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -35,102 +35,65 @@ using boost::format;
using boost::str;
namespace _qmf = qmf::org::apache::qpid::broker;
-#define LINK_MAINT_INTERVAL 2
-
// TODO: This constructor is only used by the store unit tests -
// That probably indicates that LinkRegistry isn't correctly
-// factored: The persistence element and maintenance element
-// should be factored separately
+// factored: The persistence element should be factored separately
LinkRegistry::LinkRegistry () :
- broker(0), timer(0),
- parent(0), store(0), passive(false), passiveChanged(false),
+ broker(0),
+ parent(0), store(0), passive(false),
realm("")
{
}
-LinkRegistry::LinkRegistry (Broker* _broker) :
- broker(_broker), timer(&broker->getTimer()),
- maintenanceTask(new Periodic(*this)),
- parent(0), store(0), passive(false), passiveChanged(false),
- realm(broker->getOptions().realm)
-{
- timer->add(maintenanceTask);
-}
-
-LinkRegistry::~LinkRegistry()
-{
- // This test is only necessary if the default constructor above is present
- if (maintenanceTask)
- maintenanceTask->cancel();
+namespace {
+struct ConnectionObserverImpl : public ConnectionObserver {
+ LinkRegistry& links;
+ ConnectionObserverImpl(LinkRegistry& l) : links(l) {}
+ void connection(Connection& c) { links.notifyConnection(c.getMgmtId(), &c); }
+ void opened(Connection& c) { links.notifyOpened(c.getMgmtId()); }
+ void closed(Connection& c) { links.notifyClosed(c.getMgmtId()); }
+ void forced(Connection& c, const string& text) { links.notifyConnectionForced(c.getMgmtId(), text); }
+};
}
-LinkRegistry::Periodic::Periodic (LinkRegistry& _links) :
- TimerTask (Duration (LINK_MAINT_INTERVAL * TIME_SEC),"LinkRegistry"), links(_links) {}
-
-void LinkRegistry::Periodic::fire ()
+LinkRegistry::LinkRegistry (Broker* _broker) :
+ broker(_broker),
+ parent(0), store(0), passive(false),
+ realm(broker->getOptions().realm)
{
- links.periodicMaintenance ();
- setupNextFire();
- links.timer->add(this);
+ broker->getConnectionObservers().add(
+ boost::shared_ptr<ConnectionObserver>(new ConnectionObserverImpl(*this)));
}
-void LinkRegistry::periodicMaintenance ()
-{
- Mutex::ScopedLock locker(lock);
+LinkRegistry::~LinkRegistry() {}
- linksToDestroy.clear();
- bridgesToDestroy.clear();
- if (passiveChanged) {
- if (passive) { QPID_LOG(info, "Passivating links"); }
- else { QPID_LOG(info, "Activating links"); }
- for (LinkMap::iterator i = links.begin(); i != links.end(); i++) {
- i->second->setPassive(passive);
- }
- passiveChanged = false;
- }
- for (LinkMap::iterator i = links.begin(); i != links.end(); i++)
- i->second->maintenanceVisit();
- //now process any requests for re-addressing
- for (AddressMap::iterator i = reMappings.begin(); i != reMappings.end(); i++)
- updateAddress(i->first, i->second);
- reMappings.clear();
-}
void LinkRegistry::changeAddress(const qpid::Address& oldAddress, const qpid::Address& newAddress)
{
- //done on periodic maintenance thread; hold changes in separate
- //map to avoid modifying the link map that is iterated over
- reMappings[createKey(oldAddress)] = newAddress;
-}
-
-bool LinkRegistry::updateAddress(const std::string& oldKey, const qpid::Address& newAddress)
-{
+ Mutex::ScopedLock locker(lock);
+ std::string oldKey = createKey(oldAddress);
std::string newKey = createKey(newAddress);
if (links.find(newKey) != links.end()) {
QPID_LOG(error, "Attempted to update key from " << oldKey << " to " << newKey << " which is already in use");
- return false;
} else {
LinkMap::iterator i = links.find(oldKey);
if (i == links.end()) {
QPID_LOG(error, "Attempted to update key from " << oldKey << " which does not exist, to " << newKey);
- return false;
} else {
links[newKey] = i->second;
- i->second->reconnect(newAddress);
links.erase(oldKey);
QPID_LOG(info, "Updated link key from " << oldKey << " to " << newKey);
- return true;
}
}
}
-pair<Link::shared_ptr, bool> LinkRegistry::declare(string& host,
+pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& host,
uint16_t port,
- string& transport,
+ const string& transport,
bool durable,
- string& authMechanism,
- string& username,
- string& password)
+ const string& authMechanism,
+ const string& username,
+ const string& password)
{
Mutex::ScopedLock locker(lock);
@@ -151,18 +114,20 @@ pair<Link::shared_ptr, bool> LinkRegistry::declare(string& host,
return std::pair<Link::shared_ptr, bool>(i->second, false);
}
-pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host,
+pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host,
uint16_t port,
bool durable,
- std::string& src,
- std::string& dest,
- std::string& key,
+ const std::string& src,
+ const std::string& dest,
+ const std::string& key,
bool isQueue,
bool isLocal,
- std::string& tag,
- std::string& excludes,
+ const std::string& tag,
+ const std::string& excludes,
bool dynamic,
- uint16_t sync)
+ uint16_t sync,
+ Bridge::InitializeCallback init
+)
{
Mutex::ScopedLock locker(lock);
QPID_LOG(debug, "Bridge declared " << host << ": " << port << " from " << src << " to " << dest << " (" << key << ")");
@@ -196,7 +161,8 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host,
bridge = Bridge::shared_ptr
(new Bridge (l->second.get(), l->second->nextChannel(),
boost::bind(&LinkRegistry::destroy, this,
- host, port, src, dest, key), args));
+ host, port, src, dest, key),
+ args, init));
bridges[bridgeKey] = bridge;
l->second->add(bridge);
return std::pair<Bridge::shared_ptr, bool>(bridge, true);
@@ -214,7 +180,6 @@ void LinkRegistry::destroy(const string& host, const uint16_t port)
{
if (i->second->isDurable() && store)
store->destroy(*(i->second));
- linksToDestroy[key] = i->second;
links.erase(i);
}
}
@@ -242,7 +207,6 @@ void LinkRegistry::destroy(const std::string& host,
l->second->cancel(b->second);
if (b->second->isDurable())
store->destroy(*(b->second));
- bridgesToDestroy[bridgeKey] = b->second;
bridges.erase(b);
}
@@ -276,12 +240,17 @@ void LinkRegistry::notifyConnection(const std::string& key, Connection* c)
{
Link::shared_ptr link = findLink(key);
if (link) {
- link->established();
- link->setConnection(c);
+ link->established(c);
c->setUserId(str(format("%1%@%2%") % link->getUsername() % realm));
}
}
+void LinkRegistry::notifyOpened(const std::string& key)
+{
+ Link::shared_ptr link = findLink(key);
+ if (link) link->opened();
+}
+
void LinkRegistry::notifyClosed(const std::string& key)
{
Link::shared_ptr link = findLink(key);
@@ -384,9 +353,12 @@ std::string LinkRegistry::createKey(const std::string& host, uint16_t port) {
void LinkRegistry::setPassive(bool p)
{
Mutex::ScopedLock locker(lock);
- passiveChanged = p != passive;
passive = p;
- //will activate or passivate links on maintenance visit
+ if (passive) { QPID_LOG(info, "Passivating links"); }
+ else { QPID_LOG(info, "Activating links"); }
+ for (LinkMap::iterator i = links.begin(); i != links.end(); i++) {
+ i->second->setPassive(passive);
+ }
}
void LinkRegistry::eachLink(boost::function<void(boost::shared_ptr<Link>)> f) {