summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-05-13 21:38:28 +0000
committerGordon Sim <gsim@apache.org>2008-05-13 21:38:28 +0000
commit7e6635d7e297f6ecf2199028da573bc10437f3ae (patch)
tree7e6ffe5b0d89cd1d496578a6f13a8184849abd33
parentc36a80f0b03bea42bca570e994d08613b08fd6dd (diff)
downloadqpid-python-7e6635d7e297f6ecf2199028da573bc10437f3ae.tar.gz
QPID-990: Patch from Ted Ross to enable persisting of inter-broker routing entities
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@656023 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/broker/Bridge.cpp101
-rw-r--r--cpp/src/qpid/broker/Bridge.h18
-rw-r--r--cpp/src/qpid/broker/Broker.cpp3
-rw-r--r--cpp/src/qpid/broker/Link.cpp135
-rw-r--r--cpp/src/qpid/broker/Link.h41
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.cpp95
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.h39
-rw-r--r--cpp/src/qpid/broker/RecoveryManagerImpl.cpp17
-rw-r--r--cpp/src/qpid/broker/SessionHandler.cpp3
-rwxr-xr-xcpp/src/tests/federation.py5
-rwxr-xr-xpython/commands/qpid-route204
-rw-r--r--specs/management-schema.xml5
12 files changed, 514 insertions, 152 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp
index a8e7b3c368..337992992f 100644
--- a/cpp/src/qpid/broker/Bridge.cpp
+++ b/cpp/src/qpid/broker/Bridge.cpp
@@ -20,29 +20,35 @@
*/
#include "Bridge.h"
#include "ConnectionState.h"
+#include "LinkRegistry.h"
#include "qpid/management/ManagementAgent.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/Uuid.h"
+#include "qpid/log/Statement.h"
using qpid::framing::FieldTable;
using qpid::framing::Uuid;
+using qpid::framing::Buffer;
+using qpid::management::ManagementAgent;
namespace qpid {
namespace broker {
-Bridge::Bridge(Link* link, framing::ChannelId _id, CancellationListener l,
+Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
const management::ArgsLinkBridge& _args) :
- id(_id), args(_args),
- mgmtObject(new management::Bridge(this, link, id, args.i_src, args.i_dest,
- args.i_key, args.i_src_is_queue, args.i_src_is_local)),
- listener(l), name(Uuid(true).str())
+ link(_link), id(_id), args(_args),
+ mgmtObject(new management::Bridge(this, link, id, args.i_durable, args.i_src, args.i_dest,
+ args.i_key, args.i_src_is_queue, args.i_src_is_local,
+ args.i_tag, args.i_excludes)),
+ listener(l), name(Uuid(true).str()), persistenceId(0)
{
- management::ManagementAgent::getAgent()->addObject(mgmtObject);
+ if (!args.i_durable)
+ management::ManagementAgent::getAgent()->addObject(mgmtObject);
}
Bridge::~Bridge()
-{
+{
mgmtObject->resourceDestroy();
}
@@ -65,8 +71,8 @@ void Bridge::create(ConnectionState& c)
string queue = "bridge_queue_";
queue += Uuid(true).str();
FieldTable queueSettings;
- if (args.i_id.size()) {
- queueSettings.setString("qpid.trace.id", args.i_id);
+ if (args.i_tag.size()) {
+ queueSettings.setString("qpid.trace.id", args.i_tag);
}
if (args.i_excludes.size()) {
queueSettings.setString("qpid.trace.exclude", args.i_excludes);
@@ -89,6 +95,81 @@ void Bridge::cancel()
peer->getSession().detach(name);
}
+void Bridge::destroy()
+{
+ listener(this);
+}
+
+void Bridge::setPersistenceId(uint64_t id) const
+{
+ if (mgmtObject != 0 && persistenceId == 0)
+ {
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+ agent->addObject (mgmtObject, id);
+ }
+ persistenceId = id;
+}
+
+const string& Bridge::getName() const
+{
+ return name;
+}
+
+Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
+{
+ string host;
+ uint16_t port;
+ string src;
+ string dest;
+ string key;
+ string id;
+ string excludes;
+
+ buffer.getShortString(host);
+ port = buffer.getShort();
+ bool durable(buffer.getOctet());
+ buffer.getShortString(src);
+ buffer.getShortString(dest);
+ buffer.getShortString(key);
+ bool is_queue(buffer.getOctet());
+ bool is_local(buffer.getOctet());
+ buffer.getShortString(id);
+ buffer.getShortString(excludes);
+
+ return links.declare(host, port, durable, src, dest, key,
+ is_queue, is_local, id, excludes).first;
+}
+
+void Bridge::encode(Buffer& buffer) const
+{
+ buffer.putShortString(string("bridge"));
+ buffer.putShortString(link->getHost());
+ buffer.putShort(link->getPort());
+ buffer.putOctet(args.i_durable ? 1 : 0);
+ buffer.putShortString(args.i_src);
+ buffer.putShortString(args.i_dest);
+ buffer.putShortString(args.i_key);
+ buffer.putOctet(args.i_src_is_queue ? 1 : 0);
+ buffer.putOctet(args.i_src_is_local ? 1 : 0);
+ buffer.putShortString(args.i_tag);
+ buffer.putShortString(args.i_excludes);
+}
+
+uint32_t Bridge::encodedSize() const
+{
+ return link->getHost().size() + 1 // short-string (host)
+ + 7 // short-string ("bridge")
+ + 2 // port
+ + 1 // durable
+ + args.i_src.size() + 1
+ + args.i_dest.size() + 1
+ + args.i_key.size() + 1
+ + 1 // src_is_queue
+ + 1 // src_is_local
+ + args.i_tag.size() + 1
+ + args.i_excludes.size() + 1;
+}
+
management::ManagementObject::shared_ptr Bridge::GetManagementObject (void) const
{
return dynamic_pointer_cast<management::ManagementObject>(mgmtObject);
@@ -98,7 +179,7 @@ management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId, man
{
if (methodId == management::Bridge::METHOD_CLOSE) {
//notify that we are closed
- listener(this);
+ destroy();
return management::Manageable::STATUS_OK;
} else {
return management::Manageable::STATUS_UNKNOWN_METHOD;
diff --git a/cpp/src/qpid/broker/Bridge.h b/cpp/src/qpid/broker/Bridge.h
index 15efcc6482..594a0ef508 100644
--- a/cpp/src/qpid/broker/Bridge.h
+++ b/cpp/src/qpid/broker/Bridge.h
@@ -21,8 +21,10 @@
#ifndef _Bridge_
#define _Bridge_
+#include "PersistableConfig.h"
#include "qpid/framing/AMQP_ServerProxy.h"
#include "qpid/framing/ChannelHandler.h"
+#include "qpid/framing/Buffer.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/ArgsLinkBridge.h"
#include "qpid/management/Bridge.h"
@@ -35,10 +37,12 @@ namespace broker {
class ConnectionState;
class Link;
+class LinkRegistry;
-class Bridge : public management::Manageable
+class Bridge : public PersistableConfig, public management::Manageable
{
public:
+ typedef boost::shared_ptr<Bridge> shared_ptr;
typedef boost::function<void(Bridge*)> CancellationListener;
Bridge(Link* link, framing::ChannelId id, CancellationListener l, const management::ArgsLinkBridge& args);
@@ -46,20 +50,32 @@ public:
void create(ConnectionState& c);
void cancel();
+ void destroy();
+ bool isDurable() { return args.i_durable; }
management::ManagementObject::shared_ptr GetManagementObject() const;
management::Manageable::status_t ManagementMethod(uint32_t methodId, management::Args& args);
+ // PersistableConfig:
+ void setPersistenceId(uint64_t id) const;
+ uint64_t getPersistenceId() const { return persistenceId; }
+ uint32_t encodedSize() const;
+ void encode(framing::Buffer& buffer) const;
+ const std::string& getName() const;
+ static Bridge::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
+
private:
std::auto_ptr<framing::ChannelHandler> channelHandler;
std::auto_ptr<framing::AMQP_ServerProxy::Session> session;
std::auto_ptr<framing::AMQP_ServerProxy> peer;
+ Link* link;
framing::ChannelId id;
management::ArgsLinkBridge args;
management::Bridge::shared_ptr mgmtObject;
CancellationListener listener;
std::string name;
+ mutable uint64_t persistenceId;
};
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index d80c13f12a..bc862214b7 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -329,7 +329,8 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
return Manageable::STATUS_FEATURE_NOT_IMPLEMENTED;
std::pair<Link::shared_ptr, bool> response =
- links.declare (hp.i_host, hp.i_port, hp.i_useSsl, hp.i_durable);
+ links.declare (hp.i_host, hp.i_port, hp.i_useSsl, hp.i_durable,
+ hp.i_authMechanism, hp.i_username, hp.i_password);
if (hp.i_durable && response.second)
store->create(*response.first);
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp
index 83c9a2a62e..cd032495e2 100644
--- a/cpp/src/qpid/broker/Link.cpp
+++ b/cpp/src/qpid/broker/Link.cpp
@@ -37,14 +37,19 @@ using qpid::management::Manageable;
using qpid::management::Args;
using qpid::sys::Mutex;
-Link::Link(LinkRegistry* _links,
- string& _host,
- uint16_t _port,
- bool _useSsl,
- bool _durable,
- Broker* _broker,
+Link::Link(LinkRegistry* _links,
+ MessageStore* _store,
+ string& _host,
+ uint16_t _port,
+ bool _useSsl,
+ bool _durable,
+ string& _authMechanism,
+ string& _username,
+ string& _password,
+ Broker* _broker,
management::Manageable* parent)
- : links(_links), host(_host), port(_port), useSsl(_useSsl), durable(_durable),
+ : links(_links), store(_store), host(_host), port(_port), useSsl(_useSsl), durable(_durable),
+ authMechanism(_authMechanism), username(_username), password(_password),
persistenceId(0), broker(_broker), state(0),
access(boost::bind(&Link::established, this),
boost::bind(&Link::closed, this, _1, _2),
@@ -65,7 +70,7 @@ Link::Link(LinkRegistry* _links,
agent->addObject(mgmtObject);
}
}
- setState(STATE_WAITING);
+ setStateLH(STATE_WAITING);
}
Link::~Link ()
@@ -76,7 +81,7 @@ Link::~Link ()
mgmtObject->resourceDestroy ();
}
-void Link::setState (int newState)
+void Link::setStateLH (int newState)
{
if (newState == state)
return;
@@ -93,13 +98,13 @@ void Link::setState (int newState)
}
}
-void Link::startConnection ()
+void Link::startConnectionLH ()
{
try {
broker->connect (host, port, useSsl, 0, &access);
- setState(STATE_CONNECTING);
+ setStateLH(STATE_CONNECTING);
} catch(std::exception& e) {
- setState(STATE_WAITING);
+ setStateLH(STATE_WAITING);
mgmtObject->set_lastError (e.what());
}
}
@@ -109,7 +114,7 @@ void Link::established ()
Mutex::ScopedLock mutex(lock);
QPID_LOG (info, "Inter-broker link established to " << host << ":" << port);
- setState(STATE_OPERATIONAL);
+ setStateLH(STATE_OPERATIONAL);
currentInterval = 1;
visitCount = 0;
if (closing)
@@ -124,8 +129,11 @@ void Link::closed (int, std::string text)
QPID_LOG (warning, "Inter-broker link disconnected from " << host << ":" << port);
connection.reset();
- created.transfer(created.end(), active.begin(), active.end(), active);
- setState(STATE_WAITING);
+ for (Bridges::iterator i = active.begin(); i != active.end(); i++)
+ created.push_back(*i);
+ active.clear();
+
+ setStateLH(STATE_WAITING);
mgmtObject->set_lastError (text);
if (closing)
destroy();
@@ -133,25 +141,56 @@ void Link::closed (int, std::string text)
void Link::destroy ()
{
+ Mutex::ScopedLock mutex(lock);
+ Bridges toDelete;
+
QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management");
connection.reset();
+
+ // Move the bridges to be deleted into a local vector so there is no
+ // corruption of the iterator caused by bridge deletion.
+ for (Bridges::iterator i = active.begin(); i != active.end(); i++)
+ toDelete.push_back(*i);
+ active.clear();
+
+ for (Bridges::iterator i = created.begin(); i != created.end(); i++)
+ toDelete.push_back(*i);
+ created.clear();
+
+ // Now delete all bridges on this link.
+ for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++)
+ (*i)->destroy();
+ toDelete.clear();
+
links->destroy (host, port);
}
-void Link::cancel(Bridge* bridge)
+void Link::add(Bridge::shared_ptr bridge)
{
Mutex::ScopedLock mutex(lock);
- //need to take this out of the active map and add it to the cancelled map
+ created.push_back (bridge);
+ if (state == STATE_OPERATIONAL && connection.get() != 0)
+ connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+}
+
+void Link::cancel(Bridge::shared_ptr bridge)
+{
+ Mutex::ScopedLock mutex(lock);
+
+ for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
+ if ((*i).get() == bridge.get()) {
+ created.erase(i);
+ break;
+ }
+ }
for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
- if (&(*i) == bridge) {
- cancelled.transfer(cancelled.end(), i, active);
+ if ((*i).get() == bridge.get()) {
+ bridge->cancel();
+ active.erase(i);
break;
}
}
-
- if (connection.get() != 0)
- connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
void Link::ioThreadProcessing()
@@ -161,21 +200,17 @@ void Link::ioThreadProcessing()
//process any pending creates
if (!created.empty()) {
for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
- i->create(*connection);
- }
- active.transfer(active.end(), created.begin(), created.end(), created);
- }
- if (!cancelled.empty()) {
- //process any pending cancellations
- for (Bridges::iterator i = cancelled.begin(); i != cancelled.end(); ++i) {
- i->cancel();
+ active.push_back(*i);
+ (*i)->create(*connection);
}
- cancelled.clear();
+ created.clear();
}
}
void Link::setConnection(Connection::shared_ptr c)
{
+ Mutex::ScopedLock mutex(lock);
+
connection = c;
connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
@@ -193,11 +228,18 @@ void Link::maintenanceVisit ()
currentInterval *= 2;
if (currentInterval > MAX_INTERVAL)
currentInterval = MAX_INTERVAL;
- startConnection();
+ startConnectionLH();
}
}
}
+uint Link::nextChannel()
+{
+ Mutex::ScopedLock mutex(lock);
+
+ return channelCounter++;
+}
+
void Link::setPersistenceId(uint64_t id) const
{
if (mgmtObject != 0 && persistenceId == 0)
@@ -217,13 +259,19 @@ Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer)
{
string host;
uint16_t port;
+ string authMechanism;
+ string username;
+ string password;
buffer.getShortString(host);
port = buffer.getShort();
bool useSsl(buffer.getOctet());
bool durable(buffer.getOctet());
+ buffer.getShortString(authMechanism);
+ buffer.getShortString(username);
+ buffer.getShortString(password);
- return links.declare(host, port, useSsl, durable).first;
+ return links.declare(host, port, useSsl, durable, authMechanism, username, password).first;
}
void Link::encode(Buffer& buffer) const
@@ -233,6 +281,9 @@ void Link::encode(Buffer& buffer) const
buffer.putShort(port);
buffer.putOctet(useSsl ? 1 : 0);
buffer.putOctet(durable ? 1 : 0);
+ buffer.putShortString(authMechanism);
+ buffer.putShortString(username);
+ buffer.putShortString(password);
}
uint32_t Link::encodedSize() const
@@ -241,7 +292,10 @@ uint32_t Link::encodedSize() const
+ 5 // short-string ("link")
+ 2 // port
+ 1 // useSsl
- + 1; // durable
+ + 1 // durable
+ + authMechanism.size() + 1
+ + username.size() + 1
+ + password.size() + 1;
}
ManagementObject::shared_ptr Link::GetManagementObject (void) const
@@ -251,8 +305,6 @@ ManagementObject::shared_ptr Link::GetManagementObject (void) const
Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args)
{
- Mutex::ScopedLock mutex(lock);
-
switch (op)
{
case management::Link::METHOD_CLOSE :
@@ -269,11 +321,14 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args
if (iargs.i_durable && !durable)
return Manageable::STATUS_INVALID_PARAMETER;
- created.push_back(new Bridge(this, channelCounter++,
- boost::bind(&Link::cancel, this, _1), iargs));
+ std::pair<Bridge::shared_ptr, bool> result =
+ links->declare (host, port, iargs.i_durable, iargs.i_src,
+ iargs.i_dest, iargs.i_key, iargs.i_src_is_queue,
+ iargs.i_src_is_local, iargs.i_tag, iargs.i_excludes);
+
+ if (result.second && iargs.i_durable)
+ store->create(*result.first);
- if (state == STATE_OPERATIONAL && connection.get() != 0)
- connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
return Manageable::STATUS_OK;
}
diff --git a/cpp/src/qpid/broker/Link.h b/cpp/src/qpid/broker/Link.h
index 838c3bf696..c4eca86c19 100644
--- a/cpp/src/qpid/broker/Link.h
+++ b/cpp/src/qpid/broker/Link.h
@@ -45,10 +45,14 @@ namespace qpid {
private:
sys::Mutex lock;
LinkRegistry* links;
- const string host;
- const uint16_t port;
- const bool useSsl;
- const bool durable;
+ MessageStore* store;
+ string host;
+ uint16_t port;
+ bool useSsl;
+ bool durable;
+ string authMechanism;
+ string username;
+ string password;
mutable uint64_t persistenceId;
management::Link::shared_ptr mgmtObject;
Broker* broker;
@@ -58,10 +62,9 @@ namespace qpid {
uint32_t currentInterval;
bool closing;
- typedef boost::ptr_vector<Bridge> Bridges;
+ typedef std::vector<Bridge::shared_ptr> Bridges;
Bridges created; // Bridges pending creation
Bridges active; // Bridges active
- Bridges cancelled; // Bridges pending deletion
uint channelCounter;
boost::shared_ptr<Connection> connection;
@@ -71,29 +74,37 @@ namespace qpid {
static const uint32_t MAX_INTERVAL = 16;
- void setState (int newState);
- void startConnection(); // Start the IO Connection
+ void setStateLH (int newState);
+ void startConnectionLH(); // Start the IO Connection
void established(); // Called when connection is created
void closed(int, std::string); // Called when connection goes away
void destroy(); // Called when mgmt deletes this link
- void cancel(Bridge*); // Called by self-cancelling bridge
void ioThreadProcessing(); // Called on connection's IO thread by request
void setConnection(boost::shared_ptr<Connection>); // Set pointer to the AMQP Connection
public:
typedef boost::shared_ptr<Link> shared_ptr;
- Link(LinkRegistry* links,
- string& host,
- uint16_t port,
- bool useSsl,
- bool durable,
- Broker* broker,
+ Link(LinkRegistry* links,
+ MessageStore* store,
+ string& host,
+ uint16_t port,
+ bool useSsl,
+ bool durable,
+ string& authMechanism,
+ string& username,
+ string& password,
+ Broker* broker,
management::Manageable* parent = 0);
virtual ~Link();
+ std::string getHost() { return host; }
+ uint16_t getPort() { return port; }
bool isDurable() { return durable; }
void maintenanceVisit ();
+ uint nextChannel();
+ void add(Bridge::shared_ptr);
+ void cancel(Bridge::shared_ptr);
// PersistableConfig:
void setPersistenceId(uint64_t id) const;
diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp
index 6e20a3f7ce..be3c67077e 100644
--- a/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -46,15 +46,21 @@ void LinkRegistry::Periodic::fire ()
void LinkRegistry::periodicMaintenance ()
{
Mutex::ScopedLock locker(lock);
+
linksToDestroy.clear();
+ bridgesToDestroy.clear();
for (LinkMap::iterator i = links.begin(); i != links.end(); i++)
i->second->maintenanceVisit();
}
-pair<Link::shared_ptr, bool> LinkRegistry::declare(std::string& host,
- uint16_t port,
- bool useSsl,
- bool durable)
+pair<Link::shared_ptr, bool> LinkRegistry::declare(string& host,
+ uint16_t port,
+ bool useSsl,
+ bool durable,
+ string& authMechanism,
+ string& username,
+ string& password)
+
{
Mutex::ScopedLock locker(lock);
stringstream keystream;
@@ -66,13 +72,64 @@ pair<Link::shared_ptr, bool> LinkRegistry::declare(std::string& host,
{
Link::shared_ptr link;
- link = Link::shared_ptr (new Link (this, host, port, useSsl, durable, broker, parent));
+ link = Link::shared_ptr (new Link (this, store, host, port, useSsl, durable,
+ authMechanism, username, password,
+ broker, parent));
links[key] = link;
return std::pair<Link::shared_ptr, bool>(link, true);
}
return std::pair<Link::shared_ptr, bool>(i->second, false);
}
+pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host,
+ uint16_t port,
+ bool durable,
+ std::string& src,
+ std::string& dest,
+ std::string& key,
+ bool is_queue,
+ bool is_local,
+ std::string& tag,
+ std::string& excludes)
+{
+ Mutex::ScopedLock locker(lock);
+ stringstream keystream;
+ keystream << host << ":" << port;
+ string linkKey = string(keystream.str());
+
+ keystream << "!" << src << "!" << dest << "!" << key;
+ string bridgeKey = string(keystream.str());
+
+ LinkMap::iterator l = links.find(linkKey);
+ if (l == links.end())
+ return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false);
+
+ BridgeMap::iterator b = bridges.find(bridgeKey);
+ if (b == bridges.end())
+ {
+ management::ArgsLinkBridge args;
+ Bridge::shared_ptr bridge;
+
+ args.i_durable = durable;
+ args.i_src = src;
+ args.i_dest = dest;
+ args.i_key = key;
+ args.i_src_is_queue = is_queue;
+ args.i_src_is_local = is_local;
+ args.i_tag = tag;
+ args.i_excludes = excludes;
+
+ bridge = Bridge::shared_ptr
+ (new Bridge (l->second.get(), l->second->nextChannel(),
+ boost::bind(&LinkRegistry::destroy, this,
+ host, port, src, dest, key), args));
+ bridges[bridgeKey] = bridge;
+ l->second->add(bridge);
+ return std::pair<Bridge::shared_ptr, bool>(bridge, true);
+ }
+ return std::pair<Bridge::shared_ptr, bool>(b->second, false);
+}
+
void LinkRegistry::destroy(const string& host, const uint16_t port)
{
Mutex::ScopedLock locker(lock);
@@ -90,6 +147,34 @@ void LinkRegistry::destroy(const string& host, const uint16_t port)
}
}
+void LinkRegistry::destroy(const std::string& host,
+ const uint16_t port,
+ const std::string& src,
+ const std::string& dest,
+ const std::string& key)
+{
+ Mutex::ScopedLock locker(lock);
+ stringstream keystream;
+ keystream << host << ":" << port;
+ string linkKey = string(keystream.str());
+
+ LinkMap::iterator l = links.find(linkKey);
+ if (l == links.end())
+ return;
+
+ keystream << "!" << src << "!" << dest << "!" << key;
+ string bridgeKey = string(keystream.str());
+ BridgeMap::iterator b = bridges.find(bridgeKey);
+ if (b == bridges.end())
+ return;
+
+ l->second->cancel(b->second);
+ if (b->second->isDurable())
+ store->destroy(*(b->second));
+ bridgesToDestroy[bridgeKey] = b->second;
+ bridges.erase(b);
+}
+
void LinkRegistry::setStore (MessageStore* _store)
{
assert (store == 0 && _store != 0);
diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h
index 86d8c3d2f9..3c47954141 100644
--- a/cpp/src/qpid/broker/LinkRegistry.h
+++ b/cpp/src/qpid/broker/LinkRegistry.h
@@ -24,6 +24,7 @@
#include <map>
#include "Link.h"
+#include "Bridge.h"
#include "MessageStore.h"
#include "Timer.h"
#include "qpid/sys/Mutex.h"
@@ -47,8 +48,13 @@ namespace broker {
};
typedef std::map<std::string, Link::shared_ptr> LinkMap;
- LinkMap links;
- LinkMap linksToDestroy;
+ typedef std::map<std::string, Bridge::shared_ptr> BridgeMap;
+
+ LinkMap links;
+ LinkMap linksToDestroy;
+ BridgeMap bridges;
+ BridgeMap bridgesToDestroy;
+
qpid::sys::Mutex lock;
Broker* broker;
Timer timer;
@@ -59,11 +65,32 @@ namespace broker {
public:
LinkRegistry (Broker* _broker);
- std::pair<Link::shared_ptr, bool> declare(std::string& host,
- uint16_t port,
- bool useSsl,
- bool durable);
+ std::pair<Link::shared_ptr, bool>
+ declare(std::string& host,
+ uint16_t port,
+ bool useSsl,
+ bool durable,
+ std::string& authMechanism,
+ std::string& username,
+ std::string& password);
+ std::pair<Bridge::shared_ptr, bool>
+ declare(std::string& host,
+ uint16_t port,
+ bool durable,
+ std::string& src,
+ std::string& dest,
+ std::string& key,
+ bool is_queue,
+ bool is_local,
+ std::string& id,
+ std::string& excludes);
+
void destroy(const std::string& host, const uint16_t port);
+ void destroy(const std::string& host,
+ const uint16_t port,
+ const std::string& src,
+ const std::string& dest,
+ const std::string& key);
/**
* Register the manageable parent for declared queues
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index 0f8c29f3b9..7fc2b6c4f3 100644
--- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -23,6 +23,7 @@
#include "Message.h"
#include "Queue.h"
#include "Link.h"
+#include "Bridge.h"
#include "RecoveredEnqueue.h"
#include "RecoveredDequeue.h"
#include "qpid/framing/reply_exceptions.h"
@@ -85,10 +86,11 @@ public:
class RecoverableConfigImpl : public RecoverableConfig
{
- // TODO: Add links for other config types, consider using super class (PersistableConfig?)
- Link::shared_ptr link;
+ Link::shared_ptr link;
+ Bridge::shared_ptr bridge;
public:
- RecoverableConfigImpl(Link::shared_ptr _link) : link(_link) {}
+ RecoverableConfigImpl(Link::shared_ptr _link) : link(_link) {}
+ RecoverableConfigImpl(Bridge::shared_ptr _bridge) : bridge(_bridge) {}
void setPersistenceId(uint64_t id);
};
@@ -140,10 +142,10 @@ RecoverableConfig::shared_ptr RecoveryManagerImpl::recoverConfig(framing::Buffer
string kind;
buffer.getShortString (kind);
- if (kind == "link")
- {
+ if (kind == "link")
return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Link::decode (links, buffer)));
- }
+ else if (kind == "bridge")
+ return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Bridge::decode (links, buffer)));
return RecoverableConfig::shared_ptr(); // TODO: raise an exception instead
}
@@ -212,7 +214,8 @@ void RecoverableConfigImpl::setPersistenceId(uint64_t id)
{
if (link.get())
link->setPersistenceId(id);
- // TODO: add calls to other types. Consider using a parent class.
+ else if (bridge.get())
+ bridge->setPersistenceId(id);
}
void RecoverableExchangeImpl::bind(string& queueName, string& key, framing::FieldTable& args)
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp
index e59a79f711..2ec0988fc0 100644
--- a/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/cpp/src/qpid/broker/SessionHandler.cpp
@@ -65,6 +65,9 @@ void SessionHandler::handleIn(AMQFrame& f) {
} else if (session.get()) {
//we are attached and frame was not a session control so it is for upper layers
session->handle(f);
+ } else if (m && m->isA<SessionDetachedBody>()) {
+ handleDetach();
+ connection.closeChannel(channel.get());
} else {
throw NotAttachedException(QPID_MSG("Channel " << channel.get() << " is not attached"));
}
diff --git a/cpp/src/tests/federation.py b/cpp/src/tests/federation.py
index 33da19b1b8..98e34be0e9 100755
--- a/cpp/src/tests/federation.py
+++ b/cpp/src/tests/federation.py
@@ -171,7 +171,8 @@ class FederationTests(TestBase010):
mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()})
link = mgmt.get_object("link")
- mgmt.call_method(link, "bridge", {"durable":0, "src":"my-bridge-queue", "dest":"amq.fanout", "key":"", "id":"", "excludes":"", "src_is_queue":1})
+ mgmt.call_method(link, "bridge", {"durable":0, "src":"my-bridge-queue", "dest":"amq.fanout",
+ "key":"", "tag":"", "excludes":"", "src_is_queue":1})
sleep(6)
bridge = mgmt.get_object("bridge")
@@ -210,7 +211,7 @@ class FederationTests(TestBase010):
link = mgmt.get_object("link")
mgmt.call_method(link, "bridge", {"durable":0, "src":"amq.direct", "dest":"amq.fanout", "key":"my-key",
- "id":"my-bridge-id", "excludes":"exclude-me,also-exclude-me"})
+ "tag":"my-bridge-id", "excludes":"exclude-me,also-exclude-me"})
sleep(6)
bridge = mgmt.get_object("bridge")
diff --git a/python/commands/qpid-route b/python/commands/qpid-route
index e59f89480e..e839e36821 100755
--- a/python/commands/qpid-route
+++ b/python/commands/qpid-route
@@ -31,30 +31,32 @@ from qpid.connection import Connection
from qpid.util import connect
def Usage ():
- print "Usage: qpid-route [OPTIONS] add <dest-broker> <src-broker> <exchange> <routing-key> [id] [exclude-list]"
- print " qpid-route [OPTIONS] del <dest-broker> <src-broker> <exchange> <routing-key>"
- print " qpid-route [OPTIONS] list <dest-broker>"
- #print " qpid-route [OPTIONS] load <filename>"
- print " qpid-route [OPTIONS] flush <dest-broker>"
+ print "Usage: qpid-route [OPTIONS] link add <dest-broker> <src-broker>"
+ print " qpid-route [OPTIONS] link del <dest-broker> <src-broker>"
+ print " qpid-route [OPTIONS] link list <dest-broker>"
+ print
+ print " qpid-route [OPTIONS] route add <dest-broker> <src-broker> <exchange> <routing-key> [id] [exclude-list]"
+ print " qpid-route [OPTIONS] route del <dest-broker> <src-broker> <exchange> <routing-key>"
+ print " qpid-route [OPTIONS] route list <dest-broker>"
+ print " qpid-route [OPTIONS] route flush <dest-broker>"
print
print "Options:"
print " -s [ --spec-file ] PATH (/usr/share/amqp/amqp.0-10.xml)"
- print " -v [ --verbose ] Verbose output"
- print " -q [ --quiet ] Quiet output, don't print duplicate warnings"
+ print " -v [ --verbose ] Verbose output"
+ print " -q [ --quiet ] Quiet output, don't print duplicate warnings"
+ print " -d [ --durable ] Added configuration shall be durable"
+ print " -e [ --del-empty-link ] Delete link after deleting last route on the link"
print
print " dest-broker and src-broker are in the form: [username/password@] hostname | ip-address [:<port>]"
print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
print
- #print " If loading the route configuration from a file, the input file has one line per route"
- #print " in the form:"
- #print
- #print " <dest-broker> <src-broker> <exchange> <routing-key>"
- #print
sys.exit (1)
_specpath = "/usr/share/amqp/amqp.0-10.xml"
_verbose = False
_quiet = False
+_durable = False
+_dellink = False
class RouteManager:
def __init__ (self, destBroker):
@@ -87,6 +89,57 @@ class RouteManager:
return link
return None
+ def AddLink (self, srcBroker):
+ self.src = Broker (srcBroker)
+ mc = self.mclient
+
+ brokers = mc.syncGetObjects (self.mch, "broker")
+ broker = brokers[0]
+ link = self.getLink()
+ if link != None:
+ print "Link already exists"
+ sys.exit(1)
+
+ connectArgs = {}
+ connectArgs["host"] = self.src.host
+ connectArgs["port"] = self.src.port
+ connectArgs["useSsl"] = False
+ connectArgs["durable"] = _durable
+ connectArgs["authMechanism"] = "PLAIN"
+ connectArgs["username"] = self.src.username
+ connectArgs["password"] = self.src.password
+ res = mc.syncCallMethod (self.mch, broker.id, broker.classKey, "connect", connectArgs)
+ if _verbose:
+ print "Connect method returned:", res.status, res.statusText
+ link = self.getLink ()
+
+ def DelLink (self, srcBroker):
+ self.src = Broker (srcBroker)
+ mc = self.mclient
+
+ brokers = mc.syncGetObjects (self.mch, "broker")
+ broker = brokers[0]
+ link = self.getLink()
+ if link == None:
+ print "Link not found"
+ sys.exit(1)
+
+ res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close")
+ if _verbose:
+ print "Close method returned:", res.status, res.statusText
+
+ def ListLinks (self):
+ mc = self.mclient
+ links = mc.syncGetObjects (self.mch, "link")
+ if len(links) == 0:
+ print "No Links Found"
+ else:
+ print
+ print "Host Port Durable State Last Error"
+ print "==================================================================="
+ for link in links:
+ print "%-16s%-8d %c %-18s%s" % (link.host, link.port, YN(link.durable), link.state, link.lastError)
+
def AddRoute (self, srcBroker, exchange, routingKey, id, excludes):
self.src = Broker (srcBroker)
mc = self.mclient
@@ -103,10 +156,10 @@ class RouteManager:
connectArgs["host"] = self.src.host
connectArgs["port"] = self.src.port
connectArgs["useSsl"] = False
- connectArgs["durable"] = False
- connectArgs["authMechanism"] = "ANONYMOUS"
- connectArgs["username"] = ""
- connectArgs["password"] = ""
+ connectArgs["durable"] = _durable
+ connectArgs["authMechanism"] = "PLAIN"
+ connectArgs["username"] = self.src.username
+ connectArgs["password"] = self.src.password
res = mc.syncCallMethod (self.mch, broker.id, broker.classKey, "connect", connectArgs)
if _verbose:
print "Connect method returned:", res.status, res.statusText
@@ -127,15 +180,18 @@ class RouteManager:
if _verbose:
print "Creating inter-broker binding..."
bridgeArgs = {}
- bridgeArgs["durable"] = 0
+ bridgeArgs["durable"] = _durable
bridgeArgs["src"] = exchange
bridgeArgs["dest"] = exchange
bridgeArgs["key"] = routingKey
- bridgeArgs["id"] = id
+ bridgeArgs["tag"] = id
bridgeArgs["excludes"] = excludes
bridgeArgs["src_is_queue"] = 0
bridgeArgs["src_is_local"] = 0
res = mc.syncCallMethod (self.mch, link.id, link.classKey, "bridge", bridgeArgs)
+ if res.status == 4:
+ print "Can't create a durable route on a non-durable link"
+ sys.exit(1)
if _verbose:
print "Bridge method returned:", res.status, res.statusText
@@ -159,7 +215,7 @@ class RouteManager:
if res.status != 0:
print "Error closing bridge: %d - %s" % (res.status, res.statusText)
sys.exit (1)
- if len (bridges) == 1:
+ if len (bridges) == 1 and _dellink:
link = self.getLink ()
if link == None:
sys.exit (0)
@@ -188,9 +244,6 @@ class RouteManager:
if myLink != None:
print "%s %s:%d %s %s" % (self.dest.name(), myLink.host, myLink.port, bridge.dest, bridge.key)
- def LoadRoutes (self, inFile):
- pass
-
def ClearAllRoutes (self):
mc = self.mclient
links = mc.syncGetObjects (self.mch, "link")
@@ -211,23 +264,29 @@ class RouteManager:
elif _verbose:
print "Ok"
- links = mc.syncGetObjects (self.mch, "link")
- for link in links:
- if _verbose:
- print "Deleting Link: %s:%d... " % (link.host, link.port),
- res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close")
- if res.status != 0:
- print "Error: %d - %s" % (res.status, res.statusText)
- elif _verbose:
- print "Ok"
+ if _dellink:
+ links = mc.syncGetObjects (self.mch, "link")
+ for link in links:
+ if _verbose:
+ print "Deleting Link: %s:%d... " % (link.host, link.port),
+ res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close")
+ if res.status != 0:
+ print "Error: %d - %s" % (res.status, res.statusText)
+ elif _verbose:
+ print "Ok"
+
+def YN(val):
+ if val == 1:
+ return 'Y'
+ return 'N'
##
## Main Program
##
try:
- longOpts = ("verbose", "quiet", "spec-file=")
- (optlist, cargs) = getopt.gnu_getopt (sys.argv[1:], "s:vq", longOpts)
+ longOpts = ("verbose", "quiet", "spec-file=", "durable", "del-empty-link")
+ (optlist, cargs) = getopt.gnu_getopt (sys.argv[1:], "s:vqde", longOpts)
except:
Usage ()
@@ -238,40 +297,57 @@ for opt in optlist:
_verbose = True
if opt[0] == "-q" or opt[0] == "--quiet":
_quiet = True
+ if opt[0] == "-d" or opt[0] == "--durable":
+ _durable = True
+ if opt[0] == "-e" or opt[0] == "--del-empty-link":
+ _dellink = True
nargs = len (cargs)
-if nargs < 2:
+if nargs < 3:
Usage ()
-cmd = cargs[0]
-if cmd != "load":
- rm = RouteManager (cargs[1])
- rm.ConnectToBroker ()
-
-if cmd == "add":
- if nargs < 5 or nargs > 7:
- Usage ()
-
- id = ""
- excludes = ""
- if nargs > 5: id = cargs[5]
- if nargs > 6: excludes = cargs[6]
- rm.AddRoute (cargs[2], cargs[3], cargs[4], id, excludes)
-elif cmd == "del":
- if nargs != 5:
- Usage ()
- else:
- rm.DelRoute (cargs[2], cargs[3], cargs[4])
-else:
- if nargs != 2:
- Usage ()
-
- if cmd == "list":
- rm.ListRoutes ()
- #elif cmd == "load":
- # rm.LoadRoutes (cargs[1])
- elif cmd == "flush":
- rm.ClearAllRoutes ()
+group = cargs[0]
+cmd = cargs[1]
+rm = RouteManager (cargs[2])
+rm.ConnectToBroker ()
+
+if group == "link":
+ if cmd == "add":
+ if nargs != 4:
+ Usage()
+ rm.AddLink (cargs[3])
+ elif cmd == "del":
+ if nargs != 4:
+ Usage()
+ rm.DelLink (cargs[3])
+ elif cmd == "list":
+ if nargs != 3:
+ Usage()
+ rm.ListLinks ()
+
+elif group == "route":
+ if cmd == "add":
+ if nargs < 6 or nargs > 8:
+ Usage ()
+
+ id = ""
+ excludes = ""
+ if nargs > 6: id = cargs[6]
+ if nargs > 7: excludes = cargs[7]
+ rm.AddRoute (cargs[3], cargs[4], cargs[5], id, excludes)
+ elif cmd == "del":
+ if nargs != 6:
+ Usage ()
+ else:
+ rm.DelRoute (cargs[3], cargs[4], cargs[5])
else:
- Usage ()
+ if nargs != 3:
+ Usage ()
+
+ if cmd == "list":
+ rm.ListRoutes ()
+ elif cmd == "flush":
+ rm.ClearAllRoutes ()
+ else:
+ Usage ()
rm.Disconnect ()
diff --git a/specs/management-schema.xml b/specs/management-schema.xml
index 1583968b01..c3fbee615f 100644
--- a/specs/management-schema.xml
+++ b/specs/management-schema.xml
@@ -243,7 +243,7 @@
<arg name="src" dir="I" type="sstr"/>
<arg name="dest" dir="I" type="sstr"/>
<arg name="key" dir="I" type="sstr" default=""/>
- <arg name="id" dir="I" type="sstr" default=""/>
+ <arg name="tag" dir="I" type="sstr" default=""/>
<arg name="excludes" dir="I" type="sstr" default=""/>
<arg name="src_is_queue" dir="I" type="bool" default="0"/>
<arg name="src_is_local" dir="I" type="bool" default="0"/>
@@ -259,11 +259,14 @@
<class name="bridge">
<configElement name="linkRef" type="objId" access="RC" index="y" parentRef="y"/>
<configElement name="channelId" type="uint16" access="RC" index="y"/>
+ <configElement name="durable" type="bool" access="RC"/>
<configElement name="src" type="sstr" access="RC"/>
<configElement name="dest" type="sstr" access="RC"/>
<configElement name="key" type="sstr" access="RC"/>
<configElement name="src_is_queue" type="bool" access="RC"/>
<configElement name="src_is_local" type="bool" access="RC"/>
+ <configElement name="tag" type="sstr" access="RC"/>
+ <configElement name="excludes" type="sstr" access="RC"/>
<method name="close"/>
</class>