summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2012-05-04 19:45:32 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2012-05-04 19:45:32 +0000
commit416a3cc5d7236378fce980a6356ff8e6cc07d691 (patch)
treeb08fe0fb3c8ddfe39243f3dd136b619d8bfb674c
parent5c80b835bf53caf2e0b642788fd0865e040e0975 (diff)
downloadqpid-python-416a3cc5d7236378fce980a6356ff8e6cc07d691.tar.gz
QPID-3767: re-index bridge and link by constant name, not address
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1334138 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.cpp110
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.h31
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp185
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp150
-rw-r--r--qpid/cpp/src/qpid/broker/Link.h33
-rw-r--r--qpid/cpp/src/qpid/broker/LinkRegistry.cpp248
-rw-r--r--qpid/cpp/src/qpid/broker/LinkRegistry.h54
-rw-r--r--qpid/cpp/src/qpid/broker/NameGenerator.h1
-rw-r--r--qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp8
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp10
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp5
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp17
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h1
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py177
-rwxr-xr-xqpid/cpp/src/tests/federation.py189
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java18
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/LinkConfigType.java4
-rw-r--r--qpid/specs/management-schema.xml11
-rwxr-xr-xqpid/tools/src/py/qpid-tool1
21 files changed, 984 insertions, 273 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
index 5b531e4636..53fe38a504 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.cpp
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -57,22 +57,21 @@ void Bridge::PushHandler::handle(framing::AMQFrame& frame)
conn->received(frame);
}
-Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
- const _qmf::ArgsLinkBridge& _args,
+Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id,
+ CancellationListener l, const _qmf::ArgsLinkBridge& _args,
InitializeCallback init) :
- link(_link), id(_id), args(_args), mgmtObject(0),
- listener(l), name(Uuid(true).str()), queueName("qpid.bridge_queue_"), persistenceId(0),
+ link(_link), channel(_id), args(_args), mgmtObject(0),
+ listener(l), name(_name), queueName("qpid.bridge_queue_"), persistenceId(0),
initialize(init), detached(false)
{
- std::stringstream title;
- title << id << "_" << name;
- queueName += title.str();
+ queueName += Uuid(true).str();
ManagementAgent* agent = link->getBroker()->getManagementAgent();
if (agent != 0) {
mgmtObject = new _qmf::Bridge
- (agent, this, link, id, args.i_durable, args.i_src, args.i_dest,
+ (agent, this, link, name, args.i_durable, args.i_src, args.i_dest,
args.i_key, args.i_srcIsQueue, args.i_srcIsLocal,
args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync);
+ mgmtObject->set_channelId(channel);
agent->addObject(mgmtObject);
}
QPID_LOG(debug, "Bridge " << name << " created from " << args.i_src << " to " << args.i_dest);
@@ -90,7 +89,7 @@ void Bridge::create(Connection& c)
conn = &c;
FieldTable options;
if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync);
- SessionHandler& sessionHandler = c.getChannel(id);
+ SessionHandler& sessionHandler = c.getChannel(channel);
sessionHandler.setDetachedCallback(
boost::bind(&Bridge::sessionDetached, shared_from_this()));
if (args.i_srcIsLocal) {
@@ -98,15 +97,15 @@ void Bridge::create(Connection& c)
throw Exception("Dynamic routing not supported for push routes");
// Point the bridging commands at the local connection handler
pushHandler.reset(new PushHandler(&c));
- channelHandler.reset(new framing::ChannelHandler(id, pushHandler.get()));
+ channelHandler.reset(new framing::ChannelHandler(channel, pushHandler.get()));
session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
- session->attach(name, false);
+ session->attach(queueName, false);
session->commandPoint(0,0);
} else {
- sessionHandler.attachAs(name);
+ sessionHandler.attachAs(queueName);
// Point the bridging commands at the remote peer broker
peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out));
}
@@ -168,6 +167,7 @@ void Bridge::cancel(Connection&)
QPID_LOG(debug, "Cancelled bridge " << name);
}
+/** Notify the bridge that the connection has closed */
void Bridge::closed()
{
if (args.i_dynamic) {
@@ -177,9 +177,10 @@ void Bridge::closed()
QPID_LOG(debug, "Closed bridge " << name);
}
-void Bridge::destroy()
+/** Shut down the bridge */
+void Bridge::close()
{
- listener(this);
+ listener(this); // ask the LinkRegistry to destroy us
}
void Bridge::setPersistenceId(uint64_t pId) const
@@ -187,8 +188,21 @@ void Bridge::setPersistenceId(uint64_t pId) const
persistenceId = pId;
}
+
+const std::string Bridge::ENCODED_IDENTIFIER("bridge.v2");
+const std::string Bridge::ENCODED_IDENTIFIER_V1("bridge");
+
+bool Bridge::isEncodedBridge(const std::string& key)
+{
+ return key == ENCODED_IDENTIFIER || key == ENCODED_IDENTIFIER_V1;
+}
+
+
Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
{
+ string kind;
+ buffer.getShortString(kind);
+
string host;
uint16_t port;
string src;
@@ -196,9 +210,33 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
string key;
string id;
string excludes;
+ string name;
+
+ Link::shared_ptr link;
+ if (kind == ENCODED_IDENTIFIER_V1) {
+ /** previous versions identified the bridge by host:port, not by name, and
+ * transport wasn't provided. Try to find a link using those paramters.
+ */
+ buffer.getShortString(host);
+ port = buffer.getShort();
+
+ link = links.getLink(host, port);
+ if (!link) {
+ QPID_LOG(error, "Bridge::decode() failed: cannot find Link for host=" << host << ", port=" << port);
+ return Bridge::shared_ptr();
+ }
+ } else {
+ string linkName;
+
+ buffer.getShortString(name);
+ buffer.getShortString(linkName);
+ link = links.getLink(linkName);
+ if (!link) {
+ QPID_LOG(error, "Bridge::decode() failed: cannot find Link named='" << linkName << "'");
+ return Bridge::shared_ptr();
+ }
+ }
- buffer.getShortString(host);
- port = buffer.getShort();
bool durable(buffer.getOctet());
buffer.getShortString(src);
buffer.getShortString(dest);
@@ -210,15 +248,21 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
bool dynamic(buffer.getOctet());
uint16_t sync = buffer.getShort();
- return links.declare(host, port, durable, src, dest, key,
- is_queue, is_local, id, excludes, dynamic, sync).first;
+ if (kind == ENCODED_IDENTIFIER_V1) {
+ /** previous versions did not provide a name for the bridge, so create one
+ */
+ name = createName(link->getName(), src, dest, key);
+ }
+
+ return links.declare(name, *link, durable, src, dest, key, is_queue,
+ is_local, id, excludes, dynamic, sync).first;
}
void Bridge::encode(Buffer& buffer) const
{
- buffer.putShortString(string("bridge"));
- buffer.putShortString(link->getHost());
- buffer.putShort(link->getPort());
+ buffer.putShortString(ENCODED_IDENTIFIER);
+ buffer.putShortString(name);
+ buffer.putShortString(link->getName());
buffer.putOctet(args.i_durable ? 1 : 0);
buffer.putShortString(args.i_src);
buffer.putShortString(args.i_dest);
@@ -233,9 +277,9 @@ void Bridge::encode(Buffer& buffer) const
uint32_t Bridge::encodedSize() const
{
- return link->getHost().size() + 1 // short-string (host)
- + 7 // short-string ("bridge")
- + 2 // port
+ return ENCODED_IDENTIFIER.size() + 1 // +1 byte length
+ + name.size() + 1
+ + link->getName().size() + 1
+ 1 // durable
+ args.i_src.size() + 1
+ args.i_dest.size() + 1
@@ -259,7 +303,8 @@ management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId,
{
if (methodId == _qmf::Bridge::METHOD_CLOSE) {
//notify that we are closed
- destroy();
+ QPID_LOG(debug, "Bridge::close() method called on bridge '" << name << "'");
+ close();
return management::Manageable::STATUS_OK;
} else {
return management::Manageable::STATUS_UNKNOWN_METHOD;
@@ -306,7 +351,7 @@ void Bridge::sendReorigin()
}
bool Bridge::resetProxy()
{
- SessionHandler& sessionHandler = conn->getChannel(id);
+ SessionHandler& sessionHandler = conn->getChannel(channel);
if (!sessionHandler.getSession()) peer.reset();
else peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out));
return peer.get();
@@ -318,7 +363,7 @@ void Bridge::ioThreadPropagateBinding(const string& queue, const string& exchang
peer->getExchange().bind(queue, exchange, key, args);
} else {
QPID_LOG(error, "Cannot propagate binding for dynamic bridge as session has been detached, deleting dynamic bridge");
- destroy();
+ close();
}
}
@@ -332,9 +377,18 @@ const string& Bridge::getLocalTag() const
{
return link->getBroker()->getFederationTag();
}
-
void Bridge::sessionDetached() {
detached = true;
}
+std::string Bridge::createName(const std::string& linkName,
+ const std::string& src,
+ const std::string& dest,
+ const std::string& key)
+{
+ std::stringstream keystream;
+ keystream << linkName << "!" << src << "!" << dest << "!" << key;
+ return keystream.str();
+}
+
}}
diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h
index 32b9fd1781..2cf07d3a94 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.h
+++ b/qpid/cpp/src/qpid/broker/Bridge.h
@@ -55,17 +55,18 @@ public:
typedef boost::function<void(Bridge*)> CancellationListener;
typedef boost::function<void(Bridge&, SessionHandler&)> InitializeCallback;
- Bridge(Link* link, framing::ChannelId id, CancellationListener l,
+ Bridge(const std::string& name, Link* link, framing::ChannelId id, CancellationListener l,
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args,
InitializeCallback init
);
~Bridge();
- void create(Connection& c);
- void cancel(Connection& c);
- void closed();
- void destroy();
+ QPID_BROKER_EXTERN void close();
bool isDurable() { return args.i_durable; }
+ Link *getLink() const { return link; }
+ const std::string getSrc() const { return args.i_src; }
+ const std::string getDest() const { return args.i_dest; }
+ const std::string getKey() const { return args.i_key; }
bool isDetached() const { return detached; }
@@ -80,7 +81,11 @@ public:
uint32_t encodedSize() const;
void encode(framing::Buffer& buffer) const;
const std::string& getName() const { return name; }
+
+ static const std::string ENCODED_IDENTIFIER;
+ static const std::string ENCODED_IDENTIFIER_V1;
static Bridge::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
+ static bool isEncodedBridge(const std::string& key);
// Exchange::DynamicBridge methods
void propagateBinding(const std::string& key, const std::string& tagList, const std::string& op, const std::string& origin, qpid::framing::FieldTable* extra_args=0);
@@ -93,6 +98,12 @@ public:
std::string getQueueName() const { return queueName; }
const qmf::org::apache::qpid::broker::ArgsLinkBridge& getArgs() { return args; }
+ /** create a name for a bridge (if none supplied by user config) */
+ static std::string createName(const std::string& linkName,
+ const std::string& src,
+ const std::string& dest,
+ const std::string& key);
+
private:
// Callback when the bridge's session is detached.
void sessionDetached();
@@ -108,8 +119,8 @@ private:
std::auto_ptr<framing::AMQP_ServerProxy::Session> session;
std::auto_ptr<framing::AMQP_ServerProxy> peer;
- Link* link;
- framing::ChannelId id;
+ Link* const link;
+ const framing::ChannelId channel;
qmf::org::apache::qpid::broker::ArgsLinkBridge args;
qmf::org::apache::qpid::broker::Bridge* mgmtObject;
CancellationListener listener;
@@ -121,6 +132,12 @@ private:
InitializeCallback initialize;
bool detached; // Set when session is detached.
bool resetProxy();
+
+ // connection Management (called by owning Link)
+ void create(Connection& c);
+ void cancel(Connection& c);
+ void closed();
+ friend class Link; // to call create, cancel, closed()
};
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 726b47c268..c13ac19454 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -438,7 +438,7 @@ Manageable* Broker::GetVhostObject(void) const
Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
Args& args,
- string&)
+ string& text)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
@@ -453,6 +453,14 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
status = Manageable::STATUS_OK;
break;
case _qmf::Broker::METHOD_CONNECT : {
+ /** Management is creating a Link to a remote broker using the host and port of
+ * the remote. This (old) interface does not allow management to specify a name
+ * for the link, nor does it allow multiple Links to the same remote. Use the
+ * "create()" broker method if these features are needed.
+ * TBD: deprecate this interface.
+ */
+ QPID_LOG(info, "The Broker::connect() method will be removed in a future release of QPID."
+ " Please use the Broker::create() method with type='link' instead.");
_qmf::ArgsBrokerConnect& hp=
dynamic_cast<_qmf::ArgsBrokerConnect&>(args);
@@ -461,13 +469,24 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
"; durable=" << (hp.i_durable?"T":"F") << "; authMech=\"" << hp.i_authMechanism << "\"");
if (!getProtocolFactory(transport)) {
QPID_LOG(error, "Transport '" << transport << "' not supported");
+ text = "transport type not supported";
return Manageable::STATUS_NOT_IMPLEMENTED;
}
- std::pair<Link::shared_ptr, bool> response =
- links.declare (hp.i_host, hp.i_port, transport, hp.i_durable,
- hp.i_authMechanism, hp.i_username, hp.i_password);
- if (hp.i_durable && response.second)
- store->create(*response.first);
+
+ // Does a link to the remote already exist? If so, re-use the existing link
+ // - this behavior is backward compatible with previous releases.
+ if (!links.getLink(hp.i_host, hp.i_port, transport)) {
+ // new link, need to generate a unique name for it
+ std::pair<Link::shared_ptr, bool> response =
+ links.declare(Link::createName(transport, hp.i_host, hp.i_port),
+ hp.i_host, hp.i_port, transport,
+ hp.i_durable, hp.i_authMechanism, hp.i_username, hp.i_password);
+ if (!response.first) {
+ text = "Unable to create Link";
+ status = Manageable::STATUS_PARAMETER_INVALID;
+ break;
+ }
+ }
status = Manageable::STATUS_OK;
break;
}
@@ -538,6 +557,8 @@ const std::string TYPE_QUEUE("queue");
const std::string TYPE_EXCHANGE("exchange");
const std::string TYPE_TOPIC("topic");
const std::string TYPE_BINDING("binding");
+const std::string TYPE_LINK("link");
+const std::string TYPE_BRIDGE("bridge");
const std::string DURABLE("durable");
const std::string AUTO_DELETE("auto-delete");
const std::string ALTERNATE_EXCHANGE("alternate-exchange");
@@ -549,6 +570,26 @@ const std::string ATTRIBUTE_TIMESTAMP_0_10("timestamp-0.10");
const std::string _TRUE("true");
const std::string _FALSE("false");
+
+// parameters for creating a Link object, see mgmt schema
+const std::string HOST("host");
+const std::string PORT("port");
+const std::string TRANSPORT("transport");
+const std::string AUTH_MECHANISM("authMechanism");
+const std::string USERNAME("username");
+const std::string PASSWORD("password");
+
+// parameters for creating a Bridge object, see mgmt schema
+const std::string LINK("link");
+const std::string SRC("src");
+const std::string DEST("dest");
+const std::string KEY("key");
+const std::string TAG("tag");
+const std::string EXCLUDES("excludes");
+const std::string SRC_IS_QUEUE("srcIsQueue");
+const std::string SRC_IS_LOCAL("srcIsLocal");
+const std::string DYNAMIC("dynamic");
+const std::string SYNC("sync");
}
struct InvalidBindingIdentifier : public qpid::Exception
@@ -598,6 +639,25 @@ struct UnknownObjectType : public qpid::Exception
std::string getPrefix() const { return "unknown object type"; }
};
+struct ReservedObjectName : public qpid::Exception
+{
+ ReservedObjectName(const std::string& type) : qpid::Exception(type) {}
+ std::string getPrefix() const { return std::string("names prefixed with '")
+ + QPID_NAME_PREFIX + std::string("' are reserved"); }
+};
+
+struct UnsupportedTransport : public qpid::Exception
+{
+ UnsupportedTransport(const std::string& type) : qpid::Exception(type) {}
+ std::string getPrefix() const { return "transport is not supported"; }
+};
+
+struct InvalidParameter : public qpid::Exception
+{
+ InvalidParameter(const std::string& type) : qpid::Exception(type) {}
+ std::string getPrefix() const { return "invalid parameter to method call"; }
+};
+
void Broker::createObject(const std::string& type, const std::string& name,
const Variant::Map& properties, bool /*strict*/, const ConnectionState* context)
{
@@ -669,6 +729,109 @@ void Broker::createObject(const std::string& type, const std::string& name,
amqp_0_10::translate(extensions, arguments);
bind(binding.queue, binding.exchange, binding.key, arguments, userId, connectionId);
+
+ } else if (type == TYPE_LINK) {
+
+ QPID_LOG (debug, "createObject: Link; name=" << name << "; args=" << properties );
+
+ if (name.compare(0, QPID_NAME_PREFIX.length(), QPID_NAME_PREFIX) == 0) {
+ QPID_LOG(error, "Link name='" << name << "' cannot use the reserved prefix '" << QPID_NAME_PREFIX << "'");
+ throw ReservedObjectName(name);
+ }
+
+ std::string host;
+ uint16_t port = 0;
+ std::string transport = TCP_TRANSPORT;
+ bool durable = false;
+ std::string authMech, username, password;
+
+ if (!getProtocolFactory(transport)) {
+ QPID_LOG(error, "Transport '" << transport << "' not supported.");
+ throw UnsupportedTransport(transport);
+ }
+
+ for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+ if (i->first == HOST) host = i->second.asString();
+ else if (i->first == PORT) port = i->second.asUint16();
+ else if (i->first == TRANSPORT) transport = i->second.asString();
+ else if (i->first == DURABLE) durable = bool(i->second);
+ else if (i->first == AUTH_MECHANISM) authMech = i->second.asString();
+ else if (i->first == USERNAME) username = i->second.asString();
+ else if (i->first == PASSWORD) password = i->second.asString();
+ else {
+ // TODO: strict checking here
+ }
+ }
+
+ std::pair<boost::shared_ptr<Link>, bool> rc;
+ rc = links.declare(name, host, port, transport, durable, authMech, username, password);
+ if (!rc.first) {
+ QPID_LOG (error, "Failed to create Link object, name=" << name << " remote=" << host << ":" << port <<
+ "; transport=" << transport << "; durable=" << (durable?"T":"F") << "; authMech=\"" << authMech << "\"");
+ throw InvalidParameter(name);
+ }
+ if (!rc.second) {
+ QPID_LOG (error, "Failed to create a new Link object, name=" << name << " already exists.");
+ throw ObjectAlreadyExists(name);
+ }
+
+ } else if (type == TYPE_BRIDGE) {
+
+ QPID_LOG (debug, "createObject: Bridge; name=" << name << "; args=" << properties );
+
+ if (name.compare(0, QPID_NAME_PREFIX.length(), QPID_NAME_PREFIX) == 0) {
+ QPID_LOG(error, "Bridge name='" << name << "' cannot use the reserved prefix '" << QPID_NAME_PREFIX << "'");
+ throw ReservedObjectName(name);
+ }
+
+ std::string linkName;
+ std::string src;
+ std::string dest;
+ std::string key;
+ std::string id;
+ std::string excludes;
+ bool durable = false;
+ bool srcIsQueue = false;
+ bool srcIsLocal = false;
+ bool dynamic = false;
+ uint16_t sync = 0;
+
+ for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+
+ if (i->first == LINK) linkName = i->second.asString();
+ else if (i->first == SRC) src = i->second.asString();
+ else if (i->first == DEST) dest = i->second.asString();
+ else if (i->first == KEY) key = i->second.asString();
+ else if (i->first == TAG) id = i->second.asString();
+ else if (i->first == EXCLUDES) excludes = i->second.asString();
+ else if (i->first == SRC_IS_QUEUE) srcIsQueue = bool(i->second);
+ else if (i->first == SRC_IS_LOCAL) srcIsLocal = bool(i->second);
+ else if (i->first == DYNAMIC) dynamic = bool(i->second);
+ else if (i->first == SYNC) sync = i->second.asUint16();
+ else if (i->first == DURABLE) durable = bool(i->second);
+ else {
+ // TODO: strict checking here
+ }
+ }
+
+ boost::shared_ptr<Link> link;
+ if (linkName.empty() || !(link = links.getLink(linkName))) {
+ QPID_LOG(error, "Link '" << linkName << "' not found; bridge create failed.");
+ throw InvalidParameter(name);
+ }
+ std::pair<Bridge::shared_ptr, bool> rc =
+ links.declare(name, *link, durable, src, dest, key, srcIsQueue, srcIsLocal, id, excludes,
+ dynamic, sync);
+
+ if (!rc.first) {
+ QPID_LOG (error, "Failed to create Bridge object, name=" << name << " link=" << linkName <<
+ "; src=" << src << "; dest=" << dest << "; key=" << key);
+ throw InvalidParameter(name);
+ }
+ if (!rc.second) {
+ QPID_LOG (error, "Failed to create a new Bridge object, name=" << name << " already exists.");
+ throw ObjectAlreadyExists(name);
+ }
} else {
throw UnknownObjectType(type);
}
@@ -691,6 +854,16 @@ void Broker::deleteObject(const std::string& type, const std::string& name,
} else if (type == TYPE_BINDING) {
BindingIdentifier binding(name);
unbind(binding.queue, binding.exchange, binding.key, userId, connectionId);
+ } else if (type == TYPE_LINK) {
+ boost::shared_ptr<Link> link = links.getLink(name);
+ if (link) {
+ link->close();
+ }
+ } else if (type == TYPE_BRIDGE) {
+ boost::shared_ptr<Bridge> bridge = links.getBridge(name);
+ if (bridge) {
+ bridge->close();
+ }
} else {
throw UnknownObjectType(type);
}
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index f21c861149..b605ca71e5 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -125,18 +125,19 @@ boost::shared_ptr<Exchange> Link::linkExchangeFactory( const std::string& _name
return Exchange::shared_ptr(new LinkExchange(_name));
}
-Link::Link(LinkRegistry* _links,
- MessageStore* _store,
+Link::Link(const string& _name,
+ LinkRegistry* _links,
const string& _host,
uint16_t _port,
const string& _transport,
+ DestroyedListener l,
bool _durable,
const string& _authMechanism,
const string& _username,
const string& _password,
Broker* _broker,
Manageable* parent)
- : links(_links), store(_store),
+ : name(_name), links(_links),
configuredTransport(_transport), configuredHost(_host), configuredPort(_port),
host(_host), port(_port), transport(_transport),
durable(_durable),
@@ -149,6 +150,7 @@ Link::Link(LinkRegistry* _links,
channelCounter(1),
connection(0),
agent(0),
+ listener(l),
timerTask(new LinkTimerTask(*this, broker->getTimer())),
failoverChannel(0)
{
@@ -157,7 +159,10 @@ Link::Link(LinkRegistry* _links,
agent = broker->getManagementAgent();
if (agent != 0)
{
- mgmtObject = new _qmf::Link(agent, this, parent, _host, _port, _transport, _durable);
+ mgmtObject = new _qmf::Link(agent, this, parent, name, durable);
+ mgmtObject->set_host(host);
+ mgmtObject->set_port(port);
+ mgmtObject->set_transport(transport);
agent->addObject(mgmtObject, 0, durable);
}
}
@@ -169,9 +174,9 @@ Link::Link(LinkRegistry* _links,
}
broker->getTimer().add(timerTask);
- stringstream _name;
- _name << "qpid.link." << transport << ":" << host << ":" << port;
- std::pair<Exchange::shared_ptr, bool> rc = broker->getExchanges().declare(_name.str(),
+ stringstream exchangeName;
+ exchangeName << "qpid.link." << name;
+ std::pair<Exchange::shared_ptr, bool> rc = broker->getExchanges().declare(exchangeName.str(),
exchangeTypeName);
failoverExchange = boost::static_pointer_cast<LinkExchange>(rc.first);
assert(failoverExchange);
@@ -245,6 +250,7 @@ void Link::established(Connection* c)
currentInterval = 1;
visitCount = 0;
connection = c;
+
if (closing)
destroy();
else // Process any IO tasks bridges added before established.
@@ -263,7 +269,7 @@ void Link::setUrl(const Url& u) {
namespace {
/** invoked when session used to subscribe to remote's amq.failover exchange detaches */
void sessionDetached(Link *link) {
- QPID_LOG(debug, "detached from 'amq.failover' for link: " << link->getName());
+ QPID_LOG(notice, "detached from 'amq.failover' for link: " << link->getName());
}
}
@@ -271,6 +277,11 @@ namespace {
void Link::opened() {
Mutex::ScopedLock mutex(lock);
if (!connection) return;
+
+ if (!hideManagement() && connection->GetManagementObject()) {
+ mgmtObject->set_connectionRef(connection->GetManagementObject()->getObjectId());
+ }
+
// Get default URL from known-hosts if not already set
if (url.empty()) {
const std::vector<Url>& known = connection->getKnownHosts();
@@ -346,13 +357,14 @@ void Link::closed(int, std::string text)
if (!hideManagement())
mgmtObject->set_lastError (text);
}
+ mgmtObject->set_connectionRef(qpid::management::ObjectId());
}
// Call destroy outside of the lock, don't want to be deleted with lock held.
if (isClosing)
destroy();
}
-// Called in connection IO thread.
+// Called in connection IO thread, cleans up the connection before destroying Link
void Link::destroy ()
{
Bridges toDelete;
@@ -379,9 +391,9 @@ void Link::destroy ()
}
// Now delete all bridges on this link (don't hold the lock for this).
for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++)
- (*i)->destroy();
+ (*i)->close();
toDelete.clear();
- links->destroy (configuredHost, configuredPort);
+ listener(this); // notify LinkRegistry that this Link has been destroyed
}
void Link::add(Bridge::shared_ptr bridge)
@@ -485,12 +497,16 @@ void Link::reconnectLH(const Address& a)
host = a.host;
port = a.port;
transport = a.protocol;
- startConnectionLH();
+
if (!hideManagement()) {
stringstream errorString;
- errorString << "Failed over to " << a;
+ errorString << "Failing over to " << a;
mgmtObject->set_lastError(errorString.str());
+ mgmtObject->set_host(host);
+ mgmtObject->set_port(port);
+ mgmtObject->set_transport(transport);
}
+ startConnectionLH();
}
bool Link::tryFailoverLH() {
@@ -499,15 +515,14 @@ bool Link::tryFailoverLH() {
if (url.empty()) return false;
Address next = url[reconnectNext++];
if (next.host != host || next.port != port || next.protocol != transport) {
- links->changeAddress(Address(transport, host, port), next);
- QPID_LOG(debug, "Inter-broker link failing over to " << next.host << ":" << next.port);
+ QPID_LOG(notice, "Inter-broker link '" << name << "' failing over to " << next);
reconnectLH(next);
return true;
}
return false;
}
-// Management updates for a linke are inconsistent in a cluster, so they are
+// Management updates for a link are inconsistent in a cluster, so they are
// suppressed.
bool Link::hideManagement() const {
return !mgmtObject || ( broker && broker->isInCluster());
@@ -536,18 +551,34 @@ void Link::setPersistenceId(uint64_t id) const
const string& Link::getName() const
{
- return configuredHost;
+ return name;
+}
+
+const std::string Link::ENCODED_IDENTIFIER("link.v2");
+const std::string Link::ENCODED_IDENTIFIER_V1("link");
+
+bool Link::isEncodedLink(const std::string& key)
+{
+ return key == ENCODED_IDENTIFIER || key == ENCODED_IDENTIFIER_V1;
}
Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer)
{
+ string kind;
+ buffer.getShortString(kind);
+
string host;
uint16_t port;
string transport;
string authMechanism;
string username;
string password;
+ string name;
+ if (kind == ENCODED_IDENTIFIER) {
+ // newer version provides a link name.
+ buffer.getShortString(name);
+ }
buffer.getShortString(host);
port = buffer.getShort();
buffer.getShortString(transport);
@@ -556,12 +587,21 @@ Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer)
buffer.getShortString(username);
buffer.getShortString(password);
- return links.declare(host, port, transport, durable, authMechanism, username, password).first;
+ if (kind == ENCODED_IDENTIFIER_V1) {
+ /** previous versions identified the Link by host:port, there was no name
+ * assigned. So create a name for the new Link.
+ */
+ name = createName(transport, host, port);
+ }
+
+ return links.declare(name, host, port, transport, durable, authMechanism,
+ username, password).first;
}
void Link::encode(Buffer& buffer) const
{
- buffer.putShortString(string("link"));
+ buffer.putShortString(ENCODED_IDENTIFIER);
+ buffer.putShortString(name);
buffer.putShortString(configuredHost);
buffer.putShort(configuredPort);
buffer.putShortString(configuredTransport);
@@ -573,8 +613,9 @@ void Link::encode(Buffer& buffer) const
uint32_t Link::encodedSize() const
{
- return configuredHost.size() + 1 // short-string (host)
- + 5 // short-string ("link")
+ return ENCODED_IDENTIFIER.size() + 1 // +1 byte length
+ + name.size() + 1
+ + configuredHost.size() + 1 // short-string (host)
+ 2 // port
+ configuredTransport.size() + 1 // short-string(transport)
+ 1 // durable
@@ -589,6 +630,7 @@ ManagementObject* Link::GetManagementObject (void) const
}
void Link::close() {
+ QPID_LOG(debug, "Link::close(), link=" << name );
Mutex::ScopedLock mutex(lock);
if (!closing) {
closing = true;
@@ -609,36 +651,31 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& te
return Manageable::STATUS_OK;
case _qmf::Link::METHOD_BRIDGE :
+ /* TBD: deprecate this interface in favor of the Broker::create() method. The
+ * Broker::create() method allows the user to assign a name to the bridge.
+ */
+ QPID_LOG(info, "The Link::bridge() method will be removed in a future release of QPID."
+ " Please use the Broker::create() method with type='bridge' instead.");
_qmf::ArgsLinkBridge& iargs = (_qmf::ArgsLinkBridge&) args;
- QPID_LOG(debug, "Link::bridge() request received");
-
- // Durable bridges are only valid on durable links
- if (iargs.i_durable && !durable) {
- text = "Can't create a durable route on a non-durable link";
- return Manageable::STATUS_USER;
- }
-
- if (iargs.i_dynamic) {
- Exchange::shared_ptr exchange = getBroker()->getExchanges().get(iargs.i_src);
- if (exchange.get() == 0) {
- text = "Exchange not found";
- return Manageable::STATUS_USER;
- }
- if (!exchange->supportsDynamicBinding()) {
- text = "Exchange type does not support dynamic routing";
- return Manageable::STATUS_USER;
+ QPID_LOG(debug, "Link::bridge() request received; src=" << iargs.i_src <<
+ "; dest=" << iargs.i_dest << "; key=" << iargs.i_key);
+
+ // Does a bridge already exist that has the src/dest/key? If so, re-use the
+ // existing bridge - this behavior is backward compatible with previous releases.
+ Bridge::shared_ptr bridge = links->getBridge(*this, iargs.i_src, iargs.i_dest, iargs.i_key);
+ if (!bridge) {
+ // need to create a new bridge on this link.
+ std::pair<Bridge::shared_ptr, bool> rc =
+ links->declare( Bridge::createName(name, iargs.i_src, iargs.i_dest, iargs.i_key),
+ *this, iargs.i_durable,
+ iargs.i_src, iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue,
+ iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes,
+ iargs.i_dynamic, iargs.i_sync);
+ if (!rc.first) {
+ text = "invalid parameters";
+ return Manageable::STATUS_PARAMETER_INVALID;
}
}
-
- std::pair<Bridge::shared_ptr, bool> result =
- links->declare (configuredHost, configuredPort, iargs.i_durable, iargs.i_src,
- iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue,
- iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes,
- iargs.i_dynamic, iargs.i_sync);
-
- if (result.second && iargs.i_durable)
- store->create(*result.first);
-
return Manageable::STATUS_OK;
}
@@ -716,6 +753,23 @@ void Link::setState(const framing::FieldTable& state)
}
}
+std::string Link::createName(const std::string& transport,
+ const std::string& host,
+ uint16_t port)
+{
+ stringstream linkName;
+ linkName << QPID_NAME_PREFIX << transport << std::string(":")
+ << host << std::string(":") << port;
+ return linkName.str();
+}
+
+
+bool Link::pendingConnection(const std::string& _host, uint16_t _port) const
+{
+ Mutex::ScopedLock mutex(lock);
+ return (isConnecting() && _port == port && _host == host);
+}
+
const std::string Link::exchangeTypeName("qpid.LinkExchange");
diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h
index a97fa48664..5b788bb947 100644
--- a/qpid/cpp/src/qpid/broker/Link.h
+++ b/qpid/cpp/src/qpid/broker/Link.h
@@ -25,7 +25,6 @@
#include <boost/shared_ptr.hpp>
#include "qpid/Url.h"
#include "qpid/broker/BrokerImportExport.h"
-#include "qpid/broker/MessageStore.h"
#include "qpid/broker/PersistableConfig.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/BrokerImportExport.h"
@@ -52,8 +51,8 @@ class LinkExchange;
class Link : public PersistableConfig, public management::Manageable {
private:
mutable sys::Mutex lock;
+ const std::string name;
LinkRegistry* links;
- MessageStore* store;
// these remain constant across failover - used to identify this link
const std::string configuredTransport;
@@ -85,6 +84,7 @@ class Link : public PersistableConfig, public management::Manageable {
uint channelCounter;
Connection* connection;
management::ManagementAgent* agent;
+ boost::function<void(Link*)> listener;
boost::intrusive_ptr<sys::TimerTask> timerTask;
boost::shared_ptr<broker::LinkExchange> failoverExchange; // subscribed to remote's amq.failover exchange
uint failoverChannel;
@@ -101,27 +101,32 @@ class Link : public PersistableConfig, public management::Manageable {
void setStateLH (int newState);
void startConnectionLH(); // Start the IO Connection
- void destroy(); // Called when mgmt deletes this link
+ void destroy(); // Cleanup connection before link goes away
void ioThreadProcessing(); // Called on connection's IO thread by request
bool tryFailoverLH(); // Called during maintenance visit
bool hideManagement() const;
+ void reconnectLH(const Address&); //called by LinkRegistry
- void established(Connection*); // Called when connection is create
+ // connection management (called by LinkRegistry)
+ void established(Connection*); // Called when connection is created
void opened(); // Called when connection is open (after create)
void closed(int, std::string); // Called when connection goes away
- void reconnectLH(const Address&); //called by LinkRegistry
+ void notifyConnectionForced(const std::string text);
void closeConnection(const std::string& reason);
+ bool pendingConnection(const std::string& host, uint16_t port) const; // is Link trying to connect to this remote?
friend class LinkRegistry; // to call established, opened, closed
public:
typedef boost::shared_ptr<Link> shared_ptr;
+ typedef boost::function<void(Link*)> DestroyedListener;
- Link(LinkRegistry* links,
- MessageStore* store,
+ Link(const std::string& name,
+ LinkRegistry* links,
const std::string& host,
uint16_t port,
const std::string& transport,
+ DestroyedListener l,
bool durable,
const std::string& authMechanism,
const std::string& username,
@@ -148,15 +153,17 @@ class Link : public PersistableConfig, public management::Manageable {
void cancel(Bridge::shared_ptr);
QPID_BROKER_EXTERN void setUrl(const Url&); // Set URL for reconnection.
- QPID_BROKER_EXTERN void close(); // Close the link from within the broker.
+
+ // Close the link.
+ QPID_BROKER_EXTERN void close();
std::string getAuthMechanism() { return authMechanism; }
std::string getUsername() { return username; }
std::string getPassword() { return password; }
Broker* getBroker() { return broker; }
- void notifyConnectionForced(const std::string text);
void setPassive(bool p);
+ bool isConnecting() const { return state == STATE_CONNECTING; }
// PersistableConfig:
void setPersistenceId(uint64_t id) const;
@@ -165,7 +172,10 @@ class Link : public PersistableConfig, public management::Manageable {
void encode(framing::Buffer& buffer) const;
const std::string& getName() const;
+ static const std::string ENCODED_IDENTIFIER;
+ static const std::string ENCODED_IDENTIFIER_V1;
static Link::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
+ static bool isEncodedLink(const std::string& key);
// Manageable entry points
management::ManagementObject* GetManagementObject(void) const;
@@ -178,6 +188,11 @@ class Link : public PersistableConfig, public management::Manageable {
// replicate internal state of this Link for clustering
void getState(framing::FieldTable& state) const;
void setState(const framing::FieldTable& state);
+
+ /** create a name for a link (if none supplied by user config) */
+ static std::string createName(const std::string& transport,
+ const std::string& host,
+ uint16_t port);
};
}
}
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
index d89f220d1b..3cad2c40c9 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -68,27 +68,34 @@ LinkRegistry::LinkRegistry (Broker* _broker) :
LinkRegistry::~LinkRegistry() {}
+/** find link by the *configured* remote address */
+boost::shared_ptr<Link> LinkRegistry::getLink(const std::string& host,
+ uint16_t port,
+ const std::string& transport)
+{
+ Mutex::ScopedLock locker(lock);
+ for (LinkMap::iterator i = links.begin(); i != links.end(); ++i) {
+ Link::shared_ptr& link = i->second;
+ if (link->getHost() == host &&
+ link->getPort() == port &&
+ (transport.empty() || link->getTransport() == transport))
+ return link;
+ }
+ return boost::shared_ptr<Link>();
+}
-void LinkRegistry::changeAddress(const qpid::Address& oldAddress, const qpid::Address& newAddress)
+/** find link by name */
+boost::shared_ptr<Link> LinkRegistry::getLink(const std::string& name)
{
Mutex::ScopedLock locker(lock);
- std::string oldKey = createKey(oldAddress);
- std::string newKey = createKey(newAddress);
- if (links.find(newKey) != links.end()) {
- QPID_LOG(error, "Attempted to update key from " << oldKey << " to " << newKey << " which is already in use");
- } else {
- LinkMap::iterator i = links.find(oldKey);
- if (i == links.end()) {
- QPID_LOG(error, "Attempted to update key from " << oldKey << " which does not exist, to " << newKey);
- } else {
- links[newKey] = i->second;
- links.erase(oldKey);
- QPID_LOG(info, "Updated link key from " << oldKey << " to " << newKey);
- }
- }
+ LinkMap::iterator l = links.find(name);
+ if (l != links.end())
+ return l->second;
+ return boost::shared_ptr<Link>();
}
-pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& host,
+pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& name,
+ const string& host,
uint16_t port,
const string& transport,
bool durable,
@@ -98,24 +105,53 @@ pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& host,
{
Mutex::ScopedLock locker(lock);
- string key = createKey(host, port);
- LinkMap::iterator i = links.find(key);
+ LinkMap::iterator i = links.find(name);
if (i == links.end())
{
Link::shared_ptr link;
- link = Link::shared_ptr (new Link (this, store, host, port, transport, durable,
- authMechanism, username, password,
- broker, parent));
- links[key] = link;
+ link = Link::shared_ptr (new Link (name, this, host, port, transport,
+ boost::bind(&LinkRegistry::linkDestroyed, this, _1),
+ durable, authMechanism, username, password, broker,
+ parent));
+ if (durable && store) store->create(*link);
+ links[name] = link;
+ QPID_LOG(debug, "Creating new link; name=" << name );
return std::pair<Link::shared_ptr, bool>(link, true);
}
return std::pair<Link::shared_ptr, bool>(i->second, false);
}
-pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host,
- uint16_t port,
+/** find bridge by link & route info */
+Bridge::shared_ptr LinkRegistry::getBridge(const Link& link,
+ const std::string& src,
+ const std::string& dest,
+ const std::string& key)
+{
+ Mutex::ScopedLock locker(lock);
+ for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) {
+ if (i->second->getSrc() == src && i->second->getDest() == dest &&
+ i->second->getKey() == key && i->second->getLink() &&
+ i->second->getLink()->getName() == link.getName()) {
+ return i->second;
+ }
+ }
+ return Bridge::shared_ptr();
+}
+
+/** find bridge by name */
+Bridge::shared_ptr LinkRegistry::getBridge(const std::string& name)
+{
+ Mutex::ScopedLock locker(lock);
+ BridgeMap::iterator b = bridges.find(name);
+ if (b != bridges.end())
+ return b->second;
+ return Bridge::shared_ptr();
+}
+
+pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& name,
+ Link& link,
bool durable,
const std::string& src,
const std::string& dest,
@@ -130,18 +166,26 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host,
)
{
Mutex::ScopedLock locker(lock);
- QPID_LOG(debug, "Bridge declared " << host << ": " << port << " from " << src << " to " << dest << " (" << key << ")");
- string linkKey = createKey(host, port);
- stringstream keystream;
- keystream << linkKey << "!" << src << "!" << dest << "!" << key;
- string bridgeKey = keystream.str();
+ // Durable bridges are only valid on durable links
+ if (durable && !link.isDurable()) {
+ QPID_LOG(error, "Can't create a durable route '" << name << "' on a non-durable link '" << link.getName());
+ return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false);
+ }
- LinkMap::iterator l = links.find(linkKey);
- if (l == links.end())
- return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false);
+ if (dynamic) {
+ Exchange::shared_ptr exchange = broker->getExchanges().get(src);
+ if (exchange.get() == 0) {
+ QPID_LOG(error, "Exchange not found, name='" << src << "'" );
+ return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false);
+ }
+ if (!exchange->supportsDynamicBinding()) {
+ QPID_LOG(error, "Exchange type does not support dynamic routing, name='" << src << "'");
+ return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false);
+ }
+ }
- BridgeMap::iterator b = bridges.find(bridgeKey);
+ BridgeMap::iterator b = bridges.find(name);
if (b == bridges.end())
{
_qmf::ArgsLinkBridge args;
@@ -159,23 +203,29 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host,
args.i_sync = sync;
bridge = Bridge::shared_ptr
- (new Bridge (l->second.get(), l->second->nextChannel(),
- boost::bind(&LinkRegistry::destroy, this,
- host, port, src, dest, key),
- args, init));
- bridges[bridgeKey] = bridge;
- l->second->add(bridge);
+ (new Bridge (name, &link, link.nextChannel(),
+ boost::bind(&LinkRegistry::destroyBridge, this, _1),
+ args, init));
+ bridges[name] = bridge;
+ link.add(bridge);
+ if (durable && store)
+ store->create(*bridge);
+
+ QPID_LOG(debug, "Bridge '" << name <<"' declared on link '" << link.getName() <<
+ "' from " << src << " to " << dest << " (" << key << ")");
+
return std::pair<Bridge::shared_ptr, bool>(bridge, true);
}
return std::pair<Bridge::shared_ptr, bool>(b->second, false);
}
-void LinkRegistry::destroy(const string& host, const uint16_t port)
+/** called back by the link when it has completed its cleanup and can be removed. */
+void LinkRegistry::linkDestroyed(Link *link)
{
+ QPID_LOG(debug, "LinkRegistry::destroy(); link= " << link->getName());
Mutex::ScopedLock locker(lock);
- string key = createKey(host, port);
- LinkMap::iterator i = links.find(key);
+ LinkMap::iterator i = links.find(link->getName());
if (i != links.end())
{
if (i->second->isDurable() && store)
@@ -184,27 +234,20 @@ void LinkRegistry::destroy(const string& host, const uint16_t port)
}
}
-void LinkRegistry::destroy(const std::string& host,
- const uint16_t port,
- const std::string& src,
- const std::string& dest,
- const std::string& key)
+/** called back by bridge when its destruction has been requested */
+void LinkRegistry::destroyBridge(Bridge *bridge)
{
+ QPID_LOG(debug, "LinkRegistry::destroy(); bridge= " << bridge->getName());
Mutex::ScopedLock locker(lock);
- string linkKey = createKey(host, port);
- stringstream keystream;
- keystream << linkKey << "!" << src << "!" << dest << "!" << key;
- string bridgeKey = keystream.str();
-
- LinkMap::iterator l = links.find(linkKey);
- if (l == links.end())
- return;
- BridgeMap::iterator b = bridges.find(bridgeKey);
+ BridgeMap::iterator b = bridges.find(bridge->getName());
if (b == bridges.end())
return;
- l->second->cancel(b->second);
+ Link *link = b->second->getLink();
+ if (link) {
+ link->cancel(b->second);
+ }
if (b->second->isDurable())
store->destroy(*(b->second));
bridges.erase(b);
@@ -219,28 +262,73 @@ MessageStore* LinkRegistry::getStore() const {
return store;
}
-Link::shared_ptr LinkRegistry::findLink(const std::string& keyOrMgmtId)
-{
- // Convert keyOrMgmtId to a host:port key.
- //
- // TODO aconway 2011-02-01: centralize code that constructs/parses
- // connection management IDs. Currently sys:: protocol factories
- // and IO plugins construct the IDs and LinkRegistry parses them.
- size_t separator = keyOrMgmtId.find('-');
- if (separator == std::string::npos) separator = 0;
- std::string key = keyOrMgmtId.substr(separator+1, std::string::npos);
+namespace {
+ void extractHostPort(const std::string& connId, std::string *host, uint16_t *port)
+ {
+ // Extract host and port of remote broker from connection id string.
+ //
+ // TODO aconway 2011-02-01: centralize code that constructs/parses connection
+ // management IDs. Currently sys:: protocol factories and IO plugins construct the
+ // IDs and LinkRegistry parses them.
+ // KAG: current connection id format assumed:
+ // "localhost:port-remotehost:port". In the case of IpV6, the host addresses are
+ // contained within brackets "[...]", example:
+ // connId="[::1]:36859-[::1]:48603". Liberal use of "asserts" provided to alert us
+ // if this assumption changes!
+ size_t separator = connId.find('-');
+ assert(separator != std::string::npos);
+ std::string remote = connId.substr(separator+1, std::string::npos);
+ separator = remote.rfind(":");
+ assert(separator != std::string::npos);
+ *host = remote.substr(0, separator);
+ // IPv6 - host is bracketed by "[]", strip them
+ if ((*host)[0] == '[' && (*host)[host->length() - 1] == ']') {
+ *host = host->substr(1, host->length() - 2);
+ }
+ try {
+ *port = boost::lexical_cast<uint16_t>(remote.substr(separator+1, std::string::npos));
+ } catch (const boost::bad_lexical_cast&) {
+ QPID_LOG(error, "Invalid format for connection identifier! '" << connId << "'");
+ assert(false);
+ }
+ }
+}
+/** find the Link that corresponds to the given connection */
+Link::shared_ptr LinkRegistry::findLink(const std::string& connId)
+{
Mutex::ScopedLock locker(lock);
- LinkMap::iterator l = links.find(key);
- if (l != links.end()) return l->second;
- else return Link::shared_ptr();
+ ConnectionMap::iterator c = connections.find(connId);
+ if (c != connections.end()) {
+ LinkMap::iterator l = links.find(c->second);
+ if (l != links.end())
+ return l->second;
+ }
+ return Link::shared_ptr();
}
void LinkRegistry::notifyConnection(const std::string& key, Connection* c)
{
- Link::shared_ptr link = findLink(key);
+ // find a link that is attempting to connect to the remote, and
+ // create a mapping from connection id to link
+ QPID_LOG(debug, "LinkRegistry::notifyConnection(); key=" << key );
+ std::string host;
+ uint16_t port;
+ extractHostPort( key, &host, &port );
+ Link::shared_ptr link;
+ {
+ Mutex::ScopedLock locker(lock);
+ for (LinkMap::iterator l = links.begin(); l != links.end(); ++l) {
+ if (l->second->pendingConnection(host, port)) {
+ link = l->second;
+ connections[key] = link->getName();
+ link->established(c);
+ break;
+ }
+ }
+ }
+
if (link) {
- link->established(c);
c->setUserId(str(format("%1%@%2%") % link->getUsername() % realm));
}
}
@@ -343,20 +431,6 @@ std::string LinkRegistry::getAuthIdentity(const std::string& key)
}
-std::string LinkRegistry::createKey(const qpid::Address& a) {
- // TODO aconway 2010-05-11: key should also include protocol/transport to
- // be unique. Requires refactor of LinkRegistry interface.
- return createKey(a.host, a.port);
-}
-
-std::string LinkRegistry::createKey(const std::string& host, uint16_t port) {
- // TODO aconway 2010-05-11: key should also include protocol/transport to
- // be unique. Requires refactor of LinkRegistry interface.
- stringstream keystream;
- keystream << host << ":" << port;
- return keystream.str();
-}
-
void LinkRegistry::setPassive(bool p)
{
Mutex::ScopedLock locker(lock);
@@ -369,10 +443,12 @@ void LinkRegistry::setPassive(bool p)
}
void LinkRegistry::eachLink(boost::function<void(boost::shared_ptr<Link>)> f) {
+ Mutex::ScopedLock locker(lock);
for (LinkMap::iterator i = links.begin(); i != links.end(); ++i) f(i->second);
}
void LinkRegistry::eachBridge(boost::function<void(boost::shared_ptr<Bridge>)> f) {
+ Mutex::ScopedLock locker(lock);
for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) f(i->second);
}
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.h b/qpid/cpp/src/qpid/broker/LinkRegistry.h
index 8e9d2f4b0d..5f79d9bb52 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.h
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.h
@@ -42,9 +42,11 @@ namespace broker {
class LinkRegistry {
typedef std::map<std::string, boost::shared_ptr<Link> > LinkMap;
typedef std::map<std::string, Bridge::shared_ptr> BridgeMap;
+ typedef std::map<std::string, std::string> ConnectionMap;
- LinkMap links;
- BridgeMap bridges;
+ LinkMap links; /** indexed by name of Link */
+ BridgeMap bridges; /** indexed by name of Bridge */
+ ConnectionMap connections; /** indexed by connection identifier, gives link name */
qpid::sys::Mutex lock;
Broker* broker;
@@ -54,15 +56,18 @@ namespace broker {
std::string realm;
boost::shared_ptr<Link> findLink(const std::string& key);
- static std::string createKey(const Address& address);
- static std::string createKey(const std::string& host, uint16_t port);
- // Methods called by the connection observer.
+ // Methods called by the connection observer, key is connection identifier
void notifyConnection (const std::string& key, Connection* c);
void notifyOpened (const std::string& key);
void notifyClosed (const std::string& key);
void notifyConnectionForced (const std::string& key, const std::string& text);
- friend class LinkRegistryConnectionObserver;
+ friend class LinkRegistryConnectionObserver;
+
+ /** Notify the registry that a Link has been destroyed */
+ void linkDestroyed(Link*);
+ /** Request to destroy a Bridge */
+ void destroyBridge(Bridge*);
public:
QPID_BROKER_EXTERN LinkRegistry (); // Only used in store tests
@@ -70,17 +75,27 @@ namespace broker {
QPID_BROKER_EXTERN ~LinkRegistry();
QPID_BROKER_EXTERN std::pair<boost::shared_ptr<Link>, bool>
- declare(const std::string& host,
+ declare(const std::string& name,
+ const std::string& host,
uint16_t port,
const std::string& transport,
bool durable,
const std::string& authMechanism,
const std::string& username,
const std::string& password);
+ /** determine if Link exists */
+ QPID_BROKER_EXTERN boost::shared_ptr<Link>
+ getLink(const std::string& name);
+ /** host,port,transport will be matched against the configured values, which may
+ be different from the current values due to failover */
+ QPID_BROKER_EXTERN boost::shared_ptr<Link>
+ getLink(const std::string& configHost,
+ uint16_t configPort,
+ const std::string& configTransport = std::string());
QPID_BROKER_EXTERN std::pair<Bridge::shared_ptr, bool>
- declare(const std::string& host,
- uint16_t port,
+ declare(const std::string& name,
+ Link& link,
bool durable,
const std::string& src,
const std::string& dest,
@@ -93,14 +108,14 @@ namespace broker {
uint16_t sync,
Bridge::InitializeCallback=0
);
-
- QPID_BROKER_EXTERN void destroy(const std::string& host, const uint16_t port);
-
- QPID_BROKER_EXTERN void destroy(const std::string& host,
- const uint16_t port,
- const std::string& src,
- const std::string& dest,
- const std::string& key);
+ /** determine if Bridge exists */
+ QPID_BROKER_EXTERN Bridge::shared_ptr
+ getBridge(const std::string& name);
+ QPID_BROKER_EXTERN Bridge::shared_ptr
+ getBridge(const Link& link,
+ const std::string& src,
+ const std::string& dest,
+ const std::string& key);
/**
* Register the manageable parent for declared queues
@@ -126,11 +141,6 @@ namespace broker {
QPID_BROKER_EXTERN uint16_t getPort (const std::string& key);
/**
- * Called by links failing over to new address
- */
- void changeAddress(const Address& oldAddress, const Address& newAddress);
-
- /**
* Called to alter passive state. In passive state the links
* and bridges managed by a link registry will be recorded and
* updated but links won't actually establish connections and
diff --git a/qpid/cpp/src/qpid/broker/NameGenerator.h b/qpid/cpp/src/qpid/broker/NameGenerator.h
index 6ea25c9797..2e9f7febe2 100644
--- a/qpid/cpp/src/qpid/broker/NameGenerator.h
+++ b/qpid/cpp/src/qpid/broker/NameGenerator.h
@@ -32,6 +32,7 @@ namespace qpid {
NameGenerator(const std::string& base);
std::string generate();
};
+ const std::string QPID_NAME_PREFIX("qpid."); // reserved for private names
}
}
diff --git a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index d08409695e..858535637a 100644
--- a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -144,11 +144,13 @@ RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const
RecoverableConfig::shared_ptr RecoveryManagerImpl::recoverConfig(framing::Buffer& buffer)
{
string kind;
-
+ uint32_t p = buffer.getPosition();
buffer.getShortString (kind);
- if (kind == "link")
+ buffer.setPosition(p);
+
+ if (Link::isEncodedLink(kind))
return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Link::decode (links, buffer)));
- else if (kind == "bridge")
+ else if (Bridge::isEncodedBridge(kind))
return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Bridge::decode (links, buffer)));
return RecoverableConfig::shared_ptr(); // TODO: raise an exception instead
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index 512e0f03cb..63001a3cb9 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -782,16 +782,18 @@ void Connection::managementSetupState(
void Connection::config(const std::string& encoded) {
Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
string kind;
+ uint32_t p = buf.getPosition();
buf.getShortString (kind);
- if (kind == "link") {
+ buf.setPosition(p);
+ if (broker::Link::isEncodedLink(kind)) {
broker::Link::shared_ptr link =
- broker::Link::decode(cluster.getBroker().getLinks(), buf);
+ broker::Link::decode(cluster.getBroker().getLinks(), buf);
QPID_LOG(debug, cluster << " updated link "
<< link->getHost() << ":" << link->getPort());
}
- else if (kind == "bridge") {
+ else if (broker::Bridge::isEncodedBridge(kind)) {
broker::Bridge::shared_ptr bridge =
- broker::Bridge::decode(cluster.getBroker().getLinks(), buf);
+ broker::Bridge::decode(cluster.getBroker().getLinks(), buf);
QPID_LOG(debug, cluster << " updated bridge " << bridge->getName());
}
else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind));
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index 3f3fa87a01..42cb2dbbce 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -57,8 +57,10 @@ void Backup::initialize(const Url& url) {
if (url.empty()) throw Url::Invalid("HA broker URL is empty");
QPID_LOG(notice, "HA: Backup initialized: " << url);
string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
+ framing::Uuid uuid(true);
// Declare the link
std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
+ broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(),
url[0].host, url[0].port, protocol,
false, // durable
settings.mechanism, settings.username, settings.password);
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index d0c99cbdb6..690337831c 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -190,8 +190,11 @@ BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>&
: Exchange(QPID_CONFIGURATION_REPLICATOR),
haBroker(hb), broker(hb.getBroker()), link(l)
{
+ framing::Uuid uuid(true);
+ const std::string name(QPID_CONFIGURATION_REPLICATOR + ".bridge." + uuid.str());
broker.getLinks().declare(
- link->getHost(), link->getPort(),
+ name, // name for bridge
+ *link, // parent
false, // durable
QPID_CONFIGURATION_REPLICATOR, // src
QPID_CONFIGURATION_REPLICATOR, // dest
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 7d82fb63bd..589d7ee6aa 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -119,7 +119,9 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue);
Url url(bq_args.i_broker);
string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
+ framing::Uuid uuid(true);
std::pair<broker::Link::shared_ptr, bool> result = broker.getLinks().declare(
+ broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(),
url[0].host, url[0].port, protocol,
false, // durable
settings.mechanism, settings.username, settings.password);
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 633619be13..5ab09d3213 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -60,9 +60,11 @@ QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<L
// This must be separate from the constructor so we can call shared_from_this.
void QueueReplicator::activate() {
- // Note this may create a new bridge or use an existing one.
+ sys::Mutex::ScopedLock l(lock);
+ std::pair<Bridge::shared_ptr, bool> result =
queue->getBroker()->getLinks().declare(
- link->getHost(), link->getPort(),
+ bridgeName,
+ *link,
false, // durable
queue->getName(), // src
getName(), // dest
@@ -77,21 +79,24 @@ void QueueReplicator::activate() {
// before initializeBridge is called.
boost::bind(&QueueReplicator::initializeBridge, shared_from_this(), _1, _2)
);
+ bridge = result.first;
}
QueueReplicator::~QueueReplicator() {}
void QueueReplicator::deactivate() {
+ // destroy the route
sys::Mutex::ScopedLock l(lock);
- queue->getBroker()->getLinks().destroy(
- link->getHost(), link->getPort(), queue->getName(), getName(), string());
- QPID_LOG(debug, logPrefix << "Deactivated bridge " << bridgeName);
+ if (bridge) {
+ bridge->close();
+ bridge.reset();
+ QPID_LOG(debug, logPrefix << "Deactivated bridge " << bridgeName);
+ }
}
// Called in a broker connection thread when the bridge is created.
void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
sys::Mutex::ScopedLock l(lock);
- bridgeName = bridge.getName();
framing::AMQP_ServerProxy peer(sessionHandler.out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
framing::FieldTable settings;
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index bcbac988fa..26fb9456d1 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -78,6 +78,7 @@ class QueueReplicator : public broker::Exchange,
sys::Mutex lock;
boost::shared_ptr<broker::Queue> queue;
boost::shared_ptr<broker::Link> link;
+ boost::shared_ptr<broker::Bridge> bridge;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index 8952f5de7b..09eebc5ec9 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -768,6 +768,35 @@ acl deny all all
fetch(cluster[2])
+ def _verify_federation(self, src_broker, src, dst_broker, dst, timeout=30):
+ """ Prove that traffic can pass between two federated brokers.
+ """
+ tot_time = 0
+ active = False
+ send_session = src_broker.connect().session()
+ sender = send_session.sender(src)
+ receive_session = dst_broker.connect().session()
+ receiver = receive_session.receiver(dst)
+ while not active and tot_time < timeout:
+ sender.send(Message("Hello from Source!"))
+ try:
+ receiver.fetch(timeout = 1)
+ receive_session.acknowledge()
+ # Get this far without Empty exception, and the link is good!
+ active = True
+ while True:
+ # Keep receiving msgs, as several may have accumulated
+ receiver.fetch(timeout = 1)
+ receive_session.acknowledge()
+ except Empty:
+ if not active:
+ tot_time += 1
+ receiver.close()
+ receive_session.close()
+ sender.close()
+ send_session.close()
+ return active
+
def test_federation_failover(self):
"""
Verify that federation operates across failures occuring in a cluster.
@@ -778,38 +807,6 @@ acl deny all all
cluster to newly-added members
"""
- TIMEOUT = 30
- def verify(src_broker, src, dst_broker, dst, timeout=TIMEOUT):
- """ Prove that traffic can pass from source fed broker to
- destination fed broker
- """
- tot_time = 0
- active = False
- send_session = src_broker.connect().session()
- sender = send_session.sender(src)
- receive_session = dst_broker.connect().session()
- receiver = receive_session.receiver(dst)
- while not active and tot_time < timeout:
- sender.send(Message("Hello from Source!"))
- try:
- receiver.fetch(timeout = 1)
- receive_session.acknowledge()
- # Get this far without Empty exception, and the link is good!
- active = True
- while True:
- # Keep receiving msgs, as several may have accumulated
- receiver.fetch(timeout = 1)
- receive_session.acknowledge()
- except Empty:
- if not active:
- tot_time += 1
- receiver.close()
- receive_session.close()
- sender.close()
- send_session.close()
- self.assertTrue(active, "Bridge failed to become active")
-
-
# 2 node cluster source, 2 node cluster destination
src_cluster = self.cluster(2, expect=EXPECT_EXIT_FAIL)
src_cluster.ready();
@@ -848,43 +845,145 @@ acl deny all all
self.assertEqual(result.status, 0, result)
# check that traffic passes
- verify(src_cluster[0], "srcQ", dst_cluster[0], "destQ")
+ assert self._verify_federation(src_cluster[0], "srcQ", dst_cluster[0], "destQ")
# add src[2] broker to source cluster
src_cluster.start(expect=EXPECT_EXIT_FAIL);
src_cluster.ready();
- verify(src_cluster[2], "srcQ", dst_cluster[0], "destQ")
+ assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[0], "destQ")
# Kill src[0]. dst[0] should fail over to src[1]
src_cluster[0].kill()
for b in src_cluster[1:]: b.ready()
- verify(src_cluster[1], "srcQ", dst_cluster[0], "destQ")
+ assert self._verify_federation(src_cluster[1], "srcQ", dst_cluster[0], "destQ")
# Kill src[1], dst[0] should fail over to src[2]
src_cluster[1].kill()
for b in src_cluster[2:]: b.ready()
- verify(src_cluster[2], "srcQ", dst_cluster[0], "destQ")
+ assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[0], "destQ")
# Kill dest[0], force failover to dest[1]
dst_cluster[0].kill()
for b in dst_cluster[1:]: b.ready()
- verify(src_cluster[2], "srcQ", dst_cluster[1], "destQ")
+ assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[1], "destQ")
# Add dest[2]
# dest[1] syncs dest[2] to current remote state
dst_cluster.start(expect=EXPECT_EXIT_FAIL);
for b in dst_cluster[1:]: b.ready()
- verify(src_cluster[2], "srcQ", dst_cluster[1], "destQ")
+ assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[1], "destQ")
# Kill dest[1], force failover to dest[2]
dst_cluster[1].kill()
for b in dst_cluster[2:]: b.ready()
- verify(src_cluster[2], "srcQ", dst_cluster[2], "destQ")
+ assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[2], "destQ")
for i in range(2, len(src_cluster)): src_cluster[i].kill()
for i in range(2, len(dst_cluster)): dst_cluster[i].kill()
+ def test_federation_multilink_failover(self):
+ """
+ Verify that multi-link federation operates across failures occuring in
+ a cluster.
+ """
+
+ # 1 node cluster source, 1 node cluster destination
+ src_cluster = self.cluster(1, expect=EXPECT_EXIT_FAIL)
+ src_cluster.ready();
+ dst_cluster = self.cluster(1, expect=EXPECT_EXIT_FAIL)
+ dst_cluster.ready();
+
+ # federate a direct binding across two separate links
+
+ # first, create a direct exchange bound to two queues using different
+ # bindings
+ cmd = self.popen(["qpid-config",
+ "--broker", src_cluster[0].host_port(),
+ "add", "exchange", "direct", "FedX"],
+ EXPECT_EXIT_OK)
+ cmd.wait()
+
+ cmd = self.popen(["qpid-config",
+ "--broker", dst_cluster[0].host_port(),
+ "add", "exchange", "direct", "FedX"],
+ EXPECT_EXIT_OK)
+ cmd.wait()
+
+ cmd = self.popen(["qpid-config",
+ "--broker", dst_cluster[0].host_port(),
+ "add", "queue", "destQ1"],
+ EXPECT_EXIT_OK)
+ cmd.wait()
+
+ cmd = self.popen(["qpid-config",
+ "--broker", dst_cluster[0].host_port(),
+ "bind", "FedX", "destQ1", "one"],
+ EXPECT_EXIT_OK)
+ cmd.wait()
+
+ cmd = self.popen(["qpid-config",
+ "--broker", dst_cluster[0].host_port(),
+ "add", "queue", "destQ2"],
+ EXPECT_EXIT_OK)
+ cmd.wait()
+
+ cmd = self.popen(["qpid-config",
+ "--broker", dst_cluster[0].host_port(),
+ "bind", "FedX", "destQ2", "two"],
+ EXPECT_EXIT_OK)
+ cmd.wait()
+
+ # Create two separate links between the dst and source brokers, bind
+ # each to different keys
+ dst_cluster[0].startQmf()
+ dst_broker = dst_cluster[0].qmf_session.getObjects(_class="broker")[0]
+
+ for _l in [("link1", "bridge1", "one"),
+ ("link2", "bridge2", "two")]:
+ result = dst_broker.create("link", _l[0],
+ {"host":src_cluster[0].host(),
+ "port":src_cluster[0].port()},
+ False)
+ self.assertEqual(result.status, 0, result);
+ result = dst_broker.create("bridge", _l[1],
+ {"link":_l[0],
+ "src":"FedX",
+ "dest":"FedX",
+ "key":_l[2]}, False)
+ self.assertEqual(result.status, 0);
+
+ # check that traffic passes
+ assert self._verify_federation(src_cluster[0], "FedX/one", dst_cluster[0], "destQ1")
+ assert self._verify_federation(src_cluster[0], "FedX/two", dst_cluster[0], "destQ2")
+
+ # add new member, verify traffic
+ src_cluster.start(expect=EXPECT_EXIT_FAIL);
+ src_cluster.ready();
+
+ dst_cluster.start(expect=EXPECT_EXIT_FAIL);
+ dst_cluster.ready();
+
+ assert self._verify_federation(src_cluster[0], "FedX/one", dst_cluster[0], "destQ1")
+ assert self._verify_federation(src_cluster[0], "FedX/two", dst_cluster[0], "destQ2")
+
+ src_cluster[0].kill()
+ for b in src_cluster[1:]: b.ready()
+
+ assert self._verify_federation(src_cluster[1], "FedX/one", dst_cluster[0], "destQ1")
+ assert self._verify_federation(src_cluster[1], "FedX/two", dst_cluster[0], "destQ2")
+
+ dst_cluster[0].kill()
+ for b in dst_cluster[1:]: b.ready()
+
+ assert self._verify_federation(src_cluster[1], "FedX/one", dst_cluster[1], "destQ1")
+ assert self._verify_federation(src_cluster[1], "FedX/two", dst_cluster[1], "destQ2")
+
+ for i in range(1, len(src_cluster)): src_cluster[i].kill()
+ for i in range(1, len(dst_cluster)): dst_cluster[i].kill()
+
+
+
# Some utility code for transaction tests
XA_RBROLLBACK = 1
XA_RBTIMEOUT = 2
diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py
index 7d613b98ce..5bcf67d152 100755
--- a/qpid/cpp/src/tests/federation.py
+++ b/qpid/cpp/src/tests/federation.py
@@ -23,6 +23,7 @@ from qpid.testlib import TestBase010
from qpid.datatypes import Message
from qpid.queue import Empty
from qpid.util import URL
+import qpid.messaging
from time import sleep, time
@@ -94,6 +95,11 @@ class FederationTests(TestBase010):
break
self._brokers.append(_b)
+ # add a new-style messaging connection to each broker
+ for _b in self._brokers:
+ _b.connection = qpid.messaging.Connection(_b.url)
+ _b.connection.open()
+
def _teardown_brokers(self):
""" Un-does _setup_brokers()
"""
@@ -103,7 +109,7 @@ class FederationTests(TestBase010):
if not _b.client_session.error():
_b.client_session.close(timeout=10)
_b.client_conn.close(timeout=10)
-
+ _b.connection.close()
def test_bridge_create_and_close(self):
self.startQmf();
@@ -127,18 +133,28 @@ class FederationTests(TestBase010):
self.verify_cleanup()
def test_pull_from_exchange(self):
+ """ This test uses an alternative method to manage links and bridges
+ via the broker object.
+ """
session = self.session
-
+
self.startQmf()
qmf = self.qmf
broker = qmf.getObjects(_class="broker")[0]
- result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp")
- self.assertEqual(result.status, 0, result)
- link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, False, False, 0)
+ # create link
+ link_args = {"host":self.remote_host(), "port":self.remote_port(), "durable":False,
+ "authMechanism":"PLAIN", "username":"guest", "password":"guest",
+ "transport":"tcp"}
+ result = broker.create("link", "test-link-1", link_args, False)
self.assertEqual(result.status, 0, result)
+ link = qmf.getObjects(_class="link")[0]
+ # create bridge
+ bridge_args = {"link":"test-link-1", "src":"amq.direct", "dest":"amq.fanout",
+ "key":"my-key"}
+ result = broker.create("bridge", "test-bridge-1", bridge_args, False);
+ self.assertEqual(result.status, 0, result)
bridge = qmf.getObjects(_class="bridge")[0]
#setup queue to receive messages from local broker
@@ -164,9 +180,11 @@ class FederationTests(TestBase010):
self.fail("Got unexpected message in queue: " + extra.body)
except Empty: None
- result = bridge.close()
+
+ result = broker.delete("bridge", "test-bridge-1", {})
self.assertEqual(result.status, 0, result)
- result = link.close()
+
+ result = broker.delete("link", "test-link-1", {})
self.assertEqual(result.status, 0, result)
self.verify_cleanup()
@@ -2153,3 +2171,158 @@ class FederationTests(TestBase010):
self.verify_cleanup()
+ def test_multilink_direct(self):
+ """ Verify that two distinct links can be created between federated
+ brokers.
+ """
+ self.startQmf()
+ qmf = self.qmf
+ self._setup_brokers()
+ src_broker = self._brokers[0]
+ dst_broker = self._brokers[1]
+
+ # create a direct exchange on each broker
+ for _b in [src_broker, dst_broker]:
+ _b.client_session.exchange_declare(exchange="fedX.direct", type="direct")
+ self.assertEqual(_b.client_session.exchange_query(name="fedX.direct").type,
+ "direct", "exchange_declare failed!")
+
+ # create destination queues
+ for _q in [("HiQ", "high"), ("MedQ", "medium"), ("LoQ", "low")]:
+ dst_broker.client_session.queue_declare(queue=_q[0], auto_delete=True)
+ dst_broker.client_session.exchange_bind(queue=_q[0], exchange="fedX.direct", binding_key=_q[1])
+
+ # create two connections, one for high priority traffic
+ for _q in ["HiPri", "Traffic"]:
+ result = dst_broker.qmf_object.create("link", _q,
+ {"host":src_broker.host,
+ "port":src_broker.port},
+ False)
+ self.assertEqual(result.status, 0);
+
+ links = qmf.getObjects(_broker=dst_broker.qmf_broker, _class="link")
+ for _l in links:
+ if _l.name == "HiPri":
+ hi_link = _l
+ elif _l.name == "Traffic":
+ data_link = _l
+ else:
+ self.fail("Unexpected Link found: " + _l.name)
+
+ # now create a route for messages sent with key "high" to use the
+ # hi_link
+ result = dst_broker.qmf_object.create("bridge", "HiPriBridge",
+ {"link":hi_link.name,
+ "src":"fedX.direct",
+ "dest":"fedX.direct",
+ "key":"high"}, False)
+ self.assertEqual(result.status, 0);
+
+
+ # create routes for the "medium" and "low" links to use the normal
+ # data_link
+ for _b in [("MediumBridge", "medium"), ("LowBridge", "low")]:
+ result = dst_broker.qmf_object.create("bridge", _b[0],
+ {"link":data_link.name,
+ "src":"fedX.direct",
+ "dest":"fedX.direct",
+ "key":_b[1]}, False)
+ self.assertEqual(result.status, 0);
+
+ # now wait for the links to become operational
+ for _l in [hi_link, data_link]:
+ expire_time = time() + 30
+ while _l.state != "Operational" and time() < expire_time:
+ _l.update()
+ self.assertEqual(_l.state, "Operational", "Link failed to become operational")
+
+ # verify each link uses a different connection
+ self.assertNotEqual(hi_link.connectionRef, data_link.connectionRef,
+ "Different links using the same connection")
+
+ hi_conn = qmf.getObjects(_broker=dst_broker.qmf_broker,
+ _objectId=hi_link.connectionRef)[0]
+ data_conn = qmf.getObjects(_broker=dst_broker.qmf_broker,
+ _objectId=data_link.connectionRef)[0]
+
+
+ # send hi data, verify only goes over hi link
+
+ r_ssn = dst_broker.connection.session()
+ hi_receiver = r_ssn.receiver("HiQ");
+ med_receiver = r_ssn.receiver("MedQ");
+ low_receiver = r_ssn.receiver("LoQ");
+
+ for _c in [hi_conn, data_conn]:
+ _c.update()
+ self.assertEqual(_c.msgsToClient, 0, "Unexpected messages received")
+
+ s_ssn = src_broker.connection.session()
+ hi_sender = s_ssn.sender("fedX.direct/high")
+ med_sender = s_ssn.sender("fedX.direct/medium")
+ low_sender = s_ssn.sender("fedX.direct/low")
+
+ try:
+ hi_sender.send(qpid.messaging.Message(content="hi priority"))
+ msg = hi_receiver.fetch(timeout=10)
+ r_ssn.acknowledge()
+ self.assertEqual(msg.content, "hi priority");
+ except:
+ self.fail("Hi Pri message failure")
+
+ hi_conn.update()
+ data_conn.update()
+ self.assertEqual(hi_conn.msgsToClient, 1, "Expected 1 hi pri message")
+ self.assertEqual(data_conn.msgsToClient, 0, "Expected 0 data messages")
+
+ # send low and medium, verify it does not go over hi link
+
+ try:
+ med_sender.send(qpid.messaging.Message(content="medium priority"))
+ msg = med_receiver.fetch(timeout=10)
+ r_ssn.acknowledge()
+ self.assertEqual(msg.content, "medium priority");
+ except:
+ self.fail("Medium Pri message failure")
+
+ hi_conn.update()
+ data_conn.update()
+ self.assertEqual(hi_conn.msgsToClient, 1, "Expected 1 hi pri message")
+ self.assertEqual(data_conn.msgsToClient, 1, "Expected 1 data message")
+
+ try:
+ low_sender.send(qpid.messaging.Message(content="low priority"))
+ msg = low_receiver.fetch(timeout=10)
+ r_ssn.acknowledge()
+ self.assertEqual(msg.content, "low priority");
+ except:
+ self.fail("Low Pri message failure")
+
+ hi_conn.update()
+ data_conn.update()
+ self.assertEqual(hi_conn.msgsToClient, 1, "Expected 1 hi pri message")
+ self.assertEqual(data_conn.msgsToClient, 2, "Expected 2 data message")
+
+ # cleanup
+
+ for _b in qmf.getObjects(_broker=dst_broker.qmf_broker,_class="bridge"):
+ result = _b.close()
+ self.assertEqual(result.status, 0)
+
+ for _l in qmf.getObjects(_broker=dst_broker.qmf_broker,_class="link"):
+ result = _l.close()
+ self.assertEqual(result.status, 0)
+
+ for _q in [("HiQ", "high"), ("MedQ", "medium"), ("LoQ", "low")]:
+ dst_broker.client_session.exchange_unbind(queue=_q[0], exchange="fedX.direct", binding_key=_q[1])
+ dst_broker.client_session.queue_delete(queue=_q[0])
+
+ for _b in [src_broker, dst_broker]:
+ _b.client_session.exchange_delete(exchange="fedX.direct")
+
+ self._teardown_brokers()
+
+ self.verify_cleanup()
+
+
+
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
index 900b722886..fbaba7afed 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
@@ -1955,6 +1955,12 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable
return _obj.getAckBatching();
}
+ /* support TBD */
+ public String getName()
+ {
+ return null;
+ }
+
public BrokerSchema.BridgeClass.CloseMethodResponseCommand close(final BrokerSchema.BridgeClass.CloseMethodResponseCommandFactory factory)
{
return null;
@@ -2020,6 +2026,18 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable
return _obj.getLastError();
}
+ /* support TBD */
+ public String getName()
+ {
+ return null;
+ }
+
+ /* support TBD */
+ public BrokerSchema.ConnectionObject getConnectionRef()
+ {
+ return (BrokerSchema.ConnectionObject) null;
+ }
+
public BrokerSchema.LinkClass.CloseMethodResponseCommand close(final BrokerSchema.LinkClass.CloseMethodResponseCommandFactory factory)
{
_obj.close();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/LinkConfigType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/LinkConfigType.java
index ea4f723dda..847cae87f5 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/LinkConfigType.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/LinkConfigType.java
@@ -76,7 +76,7 @@ public final class LinkConfigType extends ConfigObjectType<LinkConfigType, LinkC
}
};
- public static final LinkReadOnlyProperty<Integer> PORT_PROPERTY = new LinkReadOnlyProperty<Integer>("host")
+ public static final LinkReadOnlyProperty<Integer> PORT_PROPERTY = new LinkReadOnlyProperty<Integer>("port")
{
public Integer getValue(LinkConfig object)
{
@@ -134,4 +134,4 @@ public final class LinkConfigType extends ConfigObjectType<LinkConfigType, LinkC
-} \ No newline at end of file
+}
diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml
index 66e122b049..06e0b99af0 100644
--- a/qpid/specs/management-schema.xml
+++ b/qpid/specs/management-schema.xml
@@ -379,10 +379,12 @@
This class represents an inter-broker connection.
<property name="vhostRef" type="objId" references="Vhost" access="RC" index="y" parentRef="y"/>
- <property name="host" type="sstr" access="RC" index="y"/>
- <property name="port" type="uint16" access="RC" index="y"/>
- <property name="transport" type="sstr" access="RC"/>
+ <property name="name" type="sstr" access="RC" index="y"/>
+ <property name="host" type="sstr" access="RO"/>
+ <property name="port" type="uint16" access="RO"/>
+ <property name="transport" type="sstr" access="RO"/>
<property name="durable" type="bool" access="RC"/>
+ <property name="connectionRef" type="objId" references="Connection" access="RO"/>
<statistic name="state" type="sstr" desc="Operational state of the link"/>
<statistic name="lastError" type="lstr" desc="Reason link is not operational"/>
@@ -411,7 +413,8 @@
-->
<class name="Bridge">
<property name="linkRef" type="objId" references="Link" access="RC" index="y" parentRef="y"/>
- <property name="channelId" type="uint16" access="RC" index="y"/>
+ <property name="name" type="sstr" access="RC" index="y"/>
+ <property name="channelId" type="uint16" access="RO"/>
<property name="durable" type="bool" access="RC"/>
<property name="src" type="sstr" access="RC"/>
<property name="dest" type="sstr" access="RC"/>
diff --git a/qpid/tools/src/py/qpid-tool b/qpid/tools/src/py/qpid-tool
index af948b13a9..b31d93594c 100755
--- a/qpid/tools/src/py/qpid-tool
+++ b/qpid/tools/src/py/qpid-tool
@@ -455,6 +455,7 @@ class QmfData(Console):
rows.append(row)
else:
print "No object found with ID %d" % dispId
+ return
finally:
self.lock.release()
self.disp.table(caption, heads, rows)