diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Broker.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 185 |
1 files changed, 179 insertions, 6 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 726b47c268..c13ac19454 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -438,7 +438,7 @@ Manageable* Broker::GetVhostObject(void) const Manageable::status_t Broker::ManagementMethod (uint32_t methodId, Args& args, - string&) + string& text) { Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; @@ -453,6 +453,14 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, status = Manageable::STATUS_OK; break; case _qmf::Broker::METHOD_CONNECT : { + /** Management is creating a Link to a remote broker using the host and port of + * the remote. This (old) interface does not allow management to specify a name + * for the link, nor does it allow multiple Links to the same remote. Use the + * "create()" broker method if these features are needed. + * TBD: deprecate this interface. + */ + QPID_LOG(info, "The Broker::connect() method will be removed in a future release of QPID." + " Please use the Broker::create() method with type='link' instead."); _qmf::ArgsBrokerConnect& hp= dynamic_cast<_qmf::ArgsBrokerConnect&>(args); @@ -461,13 +469,24 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, "; durable=" << (hp.i_durable?"T":"F") << "; authMech=\"" << hp.i_authMechanism << "\""); if (!getProtocolFactory(transport)) { QPID_LOG(error, "Transport '" << transport << "' not supported"); + text = "transport type not supported"; return Manageable::STATUS_NOT_IMPLEMENTED; } - std::pair<Link::shared_ptr, bool> response = - links.declare (hp.i_host, hp.i_port, transport, hp.i_durable, - hp.i_authMechanism, hp.i_username, hp.i_password); - if (hp.i_durable && response.second) - store->create(*response.first); + + // Does a link to the remote already exist? If so, re-use the existing link + // - this behavior is backward compatible with previous releases. + if (!links.getLink(hp.i_host, hp.i_port, transport)) { + // new link, need to generate a unique name for it + std::pair<Link::shared_ptr, bool> response = + links.declare(Link::createName(transport, hp.i_host, hp.i_port), + hp.i_host, hp.i_port, transport, + hp.i_durable, hp.i_authMechanism, hp.i_username, hp.i_password); + if (!response.first) { + text = "Unable to create Link"; + status = Manageable::STATUS_PARAMETER_INVALID; + break; + } + } status = Manageable::STATUS_OK; break; } @@ -538,6 +557,8 @@ const std::string TYPE_QUEUE("queue"); const std::string TYPE_EXCHANGE("exchange"); const std::string TYPE_TOPIC("topic"); const std::string TYPE_BINDING("binding"); +const std::string TYPE_LINK("link"); +const std::string TYPE_BRIDGE("bridge"); const std::string DURABLE("durable"); const std::string AUTO_DELETE("auto-delete"); const std::string ALTERNATE_EXCHANGE("alternate-exchange"); @@ -549,6 +570,26 @@ const std::string ATTRIBUTE_TIMESTAMP_0_10("timestamp-0.10"); const std::string _TRUE("true"); const std::string _FALSE("false"); + +// parameters for creating a Link object, see mgmt schema +const std::string HOST("host"); +const std::string PORT("port"); +const std::string TRANSPORT("transport"); +const std::string AUTH_MECHANISM("authMechanism"); +const std::string USERNAME("username"); +const std::string PASSWORD("password"); + +// parameters for creating a Bridge object, see mgmt schema +const std::string LINK("link"); +const std::string SRC("src"); +const std::string DEST("dest"); +const std::string KEY("key"); +const std::string TAG("tag"); +const std::string EXCLUDES("excludes"); +const std::string SRC_IS_QUEUE("srcIsQueue"); +const std::string SRC_IS_LOCAL("srcIsLocal"); +const std::string DYNAMIC("dynamic"); +const std::string SYNC("sync"); } struct InvalidBindingIdentifier : public qpid::Exception @@ -598,6 +639,25 @@ struct UnknownObjectType : public qpid::Exception std::string getPrefix() const { return "unknown object type"; } }; +struct ReservedObjectName : public qpid::Exception +{ + ReservedObjectName(const std::string& type) : qpid::Exception(type) {} + std::string getPrefix() const { return std::string("names prefixed with '") + + QPID_NAME_PREFIX + std::string("' are reserved"); } +}; + +struct UnsupportedTransport : public qpid::Exception +{ + UnsupportedTransport(const std::string& type) : qpid::Exception(type) {} + std::string getPrefix() const { return "transport is not supported"; } +}; + +struct InvalidParameter : public qpid::Exception +{ + InvalidParameter(const std::string& type) : qpid::Exception(type) {} + std::string getPrefix() const { return "invalid parameter to method call"; } +}; + void Broker::createObject(const std::string& type, const std::string& name, const Variant::Map& properties, bool /*strict*/, const ConnectionState* context) { @@ -669,6 +729,109 @@ void Broker::createObject(const std::string& type, const std::string& name, amqp_0_10::translate(extensions, arguments); bind(binding.queue, binding.exchange, binding.key, arguments, userId, connectionId); + + } else if (type == TYPE_LINK) { + + QPID_LOG (debug, "createObject: Link; name=" << name << "; args=" << properties ); + + if (name.compare(0, QPID_NAME_PREFIX.length(), QPID_NAME_PREFIX) == 0) { + QPID_LOG(error, "Link name='" << name << "' cannot use the reserved prefix '" << QPID_NAME_PREFIX << "'"); + throw ReservedObjectName(name); + } + + std::string host; + uint16_t port = 0; + std::string transport = TCP_TRANSPORT; + bool durable = false; + std::string authMech, username, password; + + if (!getProtocolFactory(transport)) { + QPID_LOG(error, "Transport '" << transport << "' not supported."); + throw UnsupportedTransport(transport); + } + + for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { + if (i->first == HOST) host = i->second.asString(); + else if (i->first == PORT) port = i->second.asUint16(); + else if (i->first == TRANSPORT) transport = i->second.asString(); + else if (i->first == DURABLE) durable = bool(i->second); + else if (i->first == AUTH_MECHANISM) authMech = i->second.asString(); + else if (i->first == USERNAME) username = i->second.asString(); + else if (i->first == PASSWORD) password = i->second.asString(); + else { + // TODO: strict checking here + } + } + + std::pair<boost::shared_ptr<Link>, bool> rc; + rc = links.declare(name, host, port, transport, durable, authMech, username, password); + if (!rc.first) { + QPID_LOG (error, "Failed to create Link object, name=" << name << " remote=" << host << ":" << port << + "; transport=" << transport << "; durable=" << (durable?"T":"F") << "; authMech=\"" << authMech << "\""); + throw InvalidParameter(name); + } + if (!rc.second) { + QPID_LOG (error, "Failed to create a new Link object, name=" << name << " already exists."); + throw ObjectAlreadyExists(name); + } + + } else if (type == TYPE_BRIDGE) { + + QPID_LOG (debug, "createObject: Bridge; name=" << name << "; args=" << properties ); + + if (name.compare(0, QPID_NAME_PREFIX.length(), QPID_NAME_PREFIX) == 0) { + QPID_LOG(error, "Bridge name='" << name << "' cannot use the reserved prefix '" << QPID_NAME_PREFIX << "'"); + throw ReservedObjectName(name); + } + + std::string linkName; + std::string src; + std::string dest; + std::string key; + std::string id; + std::string excludes; + bool durable = false; + bool srcIsQueue = false; + bool srcIsLocal = false; + bool dynamic = false; + uint16_t sync = 0; + + for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { + + if (i->first == LINK) linkName = i->second.asString(); + else if (i->first == SRC) src = i->second.asString(); + else if (i->first == DEST) dest = i->second.asString(); + else if (i->first == KEY) key = i->second.asString(); + else if (i->first == TAG) id = i->second.asString(); + else if (i->first == EXCLUDES) excludes = i->second.asString(); + else if (i->first == SRC_IS_QUEUE) srcIsQueue = bool(i->second); + else if (i->first == SRC_IS_LOCAL) srcIsLocal = bool(i->second); + else if (i->first == DYNAMIC) dynamic = bool(i->second); + else if (i->first == SYNC) sync = i->second.asUint16(); + else if (i->first == DURABLE) durable = bool(i->second); + else { + // TODO: strict checking here + } + } + + boost::shared_ptr<Link> link; + if (linkName.empty() || !(link = links.getLink(linkName))) { + QPID_LOG(error, "Link '" << linkName << "' not found; bridge create failed."); + throw InvalidParameter(name); + } + std::pair<Bridge::shared_ptr, bool> rc = + links.declare(name, *link, durable, src, dest, key, srcIsQueue, srcIsLocal, id, excludes, + dynamic, sync); + + if (!rc.first) { + QPID_LOG (error, "Failed to create Bridge object, name=" << name << " link=" << linkName << + "; src=" << src << "; dest=" << dest << "; key=" << key); + throw InvalidParameter(name); + } + if (!rc.second) { + QPID_LOG (error, "Failed to create a new Bridge object, name=" << name << " already exists."); + throw ObjectAlreadyExists(name); + } } else { throw UnknownObjectType(type); } @@ -691,6 +854,16 @@ void Broker::deleteObject(const std::string& type, const std::string& name, } else if (type == TYPE_BINDING) { BindingIdentifier binding(name); unbind(binding.queue, binding.exchange, binding.key, userId, connectionId); + } else if (type == TYPE_LINK) { + boost::shared_ptr<Link> link = links.getLink(name); + if (link) { + link->close(); + } + } else if (type == TYPE_BRIDGE) { + boost::shared_ptr<Bridge> bridge = links.getBridge(name); + if (bridge) { + bridge->close(); + } } else { throw UnknownObjectType(type); } |