diff options
Diffstat (limited to 'cpp/src/qpid/broker/LinkRegistry.cpp')
-rw-r--r-- | cpp/src/qpid/broker/LinkRegistry.cpp | 95 |
1 files changed, 90 insertions, 5 deletions
diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp index 6e20a3f7ce..be3c67077e 100644 --- a/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/cpp/src/qpid/broker/LinkRegistry.cpp @@ -46,15 +46,21 @@ void LinkRegistry::Periodic::fire () void LinkRegistry::periodicMaintenance () { Mutex::ScopedLock locker(lock); + linksToDestroy.clear(); + bridgesToDestroy.clear(); for (LinkMap::iterator i = links.begin(); i != links.end(); i++) i->second->maintenanceVisit(); } -pair<Link::shared_ptr, bool> LinkRegistry::declare(std::string& host, - uint16_t port, - bool useSsl, - bool durable) +pair<Link::shared_ptr, bool> LinkRegistry::declare(string& host, + uint16_t port, + bool useSsl, + bool durable, + string& authMechanism, + string& username, + string& password) + { Mutex::ScopedLock locker(lock); stringstream keystream; @@ -66,13 +72,64 @@ pair<Link::shared_ptr, bool> LinkRegistry::declare(std::string& host, { Link::shared_ptr link; - link = Link::shared_ptr (new Link (this, host, port, useSsl, durable, broker, parent)); + link = Link::shared_ptr (new Link (this, store, host, port, useSsl, durable, + authMechanism, username, password, + broker, parent)); links[key] = link; return std::pair<Link::shared_ptr, bool>(link, true); } return std::pair<Link::shared_ptr, bool>(i->second, false); } +pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host, + uint16_t port, + bool durable, + std::string& src, + std::string& dest, + std::string& key, + bool is_queue, + bool is_local, + std::string& tag, + std::string& excludes) +{ + Mutex::ScopedLock locker(lock); + stringstream keystream; + keystream << host << ":" << port; + string linkKey = string(keystream.str()); + + keystream << "!" << src << "!" << dest << "!" << key; + string bridgeKey = string(keystream.str()); + + LinkMap::iterator l = links.find(linkKey); + if (l == links.end()) + return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false); + + BridgeMap::iterator b = bridges.find(bridgeKey); + if (b == bridges.end()) + { + management::ArgsLinkBridge args; + Bridge::shared_ptr bridge; + + args.i_durable = durable; + args.i_src = src; + args.i_dest = dest; + args.i_key = key; + args.i_src_is_queue = is_queue; + args.i_src_is_local = is_local; + args.i_tag = tag; + args.i_excludes = excludes; + + bridge = Bridge::shared_ptr + (new Bridge (l->second.get(), l->second->nextChannel(), + boost::bind(&LinkRegistry::destroy, this, + host, port, src, dest, key), args)); + bridges[bridgeKey] = bridge; + l->second->add(bridge); + return std::pair<Bridge::shared_ptr, bool>(bridge, true); + } + return std::pair<Bridge::shared_ptr, bool>(b->second, false); +} + void LinkRegistry::destroy(const string& host, const uint16_t port) { Mutex::ScopedLock locker(lock); @@ -90,6 +147,34 @@ void LinkRegistry::destroy(const string& host, const uint16_t port) } } +void LinkRegistry::destroy(const std::string& host, + const uint16_t port, + const std::string& src, + const std::string& dest, + const std::string& key) +{ + Mutex::ScopedLock locker(lock); + stringstream keystream; + keystream << host << ":" << port; + string linkKey = string(keystream.str()); + + LinkMap::iterator l = links.find(linkKey); + if (l == links.end()) + return; + + keystream << "!" << src << "!" << dest << "!" << key; + string bridgeKey = string(keystream.str()); + BridgeMap::iterator b = bridges.find(bridgeKey); + if (b == bridges.end()) + return; + + l->second->cancel(b->second); + if (b->second->isDurable()) + store->destroy(*(b->second)); + bridgesToDestroy[bridgeKey] = b->second; + bridges.erase(b); +} + void LinkRegistry::setStore (MessageStore* _store) { assert (store == 0 && _store != 0); |