summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/LinkRegistry.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/LinkRegistry.cpp')
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.cpp95
1 files changed, 90 insertions, 5 deletions
diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp
index 6e20a3f7ce..be3c67077e 100644
--- a/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -46,15 +46,21 @@ void LinkRegistry::Periodic::fire ()
void LinkRegistry::periodicMaintenance ()
{
Mutex::ScopedLock locker(lock);
+
linksToDestroy.clear();
+ bridgesToDestroy.clear();
for (LinkMap::iterator i = links.begin(); i != links.end(); i++)
i->second->maintenanceVisit();
}
-pair<Link::shared_ptr, bool> LinkRegistry::declare(std::string& host,
- uint16_t port,
- bool useSsl,
- bool durable)
+pair<Link::shared_ptr, bool> LinkRegistry::declare(string& host,
+ uint16_t port,
+ bool useSsl,
+ bool durable,
+ string& authMechanism,
+ string& username,
+ string& password)
+
{
Mutex::ScopedLock locker(lock);
stringstream keystream;
@@ -66,13 +72,64 @@ pair<Link::shared_ptr, bool> LinkRegistry::declare(std::string& host,
{
Link::shared_ptr link;
- link = Link::shared_ptr (new Link (this, host, port, useSsl, durable, broker, parent));
+ link = Link::shared_ptr (new Link (this, store, host, port, useSsl, durable,
+ authMechanism, username, password,
+ broker, parent));
links[key] = link;
return std::pair<Link::shared_ptr, bool>(link, true);
}
return std::pair<Link::shared_ptr, bool>(i->second, false);
}
+pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host,
+ uint16_t port,
+ bool durable,
+ std::string& src,
+ std::string& dest,
+ std::string& key,
+ bool is_queue,
+ bool is_local,
+ std::string& tag,
+ std::string& excludes)
+{
+ Mutex::ScopedLock locker(lock);
+ stringstream keystream;
+ keystream << host << ":" << port;
+ string linkKey = string(keystream.str());
+
+ keystream << "!" << src << "!" << dest << "!" << key;
+ string bridgeKey = string(keystream.str());
+
+ LinkMap::iterator l = links.find(linkKey);
+ if (l == links.end())
+ return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false);
+
+ BridgeMap::iterator b = bridges.find(bridgeKey);
+ if (b == bridges.end())
+ {
+ management::ArgsLinkBridge args;
+ Bridge::shared_ptr bridge;
+
+ args.i_durable = durable;
+ args.i_src = src;
+ args.i_dest = dest;
+ args.i_key = key;
+ args.i_src_is_queue = is_queue;
+ args.i_src_is_local = is_local;
+ args.i_tag = tag;
+ args.i_excludes = excludes;
+
+ bridge = Bridge::shared_ptr
+ (new Bridge (l->second.get(), l->second->nextChannel(),
+ boost::bind(&LinkRegistry::destroy, this,
+ host, port, src, dest, key), args));
+ bridges[bridgeKey] = bridge;
+ l->second->add(bridge);
+ return std::pair<Bridge::shared_ptr, bool>(bridge, true);
+ }
+ return std::pair<Bridge::shared_ptr, bool>(b->second, false);
+}
+
void LinkRegistry::destroy(const string& host, const uint16_t port)
{
Mutex::ScopedLock locker(lock);
@@ -90,6 +147,34 @@ void LinkRegistry::destroy(const string& host, const uint16_t port)
}
}
+void LinkRegistry::destroy(const std::string& host,
+ const uint16_t port,
+ const std::string& src,
+ const std::string& dest,
+ const std::string& key)
+{
+ Mutex::ScopedLock locker(lock);
+ stringstream keystream;
+ keystream << host << ":" << port;
+ string linkKey = string(keystream.str());
+
+ LinkMap::iterator l = links.find(linkKey);
+ if (l == links.end())
+ return;
+
+ keystream << "!" << src << "!" << dest << "!" << key;
+ string bridgeKey = string(keystream.str());
+ BridgeMap::iterator b = bridges.find(bridgeKey);
+ if (b == bridges.end())
+ return;
+
+ l->second->cancel(b->second);
+ if (b->second->isDurable())
+ store->destroy(*(b->second));
+ bridgesToDestroy[bridgeKey] = b->second;
+ bridges.erase(b);
+}
+
void LinkRegistry::setStore (MessageStore* _store)
{
assert (store == 0 && _store != 0);