summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2012-04-04 19:51:02 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2012-04-04 19:51:02 +0000
commit43c80059b846b446280ff10fb6523436326f69bb (patch)
tree2d21c0a0a811d89495bb3b41d6ec6bfafe6c4594
parent41ccf8ed8e38beee6ced8234c344086e3e640dea (diff)
downloadqpid-python-43c80059b846b446280ff10fb6523436326f69bb.tar.gz
QPID-3767: merge old fix onto branch
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3767@1309568 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.cpp82
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.h25
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp185
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp123
-rw-r--r--qpid/cpp/src/qpid/broker/Link.h30
-rw-r--r--qpid/cpp/src/qpid/broker/LinkRegistry.cpp239
-rw-r--r--qpid/cpp/src/qpid/broker/LinkRegistry.h52
-rw-r--r--qpid/cpp/src/qpid/broker/NameGenerator.h1
-rw-r--r--qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp8
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp10
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp3
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp19
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h1
-rwxr-xr-xqpid/cpp/src/tests/federation.py26
-rw-r--r--qpid/specs/management-schema.xml11
17 files changed, 600 insertions, 219 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
index 9a1f4be468..238d1ead6a 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.cpp
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -57,11 +57,11 @@ void Bridge::PushHandler::handle(framing::AMQFrame& frame)
conn->received(frame);
}
-Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
- const _qmf::ArgsLinkBridge& _args,
+Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id,
+ CancellationListener l, const _qmf::ArgsLinkBridge& _args,
InitializeCallback init) :
link(_link), id(_id), args(_args), mgmtObject(0),
- listener(l), name(Uuid(true).str()), queueName("qpid.bridge_queue_"), persistenceId(0),
+ listener(l), name(_name), queueName("qpid.bridge_queue_"), persistenceId(0),
initialize(init)
{
std::stringstream title;
@@ -70,9 +70,10 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
ManagementAgent* agent = link->getBroker()->getManagementAgent();
if (agent != 0) {
mgmtObject = new _qmf::Bridge
- (agent, this, link, id, args.i_durable, args.i_src, args.i_dest,
+ (agent, this, link, name, args.i_durable, args.i_src, args.i_dest,
args.i_key, args.i_srcIsQueue, args.i_srcIsLocal,
args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync);
+ mgmtObject->set_channelId(id);
agent->addObject(mgmtObject);
}
QPID_LOG(debug, "Bridge " << name << " created from " << args.i_src << " to " << args.i_dest);
@@ -165,6 +166,7 @@ void Bridge::cancel(Connection&)
QPID_LOG(debug, "Cancelled bridge " << name);
}
+/** Notify the bridge that the connection has closed */
void Bridge::closed()
{
if (args.i_dynamic) {
@@ -174,9 +176,10 @@ void Bridge::closed()
QPID_LOG(debug, "Closed bridge " << name);
}
-void Bridge::destroy()
+/** Shut down the bridge */
+void Bridge::close()
{
- listener(this);
+ listener(name); // ask the LinkRegistry to destroy us
}
bool Bridge::isSessionReady() const
@@ -190,8 +193,21 @@ void Bridge::setPersistenceId(uint64_t pId) const
persistenceId = pId;
}
+
+const std::string Bridge::ENCODED_IDENTIFIER("bridge.v2");
+const std::string Bridge::ENCODED_IDENTIFIER_V1("bridge");
+
+bool Bridge::isEncodedBridge(const std::string& key)
+{
+ return key == ENCODED_IDENTIFIER || key == ENCODED_IDENTIFIER_V1;
+}
+
+
Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
{
+ string kind;
+ buffer.getShortString(kind);
+
string host;
uint16_t port;
string src;
@@ -199,9 +215,37 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
string key;
string id;
string excludes;
+ string name;
+
+ Link::shared_ptr link;
+ if (kind == ENCODED_IDENTIFIER_V1) {
+ /** previous versions identified the bridge by host:port, not by name, and
+ * transport wasn't provided. So create a unique name for the new bridge.
+ */
+
+ framing::Uuid uuid(true);
+ name = QPID_NAME_PREFIX + uuid.str();
+
+ buffer.getShortString(host);
+ port = buffer.getShort();
+
+ link = links.getLink(host, port);
+ if (!link) {
+ QPID_LOG(error, "Bridge::decode() failed: cannot find Link for host=" << host << ", port=" << port);
+ return Bridge::shared_ptr();
+ }
+ } else {
+ string linkName;
+
+ buffer.getShortString(name);
+ buffer.getShortString(linkName);
+ link = links.getLink(linkName);
+ if (!link) {
+ QPID_LOG(error, "Bridge::decode() failed: cannot find Link named='" << linkName << "'");
+ return Bridge::shared_ptr();
+ }
+ }
- buffer.getShortString(host);
- port = buffer.getShort();
bool durable(buffer.getOctet());
buffer.getShortString(src);
buffer.getShortString(dest);
@@ -213,15 +257,15 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
bool dynamic(buffer.getOctet());
uint16_t sync = buffer.getShort();
- return links.declare(host, port, durable, src, dest, key,
- is_queue, is_local, id, excludes, dynamic, sync).first;
+ return links.declare(name, *link, durable, src, dest, key, is_queue,
+ is_local, id, excludes, dynamic, sync).first;
}
void Bridge::encode(Buffer& buffer) const
{
- buffer.putShortString(string("bridge"));
- buffer.putShortString(link->getHost());
- buffer.putShort(link->getPort());
+ buffer.putShortString(ENCODED_IDENTIFIER);
+ buffer.putShortString(name);
+ buffer.putShortString(link->getName());
buffer.putOctet(args.i_durable ? 1 : 0);
buffer.putShortString(args.i_src);
buffer.putShortString(args.i_dest);
@@ -236,9 +280,9 @@ void Bridge::encode(Buffer& buffer) const
uint32_t Bridge::encodedSize() const
{
- return link->getHost().size() + 1 // short-string (host)
- + 7 // short-string ("bridge")
- + 2 // port
+ return ENCODED_IDENTIFIER.length() + 1 // +1 byte length
+ + name.length() + 1
+ + link->getName().length() + 1
+ 1 // durable
+ args.i_src.size() + 1
+ args.i_dest.size() + 1
@@ -262,7 +306,8 @@ management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId,
{
if (methodId == _qmf::Bridge::METHOD_CLOSE) {
//notify that we are closed
- destroy();
+ QPID_LOG(debug, "Bridge::close() method called on bridge '" << name << "'");
+ close();
return management::Manageable::STATUS_OK;
} else {
return management::Manageable::STATUS_UNKNOWN_METHOD;
@@ -321,7 +366,7 @@ void Bridge::ioThreadPropagateBinding(const string& queue, const string& exchang
peer->getExchange().bind(queue, exchange, key, args);
} else {
QPID_LOG(error, "Cannot propagate binding for dynamic bridge as session has been detached, deleting dynamic bridge");
- destroy();
+ close();
}
}
@@ -335,5 +380,4 @@ const string& Bridge::getLocalTag() const
{
return link->getBroker()->getFederationTag();
}
-
}}
diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h
index b849b11ba8..3ece4ba47c 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.h
+++ b/qpid/cpp/src/qpid/broker/Bridge.h
@@ -48,20 +48,21 @@ class Bridge : public PersistableConfig, public management::Manageable, public E
{
public:
typedef boost::shared_ptr<Bridge> shared_ptr;
- typedef boost::function<void(Bridge*)> CancellationListener;
+ typedef boost::function<void(const std::string&)> CancellationListener;
typedef boost::function<void(Bridge&, SessionHandler&)> InitializeCallback;
- Bridge(Link* link, framing::ChannelId id, CancellationListener l,
+ Bridge(const std::string& name, Link* link, framing::ChannelId id, CancellationListener l,
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args,
InitializeCallback init
);
~Bridge();
- void create(Connection& c);
- void cancel(Connection& c);
- void closed();
- void destroy();
+ void close();
bool isDurable() { return args.i_durable; }
+ Link *getLink() const { return link; }
+ const std::string getSrc() const { return args.i_src; }
+ const std::string getDest() const { return args.i_dest; }
+ const std::string getKey() const { return args.i_key; }
bool isSessionReady() const;
@@ -76,7 +77,11 @@ public:
uint32_t encodedSize() const;
void encode(framing::Buffer& buffer) const;
const std::string& getName() const { return name; }
+
+ static const std::string ENCODED_IDENTIFIER;
+ static const std::string ENCODED_IDENTIFIER_V1;
static Bridge::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
+ static bool isEncodedBridge(const std::string& key);
// Exchange::DynamicBridge methods
void propagateBinding(const std::string& key, const std::string& tagList, const std::string& op, const std::string& origin, qpid::framing::FieldTable* extra_args=0);
@@ -101,7 +106,7 @@ private:
std::auto_ptr<framing::AMQP_ServerProxy::Session> session;
std::auto_ptr<framing::AMQP_ServerProxy> peer;
- Link* link;
+ Link* const link;
framing::ChannelId id;
qmf::org::apache::qpid::broker::ArgsLinkBridge args;
qmf::org::apache::qpid::broker::Bridge* mgmtObject;
@@ -114,6 +119,12 @@ private:
InitializeCallback initialize;
bool resetProxy();
+
+ // connection Management (called by owning Link)
+ void create(Connection& c);
+ void cancel(Connection& c);
+ void closed();
+ friend class Link; // to call create, cancel, closed()
};
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 02111b4387..6b607f6d3d 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -438,7 +438,7 @@ Manageable* Broker::GetVhostObject(void) const
Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
Args& args,
- string&)
+ string& text)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
@@ -453,6 +453,14 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
status = Manageable::STATUS_OK;
break;
case _qmf::Broker::METHOD_CONNECT : {
+ /** Management is creating a Link to a remote broker using the host and port of
+ * the remote. This (old) interface does not allow management to specify a name
+ * for the link, nor does it allow multiple Links to the same remote. Use the
+ * "create()" broker method if these features are needed.
+ * TBD: deprecate this interface.
+ */
+ QPID_LOG(warning, "The Broker::connect() method will be removed in a future release of QPID."
+ " Please use the Broker::create() method with type='link' instead.");
_qmf::ArgsBrokerConnect& hp=
dynamic_cast<_qmf::ArgsBrokerConnect&>(args);
@@ -461,13 +469,24 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
"; durable=" << (hp.i_durable?"T":"F") << "; authMech=\"" << hp.i_authMechanism << "\"");
if (!getProtocolFactory(transport)) {
QPID_LOG(error, "Transport '" << transport << "' not supported");
+ text = "transport type not supported";
return Manageable::STATUS_NOT_IMPLEMENTED;
}
- std::pair<Link::shared_ptr, bool> response =
- links.declare (hp.i_host, hp.i_port, transport, hp.i_durable,
- hp.i_authMechanism, hp.i_username, hp.i_password);
- if (hp.i_durable && response.second)
- store->create(*response.first);
+
+ // Does a link to the remote already exist? If so, re-use the existing link
+ // - this behavior is backward compatible with previous releases.
+ if (!links.getLink(hp.i_host, hp.i_port, transport)) {
+ // new link, need to generate a unique name for it
+ framing::Uuid uuid(true);
+ std::pair<Link::shared_ptr, bool> response =
+ links.declare(QPID_NAME_PREFIX + uuid.str(), hp.i_host, hp.i_port, transport,
+ hp.i_durable, hp.i_authMechanism, hp.i_username, hp.i_password);
+ if (!response.first) {
+ text = "Unable to create Link";
+ status = Manageable::STATUS_PARAMETER_INVALID;
+ break;
+ }
+ }
status = Manageable::STATUS_OK;
break;
}
@@ -538,6 +557,8 @@ const std::string TYPE_QUEUE("queue");
const std::string TYPE_EXCHANGE("exchange");
const std::string TYPE_TOPIC("topic");
const std::string TYPE_BINDING("binding");
+const std::string TYPE_LINK("link");
+const std::string TYPE_BRIDGE("bridge");
const std::string DURABLE("durable");
const std::string AUTO_DELETE("auto-delete");
const std::string ALTERNATE_EXCHANGE("alternate-exchange");
@@ -549,6 +570,26 @@ const std::string ATTRIBUTE_TIMESTAMP_0_10("timestamp-0.10");
const std::string _TRUE("true");
const std::string _FALSE("false");
+
+// parameters for creating a Link object, see mgmt schema
+const std::string HOST("host");
+const std::string PORT("port");
+const std::string TRANSPORT("transport");
+const std::string AUTH_MECHANISM("authMechanism");
+const std::string USERNAME("username");
+const std::string PASSWORD("password");
+
+// parameters for creating a Bridge object, see mgmt schema
+const std::string LINK("link");
+const std::string SRC("src");
+const std::string DEST("dest");
+const std::string KEY("key");
+const std::string TAG("tag");
+const std::string EXCLUDES("excludes");
+const std::string SRC_IS_QUEUE("srcIsQueue");
+const std::string SRC_IS_LOCAL("srcIsLocal");
+const std::string DYNAMIC("dynamic");
+const std::string SYNC("sync");
}
struct InvalidBindingIdentifier : public qpid::Exception
@@ -598,6 +639,25 @@ struct UnknownObjectType : public qpid::Exception
std::string getPrefix() const { return "unknown object type"; }
};
+struct ReservedObjectName : public qpid::Exception
+{
+ ReservedObjectName(const std::string& type) : qpid::Exception(type) {}
+ std::string getPrefix() const { return std::string("names prefixed with '")
+ + QPID_NAME_PREFIX + std::string("' are reserved"); }
+};
+
+struct UnsupportedTransport : public qpid::Exception
+{
+ UnsupportedTransport(const std::string& type) : qpid::Exception(type) {}
+ std::string getPrefix() const { return "transport is not supported"; }
+};
+
+struct InvalidParameter : public qpid::Exception
+{
+ InvalidParameter(const std::string& type) : qpid::Exception(type) {}
+ std::string getPrefix() const { return "invalid parameter to method call"; }
+};
+
void Broker::createObject(const std::string& type, const std::string& name,
const Variant::Map& properties, bool /*strict*/, const ConnectionState* context)
{
@@ -669,6 +729,109 @@ void Broker::createObject(const std::string& type, const std::string& name,
amqp_0_10::translate(extensions, arguments);
bind(binding.queue, binding.exchange, binding.key, arguments, userId, connectionId);
+
+ } else if (type == TYPE_LINK) {
+
+ QPID_LOG (debug, "createObject: Link; name=" << name << "; args=" << properties );
+
+ if (name.compare(0, QPID_NAME_PREFIX.length(), QPID_NAME_PREFIX) == 0) {
+ QPID_LOG(error, "Link name='" << name << "' cannot use the reserved prefix '" << QPID_NAME_PREFIX << "'");
+ throw ReservedObjectName(name);
+ }
+
+ std::string host;
+ uint16_t port = 0;
+ std::string transport = TCP_TRANSPORT;
+ bool durable = false;
+ std::string authMech, username, password;
+
+ if (!getProtocolFactory(transport)) {
+ QPID_LOG(error, "Transport '" << transport << "' not supported.");
+ throw UnsupportedTransport(transport);
+ }
+
+ for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+ if (i->first == HOST) host = i->second.asString();
+ else if (i->first == PORT) port = i->second.asUint16();
+ else if (i->first == TRANSPORT) transport = i->second.asString();
+ else if (i->first == DURABLE) durable = bool(i->second);
+ else if (i->first == AUTH_MECHANISM) authMech = i->second.asString();
+ else if (i->first == USERNAME) username = i->second.asString();
+ else if (i->first == PASSWORD) password = i->second.asString();
+ else {
+ // TODO: strict checking here
+ }
+ }
+
+ std::pair<boost::shared_ptr<Link>, bool> rc;
+ rc = links.declare(name, host, port, transport, durable, authMech, username, password);
+ if (!rc.first) {
+ QPID_LOG (error, "Failed to create Link object, name=" << name << " remote=" << host << ":" << port <<
+ "; transport=" << transport << "; durable=" << (durable?"T":"F") << "; authMech=\"" << authMech << "\"");
+ throw InvalidParameter(name);
+ }
+ if (!rc.second) {
+ QPID_LOG (error, "Failed to create a new Link object, name=" << name << " already exists.");
+ throw ObjectAlreadyExists(name);
+ }
+
+ } else if (type == TYPE_BRIDGE) {
+
+ QPID_LOG (debug, "createObject: Bridge; name=" << name << "; args=" << properties );
+
+ if (name.compare(0, QPID_NAME_PREFIX.length(), QPID_NAME_PREFIX) == 0) {
+ QPID_LOG(error, "Bridge name='" << name << "' cannot use the reserved prefix '" << QPID_NAME_PREFIX << "'");
+ throw ReservedObjectName(name);
+ }
+
+ std::string linkName;
+ std::string src;
+ std::string dest;
+ std::string key;
+ std::string id;
+ std::string excludes;
+ bool durable = false;
+ bool srcIsQueue = false;
+ bool srcIsLocal = false;
+ bool dynamic = false;
+ uint16_t sync = 0;
+
+ for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+
+ if (i->first == LINK) linkName = i->second.asString();
+ else if (i->first == SRC) src = i->second.asString();
+ else if (i->first == DEST) dest = i->second.asString();
+ else if (i->first == KEY) key = i->second.asString();
+ else if (i->first == TAG) id = i->second.asString();
+ else if (i->first == EXCLUDES) excludes = i->second.asString();
+ else if (i->first == SRC_IS_QUEUE) srcIsQueue = bool(i->second);
+ else if (i->first == SRC_IS_LOCAL) srcIsLocal = bool(i->second);
+ else if (i->first == DYNAMIC) dynamic = bool(i->second);
+ else if (i->first == SYNC) sync = i->second.asUint16();
+ else if (i->first == DURABLE) durable = bool(i->second);
+ else {
+ // TODO: strict checking here
+ }
+ }
+
+ boost::shared_ptr<Link> link;
+ if (linkName.empty() || !(link = links.getLink(linkName))) {
+ QPID_LOG(error, "Link '" << linkName << "' not found; bridge create failed.");
+ throw InvalidParameter(name);
+ }
+ std::pair<Bridge::shared_ptr, bool> rc =
+ links.declare(name, *link, durable, src, dest, key, srcIsQueue, srcIsLocal, id, excludes,
+ dynamic, sync);
+
+ if (!rc.first) {
+ QPID_LOG (error, "Failed to create Bridge object, name=" << name << " link=" << linkName <<
+ "; src=" << src << "; dest=" << dest << "; key=" << key);
+ throw InvalidParameter(name);
+ }
+ if (!rc.second) {
+ QPID_LOG (error, "Failed to create a new Bridge object, name=" << name << " already exists.");
+ throw ObjectAlreadyExists(name);
+ }
} else {
throw UnknownObjectType(type);
}
@@ -691,6 +854,16 @@ void Broker::deleteObject(const std::string& type, const std::string& name,
} else if (type == TYPE_BINDING) {
BindingIdentifier binding(name);
unbind(binding.queue, binding.exchange, binding.key, userId, connectionId);
+ } else if (type == TYPE_LINK) {
+ boost::shared_ptr<Link> link = links.getLink(name);
+ if (link) {
+ link->close();
+ }
+ } else if (type == TYPE_BRIDGE) {
+ boost::shared_ptr<Bridge> bridge = links.getBridge(name);
+ if (bridge) {
+ bridge->close();
+ }
} else {
throw UnknownObjectType(type);
}
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index 56a90e7fb7..911ec0ac0c 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -65,18 +65,19 @@ struct LinkTimerTask : public sys::TimerTask {
sys::Timer& timer;
};
-Link::Link(LinkRegistry* _links,
- MessageStore* _store,
+Link::Link(const string& _name,
+ LinkRegistry* _links,
const string& _host,
uint16_t _port,
const string& _transport,
+ DestroyedListener l,
bool _durable,
const string& _authMechanism,
const string& _username,
const string& _password,
Broker* _broker,
Manageable* parent)
- : links(_links), store(_store), host(_host), port(_port),
+ : name(_name), links(_links), host(_host), port(_port),
transport(_transport),
durable(_durable),
authMechanism(_authMechanism), username(_username), password(_password),
@@ -88,6 +89,7 @@ Link::Link(LinkRegistry* _links,
channelCounter(1),
connection(0),
agent(0),
+ listener(l),
timerTask(new LinkTimerTask(*this, broker->getTimer()))
{
if (parent != 0 && broker != 0)
@@ -95,7 +97,10 @@ Link::Link(LinkRegistry* _links,
agent = broker->getManagementAgent();
if (agent != 0)
{
- mgmtObject = new _qmf::Link(agent, this, parent, _host, _port, _transport, _durable);
+ mgmtObject = new _qmf::Link(agent, this, parent, name, durable);
+ mgmtObject->set_host(host);
+ mgmtObject->set_port(port);
+ mgmtObject->set_transport(transport);
agent->addObject(mgmtObject, 0, durable);
}
}
@@ -172,6 +177,10 @@ void Link::established(Connection* c)
currentInterval = 1;
visitCount = 0;
connection = c;
+ if (!hideManagement() && connection->GetManagementObject()) {
+ mgmtObject->set_connectionRef(connection->GetManagementObject()->getObjectId());
+ }
+
if (closing)
destroy();
else // Process any IO tasks bridges added before established.
@@ -224,13 +233,14 @@ void Link::closed(int, std::string text)
setStateLH(STATE_WAITING);
if (!hideManagement())
mgmtObject->set_lastError (text);
+ mgmtObject->set_connectionRef(qpid::management::ObjectId());
}
if (closing)
destroy();
}
-// Called in connection IO thread.
+// Called in connection IO thread, cleans up the connection before destroying Link
void Link::destroy ()
{
Bridges toDelete;
@@ -259,9 +269,9 @@ void Link::destroy ()
}
// Now delete all bridges on this link (don't hold the lock for this).
for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++)
- (*i)->destroy();
+ (*i)->close();
toDelete.clear();
- links->destroy (host, port);
+ listener(name); // notify LinkRegistry that this Link has been destroyed
}
void Link::add(Bridge::shared_ptr bridge)
@@ -365,12 +375,16 @@ void Link::reconnectLH(const Address& a)
host = a.host;
port = a.port;
transport = a.protocol;
- startConnectionLH();
+
if (!hideManagement()) {
stringstream errorString;
- errorString << "Failed over to " << a;
+ errorString << "Failing over to '" << transport << ":" << host << ":" << port <<"'";
mgmtObject->set_lastError(errorString.str());
+ mgmtObject->set_host(host);
+ mgmtObject->set_port(port);
+ mgmtObject->set_transport(transport);
}
+ startConnectionLH();
}
bool Link::tryFailoverLH() {
@@ -379,15 +393,14 @@ bool Link::tryFailoverLH() {
if (url.empty()) return false;
Address next = url[reconnectNext++];
if (next.host != host || next.port != port || next.protocol != transport) {
- links->changeAddress(Address(transport, host, port), next);
- QPID_LOG(debug, "Inter-broker link failing over to " << next.host << ":" << next.port);
+ QPID_LOG(notice, "Inter-broker link '" << name << "' failing over to " << next.host << ":" << next.port);
reconnectLH(next);
return true;
}
return false;
}
-// Management updates for a linke are inconsistent in a cluster, so they are
+// Management updates for a link are inconsistent in a cluster, so they are
// suppressed.
bool Link::hideManagement() const {
return !mgmtObject || ( broker && broker->isInCluster());
@@ -415,18 +428,39 @@ void Link::setPersistenceId(uint64_t id) const
const string& Link::getName() const
{
- return host;
+ return name;
+}
+
+const std::string Link::ENCODED_IDENTIFIER("link.v2");
+const std::string Link::ENCODED_IDENTIFIER_V1("link");
+
+bool Link::isEncodedLink(const std::string& key)
+{
+ return key == ENCODED_IDENTIFIER || key == ENCODED_IDENTIFIER_V1;
}
Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer)
{
+ string kind;
+ buffer.getShortString(kind);
+
string host;
uint16_t port;
string transport;
string authMechanism;
string username;
string password;
-
+ string name;
+
+ if (kind == ENCODED_IDENTIFIER_V1) {
+ /** previous versions identified the Link by host:port, there was no name
+ * assigned. So create a unique name for the new Link.
+ */
+ framing::Uuid uuid(true);
+ name = QPID_NAME_PREFIX + uuid.str();
+ } else {
+ buffer.getShortString(name);
+ }
buffer.getShortString(host);
port = buffer.getShort();
buffer.getShortString(transport);
@@ -435,12 +469,14 @@ Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer)
buffer.getShortString(username);
buffer.getShortString(password);
- return links.declare(host, port, transport, durable, authMechanism, username, password).first;
+ return links.declare(name, host, port, transport, durable, authMechanism,
+ username, password).first;
}
void Link::encode(Buffer& buffer) const
{
- buffer.putShortString(string("link"));
+ buffer.putShortString(ENCODED_IDENTIFIER);
+ buffer.putShortString(name);
buffer.putShortString(host);
buffer.putShort(port);
buffer.putShortString(transport);
@@ -452,7 +488,9 @@ void Link::encode(Buffer& buffer) const
uint32_t Link::encodedSize() const
{
- return host.size() + 1 // short-string (host)
+ return ENCODED_IDENTIFIER.length() + 1 // +1 byte length
+ + name.length() + 1
+ + host.size() + 1 // short-string (host)
+ 5 // short-string ("link")
+ 2 // port
+ transport.size() + 1 // short-string(transport)
@@ -468,6 +506,7 @@ ManagementObject* Link::GetManagementObject (void) const
}
void Link::close() {
+ QPID_LOG(debug, "Link::close(), link=" << name );
Mutex::ScopedLock mutex(lock);
if (!closing) {
closing = true;
@@ -488,36 +527,32 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& te
return Manageable::STATUS_OK;
case _qmf::Link::METHOD_BRIDGE :
+ /* TBD: deprecate this interface in favor of the Broker::create() method. The
+ * Broker::create() method allows the user to assign a name to the bridge.
+ */
+ QPID_LOG(warning, "The Link::bridge() method will be removed in a future release of QPID."
+ " Please use the Broker::create() method with type='bridge' instead.");
_qmf::ArgsLinkBridge& iargs = (_qmf::ArgsLinkBridge&) args;
- QPID_LOG(debug, "Link::bridge() request received");
-
- // Durable bridges are only valid on durable links
- if (iargs.i_durable && !durable) {
- text = "Can't create a durable route on a non-durable link";
- return Manageable::STATUS_USER;
- }
-
- if (iargs.i_dynamic) {
- Exchange::shared_ptr exchange = getBroker()->getExchanges().get(iargs.i_src);
- if (exchange.get() == 0) {
- text = "Exchange not found";
- return Manageable::STATUS_USER;
- }
- if (!exchange->supportsDynamicBinding()) {
- text = "Exchange type does not support dynamic routing";
- return Manageable::STATUS_USER;
+ QPID_LOG(debug, "Link::bridge() request received; src=" << iargs.i_src <<
+ "; dest=" << iargs.i_dest << "; key=" << iargs.i_key);
+
+ // Does a bridge already exist that has the src/dest/key? If so, re-use the
+ // existing bridge - this behavior is backward compatible with previous releases.
+ Bridge::shared_ptr bridge = links->getBridge(*this, iargs.i_src, iargs.i_dest, iargs.i_key);
+ if (!bridge) {
+ // need to create a new bridge on this link
+ framing::Uuid uuid(true);
+ const std::string name(QPID_NAME_PREFIX + uuid.str());
+ std::pair<Bridge::shared_ptr, bool> rc =
+ links->declare( name, *this, iargs.i_durable,
+ iargs.i_src, iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue,
+ iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes,
+ iargs.i_dynamic, iargs.i_sync);
+ if (!rc.first) {
+ text = "invalid parameters";
+ return Manageable::STATUS_PARAMETER_INVALID;
}
}
-
- std::pair<Bridge::shared_ptr, bool> result =
- links->declare (host, port, iargs.i_durable, iargs.i_src,
- iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue,
- iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes,
- iargs.i_dynamic, iargs.i_sync);
-
- if (result.second && iargs.i_durable)
- store->create(*result.first);
-
return Manageable::STATUS_OK;
}
diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h
index c7c8209db3..7907460721 100644
--- a/qpid/cpp/src/qpid/broker/Link.h
+++ b/qpid/cpp/src/qpid/broker/Link.h
@@ -25,7 +25,6 @@
#include <boost/shared_ptr.hpp>
#include "qpid/Url.h"
#include "qpid/broker/BrokerImportExport.h"
-#include "qpid/broker/MessageStore.h"
#include "qpid/broker/PersistableConfig.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/BrokerImportExport.h"
@@ -51,8 +50,8 @@ class Connection;
class Link : public PersistableConfig, public management::Manageable {
private:
sys::Mutex lock;
+ const std::string name;
LinkRegistry* links;
- MessageStore* store;
std::string host;
uint16_t port;
std::string transport;
@@ -77,6 +76,7 @@ class Link : public PersistableConfig, public management::Manageable {
uint channelCounter;
Connection* connection;
management::ManagementAgent* agent;
+ boost::function<void(const std::string&)> listener;
boost::intrusive_ptr<sys::TimerTask> timerTask;
@@ -91,26 +91,29 @@ class Link : public PersistableConfig, public management::Manageable {
void setStateLH (int newState);
void startConnectionLH(); // Start the IO Connection
- void destroy(); // Called when mgmt deletes this link
+ void destroy(); // Cleanup connection before link goes away
void ioThreadProcessing(); // Called on connection's IO thread by request
bool tryFailoverLH(); // Called during maintenance visit
bool hideManagement() const;
+ void reconnectLH(const Address&); //called by LinkRegistry
- void established(Connection*); // Called when connection is create
+ // connection management (called by LinkRegistry)
+ void established(Connection*); // Called when connection is created
void opened(); // Called when connection is open (after create)
void closed(int, std::string); // Called when connection goes away
- void reconnectLH(const Address&); //called by LinkRegistry
-
- friend class LinkRegistry; // to call established, opened, closed
+ void notifyConnectionForced(const std::string text);
+ friend class LinkRegistry; // to call established, opened, closed
public:
typedef boost::shared_ptr<Link> shared_ptr;
+ typedef boost::function<void(const std::string&)> DestroyedListener;
- Link(LinkRegistry* links,
- MessageStore* store,
+ Link(const std::string& name,
+ LinkRegistry* links,
const std::string& host,
uint16_t port,
const std::string& transport,
+ DestroyedListener l,
bool durable,
const std::string& authMechanism,
const std::string& username,
@@ -130,15 +133,17 @@ class Link : public PersistableConfig, public management::Manageable {
void cancel(Bridge::shared_ptr);
QPID_BROKER_EXTERN void setUrl(const Url&); // Set URL for reconnection.
- QPID_BROKER_EXTERN void close(); // Close the link from within the broker.
+
+ // Close the link.
+ QPID_BROKER_EXTERN void close();
std::string getAuthMechanism() { return authMechanism; }
std::string getUsername() { return username; }
std::string getPassword() { return password; }
Broker* getBroker() { return broker; }
- void notifyConnectionForced(const std::string text);
void setPassive(bool p);
+ bool isConnecting() const { return state == STATE_CONNECTING; }
// PersistableConfig:
void setPersistenceId(uint64_t id) const;
@@ -147,7 +152,10 @@ class Link : public PersistableConfig, public management::Manageable {
void encode(framing::Buffer& buffer) const;
const std::string& getName() const;
+ static const std::string ENCODED_IDENTIFIER;
+ static const std::string ENCODED_IDENTIFIER_V1;
static Link::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
+ static bool isEncodedLink(const std::string& key);
// Manageable entry points
management::ManagementObject* GetManagementObject(void) const;
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
index c6c5a1ac05..a0cf3555c3 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -68,27 +68,34 @@ LinkRegistry::LinkRegistry (Broker* _broker) :
LinkRegistry::~LinkRegistry() {}
+/** find link by current remote address */
+boost::shared_ptr<Link> LinkRegistry::getLink(const std::string& host,
+ uint16_t port,
+ const std::string& transport)
+{
+ Mutex::ScopedLock locker(lock);
+ for (LinkMap::iterator i = links.begin(); i != links.end(); ++i) {
+ Link::shared_ptr& link = i->second;
+ if (link->getHost() == host &&
+ link->getPort() == port &&
+ (transport.empty() || link->getTransport() == transport))
+ return link;
+ }
+ return boost::shared_ptr<Link>();
+}
-void LinkRegistry::changeAddress(const qpid::Address& oldAddress, const qpid::Address& newAddress)
+/** find link by name */
+boost::shared_ptr<Link> LinkRegistry::getLink(const std::string& name)
{
Mutex::ScopedLock locker(lock);
- std::string oldKey = createKey(oldAddress);
- std::string newKey = createKey(newAddress);
- if (links.find(newKey) != links.end()) {
- QPID_LOG(error, "Attempted to update key from " << oldKey << " to " << newKey << " which is already in use");
- } else {
- LinkMap::iterator i = links.find(oldKey);
- if (i == links.end()) {
- QPID_LOG(error, "Attempted to update key from " << oldKey << " which does not exist, to " << newKey);
- } else {
- links[newKey] = i->second;
- links.erase(oldKey);
- QPID_LOG(info, "Updated link key from " << oldKey << " to " << newKey);
- }
- }
+ LinkMap::iterator l = links.find(name);
+ if (l != links.end())
+ return l->second;
+ return boost::shared_ptr<Link>();
}
-pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& host,
+pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& name,
+ const string& host,
uint16_t port,
const string& transport,
bool durable,
@@ -98,24 +105,53 @@ pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& host,
{
Mutex::ScopedLock locker(lock);
- string key = createKey(host, port);
- LinkMap::iterator i = links.find(key);
+ LinkMap::iterator i = links.find(name);
if (i == links.end())
{
Link::shared_ptr link;
- link = Link::shared_ptr (new Link (this, store, host, port, transport, durable,
- authMechanism, username, password,
- broker, parent));
- links[key] = link;
+ link = Link::shared_ptr (new Link (name, this, host, port, transport,
+ boost::bind(&LinkRegistry::linkDestroyed, this, _1),
+ durable, authMechanism, username, password, broker,
+ parent));
+ if (durable && store) store->create(*link);
+ links[name] = link;
+ QPID_LOG(debug, "Creating new link; name=" << name );
return std::pair<Link::shared_ptr, bool>(link, true);
}
return std::pair<Link::shared_ptr, bool>(i->second, false);
}
-pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host,
- uint16_t port,
+/** find bridge by link & route info */
+Bridge::shared_ptr LinkRegistry::getBridge(const Link& link,
+ const std::string& src,
+ const std::string& dest,
+ const std::string& key)
+{
+ Mutex::ScopedLock locker(lock);
+ for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) {
+ if (i->second->getSrc() == src && i->second->getDest() == dest &&
+ i->second->getKey() == key && i->second->getLink() &&
+ i->second->getLink()->getName() == link.getName()) {
+ return i->second;
+ }
+ }
+ return Bridge::shared_ptr();
+}
+
+/** find bridge by name */
+Bridge::shared_ptr LinkRegistry::getBridge(const std::string& name)
+{
+ Mutex::ScopedLock locker(lock);
+ BridgeMap::iterator b = bridges.find(name);
+ if (b != bridges.end())
+ return b->second;
+ return Bridge::shared_ptr();
+}
+
+pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& name,
+ Link& link,
bool durable,
const std::string& src,
const std::string& dest,
@@ -130,18 +166,26 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host,
)
{
Mutex::ScopedLock locker(lock);
- QPID_LOG(debug, "Bridge declared " << host << ": " << port << " from " << src << " to " << dest << " (" << key << ")");
- string linkKey = createKey(host, port);
- stringstream keystream;
- keystream << linkKey << "!" << src << "!" << dest << "!" << key;
- string bridgeKey = keystream.str();
+ // Durable bridges are only valid on durable links
+ if (durable && !link.isDurable()) {
+ QPID_LOG(error, "Can't create a durable route '" << name << "' on a non-durable link '" << link.getName());
+ return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false);
+ }
- LinkMap::iterator l = links.find(linkKey);
- if (l == links.end())
- return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false);
+ if (dynamic) {
+ Exchange::shared_ptr exchange = broker->getExchanges().get(src);
+ if (exchange.get() == 0) {
+ QPID_LOG(error, "Exchange not found, name='" << src << "'" );
+ return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false);
+ }
+ if (!exchange->supportsDynamicBinding()) {
+ QPID_LOG(error, "Exchange type does not support dynamic routing, name='" << src << "'");
+ return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false);
+ }
+ }
- BridgeMap::iterator b = bridges.find(bridgeKey);
+ BridgeMap::iterator b = bridges.find(name);
if (b == bridges.end())
{
_qmf::ArgsLinkBridge args;
@@ -159,23 +203,29 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host,
args.i_sync = sync;
bridge = Bridge::shared_ptr
- (new Bridge (l->second.get(), l->second->nextChannel(),
- boost::bind(&LinkRegistry::destroy, this,
- host, port, src, dest, key),
- args, init));
- bridges[bridgeKey] = bridge;
- l->second->add(bridge);
+ (new Bridge (name, &link, link.nextChannel(),
+ boost::bind(&LinkRegistry::destroyBridge, this, _1),
+ args, init));
+ bridges[name] = bridge;
+ link.add(bridge);
+ if (durable && store)
+ store->create(*bridge);
+
+ QPID_LOG(debug, "Bridge '" << name <<"' declared on link '" << link.getName() <<
+ "' from " << src << " to " << dest << " (" << key << ")");
+
return std::pair<Bridge::shared_ptr, bool>(bridge, true);
}
return std::pair<Bridge::shared_ptr, bool>(b->second, false);
}
-void LinkRegistry::destroy(const string& host, const uint16_t port)
+/** called back by the link when it has completed its cleanup and can be removed. */
+void LinkRegistry::linkDestroyed(const std::string& name)
{
+ QPID_LOG(debug, "LinkRegistry::destroy(); link= " << name);
Mutex::ScopedLock locker(lock);
- string key = createKey(host, port);
- LinkMap::iterator i = links.find(key);
+ LinkMap::iterator i = links.find(name);
if (i != links.end())
{
if (i->second->isDurable() && store)
@@ -184,27 +234,20 @@ void LinkRegistry::destroy(const string& host, const uint16_t port)
}
}
-void LinkRegistry::destroy(const std::string& host,
- const uint16_t port,
- const std::string& src,
- const std::string& dest,
- const std::string& key)
+/** called back by bridge when its destruction has been requested */
+void LinkRegistry::destroyBridge(const std::string& name)
{
+ QPID_LOG(debug, "LinkRegistry::destroy(); bridge= " << name);
Mutex::ScopedLock locker(lock);
- string linkKey = createKey(host, port);
- stringstream keystream;
- keystream << linkKey << "!" << src << "!" << dest << "!" << key;
- string bridgeKey = keystream.str();
- LinkMap::iterator l = links.find(linkKey);
- if (l == links.end())
- return;
-
- BridgeMap::iterator b = bridges.find(bridgeKey);
+ BridgeMap::iterator b = bridges.find(name);
if (b == bridges.end())
return;
- l->second->cancel(b->second);
+ Link *link = b->second->getLink();
+ if (link) {
+ link->cancel(b->second);
+ }
if (b->second->isDurable())
store->destroy(*(b->second));
bridges.erase(b);
@@ -219,26 +262,64 @@ MessageStore* LinkRegistry::getStore() const {
return store;
}
-Link::shared_ptr LinkRegistry::findLink(const std::string& keyOrMgmtId)
-{
- // Convert keyOrMgmtId to a host:port key.
- //
- // TODO aconway 2011-02-01: centralize code that constructs/parses
- // connection management IDs. Currently sys:: protocol factories
- // and IO plugins construct the IDs and LinkRegistry parses them.
- size_t separator = keyOrMgmtId.find('-');
- if (separator == std::string::npos) separator = 0;
- std::string key = keyOrMgmtId.substr(separator+1, std::string::npos);
+namespace {
+ void extractHostPort(const std::string& connId, std::string *host, uint16_t *port)
+ {
+ // Extract host and port of remote broker from connection id string.
+ //
+ // TODO aconway 2011-02-01: centralize code that constructs/parses
+ // connection management IDs. Currently sys:: protocol factories
+ // and IO plugins construct the IDs and LinkRegistry parses them.
+ // current format assumed: "localhost:port-remotehost:port", asserts
+ // provided to alert us if this assumption changes!
+ size_t separator = connId.find('-');
+ assert(separator != std::string::npos);
+ std::string remote = connId.substr(separator+1, std::string::npos);
+ separator = remote.find(":");
+ assert(separator != std::string::npos);
+ *host = remote.substr(0, separator);
+ // IPv6 - host is bracketed by "[]", strip them
+ if ((*host)[0] == '[' && (*host)[host->length() - 1] == ']')
+ *host = host->substr(1, host->length() - 1);
+ *port = boost::lexical_cast<uint16_t>(remote.substr(separator+1, std::string::npos));
+ }
+}
+/** find the Link that corresponds to the given connection */
+Link::shared_ptr LinkRegistry::findLink(const std::string& connId)
+{
Mutex::ScopedLock locker(lock);
- LinkMap::iterator l = links.find(key);
- if (l != links.end()) return l->second;
- else return Link::shared_ptr();
+ ConnectionMap::iterator c = connections.find(connId);
+ if (c != connections.end()) {
+ LinkMap::iterator l = links.find(c->second);
+ if (l != links.end())
+ return l->second;
+ }
+ return Link::shared_ptr();
}
void LinkRegistry::notifyConnection(const std::string& key, Connection* c)
{
- Link::shared_ptr link = findLink(key);
+ // find a link that is attempting to connect to the remote, and
+ // create a mapping from connection id to link
+ QPID_LOG(debug, "LinkRegistry::notifyConnection(); key=" << key );
+ std::string host;
+ uint16_t port;
+ extractHostPort( key, &host, &port );
+ Link::shared_ptr link;
+ {
+ Mutex::ScopedLock locker(lock);
+ for (LinkMap::iterator l = links.begin(); l != links.end(); ++l) {
+ if (l->second->isConnecting() &&
+ l->second->getHost() == host &&
+ l->second->getPort() == port) {
+ link = l->second;
+ connections[key] = link->getName();
+ break;
+ }
+ }
+ }
+
if (link) {
link->established(c);
c->setUserId(str(format("%1%@%2%") % link->getUsername() % realm));
@@ -336,20 +417,6 @@ std::string LinkRegistry::getAuthIdentity(const std::string& key)
}
-std::string LinkRegistry::createKey(const qpid::Address& a) {
- // TODO aconway 2010-05-11: key should also include protocol/transport to
- // be unique. Requires refactor of LinkRegistry interface.
- return createKey(a.host, a.port);
-}
-
-std::string LinkRegistry::createKey(const std::string& host, uint16_t port) {
- // TODO aconway 2010-05-11: key should also include protocol/transport to
- // be unique. Requires refactor of LinkRegistry interface.
- stringstream keystream;
- keystream << host << ":" << port;
- return keystream.str();
-}
-
void LinkRegistry::setPassive(bool p)
{
Mutex::ScopedLock locker(lock);
@@ -362,10 +429,12 @@ void LinkRegistry::setPassive(bool p)
}
void LinkRegistry::eachLink(boost::function<void(boost::shared_ptr<Link>)> f) {
+ Mutex::ScopedLock locker(lock);
for (LinkMap::iterator i = links.begin(); i != links.end(); ++i) f(i->second);
}
void LinkRegistry::eachBridge(boost::function<void(boost::shared_ptr<Bridge>)> f) {
+ Mutex::ScopedLock locker(lock);
for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) f(i->second);
}
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.h b/qpid/cpp/src/qpid/broker/LinkRegistry.h
index 8e9d2f4b0d..614fb1ab3e 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.h
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.h
@@ -42,9 +42,11 @@ namespace broker {
class LinkRegistry {
typedef std::map<std::string, boost::shared_ptr<Link> > LinkMap;
typedef std::map<std::string, Bridge::shared_ptr> BridgeMap;
+ typedef std::map<std::string, std::string> ConnectionMap;
- LinkMap links;
- BridgeMap bridges;
+ LinkMap links; /** indexed by name of Link */
+ BridgeMap bridges; /** indexed by name of Bridge */
+ ConnectionMap connections; /** indexed by connection identifier, gives link name */
qpid::sys::Mutex lock;
Broker* broker;
@@ -54,15 +56,18 @@ namespace broker {
std::string realm;
boost::shared_ptr<Link> findLink(const std::string& key);
- static std::string createKey(const Address& address);
- static std::string createKey(const std::string& host, uint16_t port);
- // Methods called by the connection observer.
+ // Methods called by the connection observer, key is connection identifier
void notifyConnection (const std::string& key, Connection* c);
void notifyOpened (const std::string& key);
void notifyClosed (const std::string& key);
void notifyConnectionForced (const std::string& key, const std::string& text);
- friend class LinkRegistryConnectionObserver;
+ friend class LinkRegistryConnectionObserver;
+
+ /** Notify the registry that a Link has been destroyed */
+ void linkDestroyed(const std::string& name);
+ /** Request to destroy a Bridge */
+ void destroyBridge(const std::string& name);
public:
QPID_BROKER_EXTERN LinkRegistry (); // Only used in store tests
@@ -70,17 +75,25 @@ namespace broker {
QPID_BROKER_EXTERN ~LinkRegistry();
QPID_BROKER_EXTERN std::pair<boost::shared_ptr<Link>, bool>
- declare(const std::string& host,
+ declare(const std::string& name,
+ const std::string& host,
uint16_t port,
const std::string& transport,
bool durable,
const std::string& authMechanism,
const std::string& username,
const std::string& password);
+ /** determine if Link exists */
+ QPID_BROKER_EXTERN boost::shared_ptr<Link>
+ getLink(const std::string& name);
+ QPID_BROKER_EXTERN boost::shared_ptr<Link>
+ getLink(const std::string& host,
+ uint16_t port,
+ const std::string& transport = std::string());
QPID_BROKER_EXTERN std::pair<Bridge::shared_ptr, bool>
- declare(const std::string& host,
- uint16_t port,
+ declare(const std::string& name,
+ Link& link,
bool durable,
const std::string& src,
const std::string& dest,
@@ -93,14 +106,14 @@ namespace broker {
uint16_t sync,
Bridge::InitializeCallback=0
);
-
- QPID_BROKER_EXTERN void destroy(const std::string& host, const uint16_t port);
-
- QPID_BROKER_EXTERN void destroy(const std::string& host,
- const uint16_t port,
- const std::string& src,
- const std::string& dest,
- const std::string& key);
+ /** determine if Bridge exists */
+ QPID_BROKER_EXTERN Bridge::shared_ptr
+ getBridge(const std::string& name);
+ QPID_BROKER_EXTERN Bridge::shared_ptr
+ getBridge(const Link& link,
+ const std::string& src,
+ const std::string& dest,
+ const std::string& key);
/**
* Register the manageable parent for declared queues
@@ -126,11 +139,6 @@ namespace broker {
QPID_BROKER_EXTERN uint16_t getPort (const std::string& key);
/**
- * Called by links failing over to new address
- */
- void changeAddress(const Address& oldAddress, const Address& newAddress);
-
- /**
* Called to alter passive state. In passive state the links
* and bridges managed by a link registry will be recorded and
* updated but links won't actually establish connections and
diff --git a/qpid/cpp/src/qpid/broker/NameGenerator.h b/qpid/cpp/src/qpid/broker/NameGenerator.h
index 6ea25c9797..2e9f7febe2 100644
--- a/qpid/cpp/src/qpid/broker/NameGenerator.h
+++ b/qpid/cpp/src/qpid/broker/NameGenerator.h
@@ -32,6 +32,7 @@ namespace qpid {
NameGenerator(const std::string& base);
std::string generate();
};
+ const std::string QPID_NAME_PREFIX("qpid."); // reserved for private names
}
}
diff --git a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index d08409695e..858535637a 100644
--- a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -144,11 +144,13 @@ RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const
RecoverableConfig::shared_ptr RecoveryManagerImpl::recoverConfig(framing::Buffer& buffer)
{
string kind;
-
+ uint32_t p = buffer.getPosition();
buffer.getShortString (kind);
- if (kind == "link")
+ buffer.setPosition(p);
+
+ if (Link::isEncodedLink(kind))
return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Link::decode (links, buffer)));
- else if (kind == "bridge")
+ else if (Bridge::isEncodedBridge(kind))
return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Bridge::decode (links, buffer)));
return RecoverableConfig::shared_ptr(); // TODO: raise an exception instead
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index fc6ada096f..ce4c618885 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -781,16 +781,18 @@ void Connection::managementSetupState(
void Connection::config(const std::string& encoded) {
Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
string kind;
+ uint32_t p = buf.getPosition();
buf.getShortString (kind);
- if (kind == "link") {
+ buf.setPosition(p);
+ if (broker::Link::isEncodedLink(kind)) {
broker::Link::shared_ptr link =
- broker::Link::decode(cluster.getBroker().getLinks(), buf);
+ broker::Link::decode(cluster.getBroker().getLinks(), buf);
QPID_LOG(debug, cluster << " updated link "
<< link->getHost() << ":" << link->getPort());
}
- else if (kind == "bridge") {
+ else if (broker::Bridge::isEncodedBridge(kind)) {
broker::Bridge::shared_ptr bridge =
- broker::Bridge::decode(cluster.getBroker().getLinks(), buf);
+ broker::Bridge::decode(cluster.getBroker().getLinks(), buf);
QPID_LOG(debug, cluster << " updated bridge " << bridge->getName());
}
else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind));
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index 3d65e07202..8cd82a85f6 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -54,8 +54,10 @@ void Backup::initialize(const Url& url) {
assert(!url.empty());
QPID_LOG(notice, "HA: Backup started: " << url);
string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
+ framing::Uuid uuid(true);
// Declare the link
std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
+ broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(),
url[0].host, url[0].port, protocol,
false, // durable
settings.mechanism, settings.username, settings.password);
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 609a3378ad..915cee9e58 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -189,7 +189,8 @@ BrokerReplicator::BrokerReplicator(const boost::shared_ptr<Link>& l)
QPID_LOG(info, "HA: Backup replicating from " <<
link->getTransport() << ":" << link->getHost() << ":" << link->getPort());
broker.getLinks().declare(
- link->getHost(), link->getPort(),
+ QPID_CONFIGURATION_REPLICATOR + ".bridge", // name for bridge
+ *link,
false, // durable
QPID_CONFIGURATION_REPLICATOR, // src
QPID_CONFIGURATION_REPLICATOR, // dest
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index ad6719f207..5c1dfec2f6 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -116,7 +116,9 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue);
Url url(bq_args.i_broker);
string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
+ framing::Uuid uuid(true);
std::pair<broker::Link::shared_ptr, bool> result = broker.getLinks().declare(
+ broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(),
url[0].host, url[0].port, protocol,
false, // durable
settings.mechanism, settings.username, settings.password);
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 6aff4879e3..13e9079c7f 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -53,15 +53,19 @@ std::string QueueReplicator::replicatorName(const std::string& queueName) {
QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l)
: Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q), link(l)
{
+ bridgeName = replicatorName(q->getName());
logPrefix = "HA: Backup " + queue->getName() + ": ";
QPID_LOG(info, logPrefix << "Created, settings: " << q->getSettings());
}
// This must be separate from the constructor so we can call shared_from_this.
void QueueReplicator::activate() {
- // Note this may create a new bridge or use an existing one.
+ // Create a new route over the link
+ sys::Mutex::ScopedLock l(lock);
+ std::pair<Bridge::shared_ptr, bool> result =
queue->getBroker()->getLinks().declare(
- link->getHost(), link->getPort(),
+ bridgeName,
+ *link,
false, // durable
queue->getName(), // src
getName(), // dest
@@ -76,15 +80,19 @@ void QueueReplicator::activate() {
// before initializeBridge is called.
boost::bind(&QueueReplicator::initializeBridge, this, _1, _2, shared_from_this())
);
+ bridge = result.first;
}
QueueReplicator::~QueueReplicator() {}
void QueueReplicator::deactivate() {
+ // destroy the route
sys::Mutex::ScopedLock l(lock);
- queue->getBroker()->getLinks().destroy(
- link->getHost(), link->getPort(), queue->getName(), getName(), string());
- QPID_LOG(debug, logPrefix << "Deactivated bridge " << bridgeName);
+ if (bridge) {
+ bridge->close();
+ bridge.reset();
+ QPID_LOG(debug, logPrefix << "Deactivated bridge " << bridgeName);
+ }
}
// Called in a broker connection thread when the bridge is created.
@@ -92,7 +100,6 @@ void QueueReplicator::deactivate() {
void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler,
boost::shared_ptr<QueueReplicator> /*self*/) {
sys::Mutex::ScopedLock l(lock);
- bridgeName = bridge.getName();
framing::AMQP_ServerProxy peer(sessionHandler.out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
framing::FieldTable settings;
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index a1ebbd788a..1fc43f3a45 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -79,6 +79,7 @@ class QueueReplicator : public broker::Exchange,
sys::Mutex lock;
boost::shared_ptr<broker::Queue> queue;
boost::shared_ptr<broker::Link> link;
+ boost::shared_ptr<broker::Bridge> bridge;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py
index 7d613b98ce..5f483ad258 100755
--- a/qpid/cpp/src/tests/federation.py
+++ b/qpid/cpp/src/tests/federation.py
@@ -127,18 +127,28 @@ class FederationTests(TestBase010):
self.verify_cleanup()
def test_pull_from_exchange(self):
+ """ This test uses an alternative method to manage links and bridges
+ via the broker object.
+ """
session = self.session
-
+
self.startQmf()
qmf = self.qmf
broker = qmf.getObjects(_class="broker")[0]
- result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp")
- self.assertEqual(result.status, 0, result)
- link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, False, False, 0)
+ # create link
+ link_args = {"host":self.remote_host(), "port":self.remote_port(), "durable":False,
+ "authMechanism":"PLAIN", "username":"guest", "password":"guest",
+ "transport":"tcp"}
+ result = broker.create("link", "test-link-1", link_args, False)
self.assertEqual(result.status, 0, result)
+ link = qmf.getObjects(_class="link")[0]
+ # create bridge
+ bridge_args = {"link":"test-link-1", "src":"amq.direct", "dest":"amq.fanout",
+ "key":"my-key"}
+ result = broker.create("bridge", "test-bridge-1", bridge_args, False);
+ self.assertEqual(result.status, 0, result)
bridge = qmf.getObjects(_class="bridge")[0]
#setup queue to receive messages from local broker
@@ -164,9 +174,11 @@ class FederationTests(TestBase010):
self.fail("Got unexpected message in queue: " + extra.body)
except Empty: None
- result = bridge.close()
+
+ result = broker.delete("bridge", "test-bridge-1", {})
self.assertEqual(result.status, 0, result)
- result = link.close()
+
+ result = broker.delete("link", "test-link-1", {})
self.assertEqual(result.status, 0, result)
self.verify_cleanup()
diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml
index 9eafbc52fa..f0f5c0f9d2 100644
--- a/qpid/specs/management-schema.xml
+++ b/qpid/specs/management-schema.xml
@@ -379,10 +379,12 @@
This class represents an inter-broker connection.
<property name="vhostRef" type="objId" references="Vhost" access="RC" index="y" parentRef="y"/>
- <property name="host" type="sstr" access="RC" index="y"/>
- <property name="port" type="uint16" access="RC" index="y"/>
- <property name="transport" type="sstr" access="RC"/>
+ <property name="name" type="sstr" access="RC" index="y"/>
+ <property name="host" type="sstr" access="RO"/>
+ <property name="port" type="uint16" access="RO"/>
+ <property name="transport" type="sstr" access="RO"/>
<property name="durable" type="bool" access="RC"/>
+ <property name="connectionRef" type="objId" references="Connection" access="RO"/>
<statistic name="state" type="sstr" desc="Operational state of the link"/>
<statistic name="lastError" type="lstr" desc="Reason link is not operational"/>
@@ -411,7 +413,8 @@
-->
<class name="Bridge">
<property name="linkRef" type="objId" references="Link" access="RC" index="y" parentRef="y"/>
- <property name="channelId" type="uint16" access="RC" index="y"/>
+ <property name="name" type="sstr" access="RC" index="y"/>
+ <property name="channelId" type="uint16" access="RO"/>
<property name="durable" type="bool" access="RC"/>
<property name="src" type="sstr" access="RC"/>
<property name="dest" type="sstr" access="RC"/>