summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/LinkRegistry.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/LinkRegistry.cpp273
1 files changed, 180 insertions, 93 deletions
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
index c6c5a1ac05..75c311c917 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -68,54 +68,92 @@ LinkRegistry::LinkRegistry (Broker* _broker) :
LinkRegistry::~LinkRegistry() {}
+/** find link by the *configured* remote address */
+boost::shared_ptr<Link> LinkRegistry::getLink(const std::string& host,
+ uint16_t port,
+ const std::string& transport)
+{
+ Mutex::ScopedLock locker(lock);
+ for (LinkMap::iterator i = links.begin(); i != links.end(); ++i) {
+ Link::shared_ptr& link = i->second;
+ if (link->getHost() == host &&
+ link->getPort() == port &&
+ (transport.empty() || link->getTransport() == transport))
+ return link;
+ }
+ return boost::shared_ptr<Link>();
+}
-void LinkRegistry::changeAddress(const qpid::Address& oldAddress, const qpid::Address& newAddress)
+/** find link by name */
+boost::shared_ptr<Link> LinkRegistry::getLink(const std::string& name)
{
Mutex::ScopedLock locker(lock);
- std::string oldKey = createKey(oldAddress);
- std::string newKey = createKey(newAddress);
- if (links.find(newKey) != links.end()) {
- QPID_LOG(error, "Attempted to update key from " << oldKey << " to " << newKey << " which is already in use");
- } else {
- LinkMap::iterator i = links.find(oldKey);
- if (i == links.end()) {
- QPID_LOG(error, "Attempted to update key from " << oldKey << " which does not exist, to " << newKey);
- } else {
- links[newKey] = i->second;
- links.erase(oldKey);
- QPID_LOG(info, "Updated link key from " << oldKey << " to " << newKey);
- }
- }
+ LinkMap::iterator l = links.find(name);
+ if (l != links.end())
+ return l->second;
+ return boost::shared_ptr<Link>();
}
-pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& host,
+pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& name,
+ const string& host,
uint16_t port,
const string& transport,
bool durable,
const string& authMechanism,
const string& username,
- const string& password)
+ const string& password,
+ bool failover)
{
Mutex::ScopedLock locker(lock);
- string key = createKey(host, port);
- LinkMap::iterator i = links.find(key);
+ LinkMap::iterator i = links.find(name);
if (i == links.end())
{
Link::shared_ptr link;
- link = Link::shared_ptr (new Link (this, store, host, port, transport, durable,
- authMechanism, username, password,
- broker, parent));
- links[key] = link;
+ link = Link::shared_ptr (
+ new Link (name, this, host, port, transport,
+ boost::bind(&LinkRegistry::linkDestroyed, this, _1),
+ durable, authMechanism, username, password, broker,
+ parent, failover));
+ if (durable && store) store->create(*link);
+ links[name] = link;
+ QPID_LOG(debug, "Creating new link; name=" << name );
return std::pair<Link::shared_ptr, bool>(link, true);
}
return std::pair<Link::shared_ptr, bool>(i->second, false);
}
-pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host,
- uint16_t port,
+/** find bridge by link & route info */
+Bridge::shared_ptr LinkRegistry::getBridge(const Link& link,
+ const std::string& src,
+ const std::string& dest,
+ const std::string& key)
+{
+ Mutex::ScopedLock locker(lock);
+ for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) {
+ if (i->second->getSrc() == src && i->second->getDest() == dest &&
+ i->second->getKey() == key && i->second->getLink() &&
+ i->second->getLink()->getName() == link.getName()) {
+ return i->second;
+ }
+ }
+ return Bridge::shared_ptr();
+}
+
+/** find bridge by name */
+Bridge::shared_ptr LinkRegistry::getBridge(const std::string& name)
+{
+ Mutex::ScopedLock locker(lock);
+ BridgeMap::iterator b = bridges.find(name);
+ if (b != bridges.end())
+ return b->second;
+ return Bridge::shared_ptr();
+}
+
+pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& name,
+ Link& link,
bool durable,
const std::string& src,
const std::string& dest,
@@ -126,22 +164,32 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host,
const std::string& excludes,
bool dynamic,
uint16_t sync,
- Bridge::InitializeCallback init
+ Bridge::InitializeCallback init,
+ const std::string& queueName,
+ const std::string& altExchange
)
{
Mutex::ScopedLock locker(lock);
- QPID_LOG(debug, "Bridge declared " << host << ": " << port << " from " << src << " to " << dest << " (" << key << ")");
- string linkKey = createKey(host, port);
- stringstream keystream;
- keystream << linkKey << "!" << src << "!" << dest << "!" << key;
- string bridgeKey = keystream.str();
+ // Durable bridges are only valid on durable links
+ if (durable && !link.isDurable()) {
+ QPID_LOG(error, "Can't create a durable route '" << name << "' on a non-durable link '" << link.getName());
+ return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false);
+ }
- LinkMap::iterator l = links.find(linkKey);
- if (l == links.end())
- return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false);
+ if (dynamic) {
+ Exchange::shared_ptr exchange = broker->getExchanges().get(src);
+ if (exchange.get() == 0) {
+ QPID_LOG(error, "Exchange not found, name='" << src << "'" );
+ return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false);
+ }
+ if (!exchange->supportsDynamicBinding()) {
+ QPID_LOG(error, "Exchange type does not support dynamic routing, name='" << src << "'");
+ return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false);
+ }
+ }
- BridgeMap::iterator b = bridges.find(bridgeKey);
+ BridgeMap::iterator b = bridges.find(name);
if (b == bridges.end())
{
_qmf::ArgsLinkBridge args;
@@ -159,23 +207,29 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host,
args.i_sync = sync;
bridge = Bridge::shared_ptr
- (new Bridge (l->second.get(), l->second->nextChannel(),
- boost::bind(&LinkRegistry::destroy, this,
- host, port, src, dest, key),
- args, init));
- bridges[bridgeKey] = bridge;
- l->second->add(bridge);
+ (new Bridge (name, &link, link.nextChannel(),
+ boost::bind(&LinkRegistry::destroyBridge, this, _1),
+ args, init, queueName, altExchange));
+ bridges[name] = bridge;
+ link.add(bridge);
+ if (durable && store)
+ store->create(*bridge);
+
+ QPID_LOG(debug, "Bridge '" << name <<"' declared on link '" << link.getName() <<
+ "' from " << src << " to " << dest << " (" << key << ")");
+
return std::pair<Bridge::shared_ptr, bool>(bridge, true);
}
return std::pair<Bridge::shared_ptr, bool>(b->second, false);
}
-void LinkRegistry::destroy(const string& host, const uint16_t port)
+/** called back by the link when it has completed its cleanup and can be removed. */
+void LinkRegistry::linkDestroyed(Link *link)
{
+ QPID_LOG(debug, "LinkRegistry::destroy(); link= " << link->getName());
Mutex::ScopedLock locker(lock);
- string key = createKey(host, port);
- LinkMap::iterator i = links.find(key);
+ LinkMap::iterator i = links.find(link->getName());
if (i != links.end())
{
if (i->second->isDurable() && store)
@@ -184,27 +238,20 @@ void LinkRegistry::destroy(const string& host, const uint16_t port)
}
}
-void LinkRegistry::destroy(const std::string& host,
- const uint16_t port,
- const std::string& src,
- const std::string& dest,
- const std::string& key)
+/** called back by bridge when its destruction has been requested */
+void LinkRegistry::destroyBridge(Bridge *bridge)
{
+ QPID_LOG(debug, "LinkRegistry::destroy(); bridge= " << bridge->getName());
Mutex::ScopedLock locker(lock);
- string linkKey = createKey(host, port);
- stringstream keystream;
- keystream << linkKey << "!" << src << "!" << dest << "!" << key;
- string bridgeKey = keystream.str();
- LinkMap::iterator l = links.find(linkKey);
- if (l == links.end())
- return;
-
- BridgeMap::iterator b = bridges.find(bridgeKey);
+ BridgeMap::iterator b = bridges.find(bridge->getName());
if (b == bridges.end())
return;
- l->second->cancel(b->second);
+ Link *link = b->second->getLink();
+ if (link) {
+ link->cancel(b->second);
+ }
if (b->second->isDurable())
store->destroy(*(b->second));
bridges.erase(b);
@@ -219,28 +266,73 @@ MessageStore* LinkRegistry::getStore() const {
return store;
}
-Link::shared_ptr LinkRegistry::findLink(const std::string& keyOrMgmtId)
-{
- // Convert keyOrMgmtId to a host:port key.
- //
- // TODO aconway 2011-02-01: centralize code that constructs/parses
- // connection management IDs. Currently sys:: protocol factories
- // and IO plugins construct the IDs and LinkRegistry parses them.
- size_t separator = keyOrMgmtId.find('-');
- if (separator == std::string::npos) separator = 0;
- std::string key = keyOrMgmtId.substr(separator+1, std::string::npos);
+namespace {
+ void extractHostPort(const std::string& connId, std::string *host, uint16_t *port)
+ {
+ // Extract host and port of remote broker from connection id string.
+ //
+ // TODO aconway 2011-02-01: centralize code that constructs/parses connection
+ // management IDs. Currently sys:: protocol factories and IO plugins construct the
+ // IDs and LinkRegistry parses them.
+ // KAG: current connection id format assumed:
+ // "localhost:port-remotehost:port". In the case of IpV6, the host addresses are
+ // contained within brackets "[...]", example:
+ // connId="[::1]:36859-[::1]:48603". Liberal use of "asserts" provided to alert us
+ // if this assumption changes!
+ size_t separator = connId.find('-');
+ assert(separator != std::string::npos);
+ std::string remote = connId.substr(separator+1, std::string::npos);
+ separator = remote.rfind(":");
+ assert(separator != std::string::npos);
+ *host = remote.substr(0, separator);
+ // IPv6 - host is bracketed by "[]", strip them
+ if ((*host)[0] == '[' && (*host)[host->length() - 1] == ']') {
+ *host = host->substr(1, host->length() - 2);
+ }
+ try {
+ *port = boost::lexical_cast<uint16_t>(remote.substr(separator+1, std::string::npos));
+ } catch (const boost::bad_lexical_cast&) {
+ QPID_LOG(error, "Invalid format for connection identifier! '" << connId << "'");
+ assert(false);
+ }
+ }
+}
+/** find the Link that corresponds to the given connection */
+Link::shared_ptr LinkRegistry::findLink(const std::string& connId)
+{
Mutex::ScopedLock locker(lock);
- LinkMap::iterator l = links.find(key);
- if (l != links.end()) return l->second;
- else return Link::shared_ptr();
+ ConnectionMap::iterator c = connections.find(connId);
+ if (c != connections.end()) {
+ LinkMap::iterator l = links.find(c->second);
+ if (l != links.end())
+ return l->second;
+ }
+ return Link::shared_ptr();
}
void LinkRegistry::notifyConnection(const std::string& key, Connection* c)
{
- Link::shared_ptr link = findLink(key);
+ // find a link that is attempting to connect to the remote, and
+ // create a mapping from connection id to link
+ QPID_LOG(debug, "LinkRegistry::notifyConnection(); key=" << key );
+ std::string host;
+ uint16_t port;
+ extractHostPort( key, &host, &port );
+ Link::shared_ptr link;
+ {
+ Mutex::ScopedLock locker(lock);
+ for (LinkMap::iterator l = links.begin(); l != links.end(); ++l) {
+ if (l->second->pendingConnection(host, port)) {
+ link = l->second;
+ connections[key] = link->getName();
+ link->established(c);
+ break;
+ }
+ }
+ }
+
if (link) {
- link->established(c);
c->setUserId(str(format("%1%@%2%") % link->getUsername() % realm));
}
}
@@ -299,22 +391,29 @@ std::string LinkRegistry::getUsername(const std::string& key)
return link->getUsername();
}
+/** note: returns the current remote host (may be different from the host originally
+ configured for the Link due to failover) */
std::string LinkRegistry::getHost(const std::string& key)
{
- Link::shared_ptr link = findLink(key);
- if (!link)
- return string();
+ Link::shared_ptr link = findLink(key);
+ if (!link)
+ return string();
- return link->getHost();
+ qpid::Address addr;
+ link->getRemoteAddress(addr);
+ return addr.host;
}
+/** returns the current remote port (ditto above) */
uint16_t LinkRegistry::getPort(const std::string& key)
{
Link::shared_ptr link = findLink(key);
if (!link)
return 0;
- return link->getPort();
+ qpid::Address addr;
+ link->getRemoteAddress(addr);
+ return addr.port;
}
std::string LinkRegistry::getPassword(const std::string& key)
@@ -336,20 +435,6 @@ std::string LinkRegistry::getAuthIdentity(const std::string& key)
}
-std::string LinkRegistry::createKey(const qpid::Address& a) {
- // TODO aconway 2010-05-11: key should also include protocol/transport to
- // be unique. Requires refactor of LinkRegistry interface.
- return createKey(a.host, a.port);
-}
-
-std::string LinkRegistry::createKey(const std::string& host, uint16_t port) {
- // TODO aconway 2010-05-11: key should also include protocol/transport to
- // be unique. Requires refactor of LinkRegistry interface.
- stringstream keystream;
- keystream << host << ":" << port;
- return keystream.str();
-}
-
void LinkRegistry::setPassive(bool p)
{
Mutex::ScopedLock locker(lock);
@@ -362,10 +447,12 @@ void LinkRegistry::setPassive(bool p)
}
void LinkRegistry::eachLink(boost::function<void(boost::shared_ptr<Link>)> f) {
+ Mutex::ScopedLock locker(lock);
for (LinkMap::iterator i = links.begin(); i != links.end(); ++i) f(i->second);
}
void LinkRegistry::eachBridge(boost::function<void(boost::shared_ptr<Bridge>)> f) {
+ Mutex::ScopedLock locker(lock);
for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) f(i->second);
}