diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2012-04-04 19:51:02 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2012-04-04 19:51:02 +0000 |
commit | 43c80059b846b446280ff10fb6523436326f69bb (patch) | |
tree | 2d21c0a0a811d89495bb3b41d6ec6bfafe6c4594 | |
parent | 41ccf8ed8e38beee6ced8234c344086e3e640dea (diff) | |
download | qpid-python-43c80059b846b446280ff10fb6523436326f69bb.tar.gz |
QPID-3767: merge old fix onto branch
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3767@1309568 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Bridge.cpp | 82 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Bridge.h | 25 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 185 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Link.cpp | 123 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Link.h | 30 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/LinkRegistry.cpp | 239 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/LinkRegistry.h | 52 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/NameGenerator.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.cpp | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 19 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.h | 1 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/federation.py | 26 | ||||
-rw-r--r-- | qpid/specs/management-schema.xml | 11 |
17 files changed, 600 insertions, 219 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index 9a1f4be468..238d1ead6a 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -57,11 +57,11 @@ void Bridge::PushHandler::handle(framing::AMQFrame& frame) conn->received(frame); } -Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, - const _qmf::ArgsLinkBridge& _args, +Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id, + CancellationListener l, const _qmf::ArgsLinkBridge& _args, InitializeCallback init) : link(_link), id(_id), args(_args), mgmtObject(0), - listener(l), name(Uuid(true).str()), queueName("qpid.bridge_queue_"), persistenceId(0), + listener(l), name(_name), queueName("qpid.bridge_queue_"), persistenceId(0), initialize(init) { std::stringstream title; @@ -70,9 +70,10 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, ManagementAgent* agent = link->getBroker()->getManagementAgent(); if (agent != 0) { mgmtObject = new _qmf::Bridge - (agent, this, link, id, args.i_durable, args.i_src, args.i_dest, + (agent, this, link, name, args.i_durable, args.i_src, args.i_dest, args.i_key, args.i_srcIsQueue, args.i_srcIsLocal, args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync); + mgmtObject->set_channelId(id); agent->addObject(mgmtObject); } QPID_LOG(debug, "Bridge " << name << " created from " << args.i_src << " to " << args.i_dest); @@ -165,6 +166,7 @@ void Bridge::cancel(Connection&) QPID_LOG(debug, "Cancelled bridge " << name); } +/** Notify the bridge that the connection has closed */ void Bridge::closed() { if (args.i_dynamic) { @@ -174,9 +176,10 @@ void Bridge::closed() QPID_LOG(debug, "Closed bridge " << name); } -void Bridge::destroy() +/** Shut down the bridge */ +void Bridge::close() { - listener(this); + listener(name); // ask the LinkRegistry to destroy us } bool Bridge::isSessionReady() const @@ -190,8 +193,21 @@ void Bridge::setPersistenceId(uint64_t pId) const persistenceId = pId; } + +const std::string Bridge::ENCODED_IDENTIFIER("bridge.v2"); +const std::string Bridge::ENCODED_IDENTIFIER_V1("bridge"); + +bool Bridge::isEncodedBridge(const std::string& key) +{ + return key == ENCODED_IDENTIFIER || key == ENCODED_IDENTIFIER_V1; +} + + Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer) { + string kind; + buffer.getShortString(kind); + string host; uint16_t port; string src; @@ -199,9 +215,37 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer) string key; string id; string excludes; + string name; + + Link::shared_ptr link; + if (kind == ENCODED_IDENTIFIER_V1) { + /** previous versions identified the bridge by host:port, not by name, and + * transport wasn't provided. So create a unique name for the new bridge. + */ + + framing::Uuid uuid(true); + name = QPID_NAME_PREFIX + uuid.str(); + + buffer.getShortString(host); + port = buffer.getShort(); + + link = links.getLink(host, port); + if (!link) { + QPID_LOG(error, "Bridge::decode() failed: cannot find Link for host=" << host << ", port=" << port); + return Bridge::shared_ptr(); + } + } else { + string linkName; + + buffer.getShortString(name); + buffer.getShortString(linkName); + link = links.getLink(linkName); + if (!link) { + QPID_LOG(error, "Bridge::decode() failed: cannot find Link named='" << linkName << "'"); + return Bridge::shared_ptr(); + } + } - buffer.getShortString(host); - port = buffer.getShort(); bool durable(buffer.getOctet()); buffer.getShortString(src); buffer.getShortString(dest); @@ -213,15 +257,15 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer) bool dynamic(buffer.getOctet()); uint16_t sync = buffer.getShort(); - return links.declare(host, port, durable, src, dest, key, - is_queue, is_local, id, excludes, dynamic, sync).first; + return links.declare(name, *link, durable, src, dest, key, is_queue, + is_local, id, excludes, dynamic, sync).first; } void Bridge::encode(Buffer& buffer) const { - buffer.putShortString(string("bridge")); - buffer.putShortString(link->getHost()); - buffer.putShort(link->getPort()); + buffer.putShortString(ENCODED_IDENTIFIER); + buffer.putShortString(name); + buffer.putShortString(link->getName()); buffer.putOctet(args.i_durable ? 1 : 0); buffer.putShortString(args.i_src); buffer.putShortString(args.i_dest); @@ -236,9 +280,9 @@ void Bridge::encode(Buffer& buffer) const uint32_t Bridge::encodedSize() const { - return link->getHost().size() + 1 // short-string (host) - + 7 // short-string ("bridge") - + 2 // port + return ENCODED_IDENTIFIER.length() + 1 // +1 byte length + + name.length() + 1 + + link->getName().length() + 1 + 1 // durable + args.i_src.size() + 1 + args.i_dest.size() + 1 @@ -262,7 +306,8 @@ management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId, { if (methodId == _qmf::Bridge::METHOD_CLOSE) { //notify that we are closed - destroy(); + QPID_LOG(debug, "Bridge::close() method called on bridge '" << name << "'"); + close(); return management::Manageable::STATUS_OK; } else { return management::Manageable::STATUS_UNKNOWN_METHOD; @@ -321,7 +366,7 @@ void Bridge::ioThreadPropagateBinding(const string& queue, const string& exchang peer->getExchange().bind(queue, exchange, key, args); } else { QPID_LOG(error, "Cannot propagate binding for dynamic bridge as session has been detached, deleting dynamic bridge"); - destroy(); + close(); } } @@ -335,5 +380,4 @@ const string& Bridge::getLocalTag() const { return link->getBroker()->getFederationTag(); } - }} diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h index b849b11ba8..3ece4ba47c 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.h +++ b/qpid/cpp/src/qpid/broker/Bridge.h @@ -48,20 +48,21 @@ class Bridge : public PersistableConfig, public management::Manageable, public E { public: typedef boost::shared_ptr<Bridge> shared_ptr; - typedef boost::function<void(Bridge*)> CancellationListener; + typedef boost::function<void(const std::string&)> CancellationListener; typedef boost::function<void(Bridge&, SessionHandler&)> InitializeCallback; - Bridge(Link* link, framing::ChannelId id, CancellationListener l, + Bridge(const std::string& name, Link* link, framing::ChannelId id, CancellationListener l, const qmf::org::apache::qpid::broker::ArgsLinkBridge& args, InitializeCallback init ); ~Bridge(); - void create(Connection& c); - void cancel(Connection& c); - void closed(); - void destroy(); + void close(); bool isDurable() { return args.i_durable; } + Link *getLink() const { return link; } + const std::string getSrc() const { return args.i_src; } + const std::string getDest() const { return args.i_dest; } + const std::string getKey() const { return args.i_key; } bool isSessionReady() const; @@ -76,7 +77,11 @@ public: uint32_t encodedSize() const; void encode(framing::Buffer& buffer) const; const std::string& getName() const { return name; } + + static const std::string ENCODED_IDENTIFIER; + static const std::string ENCODED_IDENTIFIER_V1; static Bridge::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer); + static bool isEncodedBridge(const std::string& key); // Exchange::DynamicBridge methods void propagateBinding(const std::string& key, const std::string& tagList, const std::string& op, const std::string& origin, qpid::framing::FieldTable* extra_args=0); @@ -101,7 +106,7 @@ private: std::auto_ptr<framing::AMQP_ServerProxy::Session> session; std::auto_ptr<framing::AMQP_ServerProxy> peer; - Link* link; + Link* const link; framing::ChannelId id; qmf::org::apache::qpid::broker::ArgsLinkBridge args; qmf::org::apache::qpid::broker::Bridge* mgmtObject; @@ -114,6 +119,12 @@ private: InitializeCallback initialize; bool resetProxy(); + + // connection Management (called by owning Link) + void create(Connection& c); + void cancel(Connection& c); + void closed(); + friend class Link; // to call create, cancel, closed() }; diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 02111b4387..6b607f6d3d 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -438,7 +438,7 @@ Manageable* Broker::GetVhostObject(void) const Manageable::status_t Broker::ManagementMethod (uint32_t methodId, Args& args, - string&) + string& text) { Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; @@ -453,6 +453,14 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, status = Manageable::STATUS_OK; break; case _qmf::Broker::METHOD_CONNECT : { + /** Management is creating a Link to a remote broker using the host and port of + * the remote. This (old) interface does not allow management to specify a name + * for the link, nor does it allow multiple Links to the same remote. Use the + * "create()" broker method if these features are needed. + * TBD: deprecate this interface. + */ + QPID_LOG(warning, "The Broker::connect() method will be removed in a future release of QPID." + " Please use the Broker::create() method with type='link' instead."); _qmf::ArgsBrokerConnect& hp= dynamic_cast<_qmf::ArgsBrokerConnect&>(args); @@ -461,13 +469,24 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, "; durable=" << (hp.i_durable?"T":"F") << "; authMech=\"" << hp.i_authMechanism << "\""); if (!getProtocolFactory(transport)) { QPID_LOG(error, "Transport '" << transport << "' not supported"); + text = "transport type not supported"; return Manageable::STATUS_NOT_IMPLEMENTED; } - std::pair<Link::shared_ptr, bool> response = - links.declare (hp.i_host, hp.i_port, transport, hp.i_durable, - hp.i_authMechanism, hp.i_username, hp.i_password); - if (hp.i_durable && response.second) - store->create(*response.first); + + // Does a link to the remote already exist? If so, re-use the existing link + // - this behavior is backward compatible with previous releases. + if (!links.getLink(hp.i_host, hp.i_port, transport)) { + // new link, need to generate a unique name for it + framing::Uuid uuid(true); + std::pair<Link::shared_ptr, bool> response = + links.declare(QPID_NAME_PREFIX + uuid.str(), hp.i_host, hp.i_port, transport, + hp.i_durable, hp.i_authMechanism, hp.i_username, hp.i_password); + if (!response.first) { + text = "Unable to create Link"; + status = Manageable::STATUS_PARAMETER_INVALID; + break; + } + } status = Manageable::STATUS_OK; break; } @@ -538,6 +557,8 @@ const std::string TYPE_QUEUE("queue"); const std::string TYPE_EXCHANGE("exchange"); const std::string TYPE_TOPIC("topic"); const std::string TYPE_BINDING("binding"); +const std::string TYPE_LINK("link"); +const std::string TYPE_BRIDGE("bridge"); const std::string DURABLE("durable"); const std::string AUTO_DELETE("auto-delete"); const std::string ALTERNATE_EXCHANGE("alternate-exchange"); @@ -549,6 +570,26 @@ const std::string ATTRIBUTE_TIMESTAMP_0_10("timestamp-0.10"); const std::string _TRUE("true"); const std::string _FALSE("false"); + +// parameters for creating a Link object, see mgmt schema +const std::string HOST("host"); +const std::string PORT("port"); +const std::string TRANSPORT("transport"); +const std::string AUTH_MECHANISM("authMechanism"); +const std::string USERNAME("username"); +const std::string PASSWORD("password"); + +// parameters for creating a Bridge object, see mgmt schema +const std::string LINK("link"); +const std::string SRC("src"); +const std::string DEST("dest"); +const std::string KEY("key"); +const std::string TAG("tag"); +const std::string EXCLUDES("excludes"); +const std::string SRC_IS_QUEUE("srcIsQueue"); +const std::string SRC_IS_LOCAL("srcIsLocal"); +const std::string DYNAMIC("dynamic"); +const std::string SYNC("sync"); } struct InvalidBindingIdentifier : public qpid::Exception @@ -598,6 +639,25 @@ struct UnknownObjectType : public qpid::Exception std::string getPrefix() const { return "unknown object type"; } }; +struct ReservedObjectName : public qpid::Exception +{ + ReservedObjectName(const std::string& type) : qpid::Exception(type) {} + std::string getPrefix() const { return std::string("names prefixed with '") + + QPID_NAME_PREFIX + std::string("' are reserved"); } +}; + +struct UnsupportedTransport : public qpid::Exception +{ + UnsupportedTransport(const std::string& type) : qpid::Exception(type) {} + std::string getPrefix() const { return "transport is not supported"; } +}; + +struct InvalidParameter : public qpid::Exception +{ + InvalidParameter(const std::string& type) : qpid::Exception(type) {} + std::string getPrefix() const { return "invalid parameter to method call"; } +}; + void Broker::createObject(const std::string& type, const std::string& name, const Variant::Map& properties, bool /*strict*/, const ConnectionState* context) { @@ -669,6 +729,109 @@ void Broker::createObject(const std::string& type, const std::string& name, amqp_0_10::translate(extensions, arguments); bind(binding.queue, binding.exchange, binding.key, arguments, userId, connectionId); + + } else if (type == TYPE_LINK) { + + QPID_LOG (debug, "createObject: Link; name=" << name << "; args=" << properties ); + + if (name.compare(0, QPID_NAME_PREFIX.length(), QPID_NAME_PREFIX) == 0) { + QPID_LOG(error, "Link name='" << name << "' cannot use the reserved prefix '" << QPID_NAME_PREFIX << "'"); + throw ReservedObjectName(name); + } + + std::string host; + uint16_t port = 0; + std::string transport = TCP_TRANSPORT; + bool durable = false; + std::string authMech, username, password; + + if (!getProtocolFactory(transport)) { + QPID_LOG(error, "Transport '" << transport << "' not supported."); + throw UnsupportedTransport(transport); + } + + for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { + if (i->first == HOST) host = i->second.asString(); + else if (i->first == PORT) port = i->second.asUint16(); + else if (i->first == TRANSPORT) transport = i->second.asString(); + else if (i->first == DURABLE) durable = bool(i->second); + else if (i->first == AUTH_MECHANISM) authMech = i->second.asString(); + else if (i->first == USERNAME) username = i->second.asString(); + else if (i->first == PASSWORD) password = i->second.asString(); + else { + // TODO: strict checking here + } + } + + std::pair<boost::shared_ptr<Link>, bool> rc; + rc = links.declare(name, host, port, transport, durable, authMech, username, password); + if (!rc.first) { + QPID_LOG (error, "Failed to create Link object, name=" << name << " remote=" << host << ":" << port << + "; transport=" << transport << "; durable=" << (durable?"T":"F") << "; authMech=\"" << authMech << "\""); + throw InvalidParameter(name); + } + if (!rc.second) { + QPID_LOG (error, "Failed to create a new Link object, name=" << name << " already exists."); + throw ObjectAlreadyExists(name); + } + + } else if (type == TYPE_BRIDGE) { + + QPID_LOG (debug, "createObject: Bridge; name=" << name << "; args=" << properties ); + + if (name.compare(0, QPID_NAME_PREFIX.length(), QPID_NAME_PREFIX) == 0) { + QPID_LOG(error, "Bridge name='" << name << "' cannot use the reserved prefix '" << QPID_NAME_PREFIX << "'"); + throw ReservedObjectName(name); + } + + std::string linkName; + std::string src; + std::string dest; + std::string key; + std::string id; + std::string excludes; + bool durable = false; + bool srcIsQueue = false; + bool srcIsLocal = false; + bool dynamic = false; + uint16_t sync = 0; + + for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { + + if (i->first == LINK) linkName = i->second.asString(); + else if (i->first == SRC) src = i->second.asString(); + else if (i->first == DEST) dest = i->second.asString(); + else if (i->first == KEY) key = i->second.asString(); + else if (i->first == TAG) id = i->second.asString(); + else if (i->first == EXCLUDES) excludes = i->second.asString(); + else if (i->first == SRC_IS_QUEUE) srcIsQueue = bool(i->second); + else if (i->first == SRC_IS_LOCAL) srcIsLocal = bool(i->second); + else if (i->first == DYNAMIC) dynamic = bool(i->second); + else if (i->first == SYNC) sync = i->second.asUint16(); + else if (i->first == DURABLE) durable = bool(i->second); + else { + // TODO: strict checking here + } + } + + boost::shared_ptr<Link> link; + if (linkName.empty() || !(link = links.getLink(linkName))) { + QPID_LOG(error, "Link '" << linkName << "' not found; bridge create failed."); + throw InvalidParameter(name); + } + std::pair<Bridge::shared_ptr, bool> rc = + links.declare(name, *link, durable, src, dest, key, srcIsQueue, srcIsLocal, id, excludes, + dynamic, sync); + + if (!rc.first) { + QPID_LOG (error, "Failed to create Bridge object, name=" << name << " link=" << linkName << + "; src=" << src << "; dest=" << dest << "; key=" << key); + throw InvalidParameter(name); + } + if (!rc.second) { + QPID_LOG (error, "Failed to create a new Bridge object, name=" << name << " already exists."); + throw ObjectAlreadyExists(name); + } } else { throw UnknownObjectType(type); } @@ -691,6 +854,16 @@ void Broker::deleteObject(const std::string& type, const std::string& name, } else if (type == TYPE_BINDING) { BindingIdentifier binding(name); unbind(binding.queue, binding.exchange, binding.key, userId, connectionId); + } else if (type == TYPE_LINK) { + boost::shared_ptr<Link> link = links.getLink(name); + if (link) { + link->close(); + } + } else if (type == TYPE_BRIDGE) { + boost::shared_ptr<Bridge> bridge = links.getBridge(name); + if (bridge) { + bridge->close(); + } } else { throw UnknownObjectType(type); } diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index 56a90e7fb7..911ec0ac0c 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -65,18 +65,19 @@ struct LinkTimerTask : public sys::TimerTask { sys::Timer& timer; }; -Link::Link(LinkRegistry* _links, - MessageStore* _store, +Link::Link(const string& _name, + LinkRegistry* _links, const string& _host, uint16_t _port, const string& _transport, + DestroyedListener l, bool _durable, const string& _authMechanism, const string& _username, const string& _password, Broker* _broker, Manageable* parent) - : links(_links), store(_store), host(_host), port(_port), + : name(_name), links(_links), host(_host), port(_port), transport(_transport), durable(_durable), authMechanism(_authMechanism), username(_username), password(_password), @@ -88,6 +89,7 @@ Link::Link(LinkRegistry* _links, channelCounter(1), connection(0), agent(0), + listener(l), timerTask(new LinkTimerTask(*this, broker->getTimer())) { if (parent != 0 && broker != 0) @@ -95,7 +97,10 @@ Link::Link(LinkRegistry* _links, agent = broker->getManagementAgent(); if (agent != 0) { - mgmtObject = new _qmf::Link(agent, this, parent, _host, _port, _transport, _durable); + mgmtObject = new _qmf::Link(agent, this, parent, name, durable); + mgmtObject->set_host(host); + mgmtObject->set_port(port); + mgmtObject->set_transport(transport); agent->addObject(mgmtObject, 0, durable); } } @@ -172,6 +177,10 @@ void Link::established(Connection* c) currentInterval = 1; visitCount = 0; connection = c; + if (!hideManagement() && connection->GetManagementObject()) { + mgmtObject->set_connectionRef(connection->GetManagementObject()->getObjectId()); + } + if (closing) destroy(); else // Process any IO tasks bridges added before established. @@ -224,13 +233,14 @@ void Link::closed(int, std::string text) setStateLH(STATE_WAITING); if (!hideManagement()) mgmtObject->set_lastError (text); + mgmtObject->set_connectionRef(qpid::management::ObjectId()); } if (closing) destroy(); } -// Called in connection IO thread. +// Called in connection IO thread, cleans up the connection before destroying Link void Link::destroy () { Bridges toDelete; @@ -259,9 +269,9 @@ void Link::destroy () } // Now delete all bridges on this link (don't hold the lock for this). for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++) - (*i)->destroy(); + (*i)->close(); toDelete.clear(); - links->destroy (host, port); + listener(name); // notify LinkRegistry that this Link has been destroyed } void Link::add(Bridge::shared_ptr bridge) @@ -365,12 +375,16 @@ void Link::reconnectLH(const Address& a) host = a.host; port = a.port; transport = a.protocol; - startConnectionLH(); + if (!hideManagement()) { stringstream errorString; - errorString << "Failed over to " << a; + errorString << "Failing over to '" << transport << ":" << host << ":" << port <<"'"; mgmtObject->set_lastError(errorString.str()); + mgmtObject->set_host(host); + mgmtObject->set_port(port); + mgmtObject->set_transport(transport); } + startConnectionLH(); } bool Link::tryFailoverLH() { @@ -379,15 +393,14 @@ bool Link::tryFailoverLH() { if (url.empty()) return false; Address next = url[reconnectNext++]; if (next.host != host || next.port != port || next.protocol != transport) { - links->changeAddress(Address(transport, host, port), next); - QPID_LOG(debug, "Inter-broker link failing over to " << next.host << ":" << next.port); + QPID_LOG(notice, "Inter-broker link '" << name << "' failing over to " << next.host << ":" << next.port); reconnectLH(next); return true; } return false; } -// Management updates for a linke are inconsistent in a cluster, so they are +// Management updates for a link are inconsistent in a cluster, so they are // suppressed. bool Link::hideManagement() const { return !mgmtObject || ( broker && broker->isInCluster()); @@ -415,18 +428,39 @@ void Link::setPersistenceId(uint64_t id) const const string& Link::getName() const { - return host; + return name; +} + +const std::string Link::ENCODED_IDENTIFIER("link.v2"); +const std::string Link::ENCODED_IDENTIFIER_V1("link"); + +bool Link::isEncodedLink(const std::string& key) +{ + return key == ENCODED_IDENTIFIER || key == ENCODED_IDENTIFIER_V1; } Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer) { + string kind; + buffer.getShortString(kind); + string host; uint16_t port; string transport; string authMechanism; string username; string password; - + string name; + + if (kind == ENCODED_IDENTIFIER_V1) { + /** previous versions identified the Link by host:port, there was no name + * assigned. So create a unique name for the new Link. + */ + framing::Uuid uuid(true); + name = QPID_NAME_PREFIX + uuid.str(); + } else { + buffer.getShortString(name); + } buffer.getShortString(host); port = buffer.getShort(); buffer.getShortString(transport); @@ -435,12 +469,14 @@ Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer) buffer.getShortString(username); buffer.getShortString(password); - return links.declare(host, port, transport, durable, authMechanism, username, password).first; + return links.declare(name, host, port, transport, durable, authMechanism, + username, password).first; } void Link::encode(Buffer& buffer) const { - buffer.putShortString(string("link")); + buffer.putShortString(ENCODED_IDENTIFIER); + buffer.putShortString(name); buffer.putShortString(host); buffer.putShort(port); buffer.putShortString(transport); @@ -452,7 +488,9 @@ void Link::encode(Buffer& buffer) const uint32_t Link::encodedSize() const { - return host.size() + 1 // short-string (host) + return ENCODED_IDENTIFIER.length() + 1 // +1 byte length + + name.length() + 1 + + host.size() + 1 // short-string (host) + 5 // short-string ("link") + 2 // port + transport.size() + 1 // short-string(transport) @@ -468,6 +506,7 @@ ManagementObject* Link::GetManagementObject (void) const } void Link::close() { + QPID_LOG(debug, "Link::close(), link=" << name ); Mutex::ScopedLock mutex(lock); if (!closing) { closing = true; @@ -488,36 +527,32 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& te return Manageable::STATUS_OK; case _qmf::Link::METHOD_BRIDGE : + /* TBD: deprecate this interface in favor of the Broker::create() method. The + * Broker::create() method allows the user to assign a name to the bridge. + */ + QPID_LOG(warning, "The Link::bridge() method will be removed in a future release of QPID." + " Please use the Broker::create() method with type='bridge' instead."); _qmf::ArgsLinkBridge& iargs = (_qmf::ArgsLinkBridge&) args; - QPID_LOG(debug, "Link::bridge() request received"); - - // Durable bridges are only valid on durable links - if (iargs.i_durable && !durable) { - text = "Can't create a durable route on a non-durable link"; - return Manageable::STATUS_USER; - } - - if (iargs.i_dynamic) { - Exchange::shared_ptr exchange = getBroker()->getExchanges().get(iargs.i_src); - if (exchange.get() == 0) { - text = "Exchange not found"; - return Manageable::STATUS_USER; - } - if (!exchange->supportsDynamicBinding()) { - text = "Exchange type does not support dynamic routing"; - return Manageable::STATUS_USER; + QPID_LOG(debug, "Link::bridge() request received; src=" << iargs.i_src << + "; dest=" << iargs.i_dest << "; key=" << iargs.i_key); + + // Does a bridge already exist that has the src/dest/key? If so, re-use the + // existing bridge - this behavior is backward compatible with previous releases. + Bridge::shared_ptr bridge = links->getBridge(*this, iargs.i_src, iargs.i_dest, iargs.i_key); + if (!bridge) { + // need to create a new bridge on this link + framing::Uuid uuid(true); + const std::string name(QPID_NAME_PREFIX + uuid.str()); + std::pair<Bridge::shared_ptr, bool> rc = + links->declare( name, *this, iargs.i_durable, + iargs.i_src, iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue, + iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes, + iargs.i_dynamic, iargs.i_sync); + if (!rc.first) { + text = "invalid parameters"; + return Manageable::STATUS_PARAMETER_INVALID; } } - - std::pair<Bridge::shared_ptr, bool> result = - links->declare (host, port, iargs.i_durable, iargs.i_src, - iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue, - iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes, - iargs.i_dynamic, iargs.i_sync); - - if (result.second && iargs.i_durable) - store->create(*result.first); - return Manageable::STATUS_OK; } diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h index c7c8209db3..7907460721 100644 --- a/qpid/cpp/src/qpid/broker/Link.h +++ b/qpid/cpp/src/qpid/broker/Link.h @@ -25,7 +25,6 @@ #include <boost/shared_ptr.hpp> #include "qpid/Url.h" #include "qpid/broker/BrokerImportExport.h" -#include "qpid/broker/MessageStore.h" #include "qpid/broker/PersistableConfig.h" #include "qpid/broker/Bridge.h" #include "qpid/broker/BrokerImportExport.h" @@ -51,8 +50,8 @@ class Connection; class Link : public PersistableConfig, public management::Manageable { private: sys::Mutex lock; + const std::string name; LinkRegistry* links; - MessageStore* store; std::string host; uint16_t port; std::string transport; @@ -77,6 +76,7 @@ class Link : public PersistableConfig, public management::Manageable { uint channelCounter; Connection* connection; management::ManagementAgent* agent; + boost::function<void(const std::string&)> listener; boost::intrusive_ptr<sys::TimerTask> timerTask; @@ -91,26 +91,29 @@ class Link : public PersistableConfig, public management::Manageable { void setStateLH (int newState); void startConnectionLH(); // Start the IO Connection - void destroy(); // Called when mgmt deletes this link + void destroy(); // Cleanup connection before link goes away void ioThreadProcessing(); // Called on connection's IO thread by request bool tryFailoverLH(); // Called during maintenance visit bool hideManagement() const; + void reconnectLH(const Address&); //called by LinkRegistry - void established(Connection*); // Called when connection is create + // connection management (called by LinkRegistry) + void established(Connection*); // Called when connection is created void opened(); // Called when connection is open (after create) void closed(int, std::string); // Called when connection goes away - void reconnectLH(const Address&); //called by LinkRegistry - - friend class LinkRegistry; // to call established, opened, closed + void notifyConnectionForced(const std::string text); + friend class LinkRegistry; // to call established, opened, closed public: typedef boost::shared_ptr<Link> shared_ptr; + typedef boost::function<void(const std::string&)> DestroyedListener; - Link(LinkRegistry* links, - MessageStore* store, + Link(const std::string& name, + LinkRegistry* links, const std::string& host, uint16_t port, const std::string& transport, + DestroyedListener l, bool durable, const std::string& authMechanism, const std::string& username, @@ -130,15 +133,17 @@ class Link : public PersistableConfig, public management::Manageable { void cancel(Bridge::shared_ptr); QPID_BROKER_EXTERN void setUrl(const Url&); // Set URL for reconnection. - QPID_BROKER_EXTERN void close(); // Close the link from within the broker. + + // Close the link. + QPID_BROKER_EXTERN void close(); std::string getAuthMechanism() { return authMechanism; } std::string getUsername() { return username; } std::string getPassword() { return password; } Broker* getBroker() { return broker; } - void notifyConnectionForced(const std::string text); void setPassive(bool p); + bool isConnecting() const { return state == STATE_CONNECTING; } // PersistableConfig: void setPersistenceId(uint64_t id) const; @@ -147,7 +152,10 @@ class Link : public PersistableConfig, public management::Manageable { void encode(framing::Buffer& buffer) const; const std::string& getName() const; + static const std::string ENCODED_IDENTIFIER; + static const std::string ENCODED_IDENTIFIER_V1; static Link::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer); + static bool isEncodedLink(const std::string& key); // Manageable entry points management::ManagementObject* GetManagementObject(void) const; diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp index c6c5a1ac05..a0cf3555c3 100644 --- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp @@ -68,27 +68,34 @@ LinkRegistry::LinkRegistry (Broker* _broker) : LinkRegistry::~LinkRegistry() {} +/** find link by current remote address */ +boost::shared_ptr<Link> LinkRegistry::getLink(const std::string& host, + uint16_t port, + const std::string& transport) +{ + Mutex::ScopedLock locker(lock); + for (LinkMap::iterator i = links.begin(); i != links.end(); ++i) { + Link::shared_ptr& link = i->second; + if (link->getHost() == host && + link->getPort() == port && + (transport.empty() || link->getTransport() == transport)) + return link; + } + return boost::shared_ptr<Link>(); +} -void LinkRegistry::changeAddress(const qpid::Address& oldAddress, const qpid::Address& newAddress) +/** find link by name */ +boost::shared_ptr<Link> LinkRegistry::getLink(const std::string& name) { Mutex::ScopedLock locker(lock); - std::string oldKey = createKey(oldAddress); - std::string newKey = createKey(newAddress); - if (links.find(newKey) != links.end()) { - QPID_LOG(error, "Attempted to update key from " << oldKey << " to " << newKey << " which is already in use"); - } else { - LinkMap::iterator i = links.find(oldKey); - if (i == links.end()) { - QPID_LOG(error, "Attempted to update key from " << oldKey << " which does not exist, to " << newKey); - } else { - links[newKey] = i->second; - links.erase(oldKey); - QPID_LOG(info, "Updated link key from " << oldKey << " to " << newKey); - } - } + LinkMap::iterator l = links.find(name); + if (l != links.end()) + return l->second; + return boost::shared_ptr<Link>(); } -pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& host, +pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& name, + const string& host, uint16_t port, const string& transport, bool durable, @@ -98,24 +105,53 @@ pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& host, { Mutex::ScopedLock locker(lock); - string key = createKey(host, port); - LinkMap::iterator i = links.find(key); + LinkMap::iterator i = links.find(name); if (i == links.end()) { Link::shared_ptr link; - link = Link::shared_ptr (new Link (this, store, host, port, transport, durable, - authMechanism, username, password, - broker, parent)); - links[key] = link; + link = Link::shared_ptr (new Link (name, this, host, port, transport, + boost::bind(&LinkRegistry::linkDestroyed, this, _1), + durable, authMechanism, username, password, broker, + parent)); + if (durable && store) store->create(*link); + links[name] = link; + QPID_LOG(debug, "Creating new link; name=" << name ); return std::pair<Link::shared_ptr, bool>(link, true); } return std::pair<Link::shared_ptr, bool>(i->second, false); } -pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host, - uint16_t port, +/** find bridge by link & route info */ +Bridge::shared_ptr LinkRegistry::getBridge(const Link& link, + const std::string& src, + const std::string& dest, + const std::string& key) +{ + Mutex::ScopedLock locker(lock); + for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) { + if (i->second->getSrc() == src && i->second->getDest() == dest && + i->second->getKey() == key && i->second->getLink() && + i->second->getLink()->getName() == link.getName()) { + return i->second; + } + } + return Bridge::shared_ptr(); +} + +/** find bridge by name */ +Bridge::shared_ptr LinkRegistry::getBridge(const std::string& name) +{ + Mutex::ScopedLock locker(lock); + BridgeMap::iterator b = bridges.find(name); + if (b != bridges.end()) + return b->second; + return Bridge::shared_ptr(); +} + +pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& name, + Link& link, bool durable, const std::string& src, const std::string& dest, @@ -130,18 +166,26 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host, ) { Mutex::ScopedLock locker(lock); - QPID_LOG(debug, "Bridge declared " << host << ": " << port << " from " << src << " to " << dest << " (" << key << ")"); - string linkKey = createKey(host, port); - stringstream keystream; - keystream << linkKey << "!" << src << "!" << dest << "!" << key; - string bridgeKey = keystream.str(); + // Durable bridges are only valid on durable links + if (durable && !link.isDurable()) { + QPID_LOG(error, "Can't create a durable route '" << name << "' on a non-durable link '" << link.getName()); + return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false); + } - LinkMap::iterator l = links.find(linkKey); - if (l == links.end()) - return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false); + if (dynamic) { + Exchange::shared_ptr exchange = broker->getExchanges().get(src); + if (exchange.get() == 0) { + QPID_LOG(error, "Exchange not found, name='" << src << "'" ); + return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false); + } + if (!exchange->supportsDynamicBinding()) { + QPID_LOG(error, "Exchange type does not support dynamic routing, name='" << src << "'"); + return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false); + } + } - BridgeMap::iterator b = bridges.find(bridgeKey); + BridgeMap::iterator b = bridges.find(name); if (b == bridges.end()) { _qmf::ArgsLinkBridge args; @@ -159,23 +203,29 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host, args.i_sync = sync; bridge = Bridge::shared_ptr - (new Bridge (l->second.get(), l->second->nextChannel(), - boost::bind(&LinkRegistry::destroy, this, - host, port, src, dest, key), - args, init)); - bridges[bridgeKey] = bridge; - l->second->add(bridge); + (new Bridge (name, &link, link.nextChannel(), + boost::bind(&LinkRegistry::destroyBridge, this, _1), + args, init)); + bridges[name] = bridge; + link.add(bridge); + if (durable && store) + store->create(*bridge); + + QPID_LOG(debug, "Bridge '" << name <<"' declared on link '" << link.getName() << + "' from " << src << " to " << dest << " (" << key << ")"); + return std::pair<Bridge::shared_ptr, bool>(bridge, true); } return std::pair<Bridge::shared_ptr, bool>(b->second, false); } -void LinkRegistry::destroy(const string& host, const uint16_t port) +/** called back by the link when it has completed its cleanup and can be removed. */ +void LinkRegistry::linkDestroyed(const std::string& name) { + QPID_LOG(debug, "LinkRegistry::destroy(); link= " << name); Mutex::ScopedLock locker(lock); - string key = createKey(host, port); - LinkMap::iterator i = links.find(key); + LinkMap::iterator i = links.find(name); if (i != links.end()) { if (i->second->isDurable() && store) @@ -184,27 +234,20 @@ void LinkRegistry::destroy(const string& host, const uint16_t port) } } -void LinkRegistry::destroy(const std::string& host, - const uint16_t port, - const std::string& src, - const std::string& dest, - const std::string& key) +/** called back by bridge when its destruction has been requested */ +void LinkRegistry::destroyBridge(const std::string& name) { + QPID_LOG(debug, "LinkRegistry::destroy(); bridge= " << name); Mutex::ScopedLock locker(lock); - string linkKey = createKey(host, port); - stringstream keystream; - keystream << linkKey << "!" << src << "!" << dest << "!" << key; - string bridgeKey = keystream.str(); - LinkMap::iterator l = links.find(linkKey); - if (l == links.end()) - return; - - BridgeMap::iterator b = bridges.find(bridgeKey); + BridgeMap::iterator b = bridges.find(name); if (b == bridges.end()) return; - l->second->cancel(b->second); + Link *link = b->second->getLink(); + if (link) { + link->cancel(b->second); + } if (b->second->isDurable()) store->destroy(*(b->second)); bridges.erase(b); @@ -219,26 +262,64 @@ MessageStore* LinkRegistry::getStore() const { return store; } -Link::shared_ptr LinkRegistry::findLink(const std::string& keyOrMgmtId) -{ - // Convert keyOrMgmtId to a host:port key. - // - // TODO aconway 2011-02-01: centralize code that constructs/parses - // connection management IDs. Currently sys:: protocol factories - // and IO plugins construct the IDs and LinkRegistry parses them. - size_t separator = keyOrMgmtId.find('-'); - if (separator == std::string::npos) separator = 0; - std::string key = keyOrMgmtId.substr(separator+1, std::string::npos); +namespace { + void extractHostPort(const std::string& connId, std::string *host, uint16_t *port) + { + // Extract host and port of remote broker from connection id string. + // + // TODO aconway 2011-02-01: centralize code that constructs/parses + // connection management IDs. Currently sys:: protocol factories + // and IO plugins construct the IDs and LinkRegistry parses them. + // current format assumed: "localhost:port-remotehost:port", asserts + // provided to alert us if this assumption changes! + size_t separator = connId.find('-'); + assert(separator != std::string::npos); + std::string remote = connId.substr(separator+1, std::string::npos); + separator = remote.find(":"); + assert(separator != std::string::npos); + *host = remote.substr(0, separator); + // IPv6 - host is bracketed by "[]", strip them + if ((*host)[0] == '[' && (*host)[host->length() - 1] == ']') + *host = host->substr(1, host->length() - 1); + *port = boost::lexical_cast<uint16_t>(remote.substr(separator+1, std::string::npos)); + } +} +/** find the Link that corresponds to the given connection */ +Link::shared_ptr LinkRegistry::findLink(const std::string& connId) +{ Mutex::ScopedLock locker(lock); - LinkMap::iterator l = links.find(key); - if (l != links.end()) return l->second; - else return Link::shared_ptr(); + ConnectionMap::iterator c = connections.find(connId); + if (c != connections.end()) { + LinkMap::iterator l = links.find(c->second); + if (l != links.end()) + return l->second; + } + return Link::shared_ptr(); } void LinkRegistry::notifyConnection(const std::string& key, Connection* c) { - Link::shared_ptr link = findLink(key); + // find a link that is attempting to connect to the remote, and + // create a mapping from connection id to link + QPID_LOG(debug, "LinkRegistry::notifyConnection(); key=" << key ); + std::string host; + uint16_t port; + extractHostPort( key, &host, &port ); + Link::shared_ptr link; + { + Mutex::ScopedLock locker(lock); + for (LinkMap::iterator l = links.begin(); l != links.end(); ++l) { + if (l->second->isConnecting() && + l->second->getHost() == host && + l->second->getPort() == port) { + link = l->second; + connections[key] = link->getName(); + break; + } + } + } + if (link) { link->established(c); c->setUserId(str(format("%1%@%2%") % link->getUsername() % realm)); @@ -336,20 +417,6 @@ std::string LinkRegistry::getAuthIdentity(const std::string& key) } -std::string LinkRegistry::createKey(const qpid::Address& a) { - // TODO aconway 2010-05-11: key should also include protocol/transport to - // be unique. Requires refactor of LinkRegistry interface. - return createKey(a.host, a.port); -} - -std::string LinkRegistry::createKey(const std::string& host, uint16_t port) { - // TODO aconway 2010-05-11: key should also include protocol/transport to - // be unique. Requires refactor of LinkRegistry interface. - stringstream keystream; - keystream << host << ":" << port; - return keystream.str(); -} - void LinkRegistry::setPassive(bool p) { Mutex::ScopedLock locker(lock); @@ -362,10 +429,12 @@ void LinkRegistry::setPassive(bool p) } void LinkRegistry::eachLink(boost::function<void(boost::shared_ptr<Link>)> f) { + Mutex::ScopedLock locker(lock); for (LinkMap::iterator i = links.begin(); i != links.end(); ++i) f(i->second); } void LinkRegistry::eachBridge(boost::function<void(boost::shared_ptr<Bridge>)> f) { + Mutex::ScopedLock locker(lock); for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) f(i->second); } diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.h b/qpid/cpp/src/qpid/broker/LinkRegistry.h index 8e9d2f4b0d..614fb1ab3e 100644 --- a/qpid/cpp/src/qpid/broker/LinkRegistry.h +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.h @@ -42,9 +42,11 @@ namespace broker { class LinkRegistry { typedef std::map<std::string, boost::shared_ptr<Link> > LinkMap; typedef std::map<std::string, Bridge::shared_ptr> BridgeMap; + typedef std::map<std::string, std::string> ConnectionMap; - LinkMap links; - BridgeMap bridges; + LinkMap links; /** indexed by name of Link */ + BridgeMap bridges; /** indexed by name of Bridge */ + ConnectionMap connections; /** indexed by connection identifier, gives link name */ qpid::sys::Mutex lock; Broker* broker; @@ -54,15 +56,18 @@ namespace broker { std::string realm; boost::shared_ptr<Link> findLink(const std::string& key); - static std::string createKey(const Address& address); - static std::string createKey(const std::string& host, uint16_t port); - // Methods called by the connection observer. + // Methods called by the connection observer, key is connection identifier void notifyConnection (const std::string& key, Connection* c); void notifyOpened (const std::string& key); void notifyClosed (const std::string& key); void notifyConnectionForced (const std::string& key, const std::string& text); - friend class LinkRegistryConnectionObserver; + friend class LinkRegistryConnectionObserver; + + /** Notify the registry that a Link has been destroyed */ + void linkDestroyed(const std::string& name); + /** Request to destroy a Bridge */ + void destroyBridge(const std::string& name); public: QPID_BROKER_EXTERN LinkRegistry (); // Only used in store tests @@ -70,17 +75,25 @@ namespace broker { QPID_BROKER_EXTERN ~LinkRegistry(); QPID_BROKER_EXTERN std::pair<boost::shared_ptr<Link>, bool> - declare(const std::string& host, + declare(const std::string& name, + const std::string& host, uint16_t port, const std::string& transport, bool durable, const std::string& authMechanism, const std::string& username, const std::string& password); + /** determine if Link exists */ + QPID_BROKER_EXTERN boost::shared_ptr<Link> + getLink(const std::string& name); + QPID_BROKER_EXTERN boost::shared_ptr<Link> + getLink(const std::string& host, + uint16_t port, + const std::string& transport = std::string()); QPID_BROKER_EXTERN std::pair<Bridge::shared_ptr, bool> - declare(const std::string& host, - uint16_t port, + declare(const std::string& name, + Link& link, bool durable, const std::string& src, const std::string& dest, @@ -93,14 +106,14 @@ namespace broker { uint16_t sync, Bridge::InitializeCallback=0 ); - - QPID_BROKER_EXTERN void destroy(const std::string& host, const uint16_t port); - - QPID_BROKER_EXTERN void destroy(const std::string& host, - const uint16_t port, - const std::string& src, - const std::string& dest, - const std::string& key); + /** determine if Bridge exists */ + QPID_BROKER_EXTERN Bridge::shared_ptr + getBridge(const std::string& name); + QPID_BROKER_EXTERN Bridge::shared_ptr + getBridge(const Link& link, + const std::string& src, + const std::string& dest, + const std::string& key); /** * Register the manageable parent for declared queues @@ -126,11 +139,6 @@ namespace broker { QPID_BROKER_EXTERN uint16_t getPort (const std::string& key); /** - * Called by links failing over to new address - */ - void changeAddress(const Address& oldAddress, const Address& newAddress); - - /** * Called to alter passive state. In passive state the links * and bridges managed by a link registry will be recorded and * updated but links won't actually establish connections and diff --git a/qpid/cpp/src/qpid/broker/NameGenerator.h b/qpid/cpp/src/qpid/broker/NameGenerator.h index 6ea25c9797..2e9f7febe2 100644 --- a/qpid/cpp/src/qpid/broker/NameGenerator.h +++ b/qpid/cpp/src/qpid/broker/NameGenerator.h @@ -32,6 +32,7 @@ namespace qpid { NameGenerator(const std::string& base); std::string generate(); }; + const std::string QPID_NAME_PREFIX("qpid."); // reserved for private names } } diff --git a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index d08409695e..858535637a 100644 --- a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -144,11 +144,13 @@ RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const RecoverableConfig::shared_ptr RecoveryManagerImpl::recoverConfig(framing::Buffer& buffer) { string kind; - + uint32_t p = buffer.getPosition(); buffer.getShortString (kind); - if (kind == "link") + buffer.setPosition(p); + + if (Link::isEncodedLink(kind)) return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Link::decode (links, buffer))); - else if (kind == "bridge") + else if (Bridge::isEncodedBridge(kind)) return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Bridge::decode (links, buffer))); return RecoverableConfig::shared_ptr(); // TODO: raise an exception instead diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index fc6ada096f..ce4c618885 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -781,16 +781,18 @@ void Connection::managementSetupState( void Connection::config(const std::string& encoded) { Buffer buf(const_cast<char*>(encoded.data()), encoded.size()); string kind; + uint32_t p = buf.getPosition(); buf.getShortString (kind); - if (kind == "link") { + buf.setPosition(p); + if (broker::Link::isEncodedLink(kind)) { broker::Link::shared_ptr link = - broker::Link::decode(cluster.getBroker().getLinks(), buf); + broker::Link::decode(cluster.getBroker().getLinks(), buf); QPID_LOG(debug, cluster << " updated link " << link->getHost() << ":" << link->getPort()); } - else if (kind == "bridge") { + else if (broker::Bridge::isEncodedBridge(kind)) { broker::Bridge::shared_ptr bridge = - broker::Bridge::decode(cluster.getBroker().getLinks(), buf); + broker::Bridge::decode(cluster.getBroker().getLinks(), buf); QPID_LOG(debug, cluster << " updated bridge " << bridge->getName()); } else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind)); diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index 3d65e07202..8cd82a85f6 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -54,8 +54,10 @@ void Backup::initialize(const Url& url) { assert(!url.empty()); QPID_LOG(notice, "HA: Backup started: " << url); string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; + framing::Uuid uuid(true); // Declare the link std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare( + broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(), url[0].host, url[0].port, protocol, false, // durable settings.mechanism, settings.username, settings.password); diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 609a3378ad..915cee9e58 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -189,7 +189,8 @@ BrokerReplicator::BrokerReplicator(const boost::shared_ptr<Link>& l) QPID_LOG(info, "HA: Backup replicating from " << link->getTransport() << ":" << link->getHost() << ":" << link->getPort()); broker.getLinks().declare( - link->getHost(), link->getPort(), + QPID_CONFIGURATION_REPLICATOR + ".bridge", // name for bridge + *link, false, // durable QPID_CONFIGURATION_REPLICATOR, // src QPID_CONFIGURATION_REPLICATOR, // dest diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index ad6719f207..5c1dfec2f6 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -116,7 +116,9 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue); Url url(bq_args.i_broker); string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; + framing::Uuid uuid(true); std::pair<broker::Link::shared_ptr, bool> result = broker.getLinks().declare( + broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(), url[0].host, url[0].port, protocol, false, // durable settings.mechanism, settings.username, settings.password); diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 6aff4879e3..13e9079c7f 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -53,15 +53,19 @@ std::string QueueReplicator::replicatorName(const std::string& queueName) { QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l) : Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q), link(l) { + bridgeName = replicatorName(q->getName()); logPrefix = "HA: Backup " + queue->getName() + ": "; QPID_LOG(info, logPrefix << "Created, settings: " << q->getSettings()); } // This must be separate from the constructor so we can call shared_from_this. void QueueReplicator::activate() { - // Note this may create a new bridge or use an existing one. + // Create a new route over the link + sys::Mutex::ScopedLock l(lock); + std::pair<Bridge::shared_ptr, bool> result = queue->getBroker()->getLinks().declare( - link->getHost(), link->getPort(), + bridgeName, + *link, false, // durable queue->getName(), // src getName(), // dest @@ -76,15 +80,19 @@ void QueueReplicator::activate() { // before initializeBridge is called. boost::bind(&QueueReplicator::initializeBridge, this, _1, _2, shared_from_this()) ); + bridge = result.first; } QueueReplicator::~QueueReplicator() {} void QueueReplicator::deactivate() { + // destroy the route sys::Mutex::ScopedLock l(lock); - queue->getBroker()->getLinks().destroy( - link->getHost(), link->getPort(), queue->getName(), getName(), string()); - QPID_LOG(debug, logPrefix << "Deactivated bridge " << bridgeName); + if (bridge) { + bridge->close(); + bridge.reset(); + QPID_LOG(debug, logPrefix << "Deactivated bridge " << bridgeName); + } } // Called in a broker connection thread when the bridge is created. @@ -92,7 +100,6 @@ void QueueReplicator::deactivate() { void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler, boost::shared_ptr<QueueReplicator> /*self*/) { sys::Mutex::ScopedLock l(lock); - bridgeName = bridge.getName(); framing::AMQP_ServerProxy peer(sessionHandler.out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); framing::FieldTable settings; diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h index a1ebbd788a..1fc43f3a45 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.h +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h @@ -79,6 +79,7 @@ class QueueReplicator : public broker::Exchange, sys::Mutex lock; boost::shared_ptr<broker::Queue> queue; boost::shared_ptr<broker::Link> link; + boost::shared_ptr<broker::Bridge> bridge; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py index 7d613b98ce..5f483ad258 100755 --- a/qpid/cpp/src/tests/federation.py +++ b/qpid/cpp/src/tests/federation.py @@ -127,18 +127,28 @@ class FederationTests(TestBase010): self.verify_cleanup() def test_pull_from_exchange(self): + """ This test uses an alternative method to manage links and bridges + via the broker object. + """ session = self.session - + self.startQmf() qmf = self.qmf broker = qmf.getObjects(_class="broker")[0] - result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") - self.assertEqual(result.status, 0, result) - link = qmf.getObjects(_class="link")[0] - result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, False, False, 0) + # create link + link_args = {"host":self.remote_host(), "port":self.remote_port(), "durable":False, + "authMechanism":"PLAIN", "username":"guest", "password":"guest", + "transport":"tcp"} + result = broker.create("link", "test-link-1", link_args, False) self.assertEqual(result.status, 0, result) + link = qmf.getObjects(_class="link")[0] + # create bridge + bridge_args = {"link":"test-link-1", "src":"amq.direct", "dest":"amq.fanout", + "key":"my-key"} + result = broker.create("bridge", "test-bridge-1", bridge_args, False); + self.assertEqual(result.status, 0, result) bridge = qmf.getObjects(_class="bridge")[0] #setup queue to receive messages from local broker @@ -164,9 +174,11 @@ class FederationTests(TestBase010): self.fail("Got unexpected message in queue: " + extra.body) except Empty: None - result = bridge.close() + + result = broker.delete("bridge", "test-bridge-1", {}) self.assertEqual(result.status, 0, result) - result = link.close() + + result = broker.delete("link", "test-link-1", {}) self.assertEqual(result.status, 0, result) self.verify_cleanup() diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml index 9eafbc52fa..f0f5c0f9d2 100644 --- a/qpid/specs/management-schema.xml +++ b/qpid/specs/management-schema.xml @@ -379,10 +379,12 @@ This class represents an inter-broker connection. <property name="vhostRef" type="objId" references="Vhost" access="RC" index="y" parentRef="y"/> - <property name="host" type="sstr" access="RC" index="y"/> - <property name="port" type="uint16" access="RC" index="y"/> - <property name="transport" type="sstr" access="RC"/> + <property name="name" type="sstr" access="RC" index="y"/> + <property name="host" type="sstr" access="RO"/> + <property name="port" type="uint16" access="RO"/> + <property name="transport" type="sstr" access="RO"/> <property name="durable" type="bool" access="RC"/> + <property name="connectionRef" type="objId" references="Connection" access="RO"/> <statistic name="state" type="sstr" desc="Operational state of the link"/> <statistic name="lastError" type="lstr" desc="Reason link is not operational"/> @@ -411,7 +413,8 @@ --> <class name="Bridge"> <property name="linkRef" type="objId" references="Link" access="RC" index="y" parentRef="y"/> - <property name="channelId" type="uint16" access="RC" index="y"/> + <property name="name" type="sstr" access="RC" index="y"/> + <property name="channelId" type="uint16" access="RO"/> <property name="durable" type="bool" access="RC"/> <property name="src" type="sstr" access="RC"/> <property name="dest" type="sstr" access="RC"/> |