diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2012-05-04 19:45:32 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2012-05-04 19:45:32 +0000 |
commit | 416a3cc5d7236378fce980a6356ff8e6cc07d691 (patch) | |
tree | b08fe0fb3c8ddfe39243f3dd136b619d8bfb674c | |
parent | 5c80b835bf53caf2e0b642788fd0865e040e0975 (diff) | |
download | qpid-python-416a3cc5d7236378fce980a6356ff8e6cc07d691.tar.gz |
QPID-3767: re-index bridge and link by constant name, not address
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1334138 13f79535-47bb-0310-9956-ffa450edef68
21 files changed, 984 insertions, 273 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index 5b531e4636..53fe38a504 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -57,22 +57,21 @@ void Bridge::PushHandler::handle(framing::AMQFrame& frame) conn->received(frame); } -Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, - const _qmf::ArgsLinkBridge& _args, +Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id, + CancellationListener l, const _qmf::ArgsLinkBridge& _args, InitializeCallback init) : - link(_link), id(_id), args(_args), mgmtObject(0), - listener(l), name(Uuid(true).str()), queueName("qpid.bridge_queue_"), persistenceId(0), + link(_link), channel(_id), args(_args), mgmtObject(0), + listener(l), name(_name), queueName("qpid.bridge_queue_"), persistenceId(0), initialize(init), detached(false) { - std::stringstream title; - title << id << "_" << name; - queueName += title.str(); + queueName += Uuid(true).str(); ManagementAgent* agent = link->getBroker()->getManagementAgent(); if (agent != 0) { mgmtObject = new _qmf::Bridge - (agent, this, link, id, args.i_durable, args.i_src, args.i_dest, + (agent, this, link, name, args.i_durable, args.i_src, args.i_dest, args.i_key, args.i_srcIsQueue, args.i_srcIsLocal, args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync); + mgmtObject->set_channelId(channel); agent->addObject(mgmtObject); } QPID_LOG(debug, "Bridge " << name << " created from " << args.i_src << " to " << args.i_dest); @@ -90,7 +89,7 @@ void Bridge::create(Connection& c) conn = &c; FieldTable options; if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync); - SessionHandler& sessionHandler = c.getChannel(id); + SessionHandler& sessionHandler = c.getChannel(channel); sessionHandler.setDetachedCallback( boost::bind(&Bridge::sessionDetached, shared_from_this())); if (args.i_srcIsLocal) { @@ -98,15 +97,15 @@ void Bridge::create(Connection& c) throw Exception("Dynamic routing not supported for push routes"); // Point the bridging commands at the local connection handler pushHandler.reset(new PushHandler(&c)); - channelHandler.reset(new framing::ChannelHandler(id, pushHandler.get())); + channelHandler.reset(new framing::ChannelHandler(channel, pushHandler.get())); session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler)); peer.reset(new framing::AMQP_ServerProxy(*channelHandler)); - session->attach(name, false); + session->attach(queueName, false); session->commandPoint(0,0); } else { - sessionHandler.attachAs(name); + sessionHandler.attachAs(queueName); // Point the bridging commands at the remote peer broker peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out)); } @@ -168,6 +167,7 @@ void Bridge::cancel(Connection&) QPID_LOG(debug, "Cancelled bridge " << name); } +/** Notify the bridge that the connection has closed */ void Bridge::closed() { if (args.i_dynamic) { @@ -177,9 +177,10 @@ void Bridge::closed() QPID_LOG(debug, "Closed bridge " << name); } -void Bridge::destroy() +/** Shut down the bridge */ +void Bridge::close() { - listener(this); + listener(this); // ask the LinkRegistry to destroy us } void Bridge::setPersistenceId(uint64_t pId) const @@ -187,8 +188,21 @@ void Bridge::setPersistenceId(uint64_t pId) const persistenceId = pId; } + +const std::string Bridge::ENCODED_IDENTIFIER("bridge.v2"); +const std::string Bridge::ENCODED_IDENTIFIER_V1("bridge"); + +bool Bridge::isEncodedBridge(const std::string& key) +{ + return key == ENCODED_IDENTIFIER || key == ENCODED_IDENTIFIER_V1; +} + + Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer) { + string kind; + buffer.getShortString(kind); + string host; uint16_t port; string src; @@ -196,9 +210,33 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer) string key; string id; string excludes; + string name; + + Link::shared_ptr link; + if (kind == ENCODED_IDENTIFIER_V1) { + /** previous versions identified the bridge by host:port, not by name, and + * transport wasn't provided. Try to find a link using those paramters. + */ + buffer.getShortString(host); + port = buffer.getShort(); + + link = links.getLink(host, port); + if (!link) { + QPID_LOG(error, "Bridge::decode() failed: cannot find Link for host=" << host << ", port=" << port); + return Bridge::shared_ptr(); + } + } else { + string linkName; + + buffer.getShortString(name); + buffer.getShortString(linkName); + link = links.getLink(linkName); + if (!link) { + QPID_LOG(error, "Bridge::decode() failed: cannot find Link named='" << linkName << "'"); + return Bridge::shared_ptr(); + } + } - buffer.getShortString(host); - port = buffer.getShort(); bool durable(buffer.getOctet()); buffer.getShortString(src); buffer.getShortString(dest); @@ -210,15 +248,21 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer) bool dynamic(buffer.getOctet()); uint16_t sync = buffer.getShort(); - return links.declare(host, port, durable, src, dest, key, - is_queue, is_local, id, excludes, dynamic, sync).first; + if (kind == ENCODED_IDENTIFIER_V1) { + /** previous versions did not provide a name for the bridge, so create one + */ + name = createName(link->getName(), src, dest, key); + } + + return links.declare(name, *link, durable, src, dest, key, is_queue, + is_local, id, excludes, dynamic, sync).first; } void Bridge::encode(Buffer& buffer) const { - buffer.putShortString(string("bridge")); - buffer.putShortString(link->getHost()); - buffer.putShort(link->getPort()); + buffer.putShortString(ENCODED_IDENTIFIER); + buffer.putShortString(name); + buffer.putShortString(link->getName()); buffer.putOctet(args.i_durable ? 1 : 0); buffer.putShortString(args.i_src); buffer.putShortString(args.i_dest); @@ -233,9 +277,9 @@ void Bridge::encode(Buffer& buffer) const uint32_t Bridge::encodedSize() const { - return link->getHost().size() + 1 // short-string (host) - + 7 // short-string ("bridge") - + 2 // port + return ENCODED_IDENTIFIER.size() + 1 // +1 byte length + + name.size() + 1 + + link->getName().size() + 1 + 1 // durable + args.i_src.size() + 1 + args.i_dest.size() + 1 @@ -259,7 +303,8 @@ management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId, { if (methodId == _qmf::Bridge::METHOD_CLOSE) { //notify that we are closed - destroy(); + QPID_LOG(debug, "Bridge::close() method called on bridge '" << name << "'"); + close(); return management::Manageable::STATUS_OK; } else { return management::Manageable::STATUS_UNKNOWN_METHOD; @@ -306,7 +351,7 @@ void Bridge::sendReorigin() } bool Bridge::resetProxy() { - SessionHandler& sessionHandler = conn->getChannel(id); + SessionHandler& sessionHandler = conn->getChannel(channel); if (!sessionHandler.getSession()) peer.reset(); else peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out)); return peer.get(); @@ -318,7 +363,7 @@ void Bridge::ioThreadPropagateBinding(const string& queue, const string& exchang peer->getExchange().bind(queue, exchange, key, args); } else { QPID_LOG(error, "Cannot propagate binding for dynamic bridge as session has been detached, deleting dynamic bridge"); - destroy(); + close(); } } @@ -332,9 +377,18 @@ const string& Bridge::getLocalTag() const { return link->getBroker()->getFederationTag(); } - void Bridge::sessionDetached() { detached = true; } +std::string Bridge::createName(const std::string& linkName, + const std::string& src, + const std::string& dest, + const std::string& key) +{ + std::stringstream keystream; + keystream << linkName << "!" << src << "!" << dest << "!" << key; + return keystream.str(); +} + }} diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h index 32b9fd1781..2cf07d3a94 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.h +++ b/qpid/cpp/src/qpid/broker/Bridge.h @@ -55,17 +55,18 @@ public: typedef boost::function<void(Bridge*)> CancellationListener; typedef boost::function<void(Bridge&, SessionHandler&)> InitializeCallback; - Bridge(Link* link, framing::ChannelId id, CancellationListener l, + Bridge(const std::string& name, Link* link, framing::ChannelId id, CancellationListener l, const qmf::org::apache::qpid::broker::ArgsLinkBridge& args, InitializeCallback init ); ~Bridge(); - void create(Connection& c); - void cancel(Connection& c); - void closed(); - void destroy(); + QPID_BROKER_EXTERN void close(); bool isDurable() { return args.i_durable; } + Link *getLink() const { return link; } + const std::string getSrc() const { return args.i_src; } + const std::string getDest() const { return args.i_dest; } + const std::string getKey() const { return args.i_key; } bool isDetached() const { return detached; } @@ -80,7 +81,11 @@ public: uint32_t encodedSize() const; void encode(framing::Buffer& buffer) const; const std::string& getName() const { return name; } + + static const std::string ENCODED_IDENTIFIER; + static const std::string ENCODED_IDENTIFIER_V1; static Bridge::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer); + static bool isEncodedBridge(const std::string& key); // Exchange::DynamicBridge methods void propagateBinding(const std::string& key, const std::string& tagList, const std::string& op, const std::string& origin, qpid::framing::FieldTable* extra_args=0); @@ -93,6 +98,12 @@ public: std::string getQueueName() const { return queueName; } const qmf::org::apache::qpid::broker::ArgsLinkBridge& getArgs() { return args; } + /** create a name for a bridge (if none supplied by user config) */ + static std::string createName(const std::string& linkName, + const std::string& src, + const std::string& dest, + const std::string& key); + private: // Callback when the bridge's session is detached. void sessionDetached(); @@ -108,8 +119,8 @@ private: std::auto_ptr<framing::AMQP_ServerProxy::Session> session; std::auto_ptr<framing::AMQP_ServerProxy> peer; - Link* link; - framing::ChannelId id; + Link* const link; + const framing::ChannelId channel; qmf::org::apache::qpid::broker::ArgsLinkBridge args; qmf::org::apache::qpid::broker::Bridge* mgmtObject; CancellationListener listener; @@ -121,6 +132,12 @@ private: InitializeCallback initialize; bool detached; // Set when session is detached. bool resetProxy(); + + // connection Management (called by owning Link) + void create(Connection& c); + void cancel(Connection& c); + void closed(); + friend class Link; // to call create, cancel, closed() }; diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 726b47c268..c13ac19454 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -438,7 +438,7 @@ Manageable* Broker::GetVhostObject(void) const Manageable::status_t Broker::ManagementMethod (uint32_t methodId, Args& args, - string&) + string& text) { Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; @@ -453,6 +453,14 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, status = Manageable::STATUS_OK; break; case _qmf::Broker::METHOD_CONNECT : { + /** Management is creating a Link to a remote broker using the host and port of + * the remote. This (old) interface does not allow management to specify a name + * for the link, nor does it allow multiple Links to the same remote. Use the + * "create()" broker method if these features are needed. + * TBD: deprecate this interface. + */ + QPID_LOG(info, "The Broker::connect() method will be removed in a future release of QPID." + " Please use the Broker::create() method with type='link' instead."); _qmf::ArgsBrokerConnect& hp= dynamic_cast<_qmf::ArgsBrokerConnect&>(args); @@ -461,13 +469,24 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, "; durable=" << (hp.i_durable?"T":"F") << "; authMech=\"" << hp.i_authMechanism << "\""); if (!getProtocolFactory(transport)) { QPID_LOG(error, "Transport '" << transport << "' not supported"); + text = "transport type not supported"; return Manageable::STATUS_NOT_IMPLEMENTED; } - std::pair<Link::shared_ptr, bool> response = - links.declare (hp.i_host, hp.i_port, transport, hp.i_durable, - hp.i_authMechanism, hp.i_username, hp.i_password); - if (hp.i_durable && response.second) - store->create(*response.first); + + // Does a link to the remote already exist? If so, re-use the existing link + // - this behavior is backward compatible with previous releases. + if (!links.getLink(hp.i_host, hp.i_port, transport)) { + // new link, need to generate a unique name for it + std::pair<Link::shared_ptr, bool> response = + links.declare(Link::createName(transport, hp.i_host, hp.i_port), + hp.i_host, hp.i_port, transport, + hp.i_durable, hp.i_authMechanism, hp.i_username, hp.i_password); + if (!response.first) { + text = "Unable to create Link"; + status = Manageable::STATUS_PARAMETER_INVALID; + break; + } + } status = Manageable::STATUS_OK; break; } @@ -538,6 +557,8 @@ const std::string TYPE_QUEUE("queue"); const std::string TYPE_EXCHANGE("exchange"); const std::string TYPE_TOPIC("topic"); const std::string TYPE_BINDING("binding"); +const std::string TYPE_LINK("link"); +const std::string TYPE_BRIDGE("bridge"); const std::string DURABLE("durable"); const std::string AUTO_DELETE("auto-delete"); const std::string ALTERNATE_EXCHANGE("alternate-exchange"); @@ -549,6 +570,26 @@ const std::string ATTRIBUTE_TIMESTAMP_0_10("timestamp-0.10"); const std::string _TRUE("true"); const std::string _FALSE("false"); + +// parameters for creating a Link object, see mgmt schema +const std::string HOST("host"); +const std::string PORT("port"); +const std::string TRANSPORT("transport"); +const std::string AUTH_MECHANISM("authMechanism"); +const std::string USERNAME("username"); +const std::string PASSWORD("password"); + +// parameters for creating a Bridge object, see mgmt schema +const std::string LINK("link"); +const std::string SRC("src"); +const std::string DEST("dest"); +const std::string KEY("key"); +const std::string TAG("tag"); +const std::string EXCLUDES("excludes"); +const std::string SRC_IS_QUEUE("srcIsQueue"); +const std::string SRC_IS_LOCAL("srcIsLocal"); +const std::string DYNAMIC("dynamic"); +const std::string SYNC("sync"); } struct InvalidBindingIdentifier : public qpid::Exception @@ -598,6 +639,25 @@ struct UnknownObjectType : public qpid::Exception std::string getPrefix() const { return "unknown object type"; } }; +struct ReservedObjectName : public qpid::Exception +{ + ReservedObjectName(const std::string& type) : qpid::Exception(type) {} + std::string getPrefix() const { return std::string("names prefixed with '") + + QPID_NAME_PREFIX + std::string("' are reserved"); } +}; + +struct UnsupportedTransport : public qpid::Exception +{ + UnsupportedTransport(const std::string& type) : qpid::Exception(type) {} + std::string getPrefix() const { return "transport is not supported"; } +}; + +struct InvalidParameter : public qpid::Exception +{ + InvalidParameter(const std::string& type) : qpid::Exception(type) {} + std::string getPrefix() const { return "invalid parameter to method call"; } +}; + void Broker::createObject(const std::string& type, const std::string& name, const Variant::Map& properties, bool /*strict*/, const ConnectionState* context) { @@ -669,6 +729,109 @@ void Broker::createObject(const std::string& type, const std::string& name, amqp_0_10::translate(extensions, arguments); bind(binding.queue, binding.exchange, binding.key, arguments, userId, connectionId); + + } else if (type == TYPE_LINK) { + + QPID_LOG (debug, "createObject: Link; name=" << name << "; args=" << properties ); + + if (name.compare(0, QPID_NAME_PREFIX.length(), QPID_NAME_PREFIX) == 0) { + QPID_LOG(error, "Link name='" << name << "' cannot use the reserved prefix '" << QPID_NAME_PREFIX << "'"); + throw ReservedObjectName(name); + } + + std::string host; + uint16_t port = 0; + std::string transport = TCP_TRANSPORT; + bool durable = false; + std::string authMech, username, password; + + if (!getProtocolFactory(transport)) { + QPID_LOG(error, "Transport '" << transport << "' not supported."); + throw UnsupportedTransport(transport); + } + + for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { + if (i->first == HOST) host = i->second.asString(); + else if (i->first == PORT) port = i->second.asUint16(); + else if (i->first == TRANSPORT) transport = i->second.asString(); + else if (i->first == DURABLE) durable = bool(i->second); + else if (i->first == AUTH_MECHANISM) authMech = i->second.asString(); + else if (i->first == USERNAME) username = i->second.asString(); + else if (i->first == PASSWORD) password = i->second.asString(); + else { + // TODO: strict checking here + } + } + + std::pair<boost::shared_ptr<Link>, bool> rc; + rc = links.declare(name, host, port, transport, durable, authMech, username, password); + if (!rc.first) { + QPID_LOG (error, "Failed to create Link object, name=" << name << " remote=" << host << ":" << port << + "; transport=" << transport << "; durable=" << (durable?"T":"F") << "; authMech=\"" << authMech << "\""); + throw InvalidParameter(name); + } + if (!rc.second) { + QPID_LOG (error, "Failed to create a new Link object, name=" << name << " already exists."); + throw ObjectAlreadyExists(name); + } + + } else if (type == TYPE_BRIDGE) { + + QPID_LOG (debug, "createObject: Bridge; name=" << name << "; args=" << properties ); + + if (name.compare(0, QPID_NAME_PREFIX.length(), QPID_NAME_PREFIX) == 0) { + QPID_LOG(error, "Bridge name='" << name << "' cannot use the reserved prefix '" << QPID_NAME_PREFIX << "'"); + throw ReservedObjectName(name); + } + + std::string linkName; + std::string src; + std::string dest; + std::string key; + std::string id; + std::string excludes; + bool durable = false; + bool srcIsQueue = false; + bool srcIsLocal = false; + bool dynamic = false; + uint16_t sync = 0; + + for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { + + if (i->first == LINK) linkName = i->second.asString(); + else if (i->first == SRC) src = i->second.asString(); + else if (i->first == DEST) dest = i->second.asString(); + else if (i->first == KEY) key = i->second.asString(); + else if (i->first == TAG) id = i->second.asString(); + else if (i->first == EXCLUDES) excludes = i->second.asString(); + else if (i->first == SRC_IS_QUEUE) srcIsQueue = bool(i->second); + else if (i->first == SRC_IS_LOCAL) srcIsLocal = bool(i->second); + else if (i->first == DYNAMIC) dynamic = bool(i->second); + else if (i->first == SYNC) sync = i->second.asUint16(); + else if (i->first == DURABLE) durable = bool(i->second); + else { + // TODO: strict checking here + } + } + + boost::shared_ptr<Link> link; + if (linkName.empty() || !(link = links.getLink(linkName))) { + QPID_LOG(error, "Link '" << linkName << "' not found; bridge create failed."); + throw InvalidParameter(name); + } + std::pair<Bridge::shared_ptr, bool> rc = + links.declare(name, *link, durable, src, dest, key, srcIsQueue, srcIsLocal, id, excludes, + dynamic, sync); + + if (!rc.first) { + QPID_LOG (error, "Failed to create Bridge object, name=" << name << " link=" << linkName << + "; src=" << src << "; dest=" << dest << "; key=" << key); + throw InvalidParameter(name); + } + if (!rc.second) { + QPID_LOG (error, "Failed to create a new Bridge object, name=" << name << " already exists."); + throw ObjectAlreadyExists(name); + } } else { throw UnknownObjectType(type); } @@ -691,6 +854,16 @@ void Broker::deleteObject(const std::string& type, const std::string& name, } else if (type == TYPE_BINDING) { BindingIdentifier binding(name); unbind(binding.queue, binding.exchange, binding.key, userId, connectionId); + } else if (type == TYPE_LINK) { + boost::shared_ptr<Link> link = links.getLink(name); + if (link) { + link->close(); + } + } else if (type == TYPE_BRIDGE) { + boost::shared_ptr<Bridge> bridge = links.getBridge(name); + if (bridge) { + bridge->close(); + } } else { throw UnknownObjectType(type); } diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index f21c861149..b605ca71e5 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -125,18 +125,19 @@ boost::shared_ptr<Exchange> Link::linkExchangeFactory( const std::string& _name return Exchange::shared_ptr(new LinkExchange(_name)); } -Link::Link(LinkRegistry* _links, - MessageStore* _store, +Link::Link(const string& _name, + LinkRegistry* _links, const string& _host, uint16_t _port, const string& _transport, + DestroyedListener l, bool _durable, const string& _authMechanism, const string& _username, const string& _password, Broker* _broker, Manageable* parent) - : links(_links), store(_store), + : name(_name), links(_links), configuredTransport(_transport), configuredHost(_host), configuredPort(_port), host(_host), port(_port), transport(_transport), durable(_durable), @@ -149,6 +150,7 @@ Link::Link(LinkRegistry* _links, channelCounter(1), connection(0), agent(0), + listener(l), timerTask(new LinkTimerTask(*this, broker->getTimer())), failoverChannel(0) { @@ -157,7 +159,10 @@ Link::Link(LinkRegistry* _links, agent = broker->getManagementAgent(); if (agent != 0) { - mgmtObject = new _qmf::Link(agent, this, parent, _host, _port, _transport, _durable); + mgmtObject = new _qmf::Link(agent, this, parent, name, durable); + mgmtObject->set_host(host); + mgmtObject->set_port(port); + mgmtObject->set_transport(transport); agent->addObject(mgmtObject, 0, durable); } } @@ -169,9 +174,9 @@ Link::Link(LinkRegistry* _links, } broker->getTimer().add(timerTask); - stringstream _name; - _name << "qpid.link." << transport << ":" << host << ":" << port; - std::pair<Exchange::shared_ptr, bool> rc = broker->getExchanges().declare(_name.str(), + stringstream exchangeName; + exchangeName << "qpid.link." << name; + std::pair<Exchange::shared_ptr, bool> rc = broker->getExchanges().declare(exchangeName.str(), exchangeTypeName); failoverExchange = boost::static_pointer_cast<LinkExchange>(rc.first); assert(failoverExchange); @@ -245,6 +250,7 @@ void Link::established(Connection* c) currentInterval = 1; visitCount = 0; connection = c; + if (closing) destroy(); else // Process any IO tasks bridges added before established. @@ -263,7 +269,7 @@ void Link::setUrl(const Url& u) { namespace { /** invoked when session used to subscribe to remote's amq.failover exchange detaches */ void sessionDetached(Link *link) { - QPID_LOG(debug, "detached from 'amq.failover' for link: " << link->getName()); + QPID_LOG(notice, "detached from 'amq.failover' for link: " << link->getName()); } } @@ -271,6 +277,11 @@ namespace { void Link::opened() { Mutex::ScopedLock mutex(lock); if (!connection) return; + + if (!hideManagement() && connection->GetManagementObject()) { + mgmtObject->set_connectionRef(connection->GetManagementObject()->getObjectId()); + } + // Get default URL from known-hosts if not already set if (url.empty()) { const std::vector<Url>& known = connection->getKnownHosts(); @@ -346,13 +357,14 @@ void Link::closed(int, std::string text) if (!hideManagement()) mgmtObject->set_lastError (text); } + mgmtObject->set_connectionRef(qpid::management::ObjectId()); } // Call destroy outside of the lock, don't want to be deleted with lock held. if (isClosing) destroy(); } -// Called in connection IO thread. +// Called in connection IO thread, cleans up the connection before destroying Link void Link::destroy () { Bridges toDelete; @@ -379,9 +391,9 @@ void Link::destroy () } // Now delete all bridges on this link (don't hold the lock for this). for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++) - (*i)->destroy(); + (*i)->close(); toDelete.clear(); - links->destroy (configuredHost, configuredPort); + listener(this); // notify LinkRegistry that this Link has been destroyed } void Link::add(Bridge::shared_ptr bridge) @@ -485,12 +497,16 @@ void Link::reconnectLH(const Address& a) host = a.host; port = a.port; transport = a.protocol; - startConnectionLH(); + if (!hideManagement()) { stringstream errorString; - errorString << "Failed over to " << a; + errorString << "Failing over to " << a; mgmtObject->set_lastError(errorString.str()); + mgmtObject->set_host(host); + mgmtObject->set_port(port); + mgmtObject->set_transport(transport); } + startConnectionLH(); } bool Link::tryFailoverLH() { @@ -499,15 +515,14 @@ bool Link::tryFailoverLH() { if (url.empty()) return false; Address next = url[reconnectNext++]; if (next.host != host || next.port != port || next.protocol != transport) { - links->changeAddress(Address(transport, host, port), next); - QPID_LOG(debug, "Inter-broker link failing over to " << next.host << ":" << next.port); + QPID_LOG(notice, "Inter-broker link '" << name << "' failing over to " << next); reconnectLH(next); return true; } return false; } -// Management updates for a linke are inconsistent in a cluster, so they are +// Management updates for a link are inconsistent in a cluster, so they are // suppressed. bool Link::hideManagement() const { return !mgmtObject || ( broker && broker->isInCluster()); @@ -536,18 +551,34 @@ void Link::setPersistenceId(uint64_t id) const const string& Link::getName() const { - return configuredHost; + return name; +} + +const std::string Link::ENCODED_IDENTIFIER("link.v2"); +const std::string Link::ENCODED_IDENTIFIER_V1("link"); + +bool Link::isEncodedLink(const std::string& key) +{ + return key == ENCODED_IDENTIFIER || key == ENCODED_IDENTIFIER_V1; } Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer) { + string kind; + buffer.getShortString(kind); + string host; uint16_t port; string transport; string authMechanism; string username; string password; + string name; + if (kind == ENCODED_IDENTIFIER) { + // newer version provides a link name. + buffer.getShortString(name); + } buffer.getShortString(host); port = buffer.getShort(); buffer.getShortString(transport); @@ -556,12 +587,21 @@ Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer) buffer.getShortString(username); buffer.getShortString(password); - return links.declare(host, port, transport, durable, authMechanism, username, password).first; + if (kind == ENCODED_IDENTIFIER_V1) { + /** previous versions identified the Link by host:port, there was no name + * assigned. So create a name for the new Link. + */ + name = createName(transport, host, port); + } + + return links.declare(name, host, port, transport, durable, authMechanism, + username, password).first; } void Link::encode(Buffer& buffer) const { - buffer.putShortString(string("link")); + buffer.putShortString(ENCODED_IDENTIFIER); + buffer.putShortString(name); buffer.putShortString(configuredHost); buffer.putShort(configuredPort); buffer.putShortString(configuredTransport); @@ -573,8 +613,9 @@ void Link::encode(Buffer& buffer) const uint32_t Link::encodedSize() const { - return configuredHost.size() + 1 // short-string (host) - + 5 // short-string ("link") + return ENCODED_IDENTIFIER.size() + 1 // +1 byte length + + name.size() + 1 + + configuredHost.size() + 1 // short-string (host) + 2 // port + configuredTransport.size() + 1 // short-string(transport) + 1 // durable @@ -589,6 +630,7 @@ ManagementObject* Link::GetManagementObject (void) const } void Link::close() { + QPID_LOG(debug, "Link::close(), link=" << name ); Mutex::ScopedLock mutex(lock); if (!closing) { closing = true; @@ -609,36 +651,31 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& te return Manageable::STATUS_OK; case _qmf::Link::METHOD_BRIDGE : + /* TBD: deprecate this interface in favor of the Broker::create() method. The + * Broker::create() method allows the user to assign a name to the bridge. + */ + QPID_LOG(info, "The Link::bridge() method will be removed in a future release of QPID." + " Please use the Broker::create() method with type='bridge' instead."); _qmf::ArgsLinkBridge& iargs = (_qmf::ArgsLinkBridge&) args; - QPID_LOG(debug, "Link::bridge() request received"); - - // Durable bridges are only valid on durable links - if (iargs.i_durable && !durable) { - text = "Can't create a durable route on a non-durable link"; - return Manageable::STATUS_USER; - } - - if (iargs.i_dynamic) { - Exchange::shared_ptr exchange = getBroker()->getExchanges().get(iargs.i_src); - if (exchange.get() == 0) { - text = "Exchange not found"; - return Manageable::STATUS_USER; - } - if (!exchange->supportsDynamicBinding()) { - text = "Exchange type does not support dynamic routing"; - return Manageable::STATUS_USER; + QPID_LOG(debug, "Link::bridge() request received; src=" << iargs.i_src << + "; dest=" << iargs.i_dest << "; key=" << iargs.i_key); + + // Does a bridge already exist that has the src/dest/key? If so, re-use the + // existing bridge - this behavior is backward compatible with previous releases. + Bridge::shared_ptr bridge = links->getBridge(*this, iargs.i_src, iargs.i_dest, iargs.i_key); + if (!bridge) { + // need to create a new bridge on this link. + std::pair<Bridge::shared_ptr, bool> rc = + links->declare( Bridge::createName(name, iargs.i_src, iargs.i_dest, iargs.i_key), + *this, iargs.i_durable, + iargs.i_src, iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue, + iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes, + iargs.i_dynamic, iargs.i_sync); + if (!rc.first) { + text = "invalid parameters"; + return Manageable::STATUS_PARAMETER_INVALID; } } - - std::pair<Bridge::shared_ptr, bool> result = - links->declare (configuredHost, configuredPort, iargs.i_durable, iargs.i_src, - iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue, - iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes, - iargs.i_dynamic, iargs.i_sync); - - if (result.second && iargs.i_durable) - store->create(*result.first); - return Manageable::STATUS_OK; } @@ -716,6 +753,23 @@ void Link::setState(const framing::FieldTable& state) } } +std::string Link::createName(const std::string& transport, + const std::string& host, + uint16_t port) +{ + stringstream linkName; + linkName << QPID_NAME_PREFIX << transport << std::string(":") + << host << std::string(":") << port; + return linkName.str(); +} + + +bool Link::pendingConnection(const std::string& _host, uint16_t _port) const +{ + Mutex::ScopedLock mutex(lock); + return (isConnecting() && _port == port && _host == host); +} + const std::string Link::exchangeTypeName("qpid.LinkExchange"); diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h index a97fa48664..5b788bb947 100644 --- a/qpid/cpp/src/qpid/broker/Link.h +++ b/qpid/cpp/src/qpid/broker/Link.h @@ -25,7 +25,6 @@ #include <boost/shared_ptr.hpp> #include "qpid/Url.h" #include "qpid/broker/BrokerImportExport.h" -#include "qpid/broker/MessageStore.h" #include "qpid/broker/PersistableConfig.h" #include "qpid/broker/Bridge.h" #include "qpid/broker/BrokerImportExport.h" @@ -52,8 +51,8 @@ class LinkExchange; class Link : public PersistableConfig, public management::Manageable { private: mutable sys::Mutex lock; + const std::string name; LinkRegistry* links; - MessageStore* store; // these remain constant across failover - used to identify this link const std::string configuredTransport; @@ -85,6 +84,7 @@ class Link : public PersistableConfig, public management::Manageable { uint channelCounter; Connection* connection; management::ManagementAgent* agent; + boost::function<void(Link*)> listener; boost::intrusive_ptr<sys::TimerTask> timerTask; boost::shared_ptr<broker::LinkExchange> failoverExchange; // subscribed to remote's amq.failover exchange uint failoverChannel; @@ -101,27 +101,32 @@ class Link : public PersistableConfig, public management::Manageable { void setStateLH (int newState); void startConnectionLH(); // Start the IO Connection - void destroy(); // Called when mgmt deletes this link + void destroy(); // Cleanup connection before link goes away void ioThreadProcessing(); // Called on connection's IO thread by request bool tryFailoverLH(); // Called during maintenance visit bool hideManagement() const; + void reconnectLH(const Address&); //called by LinkRegistry - void established(Connection*); // Called when connection is create + // connection management (called by LinkRegistry) + void established(Connection*); // Called when connection is created void opened(); // Called when connection is open (after create) void closed(int, std::string); // Called when connection goes away - void reconnectLH(const Address&); //called by LinkRegistry + void notifyConnectionForced(const std::string text); void closeConnection(const std::string& reason); + bool pendingConnection(const std::string& host, uint16_t port) const; // is Link trying to connect to this remote? friend class LinkRegistry; // to call established, opened, closed public: typedef boost::shared_ptr<Link> shared_ptr; + typedef boost::function<void(Link*)> DestroyedListener; - Link(LinkRegistry* links, - MessageStore* store, + Link(const std::string& name, + LinkRegistry* links, const std::string& host, uint16_t port, const std::string& transport, + DestroyedListener l, bool durable, const std::string& authMechanism, const std::string& username, @@ -148,15 +153,17 @@ class Link : public PersistableConfig, public management::Manageable { void cancel(Bridge::shared_ptr); QPID_BROKER_EXTERN void setUrl(const Url&); // Set URL for reconnection. - QPID_BROKER_EXTERN void close(); // Close the link from within the broker. + + // Close the link. + QPID_BROKER_EXTERN void close(); std::string getAuthMechanism() { return authMechanism; } std::string getUsername() { return username; } std::string getPassword() { return password; } Broker* getBroker() { return broker; } - void notifyConnectionForced(const std::string text); void setPassive(bool p); + bool isConnecting() const { return state == STATE_CONNECTING; } // PersistableConfig: void setPersistenceId(uint64_t id) const; @@ -165,7 +172,10 @@ class Link : public PersistableConfig, public management::Manageable { void encode(framing::Buffer& buffer) const; const std::string& getName() const; + static const std::string ENCODED_IDENTIFIER; + static const std::string ENCODED_IDENTIFIER_V1; static Link::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer); + static bool isEncodedLink(const std::string& key); // Manageable entry points management::ManagementObject* GetManagementObject(void) const; @@ -178,6 +188,11 @@ class Link : public PersistableConfig, public management::Manageable { // replicate internal state of this Link for clustering void getState(framing::FieldTable& state) const; void setState(const framing::FieldTable& state); + + /** create a name for a link (if none supplied by user config) */ + static std::string createName(const std::string& transport, + const std::string& host, + uint16_t port); }; } } diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp index d89f220d1b..3cad2c40c9 100644 --- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp @@ -68,27 +68,34 @@ LinkRegistry::LinkRegistry (Broker* _broker) : LinkRegistry::~LinkRegistry() {} +/** find link by the *configured* remote address */ +boost::shared_ptr<Link> LinkRegistry::getLink(const std::string& host, + uint16_t port, + const std::string& transport) +{ + Mutex::ScopedLock locker(lock); + for (LinkMap::iterator i = links.begin(); i != links.end(); ++i) { + Link::shared_ptr& link = i->second; + if (link->getHost() == host && + link->getPort() == port && + (transport.empty() || link->getTransport() == transport)) + return link; + } + return boost::shared_ptr<Link>(); +} -void LinkRegistry::changeAddress(const qpid::Address& oldAddress, const qpid::Address& newAddress) +/** find link by name */ +boost::shared_ptr<Link> LinkRegistry::getLink(const std::string& name) { Mutex::ScopedLock locker(lock); - std::string oldKey = createKey(oldAddress); - std::string newKey = createKey(newAddress); - if (links.find(newKey) != links.end()) { - QPID_LOG(error, "Attempted to update key from " << oldKey << " to " << newKey << " which is already in use"); - } else { - LinkMap::iterator i = links.find(oldKey); - if (i == links.end()) { - QPID_LOG(error, "Attempted to update key from " << oldKey << " which does not exist, to " << newKey); - } else { - links[newKey] = i->second; - links.erase(oldKey); - QPID_LOG(info, "Updated link key from " << oldKey << " to " << newKey); - } - } + LinkMap::iterator l = links.find(name); + if (l != links.end()) + return l->second; + return boost::shared_ptr<Link>(); } -pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& host, +pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& name, + const string& host, uint16_t port, const string& transport, bool durable, @@ -98,24 +105,53 @@ pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& host, { Mutex::ScopedLock locker(lock); - string key = createKey(host, port); - LinkMap::iterator i = links.find(key); + LinkMap::iterator i = links.find(name); if (i == links.end()) { Link::shared_ptr link; - link = Link::shared_ptr (new Link (this, store, host, port, transport, durable, - authMechanism, username, password, - broker, parent)); - links[key] = link; + link = Link::shared_ptr (new Link (name, this, host, port, transport, + boost::bind(&LinkRegistry::linkDestroyed, this, _1), + durable, authMechanism, username, password, broker, + parent)); + if (durable && store) store->create(*link); + links[name] = link; + QPID_LOG(debug, "Creating new link; name=" << name ); return std::pair<Link::shared_ptr, bool>(link, true); } return std::pair<Link::shared_ptr, bool>(i->second, false); } -pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host, - uint16_t port, +/** find bridge by link & route info */ +Bridge::shared_ptr LinkRegistry::getBridge(const Link& link, + const std::string& src, + const std::string& dest, + const std::string& key) +{ + Mutex::ScopedLock locker(lock); + for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) { + if (i->second->getSrc() == src && i->second->getDest() == dest && + i->second->getKey() == key && i->second->getLink() && + i->second->getLink()->getName() == link.getName()) { + return i->second; + } + } + return Bridge::shared_ptr(); +} + +/** find bridge by name */ +Bridge::shared_ptr LinkRegistry::getBridge(const std::string& name) +{ + Mutex::ScopedLock locker(lock); + BridgeMap::iterator b = bridges.find(name); + if (b != bridges.end()) + return b->second; + return Bridge::shared_ptr(); +} + +pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& name, + Link& link, bool durable, const std::string& src, const std::string& dest, @@ -130,18 +166,26 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host, ) { Mutex::ScopedLock locker(lock); - QPID_LOG(debug, "Bridge declared " << host << ": " << port << " from " << src << " to " << dest << " (" << key << ")"); - string linkKey = createKey(host, port); - stringstream keystream; - keystream << linkKey << "!" << src << "!" << dest << "!" << key; - string bridgeKey = keystream.str(); + // Durable bridges are only valid on durable links + if (durable && !link.isDurable()) { + QPID_LOG(error, "Can't create a durable route '" << name << "' on a non-durable link '" << link.getName()); + return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false); + } - LinkMap::iterator l = links.find(linkKey); - if (l == links.end()) - return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false); + if (dynamic) { + Exchange::shared_ptr exchange = broker->getExchanges().get(src); + if (exchange.get() == 0) { + QPID_LOG(error, "Exchange not found, name='" << src << "'" ); + return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false); + } + if (!exchange->supportsDynamicBinding()) { + QPID_LOG(error, "Exchange type does not support dynamic routing, name='" << src << "'"); + return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false); + } + } - BridgeMap::iterator b = bridges.find(bridgeKey); + BridgeMap::iterator b = bridges.find(name); if (b == bridges.end()) { _qmf::ArgsLinkBridge args; @@ -159,23 +203,29 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host, args.i_sync = sync; bridge = Bridge::shared_ptr - (new Bridge (l->second.get(), l->second->nextChannel(), - boost::bind(&LinkRegistry::destroy, this, - host, port, src, dest, key), - args, init)); - bridges[bridgeKey] = bridge; - l->second->add(bridge); + (new Bridge (name, &link, link.nextChannel(), + boost::bind(&LinkRegistry::destroyBridge, this, _1), + args, init)); + bridges[name] = bridge; + link.add(bridge); + if (durable && store) + store->create(*bridge); + + QPID_LOG(debug, "Bridge '" << name <<"' declared on link '" << link.getName() << + "' from " << src << " to " << dest << " (" << key << ")"); + return std::pair<Bridge::shared_ptr, bool>(bridge, true); } return std::pair<Bridge::shared_ptr, bool>(b->second, false); } -void LinkRegistry::destroy(const string& host, const uint16_t port) +/** called back by the link when it has completed its cleanup and can be removed. */ +void LinkRegistry::linkDestroyed(Link *link) { + QPID_LOG(debug, "LinkRegistry::destroy(); link= " << link->getName()); Mutex::ScopedLock locker(lock); - string key = createKey(host, port); - LinkMap::iterator i = links.find(key); + LinkMap::iterator i = links.find(link->getName()); if (i != links.end()) { if (i->second->isDurable() && store) @@ -184,27 +234,20 @@ void LinkRegistry::destroy(const string& host, const uint16_t port) } } -void LinkRegistry::destroy(const std::string& host, - const uint16_t port, - const std::string& src, - const std::string& dest, - const std::string& key) +/** called back by bridge when its destruction has been requested */ +void LinkRegistry::destroyBridge(Bridge *bridge) { + QPID_LOG(debug, "LinkRegistry::destroy(); bridge= " << bridge->getName()); Mutex::ScopedLock locker(lock); - string linkKey = createKey(host, port); - stringstream keystream; - keystream << linkKey << "!" << src << "!" << dest << "!" << key; - string bridgeKey = keystream.str(); - - LinkMap::iterator l = links.find(linkKey); - if (l == links.end()) - return; - BridgeMap::iterator b = bridges.find(bridgeKey); + BridgeMap::iterator b = bridges.find(bridge->getName()); if (b == bridges.end()) return; - l->second->cancel(b->second); + Link *link = b->second->getLink(); + if (link) { + link->cancel(b->second); + } if (b->second->isDurable()) store->destroy(*(b->second)); bridges.erase(b); @@ -219,28 +262,73 @@ MessageStore* LinkRegistry::getStore() const { return store; } -Link::shared_ptr LinkRegistry::findLink(const std::string& keyOrMgmtId) -{ - // Convert keyOrMgmtId to a host:port key. - // - // TODO aconway 2011-02-01: centralize code that constructs/parses - // connection management IDs. Currently sys:: protocol factories - // and IO plugins construct the IDs and LinkRegistry parses them. - size_t separator = keyOrMgmtId.find('-'); - if (separator == std::string::npos) separator = 0; - std::string key = keyOrMgmtId.substr(separator+1, std::string::npos); +namespace { + void extractHostPort(const std::string& connId, std::string *host, uint16_t *port) + { + // Extract host and port of remote broker from connection id string. + // + // TODO aconway 2011-02-01: centralize code that constructs/parses connection + // management IDs. Currently sys:: protocol factories and IO plugins construct the + // IDs and LinkRegistry parses them. + // KAG: current connection id format assumed: + // "localhost:port-remotehost:port". In the case of IpV6, the host addresses are + // contained within brackets "[...]", example: + // connId="[::1]:36859-[::1]:48603". Liberal use of "asserts" provided to alert us + // if this assumption changes! + size_t separator = connId.find('-'); + assert(separator != std::string::npos); + std::string remote = connId.substr(separator+1, std::string::npos); + separator = remote.rfind(":"); + assert(separator != std::string::npos); + *host = remote.substr(0, separator); + // IPv6 - host is bracketed by "[]", strip them + if ((*host)[0] == '[' && (*host)[host->length() - 1] == ']') { + *host = host->substr(1, host->length() - 2); + } + try { + *port = boost::lexical_cast<uint16_t>(remote.substr(separator+1, std::string::npos)); + } catch (const boost::bad_lexical_cast&) { + QPID_LOG(error, "Invalid format for connection identifier! '" << connId << "'"); + assert(false); + } + } +} +/** find the Link that corresponds to the given connection */ +Link::shared_ptr LinkRegistry::findLink(const std::string& connId) +{ Mutex::ScopedLock locker(lock); - LinkMap::iterator l = links.find(key); - if (l != links.end()) return l->second; - else return Link::shared_ptr(); + ConnectionMap::iterator c = connections.find(connId); + if (c != connections.end()) { + LinkMap::iterator l = links.find(c->second); + if (l != links.end()) + return l->second; + } + return Link::shared_ptr(); } void LinkRegistry::notifyConnection(const std::string& key, Connection* c) { - Link::shared_ptr link = findLink(key); + // find a link that is attempting to connect to the remote, and + // create a mapping from connection id to link + QPID_LOG(debug, "LinkRegistry::notifyConnection(); key=" << key ); + std::string host; + uint16_t port; + extractHostPort( key, &host, &port ); + Link::shared_ptr link; + { + Mutex::ScopedLock locker(lock); + for (LinkMap::iterator l = links.begin(); l != links.end(); ++l) { + if (l->second->pendingConnection(host, port)) { + link = l->second; + connections[key] = link->getName(); + link->established(c); + break; + } + } + } + if (link) { - link->established(c); c->setUserId(str(format("%1%@%2%") % link->getUsername() % realm)); } } @@ -343,20 +431,6 @@ std::string LinkRegistry::getAuthIdentity(const std::string& key) } -std::string LinkRegistry::createKey(const qpid::Address& a) { - // TODO aconway 2010-05-11: key should also include protocol/transport to - // be unique. Requires refactor of LinkRegistry interface. - return createKey(a.host, a.port); -} - -std::string LinkRegistry::createKey(const std::string& host, uint16_t port) { - // TODO aconway 2010-05-11: key should also include protocol/transport to - // be unique. Requires refactor of LinkRegistry interface. - stringstream keystream; - keystream << host << ":" << port; - return keystream.str(); -} - void LinkRegistry::setPassive(bool p) { Mutex::ScopedLock locker(lock); @@ -369,10 +443,12 @@ void LinkRegistry::setPassive(bool p) } void LinkRegistry::eachLink(boost::function<void(boost::shared_ptr<Link>)> f) { + Mutex::ScopedLock locker(lock); for (LinkMap::iterator i = links.begin(); i != links.end(); ++i) f(i->second); } void LinkRegistry::eachBridge(boost::function<void(boost::shared_ptr<Bridge>)> f) { + Mutex::ScopedLock locker(lock); for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) f(i->second); } diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.h b/qpid/cpp/src/qpid/broker/LinkRegistry.h index 8e9d2f4b0d..5f79d9bb52 100644 --- a/qpid/cpp/src/qpid/broker/LinkRegistry.h +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.h @@ -42,9 +42,11 @@ namespace broker { class LinkRegistry { typedef std::map<std::string, boost::shared_ptr<Link> > LinkMap; typedef std::map<std::string, Bridge::shared_ptr> BridgeMap; + typedef std::map<std::string, std::string> ConnectionMap; - LinkMap links; - BridgeMap bridges; + LinkMap links; /** indexed by name of Link */ + BridgeMap bridges; /** indexed by name of Bridge */ + ConnectionMap connections; /** indexed by connection identifier, gives link name */ qpid::sys::Mutex lock; Broker* broker; @@ -54,15 +56,18 @@ namespace broker { std::string realm; boost::shared_ptr<Link> findLink(const std::string& key); - static std::string createKey(const Address& address); - static std::string createKey(const std::string& host, uint16_t port); - // Methods called by the connection observer. + // Methods called by the connection observer, key is connection identifier void notifyConnection (const std::string& key, Connection* c); void notifyOpened (const std::string& key); void notifyClosed (const std::string& key); void notifyConnectionForced (const std::string& key, const std::string& text); - friend class LinkRegistryConnectionObserver; + friend class LinkRegistryConnectionObserver; + + /** Notify the registry that a Link has been destroyed */ + void linkDestroyed(Link*); + /** Request to destroy a Bridge */ + void destroyBridge(Bridge*); public: QPID_BROKER_EXTERN LinkRegistry (); // Only used in store tests @@ -70,17 +75,27 @@ namespace broker { QPID_BROKER_EXTERN ~LinkRegistry(); QPID_BROKER_EXTERN std::pair<boost::shared_ptr<Link>, bool> - declare(const std::string& host, + declare(const std::string& name, + const std::string& host, uint16_t port, const std::string& transport, bool durable, const std::string& authMechanism, const std::string& username, const std::string& password); + /** determine if Link exists */ + QPID_BROKER_EXTERN boost::shared_ptr<Link> + getLink(const std::string& name); + /** host,port,transport will be matched against the configured values, which may + be different from the current values due to failover */ + QPID_BROKER_EXTERN boost::shared_ptr<Link> + getLink(const std::string& configHost, + uint16_t configPort, + const std::string& configTransport = std::string()); QPID_BROKER_EXTERN std::pair<Bridge::shared_ptr, bool> - declare(const std::string& host, - uint16_t port, + declare(const std::string& name, + Link& link, bool durable, const std::string& src, const std::string& dest, @@ -93,14 +108,14 @@ namespace broker { uint16_t sync, Bridge::InitializeCallback=0 ); - - QPID_BROKER_EXTERN void destroy(const std::string& host, const uint16_t port); - - QPID_BROKER_EXTERN void destroy(const std::string& host, - const uint16_t port, - const std::string& src, - const std::string& dest, - const std::string& key); + /** determine if Bridge exists */ + QPID_BROKER_EXTERN Bridge::shared_ptr + getBridge(const std::string& name); + QPID_BROKER_EXTERN Bridge::shared_ptr + getBridge(const Link& link, + const std::string& src, + const std::string& dest, + const std::string& key); /** * Register the manageable parent for declared queues @@ -126,11 +141,6 @@ namespace broker { QPID_BROKER_EXTERN uint16_t getPort (const std::string& key); /** - * Called by links failing over to new address - */ - void changeAddress(const Address& oldAddress, const Address& newAddress); - - /** * Called to alter passive state. In passive state the links * and bridges managed by a link registry will be recorded and * updated but links won't actually establish connections and diff --git a/qpid/cpp/src/qpid/broker/NameGenerator.h b/qpid/cpp/src/qpid/broker/NameGenerator.h index 6ea25c9797..2e9f7febe2 100644 --- a/qpid/cpp/src/qpid/broker/NameGenerator.h +++ b/qpid/cpp/src/qpid/broker/NameGenerator.h @@ -32,6 +32,7 @@ namespace qpid { NameGenerator(const std::string& base); std::string generate(); }; + const std::string QPID_NAME_PREFIX("qpid."); // reserved for private names } } diff --git a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index d08409695e..858535637a 100644 --- a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -144,11 +144,13 @@ RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const RecoverableConfig::shared_ptr RecoveryManagerImpl::recoverConfig(framing::Buffer& buffer) { string kind; - + uint32_t p = buffer.getPosition(); buffer.getShortString (kind); - if (kind == "link") + buffer.setPosition(p); + + if (Link::isEncodedLink(kind)) return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Link::decode (links, buffer))); - else if (kind == "bridge") + else if (Bridge::isEncodedBridge(kind)) return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Bridge::decode (links, buffer))); return RecoverableConfig::shared_ptr(); // TODO: raise an exception instead diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index 512e0f03cb..63001a3cb9 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -782,16 +782,18 @@ void Connection::managementSetupState( void Connection::config(const std::string& encoded) { Buffer buf(const_cast<char*>(encoded.data()), encoded.size()); string kind; + uint32_t p = buf.getPosition(); buf.getShortString (kind); - if (kind == "link") { + buf.setPosition(p); + if (broker::Link::isEncodedLink(kind)) { broker::Link::shared_ptr link = - broker::Link::decode(cluster.getBroker().getLinks(), buf); + broker::Link::decode(cluster.getBroker().getLinks(), buf); QPID_LOG(debug, cluster << " updated link " << link->getHost() << ":" << link->getPort()); } - else if (kind == "bridge") { + else if (broker::Bridge::isEncodedBridge(kind)) { broker::Bridge::shared_ptr bridge = - broker::Bridge::decode(cluster.getBroker().getLinks(), buf); + broker::Bridge::decode(cluster.getBroker().getLinks(), buf); QPID_LOG(debug, cluster << " updated bridge " << bridge->getName()); } else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind)); diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index 3f3fa87a01..42cb2dbbce 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -57,8 +57,10 @@ void Backup::initialize(const Url& url) { if (url.empty()) throw Url::Invalid("HA broker URL is empty"); QPID_LOG(notice, "HA: Backup initialized: " << url); string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; + framing::Uuid uuid(true); // Declare the link std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare( + broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(), url[0].host, url[0].port, protocol, false, // durable settings.mechanism, settings.username, settings.password); diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index d0c99cbdb6..690337831c 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -190,8 +190,11 @@ BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& : Exchange(QPID_CONFIGURATION_REPLICATOR), haBroker(hb), broker(hb.getBroker()), link(l) { + framing::Uuid uuid(true); + const std::string name(QPID_CONFIGURATION_REPLICATOR + ".bridge." + uuid.str()); broker.getLinks().declare( - link->getHost(), link->getPort(), + name, // name for bridge + *link, // parent false, // durable QPID_CONFIGURATION_REPLICATOR, // src QPID_CONFIGURATION_REPLICATOR, // dest diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 7d82fb63bd..589d7ee6aa 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -119,7 +119,9 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue); Url url(bq_args.i_broker); string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; + framing::Uuid uuid(true); std::pair<broker::Link::shared_ptr, bool> result = broker.getLinks().declare( + broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(), url[0].host, url[0].port, protocol, false, // durable settings.mechanism, settings.username, settings.password); diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 633619be13..5ab09d3213 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -60,9 +60,11 @@ QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<L // This must be separate from the constructor so we can call shared_from_this. void QueueReplicator::activate() { - // Note this may create a new bridge or use an existing one. + sys::Mutex::ScopedLock l(lock); + std::pair<Bridge::shared_ptr, bool> result = queue->getBroker()->getLinks().declare( - link->getHost(), link->getPort(), + bridgeName, + *link, false, // durable queue->getName(), // src getName(), // dest @@ -77,21 +79,24 @@ void QueueReplicator::activate() { // before initializeBridge is called. boost::bind(&QueueReplicator::initializeBridge, shared_from_this(), _1, _2) ); + bridge = result.first; } QueueReplicator::~QueueReplicator() {} void QueueReplicator::deactivate() { + // destroy the route sys::Mutex::ScopedLock l(lock); - queue->getBroker()->getLinks().destroy( - link->getHost(), link->getPort(), queue->getName(), getName(), string()); - QPID_LOG(debug, logPrefix << "Deactivated bridge " << bridgeName); + if (bridge) { + bridge->close(); + bridge.reset(); + QPID_LOG(debug, logPrefix << "Deactivated bridge " << bridgeName); + } } // Called in a broker connection thread when the bridge is created. void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { sys::Mutex::ScopedLock l(lock); - bridgeName = bridge.getName(); framing::AMQP_ServerProxy peer(sessionHandler.out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); framing::FieldTable settings; diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h index bcbac988fa..26fb9456d1 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.h +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h @@ -78,6 +78,7 @@ class QueueReplicator : public broker::Exchange, sys::Mutex lock; boost::shared_ptr<broker::Queue> queue; boost::shared_ptr<broker::Link> link; + boost::shared_ptr<broker::Bridge> bridge; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index 8952f5de7b..09eebc5ec9 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -768,6 +768,35 @@ acl deny all all fetch(cluster[2]) + def _verify_federation(self, src_broker, src, dst_broker, dst, timeout=30): + """ Prove that traffic can pass between two federated brokers. + """ + tot_time = 0 + active = False + send_session = src_broker.connect().session() + sender = send_session.sender(src) + receive_session = dst_broker.connect().session() + receiver = receive_session.receiver(dst) + while not active and tot_time < timeout: + sender.send(Message("Hello from Source!")) + try: + receiver.fetch(timeout = 1) + receive_session.acknowledge() + # Get this far without Empty exception, and the link is good! + active = True + while True: + # Keep receiving msgs, as several may have accumulated + receiver.fetch(timeout = 1) + receive_session.acknowledge() + except Empty: + if not active: + tot_time += 1 + receiver.close() + receive_session.close() + sender.close() + send_session.close() + return active + def test_federation_failover(self): """ Verify that federation operates across failures occuring in a cluster. @@ -778,38 +807,6 @@ acl deny all all cluster to newly-added members """ - TIMEOUT = 30 - def verify(src_broker, src, dst_broker, dst, timeout=TIMEOUT): - """ Prove that traffic can pass from source fed broker to - destination fed broker - """ - tot_time = 0 - active = False - send_session = src_broker.connect().session() - sender = send_session.sender(src) - receive_session = dst_broker.connect().session() - receiver = receive_session.receiver(dst) - while not active and tot_time < timeout: - sender.send(Message("Hello from Source!")) - try: - receiver.fetch(timeout = 1) - receive_session.acknowledge() - # Get this far without Empty exception, and the link is good! - active = True - while True: - # Keep receiving msgs, as several may have accumulated - receiver.fetch(timeout = 1) - receive_session.acknowledge() - except Empty: - if not active: - tot_time += 1 - receiver.close() - receive_session.close() - sender.close() - send_session.close() - self.assertTrue(active, "Bridge failed to become active") - - # 2 node cluster source, 2 node cluster destination src_cluster = self.cluster(2, expect=EXPECT_EXIT_FAIL) src_cluster.ready(); @@ -848,43 +845,145 @@ acl deny all all self.assertEqual(result.status, 0, result) # check that traffic passes - verify(src_cluster[0], "srcQ", dst_cluster[0], "destQ") + assert self._verify_federation(src_cluster[0], "srcQ", dst_cluster[0], "destQ") # add src[2] broker to source cluster src_cluster.start(expect=EXPECT_EXIT_FAIL); src_cluster.ready(); - verify(src_cluster[2], "srcQ", dst_cluster[0], "destQ") + assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[0], "destQ") # Kill src[0]. dst[0] should fail over to src[1] src_cluster[0].kill() for b in src_cluster[1:]: b.ready() - verify(src_cluster[1], "srcQ", dst_cluster[0], "destQ") + assert self._verify_federation(src_cluster[1], "srcQ", dst_cluster[0], "destQ") # Kill src[1], dst[0] should fail over to src[2] src_cluster[1].kill() for b in src_cluster[2:]: b.ready() - verify(src_cluster[2], "srcQ", dst_cluster[0], "destQ") + assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[0], "destQ") # Kill dest[0], force failover to dest[1] dst_cluster[0].kill() for b in dst_cluster[1:]: b.ready() - verify(src_cluster[2], "srcQ", dst_cluster[1], "destQ") + assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[1], "destQ") # Add dest[2] # dest[1] syncs dest[2] to current remote state dst_cluster.start(expect=EXPECT_EXIT_FAIL); for b in dst_cluster[1:]: b.ready() - verify(src_cluster[2], "srcQ", dst_cluster[1], "destQ") + assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[1], "destQ") # Kill dest[1], force failover to dest[2] dst_cluster[1].kill() for b in dst_cluster[2:]: b.ready() - verify(src_cluster[2], "srcQ", dst_cluster[2], "destQ") + assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[2], "destQ") for i in range(2, len(src_cluster)): src_cluster[i].kill() for i in range(2, len(dst_cluster)): dst_cluster[i].kill() + def test_federation_multilink_failover(self): + """ + Verify that multi-link federation operates across failures occuring in + a cluster. + """ + + # 1 node cluster source, 1 node cluster destination + src_cluster = self.cluster(1, expect=EXPECT_EXIT_FAIL) + src_cluster.ready(); + dst_cluster = self.cluster(1, expect=EXPECT_EXIT_FAIL) + dst_cluster.ready(); + + # federate a direct binding across two separate links + + # first, create a direct exchange bound to two queues using different + # bindings + cmd = self.popen(["qpid-config", + "--broker", src_cluster[0].host_port(), + "add", "exchange", "direct", "FedX"], + EXPECT_EXIT_OK) + cmd.wait() + + cmd = self.popen(["qpid-config", + "--broker", dst_cluster[0].host_port(), + "add", "exchange", "direct", "FedX"], + EXPECT_EXIT_OK) + cmd.wait() + + cmd = self.popen(["qpid-config", + "--broker", dst_cluster[0].host_port(), + "add", "queue", "destQ1"], + EXPECT_EXIT_OK) + cmd.wait() + + cmd = self.popen(["qpid-config", + "--broker", dst_cluster[0].host_port(), + "bind", "FedX", "destQ1", "one"], + EXPECT_EXIT_OK) + cmd.wait() + + cmd = self.popen(["qpid-config", + "--broker", dst_cluster[0].host_port(), + "add", "queue", "destQ2"], + EXPECT_EXIT_OK) + cmd.wait() + + cmd = self.popen(["qpid-config", + "--broker", dst_cluster[0].host_port(), + "bind", "FedX", "destQ2", "two"], + EXPECT_EXIT_OK) + cmd.wait() + + # Create two separate links between the dst and source brokers, bind + # each to different keys + dst_cluster[0].startQmf() + dst_broker = dst_cluster[0].qmf_session.getObjects(_class="broker")[0] + + for _l in [("link1", "bridge1", "one"), + ("link2", "bridge2", "two")]: + result = dst_broker.create("link", _l[0], + {"host":src_cluster[0].host(), + "port":src_cluster[0].port()}, + False) + self.assertEqual(result.status, 0, result); + result = dst_broker.create("bridge", _l[1], + {"link":_l[0], + "src":"FedX", + "dest":"FedX", + "key":_l[2]}, False) + self.assertEqual(result.status, 0); + + # check that traffic passes + assert self._verify_federation(src_cluster[0], "FedX/one", dst_cluster[0], "destQ1") + assert self._verify_federation(src_cluster[0], "FedX/two", dst_cluster[0], "destQ2") + + # add new member, verify traffic + src_cluster.start(expect=EXPECT_EXIT_FAIL); + src_cluster.ready(); + + dst_cluster.start(expect=EXPECT_EXIT_FAIL); + dst_cluster.ready(); + + assert self._verify_federation(src_cluster[0], "FedX/one", dst_cluster[0], "destQ1") + assert self._verify_federation(src_cluster[0], "FedX/two", dst_cluster[0], "destQ2") + + src_cluster[0].kill() + for b in src_cluster[1:]: b.ready() + + assert self._verify_federation(src_cluster[1], "FedX/one", dst_cluster[0], "destQ1") + assert self._verify_federation(src_cluster[1], "FedX/two", dst_cluster[0], "destQ2") + + dst_cluster[0].kill() + for b in dst_cluster[1:]: b.ready() + + assert self._verify_federation(src_cluster[1], "FedX/one", dst_cluster[1], "destQ1") + assert self._verify_federation(src_cluster[1], "FedX/two", dst_cluster[1], "destQ2") + + for i in range(1, len(src_cluster)): src_cluster[i].kill() + for i in range(1, len(dst_cluster)): dst_cluster[i].kill() + + + # Some utility code for transaction tests XA_RBROLLBACK = 1 XA_RBTIMEOUT = 2 diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py index 7d613b98ce..5bcf67d152 100755 --- a/qpid/cpp/src/tests/federation.py +++ b/qpid/cpp/src/tests/federation.py @@ -23,6 +23,7 @@ from qpid.testlib import TestBase010 from qpid.datatypes import Message from qpid.queue import Empty from qpid.util import URL +import qpid.messaging from time import sleep, time @@ -94,6 +95,11 @@ class FederationTests(TestBase010): break self._brokers.append(_b) + # add a new-style messaging connection to each broker + for _b in self._brokers: + _b.connection = qpid.messaging.Connection(_b.url) + _b.connection.open() + def _teardown_brokers(self): """ Un-does _setup_brokers() """ @@ -103,7 +109,7 @@ class FederationTests(TestBase010): if not _b.client_session.error(): _b.client_session.close(timeout=10) _b.client_conn.close(timeout=10) - + _b.connection.close() def test_bridge_create_and_close(self): self.startQmf(); @@ -127,18 +133,28 @@ class FederationTests(TestBase010): self.verify_cleanup() def test_pull_from_exchange(self): + """ This test uses an alternative method to manage links and bridges + via the broker object. + """ session = self.session - + self.startQmf() qmf = self.qmf broker = qmf.getObjects(_class="broker")[0] - result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") - self.assertEqual(result.status, 0, result) - link = qmf.getObjects(_class="link")[0] - result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, False, False, 0) + # create link + link_args = {"host":self.remote_host(), "port":self.remote_port(), "durable":False, + "authMechanism":"PLAIN", "username":"guest", "password":"guest", + "transport":"tcp"} + result = broker.create("link", "test-link-1", link_args, False) self.assertEqual(result.status, 0, result) + link = qmf.getObjects(_class="link")[0] + # create bridge + bridge_args = {"link":"test-link-1", "src":"amq.direct", "dest":"amq.fanout", + "key":"my-key"} + result = broker.create("bridge", "test-bridge-1", bridge_args, False); + self.assertEqual(result.status, 0, result) bridge = qmf.getObjects(_class="bridge")[0] #setup queue to receive messages from local broker @@ -164,9 +180,11 @@ class FederationTests(TestBase010): self.fail("Got unexpected message in queue: " + extra.body) except Empty: None - result = bridge.close() + + result = broker.delete("bridge", "test-bridge-1", {}) self.assertEqual(result.status, 0, result) - result = link.close() + + result = broker.delete("link", "test-link-1", {}) self.assertEqual(result.status, 0, result) self.verify_cleanup() @@ -2153,3 +2171,158 @@ class FederationTests(TestBase010): self.verify_cleanup() + def test_multilink_direct(self): + """ Verify that two distinct links can be created between federated + brokers. + """ + self.startQmf() + qmf = self.qmf + self._setup_brokers() + src_broker = self._brokers[0] + dst_broker = self._brokers[1] + + # create a direct exchange on each broker + for _b in [src_broker, dst_broker]: + _b.client_session.exchange_declare(exchange="fedX.direct", type="direct") + self.assertEqual(_b.client_session.exchange_query(name="fedX.direct").type, + "direct", "exchange_declare failed!") + + # create destination queues + for _q in [("HiQ", "high"), ("MedQ", "medium"), ("LoQ", "low")]: + dst_broker.client_session.queue_declare(queue=_q[0], auto_delete=True) + dst_broker.client_session.exchange_bind(queue=_q[0], exchange="fedX.direct", binding_key=_q[1]) + + # create two connections, one for high priority traffic + for _q in ["HiPri", "Traffic"]: + result = dst_broker.qmf_object.create("link", _q, + {"host":src_broker.host, + "port":src_broker.port}, + False) + self.assertEqual(result.status, 0); + + links = qmf.getObjects(_broker=dst_broker.qmf_broker, _class="link") + for _l in links: + if _l.name == "HiPri": + hi_link = _l + elif _l.name == "Traffic": + data_link = _l + else: + self.fail("Unexpected Link found: " + _l.name) + + # now create a route for messages sent with key "high" to use the + # hi_link + result = dst_broker.qmf_object.create("bridge", "HiPriBridge", + {"link":hi_link.name, + "src":"fedX.direct", + "dest":"fedX.direct", + "key":"high"}, False) + self.assertEqual(result.status, 0); + + + # create routes for the "medium" and "low" links to use the normal + # data_link + for _b in [("MediumBridge", "medium"), ("LowBridge", "low")]: + result = dst_broker.qmf_object.create("bridge", _b[0], + {"link":data_link.name, + "src":"fedX.direct", + "dest":"fedX.direct", + "key":_b[1]}, False) + self.assertEqual(result.status, 0); + + # now wait for the links to become operational + for _l in [hi_link, data_link]: + expire_time = time() + 30 + while _l.state != "Operational" and time() < expire_time: + _l.update() + self.assertEqual(_l.state, "Operational", "Link failed to become operational") + + # verify each link uses a different connection + self.assertNotEqual(hi_link.connectionRef, data_link.connectionRef, + "Different links using the same connection") + + hi_conn = qmf.getObjects(_broker=dst_broker.qmf_broker, + _objectId=hi_link.connectionRef)[0] + data_conn = qmf.getObjects(_broker=dst_broker.qmf_broker, + _objectId=data_link.connectionRef)[0] + + + # send hi data, verify only goes over hi link + + r_ssn = dst_broker.connection.session() + hi_receiver = r_ssn.receiver("HiQ"); + med_receiver = r_ssn.receiver("MedQ"); + low_receiver = r_ssn.receiver("LoQ"); + + for _c in [hi_conn, data_conn]: + _c.update() + self.assertEqual(_c.msgsToClient, 0, "Unexpected messages received") + + s_ssn = src_broker.connection.session() + hi_sender = s_ssn.sender("fedX.direct/high") + med_sender = s_ssn.sender("fedX.direct/medium") + low_sender = s_ssn.sender("fedX.direct/low") + + try: + hi_sender.send(qpid.messaging.Message(content="hi priority")) + msg = hi_receiver.fetch(timeout=10) + r_ssn.acknowledge() + self.assertEqual(msg.content, "hi priority"); + except: + self.fail("Hi Pri message failure") + + hi_conn.update() + data_conn.update() + self.assertEqual(hi_conn.msgsToClient, 1, "Expected 1 hi pri message") + self.assertEqual(data_conn.msgsToClient, 0, "Expected 0 data messages") + + # send low and medium, verify it does not go over hi link + + try: + med_sender.send(qpid.messaging.Message(content="medium priority")) + msg = med_receiver.fetch(timeout=10) + r_ssn.acknowledge() + self.assertEqual(msg.content, "medium priority"); + except: + self.fail("Medium Pri message failure") + + hi_conn.update() + data_conn.update() + self.assertEqual(hi_conn.msgsToClient, 1, "Expected 1 hi pri message") + self.assertEqual(data_conn.msgsToClient, 1, "Expected 1 data message") + + try: + low_sender.send(qpid.messaging.Message(content="low priority")) + msg = low_receiver.fetch(timeout=10) + r_ssn.acknowledge() + self.assertEqual(msg.content, "low priority"); + except: + self.fail("Low Pri message failure") + + hi_conn.update() + data_conn.update() + self.assertEqual(hi_conn.msgsToClient, 1, "Expected 1 hi pri message") + self.assertEqual(data_conn.msgsToClient, 2, "Expected 2 data message") + + # cleanup + + for _b in qmf.getObjects(_broker=dst_broker.qmf_broker,_class="bridge"): + result = _b.close() + self.assertEqual(result.status, 0) + + for _l in qmf.getObjects(_broker=dst_broker.qmf_broker,_class="link"): + result = _l.close() + self.assertEqual(result.status, 0) + + for _q in [("HiQ", "high"), ("MedQ", "medium"), ("LoQ", "low")]: + dst_broker.client_session.exchange_unbind(queue=_q[0], exchange="fedX.direct", binding_key=_q[1]) + dst_broker.client_session.queue_delete(queue=_q[0]) + + for _b in [src_broker, dst_broker]: + _b.client_session.exchange_delete(exchange="fedX.direct") + + self._teardown_brokers() + + self.verify_cleanup() + + + diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java index 900b722886..fbaba7afed 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java @@ -1955,6 +1955,12 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable return _obj.getAckBatching(); } + /* support TBD */ + public String getName() + { + return null; + } + public BrokerSchema.BridgeClass.CloseMethodResponseCommand close(final BrokerSchema.BridgeClass.CloseMethodResponseCommandFactory factory) { return null; @@ -2020,6 +2026,18 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable return _obj.getLastError(); } + /* support TBD */ + public String getName() + { + return null; + } + + /* support TBD */ + public BrokerSchema.ConnectionObject getConnectionRef() + { + return (BrokerSchema.ConnectionObject) null; + } + public BrokerSchema.LinkClass.CloseMethodResponseCommand close(final BrokerSchema.LinkClass.CloseMethodResponseCommandFactory factory) { _obj.close(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/LinkConfigType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/LinkConfigType.java index ea4f723dda..847cae87f5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/LinkConfigType.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/LinkConfigType.java @@ -76,7 +76,7 @@ public final class LinkConfigType extends ConfigObjectType<LinkConfigType, LinkC } }; - public static final LinkReadOnlyProperty<Integer> PORT_PROPERTY = new LinkReadOnlyProperty<Integer>("host") + public static final LinkReadOnlyProperty<Integer> PORT_PROPERTY = new LinkReadOnlyProperty<Integer>("port") { public Integer getValue(LinkConfig object) { @@ -134,4 +134,4 @@ public final class LinkConfigType extends ConfigObjectType<LinkConfigType, LinkC -}
\ No newline at end of file +} diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml index 66e122b049..06e0b99af0 100644 --- a/qpid/specs/management-schema.xml +++ b/qpid/specs/management-schema.xml @@ -379,10 +379,12 @@ This class represents an inter-broker connection. <property name="vhostRef" type="objId" references="Vhost" access="RC" index="y" parentRef="y"/> - <property name="host" type="sstr" access="RC" index="y"/> - <property name="port" type="uint16" access="RC" index="y"/> - <property name="transport" type="sstr" access="RC"/> + <property name="name" type="sstr" access="RC" index="y"/> + <property name="host" type="sstr" access="RO"/> + <property name="port" type="uint16" access="RO"/> + <property name="transport" type="sstr" access="RO"/> <property name="durable" type="bool" access="RC"/> + <property name="connectionRef" type="objId" references="Connection" access="RO"/> <statistic name="state" type="sstr" desc="Operational state of the link"/> <statistic name="lastError" type="lstr" desc="Reason link is not operational"/> @@ -411,7 +413,8 @@ --> <class name="Bridge"> <property name="linkRef" type="objId" references="Link" access="RC" index="y" parentRef="y"/> - <property name="channelId" type="uint16" access="RC" index="y"/> + <property name="name" type="sstr" access="RC" index="y"/> + <property name="channelId" type="uint16" access="RO"/> <property name="durable" type="bool" access="RC"/> <property name="src" type="sstr" access="RC"/> <property name="dest" type="sstr" access="RC"/> diff --git a/qpid/tools/src/py/qpid-tool b/qpid/tools/src/py/qpid-tool index af948b13a9..b31d93594c 100755 --- a/qpid/tools/src/py/qpid-tool +++ b/qpid/tools/src/py/qpid-tool @@ -455,6 +455,7 @@ class QmfData(Console): rows.append(row) else: print "No object found with ID %d" % dispId + return finally: self.lock.release() self.disp.table(caption, heads, rows) |