summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/LinkRegistry.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/LinkRegistry.cpp')
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.cpp94
1 files changed, 19 insertions, 75 deletions
diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp
index a79081b8ed..d048b9c05f 100644
--- a/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -19,8 +19,10 @@
*
*/
#include "qpid/broker/LinkRegistry.h"
-#include "qpid/broker/Link.h"
+
+#include "qpid/broker/Broker.h"
#include "qpid/broker/Connection.h"
+#include "qpid/broker/Link.h"
#include "qpid/log/Statement.h"
#include <iostream>
#include <boost/format.hpp>
@@ -42,8 +44,8 @@ namespace _qmf = qmf::org::apache::qpid::broker;
// factored: The persistence element should be factored separately
LinkRegistry::LinkRegistry () :
broker(0),
-// parent(0), store(0), passive(false),
- parent(0), asyncStore(0), passive(false),
+// parent(0), store(0),
+ parent(0), asyncStore(0),
realm("")
{
}
@@ -60,7 +62,8 @@ class LinkRegistryConnectionObserver : public ConnectionObserver {
LinkRegistry::LinkRegistry (Broker* _broker) :
broker(_broker),
- parent(0), asyncStore(0), passive(false),
+// parent(0), store(0),
+ parent(0), asyncStore(0),
realm(broker->getOptions().realm)
{
broker->getConnectionObservers().add(
@@ -118,10 +121,9 @@ pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& name,
boost::bind(&LinkRegistry::linkDestroyed, this, _1),
durable, authMechanism, username, password, broker,
parent, failover));
-// if (durable && store) store->create(*link);
- if (durable && asyncStore) {
-// store->create(*link);
- // TODO: kpvdr: async create config (link)
+ if (durable && asyncStore && !broker->inRecovery()) {
+ //store->create(*link);
+ // TODO: kpvdr: async create config (link)
}
links[name] = link;
pendingLinks[name] = link;
@@ -218,9 +220,8 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& name,
args, init, queueName, altExchange));
bridges[name] = bridge;
link.add(bridge);
-// if (durable && store)
- if (durable && asyncStore) {
-// store->create(*bridge);
+ if (durable && asyncStore && !broker->inRecovery()) {
+ //store->create(*bridge);
// TODO: kpvdr: Async create config (bridge)
}
@@ -264,6 +265,7 @@ void LinkRegistry::destroyBridge(Bridge *bridge)
Link *link = b->second->getLink();
if (link) {
link->cancel(b->second);
+ link->returnChannel( bridge->getChannel() );
}
// if (b->second->isDurable())
if (b->second->isDurable()) {
@@ -283,38 +285,6 @@ AsyncStore* LinkRegistry::getStore() const {
return asyncStore;
}
-namespace {
- void extractHostPort(const std::string& connId, std::string *host, uint16_t *port)
- {
- // Extract host and port of remote broker from connection id string.
- //
- // TODO aconway 2011-02-01: centralize code that constructs/parses connection
- // management IDs. Currently sys:: protocol factories and IO plugins construct the
- // IDs and LinkRegistry parses them.
- // KAG: current connection id format assumed:
- // "localhost:port-remotehost:port". In the case of IpV6, the host addresses are
- // contained within brackets "[...]", example:
- // connId="[::1]:36859-[::1]:48603". Liberal use of "asserts" provided to alert us
- // if this assumption changes!
- size_t separator = connId.find('-');
- assert(separator != std::string::npos);
- std::string remote = connId.substr(separator+1, std::string::npos);
- separator = remote.rfind(":");
- assert(separator != std::string::npos);
- *host = remote.substr(0, separator);
- // IPv6 - host is bracketed by "[]", strip them
- if ((*host)[0] == '[' && (*host)[host->length() - 1] == ']') {
- *host = host->substr(1, host->length() - 2);
- }
- try {
- *port = boost::lexical_cast<uint16_t>(remote.substr(separator+1, std::string::npos));
- } catch (const boost::bad_lexical_cast&) {
- QPID_LOG(error, "Invalid format for connection identifier! '" << connId << "'");
- assert(false);
- }
- }
-}
-
/** find the Link that corresponds to the given connection */
Link::shared_ptr LinkRegistry::findLink(const std::string& connId)
{
@@ -334,19 +304,15 @@ void LinkRegistry::notifyConnection(const std::string& key, Connection* c)
// create a mapping from connection id to link
QPID_LOG(debug, "LinkRegistry::notifyConnection(); key=" << key );
std::string host;
- uint16_t port = 0;
- extractHostPort( key, &host, &port );
Link::shared_ptr link;
{
Mutex::ScopedLock locker(lock);
- for (LinkMap::iterator l = pendingLinks.begin(); l != pendingLinks.end(); ++l) {
- if (l->second->pendingConnection(host, port)) {
- link = l->second;
- pendingLinks.erase(l);
- connections[key] = link->getName();
- QPID_LOG(debug, "LinkRegistry:: found pending =" << link->getName());
- break;
- }
+ LinkMap::iterator l = pendingLinks.find(key);
+ if (l != pendingLinks.end()) {
+ link = l->second;
+ pendingLinks.erase(l);
+ connections[key] = link->getName();
+ QPID_LOG(debug, "LinkRegistry:: found pending =" << link->getName());
}
}
@@ -461,26 +427,4 @@ std::string LinkRegistry::getAuthIdentity(const std::string& key)
return link->getUsername();
}
-
-void LinkRegistry::setPassive(bool p)
-{
- Mutex::ScopedLock locker(lock);
- passive = p;
- if (passive) { QPID_LOG(info, "Passivating links"); }
- else { QPID_LOG(info, "Activating links"); }
- for (LinkMap::iterator i = links.begin(); i != links.end(); i++) {
- i->second->setPassive(passive);
- }
-}
-
-void LinkRegistry::eachLink(boost::function<void(boost::shared_ptr<Link>)> f) {
- Mutex::ScopedLock locker(lock);
- for (LinkMap::iterator i = links.begin(); i != links.end(); ++i) f(i->second);
-}
-
-void LinkRegistry::eachBridge(boost::function<void(boost::shared_ptr<Bridge>)> f) {
- Mutex::ScopedLock locker(lock);
- for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) f(i->second);
-}
-
}} // namespace qpid::broker