diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/LinkRegistry.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/LinkRegistry.cpp | 399 |
1 files changed, 399 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp new file mode 100644 index 0000000000..e9885f5462 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp @@ -0,0 +1,399 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/LinkRegistry.h" +#include "qpid/broker/Link.h" +#include "qpid/broker/Connection.h" +#include "qpid/log/Statement.h" +#include <iostream> +#include <boost/format.hpp> + +using namespace qpid::broker; +using namespace qpid::sys; +using std::string; +using std::pair; +using std::stringstream; +using boost::intrusive_ptr; +using boost::format; +using boost::str; +namespace _qmf = qmf::org::apache::qpid::broker; + +#define LINK_MAINT_INTERVAL 2 + +// TODO: This constructor is only used by the store unit tests - +// That probably indicates that LinkRegistry isn't correctly +// factored: The persistence element and maintenance element +// should be factored separately +LinkRegistry::LinkRegistry () : + broker(0), timer(0), + parent(0), store(0), passive(false), passiveChanged(false), + realm("") +{ +} + +LinkRegistry::LinkRegistry (Broker* _broker) : + broker(_broker), timer(&broker->getTimer()), + maintenanceTask(new Periodic(*this)), + parent(0), store(0), passive(false), passiveChanged(false), + realm(broker->getOptions().realm) +{ + timer->add(maintenanceTask); +} + +LinkRegistry::~LinkRegistry() +{ + // This test is only necessary if the default constructor above is present + if (maintenanceTask) + maintenanceTask->cancel(); +} + +LinkRegistry::Periodic::Periodic (LinkRegistry& _links) : + TimerTask (Duration (LINK_MAINT_INTERVAL * TIME_SEC),"LinkRegistry"), links(_links) {} + +void LinkRegistry::Periodic::fire () +{ + links.periodicMaintenance (); + setupNextFire(); + links.timer->add(this); +} + +void LinkRegistry::periodicMaintenance () +{ + Mutex::ScopedLock locker(lock); + + linksToDestroy.clear(); + bridgesToDestroy.clear(); + if (passiveChanged) { + if (passive) { QPID_LOG(info, "Passivating links"); } + else { QPID_LOG(info, "Activating links"); } + for (LinkMap::iterator i = links.begin(); i != links.end(); i++) { + i->second->setPassive(passive); + } + passiveChanged = false; + } + for (LinkMap::iterator i = links.begin(); i != links.end(); i++) + i->second->maintenanceVisit(); + //now process any requests for re-addressing + for (AddressMap::iterator i = reMappings.begin(); i != reMappings.end(); i++) + updateAddress(i->first, i->second); + reMappings.clear(); +} + +void LinkRegistry::changeAddress(const qpid::Address& oldAddress, const qpid::Address& newAddress) +{ + //done on periodic maintenance thread; hold changes in separate + //map to avoid modifying the link map that is iterated over + reMappings[createKey(oldAddress)] = newAddress; +} + +bool LinkRegistry::updateAddress(const std::string& oldKey, const qpid::Address& newAddress) +{ + std::string newKey = createKey(newAddress); + if (links.find(newKey) != links.end()) { + QPID_LOG(error, "Attempted to update key from " << oldKey << " to " << newKey << " which is already in use"); + return false; + } else { + LinkMap::iterator i = links.find(oldKey); + if (i == links.end()) { + QPID_LOG(error, "Attempted to update key from " << oldKey << " which does not exist, to " << newKey); + return false; + } else { + links[newKey] = i->second; + i->second->reconnect(newAddress); + links.erase(oldKey); + QPID_LOG(info, "Updated link key from " << oldKey << " to " << newKey); + return true; + } + } +} + +pair<Link::shared_ptr, bool> LinkRegistry::declare(string& host, + uint16_t port, + string& transport, + bool durable, + string& authMechanism, + string& username, + string& password) + +{ + Mutex::ScopedLock locker(lock); + string key = createKey(host, port); + + LinkMap::iterator i = links.find(key); + if (i == links.end()) + { + Link::shared_ptr link; + + link = Link::shared_ptr (new Link (this, store, host, port, transport, durable, + authMechanism, username, password, + broker, parent)); + if (passive) link->setPassive(true); + links[key] = link; + return std::pair<Link::shared_ptr, bool>(link, true); + } + return std::pair<Link::shared_ptr, bool>(i->second, false); +} + +pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host, + uint16_t port, + bool durable, + std::string& src, + std::string& dest, + std::string& key, + bool isQueue, + bool isLocal, + std::string& tag, + std::string& excludes, + bool dynamic, + uint16_t sync) +{ + Mutex::ScopedLock locker(lock); + QPID_LOG(debug, "Bridge declared " << host << ": " << port << " from " << src << " to " << dest << " (" << key << ")"); + + string linkKey = createKey(host, port); + stringstream keystream; + keystream << linkKey << "!" << src << "!" << dest << "!" << key; + string bridgeKey = keystream.str(); + + LinkMap::iterator l = links.find(linkKey); + if (l == links.end()) + return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false); + + BridgeMap::iterator b = bridges.find(bridgeKey); + if (b == bridges.end()) + { + _qmf::ArgsLinkBridge args; + Bridge::shared_ptr bridge; + + args.i_durable = durable; + args.i_src = src; + args.i_dest = dest; + args.i_key = key; + args.i_srcIsQueue = isQueue; + args.i_srcIsLocal = isLocal; + args.i_tag = tag; + args.i_excludes = excludes; + args.i_dynamic = dynamic; + args.i_sync = sync; + + bridge = Bridge::shared_ptr + (new Bridge (l->second.get(), l->second->nextChannel(), + boost::bind(&LinkRegistry::destroy, this, + host, port, src, dest, key), args)); + bridges[bridgeKey] = bridge; + l->second->add(bridge); + return std::pair<Bridge::shared_ptr, bool>(bridge, true); + } + return std::pair<Bridge::shared_ptr, bool>(b->second, false); +} + +void LinkRegistry::destroy(const string& host, const uint16_t port) +{ + Mutex::ScopedLock locker(lock); + string key = createKey(host, port); + + LinkMap::iterator i = links.find(key); + if (i != links.end()) + { + if (i->second->isDurable() && store) + store->destroy(*(i->second)); + linksToDestroy[key] = i->second; + links.erase(i); + } +} + +void LinkRegistry::destroy(const std::string& host, + const uint16_t port, + const std::string& src, + const std::string& dest, + const std::string& key) +{ + Mutex::ScopedLock locker(lock); + string linkKey = createKey(host, port); + stringstream keystream; + keystream << linkKey << "!" << src << "!" << dest << "!" << key; + string bridgeKey = keystream.str(); + + LinkMap::iterator l = links.find(linkKey); + if (l == links.end()) + return; + + BridgeMap::iterator b = bridges.find(bridgeKey); + if (b == bridges.end()) + return; + + l->second->cancel(b->second); + if (b->second->isDurable()) + store->destroy(*(b->second)); + bridgesToDestroy[bridgeKey] = b->second; + bridges.erase(b); +} + +void LinkRegistry::setStore (MessageStore* _store) +{ + store = _store; +} + +MessageStore* LinkRegistry::getStore() const { + return store; +} + +Link::shared_ptr LinkRegistry::findLink(const std::string& keyOrMgmtId) +{ + // Convert keyOrMgmtId to a host:port key. + // + // TODO aconway 2011-02-01: centralize code that constructs/parses + // connection management IDs. Currently sys:: protocol factories + // and IO plugins construct the IDs and LinkRegistry parses them. + size_t separator = keyOrMgmtId.find('-'); + if (separator == std::string::npos) separator = 0; + std::string key = keyOrMgmtId.substr(separator+1, std::string::npos); + + Mutex::ScopedLock locker(lock); + LinkMap::iterator l = links.find(key); + if (l != links.end()) return l->second; + else return Link::shared_ptr(); +} + +void LinkRegistry::notifyConnection(const std::string& key, Connection* c) +{ + Link::shared_ptr link = findLink(key); + if (link) { + link->established(); + link->setConnection(c); + c->setUserId(str(format("%1%@%2%") % link->getUsername() % realm)); + } +} + +void LinkRegistry::notifyClosed(const std::string& key) +{ + Link::shared_ptr link = findLink(key); + if (link) { + link->closed(0, "Closed by peer"); + } +} + +void LinkRegistry::notifyConnectionForced(const std::string& key, const std::string& text) +{ + Link::shared_ptr link = findLink(key); + if (link) { + link->notifyConnectionForced(text); + } +} + +std::string LinkRegistry::getAuthMechanism(const std::string& key) +{ + Link::shared_ptr link = findLink(key); + if (link) + return link->getAuthMechanism(); + return string("ANONYMOUS"); +} + +std::string LinkRegistry::getAuthCredentials(const std::string& key) +{ + Link::shared_ptr link = findLink(key); + if (!link) + return string(); + + string result; + result += '\0'; + result += link->getUsername(); + result += '\0'; + result += link->getPassword(); + + return result; +} + +std::string LinkRegistry::getUsername(const std::string& key) +{ + Link::shared_ptr link = findLink(key); + if (!link) + return string(); + + return link->getUsername(); +} + +std::string LinkRegistry::getHost(const std::string& key) +{ + Link::shared_ptr link = findLink(key); + if (!link) + return string(); + + return link->getHost(); +} + +uint16_t LinkRegistry::getPort(const std::string& key) +{ + Link::shared_ptr link = findLink(key); + if (!link) + return 0; + + return link->getPort(); +} + +std::string LinkRegistry::getPassword(const std::string& key) +{ + Link::shared_ptr link = findLink(key); + if (!link) + return string(); + + return link->getPassword(); +} + +std::string LinkRegistry::getAuthIdentity(const std::string& key) +{ + Link::shared_ptr link = findLink(key); + if (!link) + return string(); + + return link->getUsername(); +} + + +std::string LinkRegistry::createKey(const qpid::Address& a) { + // TODO aconway 2010-05-11: key should also include protocol/transport to + // be unique. Requires refactor of LinkRegistry interface. + return createKey(a.host, a.port); +} + +std::string LinkRegistry::createKey(const std::string& host, uint16_t port) { + // TODO aconway 2010-05-11: key should also include protocol/transport to + // be unique. Requires refactor of LinkRegistry interface. + stringstream keystream; + keystream << host << ":" << port; + return keystream.str(); +} + +void LinkRegistry::setPassive(bool p) +{ + Mutex::ScopedLock locker(lock); + passiveChanged = p != passive; + passive = p; + //will activate or passivate links on maintenance visit +} + +void LinkRegistry::eachLink(boost::function<void(boost::shared_ptr<Link>)> f) { + for (LinkMap::iterator i = links.begin(); i != links.end(); ++i) f(i->second); +} + +void LinkRegistry::eachBridge(boost::function<void(boost::shared_ptr<Bridge>)> f) { + for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) f(i->second); +} + |