summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/LinkRegistry.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/LinkRegistry.cpp')
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.cpp164
1 files changed, 129 insertions, 35 deletions
diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp
index 0703c276cf..f32587dd68 100644
--- a/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -18,20 +18,49 @@
* under the License.
*
*/
-#include "LinkRegistry.h"
+#include "qpid/broker/LinkRegistry.h"
+#include "qpid/broker/Link.h"
+#include "qpid/broker/Connection.h"
+#include "qpid/log/Statement.h"
#include <iostream>
+#include <boost/format.hpp>
using namespace qpid::broker;
using namespace qpid::sys;
using std::pair;
using std::stringstream;
using boost::intrusive_ptr;
+using boost::format;
+using boost::str;
+namespace _qmf = qmf::org::apache::qpid::broker;
#define LINK_MAINT_INTERVAL 2
-LinkRegistry::LinkRegistry (Broker* _broker) : broker(_broker), parent(0), store(0)
+// TODO: This constructor is only used by the store unit tests -
+// That probably indicates that LinkRegistry isn't correctly
+// factored: The persistence element and maintenance element
+// should be factored separately
+LinkRegistry::LinkRegistry () :
+ broker(0), timer(0),
+ parent(0), store(0), passive(false), passiveChanged(false),
+ realm("")
{
- timer.add (intrusive_ptr<TimerTask> (new Periodic(*this)));
+}
+
+LinkRegistry::LinkRegistry (Broker* _broker) :
+ broker(_broker), timer(&broker->getTimer()),
+ maintenanceTask(new Periodic(*this)),
+ parent(0), store(0), passive(false), passiveChanged(false),
+ realm(broker->getOptions().realm)
+{
+ timer->add(maintenanceTask);
+}
+
+LinkRegistry::~LinkRegistry()
+{
+ // This test is only necessary if the default constructor above is present
+ if (maintenanceTask)
+ maintenanceTask->cancel();
}
LinkRegistry::Periodic::Periodic (LinkRegistry& _links) :
@@ -40,7 +69,8 @@ LinkRegistry::Periodic::Periodic (LinkRegistry& _links) :
void LinkRegistry::Periodic::fire ()
{
links.periodicMaintenance ();
- links.timer.add (intrusive_ptr<TimerTask> (new Periodic(links)));
+ setupNextFire();
+ links.timer->add(this);
}
void LinkRegistry::periodicMaintenance ()
@@ -49,13 +79,53 @@ void LinkRegistry::periodicMaintenance ()
linksToDestroy.clear();
bridgesToDestroy.clear();
+ if (passiveChanged) {
+ if (passive) { QPID_LOG(info, "Passivating links"); }
+ else { QPID_LOG(info, "Activating links"); }
+ for (LinkMap::iterator i = links.begin(); i != links.end(); i++) {
+ i->second->setPassive(passive);
+ }
+ passiveChanged = false;
+ }
for (LinkMap::iterator i = links.begin(); i != links.end(); i++)
i->second->maintenanceVisit();
+ //now process any requests for re-addressing
+ for (AddressMap::iterator i = reMappings.begin(); i != reMappings.end(); i++)
+ updateAddress(i->first, i->second);
+ reMappings.clear();
+}
+
+void LinkRegistry::changeAddress(const qpid::TcpAddress& oldAddress, const qpid::TcpAddress& newAddress)
+{
+ //done on periodic maintenance thread; hold changes in separate
+ //map to avoid modifying the link map that is iterated over
+ reMappings[createKey(oldAddress)] = newAddress;
+}
+
+bool LinkRegistry::updateAddress(const std::string& oldKey, const qpid::TcpAddress& newAddress)
+{
+ std::string newKey = createKey(newAddress);
+ if (links.find(newKey) != links.end()) {
+ QPID_LOG(error, "Attempted to update key from " << oldKey << " to " << newKey << " which is already in use");
+ return false;
+ } else {
+ LinkMap::iterator i = links.find(oldKey);
+ if (i == links.end()) {
+ QPID_LOG(error, "Attempted to update key from " << oldKey << " which does not exist, to " << newKey);
+ return false;
+ } else {
+ links[newKey] = i->second;
+ i->second->reconnect(newAddress);
+ links.erase(oldKey);
+ QPID_LOG(info, "Updated link key from " << oldKey << " to " << newKey);
+ return true;
+ }
+ }
}
pair<Link::shared_ptr, bool> LinkRegistry::declare(string& host,
uint16_t port,
- bool useSsl,
+ string& transport,
bool durable,
string& authMechanism,
string& username,
@@ -72,9 +142,10 @@ pair<Link::shared_ptr, bool> LinkRegistry::declare(string& host,
{
Link::shared_ptr link;
- link = Link::shared_ptr (new Link (this, store, host, port, useSsl, durable,
+ link = Link::shared_ptr (new Link (this, store, host, port, transport, durable,
authMechanism, username, password,
broker, parent));
+ if (passive) link->setPassive(true);
links[key] = link;
return std::pair<Link::shared_ptr, bool>(link, true);
}
@@ -90,9 +161,13 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host,
bool isQueue,
bool isLocal,
std::string& tag,
- std::string& excludes)
+ std::string& excludes,
+ bool dynamic,
+ uint16_t sync)
{
Mutex::ScopedLock locker(lock);
+ QPID_LOG(debug, "Bridge declared " << host << ": " << port << " from " << src << " to " << dest << " (" << key << ")");
+
stringstream keystream;
keystream << host << ":" << port;
string linkKey = string(keystream.str());
@@ -107,7 +182,7 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host,
BridgeMap::iterator b = bridges.find(bridgeKey);
if (b == bridges.end())
{
- management::ArgsLinkBridge args;
+ _qmf::ArgsLinkBridge args;
Bridge::shared_ptr bridge;
args.i_durable = durable;
@@ -118,6 +193,8 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host,
args.i_srcIsLocal = isLocal;
args.i_tag = tag;
args.i_excludes = excludes;
+ args.i_dynamic = dynamic;
+ args.i_sync = sync;
bridge = Bridge::shared_ptr
(new Bridge (l->second.get(), l->second->nextChannel(),
@@ -177,7 +254,6 @@ void LinkRegistry::destroy(const std::string& host,
void LinkRegistry::setStore (MessageStore* _store)
{
- assert (store == 0 && _store != 0);
store = _store;
}
@@ -185,66 +261,84 @@ MessageStore* LinkRegistry::getStore() const {
return store;
}
-void LinkRegistry::notifyConnection(const std::string& key, Connection* c)
+Link::shared_ptr LinkRegistry::findLink(const std::string& key)
{
Mutex::ScopedLock locker(lock);
LinkMap::iterator l = links.find(key);
- if (l != links.end())
- {
- l->second->established();
- l->second->setConnection(c);
+ if (l != links.end()) return l->second;
+ else return Link::shared_ptr();
+}
+
+void LinkRegistry::notifyConnection(const std::string& key, Connection* c)
+{
+ Link::shared_ptr link = findLink(key);
+ if (link) {
+ link->established();
+ link->setConnection(c);
+ c->setUserId(str(format("%1%@%2%") % link->getUsername() % realm));
}
}
void LinkRegistry::notifyClosed(const std::string& key)
{
- Mutex::ScopedLock locker(lock);
- LinkMap::iterator l = links.find(key);
- if (l != links.end())
- l->second->closed(0, "Closed by peer");
+ Link::shared_ptr link = findLink(key);
+ if (link) {
+ link->closed(0, "Closed by peer");
+ }
}
void LinkRegistry::notifyConnectionForced(const std::string& key, const std::string& text)
{
- Mutex::ScopedLock locker(lock);
- LinkMap::iterator l = links.find(key);
- if (l != links.end())
- l->second->notifyConnectionForced(text);
+ Link::shared_ptr link = findLink(key);
+ if (link) {
+ link->notifyConnectionForced(text);
+ }
}
std::string LinkRegistry::getAuthMechanism(const std::string& key)
{
- Mutex::ScopedLock locker(lock);
- LinkMap::iterator l = links.find(key);
- if (l != links.end())
- return l->second->getAuthMechanism();
+ Link::shared_ptr link = findLink(key);
+ if (link)
+ return link->getAuthMechanism();
return string("ANONYMOUS");
}
std::string LinkRegistry::getAuthCredentials(const std::string& key)
{
- Mutex::ScopedLock locker(lock);
- LinkMap::iterator l = links.find(key);
- if (l == links.end())
+ Link::shared_ptr link = findLink(key);
+ if (!link)
return string();
string result;
result += '\0';
- result += l->second->getUsername();
+ result += link->getUsername();
result += '\0';
- result += l->second->getPassword();
+ result += link->getPassword();
return result;
}
std::string LinkRegistry::getAuthIdentity(const std::string& key)
{
- Mutex::ScopedLock locker(lock);
- LinkMap::iterator l = links.find(key);
- if (l == links.end())
+ Link::shared_ptr link = findLink(key);
+ if (!link)
return string();
- return l->second->getUsername();
+ return link->getUsername();
}
+std::string LinkRegistry::createKey(const qpid::TcpAddress& a)
+{
+ stringstream keystream;
+ keystream << a.host << ":" << a.port;
+ return string(keystream.str());
+}
+
+void LinkRegistry::setPassive(bool p)
+{
+ Mutex::ScopedLock locker(lock);
+ passiveChanged = p != passive;
+ passive = p;
+ //will activate or passivate links on maintenance visit
+}