From e3b659e61a270ad25af48a59096db2506eec9447 Mon Sep 17 00:00:00 2001 From: Kenneth Anthony Giusti Date: Tue, 1 May 2012 13:57:21 +0000 Subject: QPID-3963: fix naming of link exchange, and exchange creation/replication handling git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1332654 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp | 3 ++ qpid/cpp/src/qpid/broker/Link.cpp | 72 +++++++++++++++++++-------- qpid/cpp/src/qpid/broker/Link.h | 29 ++++++++--- qpid/cpp/src/qpid/broker/LinkRegistry.cpp | 17 +++++-- 4 files changed, 89 insertions(+), 32 deletions(-) diff --git a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp index fca77f7ddd..43d7268dfb 100644 --- a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -24,6 +24,7 @@ #include "qpid/broker/FanOutExchange.h" #include "qpid/broker/HeadersExchange.h" #include "qpid/broker/TopicExchange.h" +#include "qpid/broker/Link.h" #include "qpid/management/ManagementDirectExchange.h" #include "qpid/management/ManagementTopicExchange.h" #include "qpid/framing/reply_exceptions.h" @@ -58,6 +59,8 @@ pair ExchangeRegistry::declare(const string& name, c exchange = Exchange::shared_ptr(new ManagementDirectExchange(name, durable, args, parent, broker)); }else if (type == ManagementTopicExchange::typeName) { exchange = Exchange::shared_ptr(new ManagementTopicExchange(name, durable, args, parent, broker)); + }else if (type == Link::exchangeTypeName) { + exchange = Link::linkExchangeFactory(name); }else{ FunctionMap::iterator i = factory.find(type); if (i == factory.end()) { diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index bbffa93a53..a8c4b2c2cb 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -79,9 +79,9 @@ namespace { class LinkExchange : public broker::Exchange { public: - LinkExchange(Link& _link, const std::string& name) : Exchange(name), link(_link) {} + LinkExchange(const std::string& name) : Exchange(name), link(0) {} ~LinkExchange() {}; - std::string getType() const { return std::string("qpid.LinkExchange"); } + std::string getType() const { return Link::exchangeTypeName; } // Exchange methods - set up to prevent binding/unbinding etc from clients! bool bind(boost::shared_ptr, const std::string&, const framing::FieldTable*) { return false; } @@ -92,6 +92,7 @@ public: // and saving them should the Link need to reconnect. void route(broker::Deliverable& msg) { + if (!link) return; const framing::FieldTable* headers = msg.getMessage().getApplicationHeaders(); framing::Array addresses; if (headers && headers->getArray(FAILOVER_HEADER_KEY, addresses)) { @@ -102,15 +103,26 @@ public: for(size_t i = 0; i < urlVec.size(); ++i) urls.insert(urls.end(), urlVec[i].begin(), urlVec[i].end()); QPID_LOG(debug, "Remote broker has provided these failover addresses= " << urls); - link.setUrl(urls); + link->setUrl(urls); } } + void setLink(Link *_link) + { + assert(!link); + link = _link; + } + private: - Link& link; + Link *link; }; +boost::shared_ptr Link::linkExchangeFactory( const std::string& _name ) +{ + return Exchange::shared_ptr(new LinkExchange(_name)); +} + Link::Link(LinkRegistry* _links, MessageStore* _store, const string& _host, @@ -122,8 +134,9 @@ Link::Link(LinkRegistry* _links, const string& _password, Broker* _broker, Manageable* parent) - : links(_links), store(_store), host(_host), port(_port), - transport(_transport), + : links(_links), store(_store), + configuredTransport(_transport), configuredHost(_host), configuredPort(_port), + host(_host), port(_port), transport(_transport), durable(_durable), authMechanism(_authMechanism), username(_username), password(_password), persistenceId(0), mgmtObject(0), broker(_broker), state(0), @@ -153,9 +166,13 @@ Link::Link(LinkRegistry* _links, } broker->getTimer().add(timerTask); - exchange.reset(new broker::LinkExchange(*this, - "qpid.link." + framing::Uuid(true).str())); - broker->getExchanges().registerExchange(exchange); + stringstream _name; + _name << "qpid.link." << transport << ":" << host << ":" << port; + std::pair rc = broker->getExchanges().declare(_name.str(), + exchangeTypeName); + exchange = boost::static_pointer_cast(rc.first); + assert(exchange); + exchange->setLink(this); } Link::~Link () @@ -270,11 +287,11 @@ void Link::opened() { // attempt to subscribe to failover exchange for updates from remote // - const std::string queueName = "qpid.link." + framing::Uuid(true).str(); + const std::string queueName = "qpid.link." + exchange->getName(); SessionHandler& sessionHandler = connection->getChannel(FAILOVER_CHANNEL); sessionHandler.setDetachedCallback( boost::bind(&sessionDetached, this) ); - sessionHandler.attachAs(getName()); + sessionHandler.attachAs(exchange->getName()); framing::AMQP_ServerProxy remoteBroker(sessionHandler.out); @@ -341,7 +358,7 @@ void Link::destroy () { Mutex::ScopedLock mutex(lock); - QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management"); + QPID_LOG (info, "Inter-broker link to " << configuredHost << ":" << configuredPort << " removed by management"); closeConnection("closed by management"); setStateLH(STATE_CLOSED); @@ -363,7 +380,7 @@ void Link::destroy () for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++) (*i)->destroy(); toDelete.clear(); - links->destroy (host, port); + links->destroy (configuredHost, configuredPort); } void Link::add(Bridge::shared_ptr bridge) @@ -518,7 +535,7 @@ void Link::setPersistenceId(uint64_t id) const const string& Link::getName() const { - return host; + return configuredHost; } Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer) @@ -544,9 +561,9 @@ Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer) void Link::encode(Buffer& buffer) const { buffer.putShortString(string("link")); - buffer.putShortString(host); - buffer.putShort(port); - buffer.putShortString(transport); + buffer.putShortString(configuredHost); + buffer.putShort(configuredPort); + buffer.putShortString(configuredTransport); buffer.putOctet(durable ? 1 : 0); buffer.putShortString(authMechanism); buffer.putShortString(username); @@ -555,10 +572,10 @@ void Link::encode(Buffer& buffer) const uint32_t Link::encodedSize() const { - return host.size() + 1 // short-string (host) + return configuredHost.size() + 1 // short-string (host) + 5 // short-string ("link") + 2 // port - + transport.size() + 1 // short-string(transport) + + configuredTransport.size() + 1 // short-string(transport) + 1 // durable + authMechanism.size() + 1 + username.size() + 1 @@ -613,7 +630,7 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& te } std::pair result = - links->declare (host, port, iargs.i_durable, iargs.i_src, + links->declare (configuredHost, configuredPort, iargs.i_durable, iargs.i_src, iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue, iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes, iargs.i_dynamic, iargs.i_sync); @@ -652,11 +669,24 @@ void Link::closeConnection( const std::string& reason) if (sessionHandler.getSession()) { framing::AMQP_ServerProxy remoteBroker(sessionHandler.out); remoteBroker.getMessage().cancel(exchange->getName()); - remoteBroker.getSession().detach(getName()); + remoteBroker.getSession().detach(exchange->getName()); } connection->close(CLOSE_CODE_CONNECTION_FORCED, reason); connection = 0; } } +/** returns the current remote's address, and connection state */ +bool Link::getRemoteAddress(qpid::Address& addr) const +{ + addr.protocol = transport; + addr.host = host; + addr.port = port; + + return state == STATE_OPERATIONAL; +} + + +const std::string Link::exchangeTypeName("qpid.LinkExchange"); + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h index a941aee4f3..1f8b3a2f23 100644 --- a/qpid/cpp/src/qpid/broker/Link.h +++ b/qpid/cpp/src/qpid/broker/Link.h @@ -54,9 +54,16 @@ class Link : public PersistableConfig, public management::Manageable { sys::Mutex lock; LinkRegistry* links; MessageStore* store; - std::string host; - uint16_t port; - std::string transport; + + // these remain constant across failover - used to identify this link + const std::string configuredTransport; + const std::string configuredHost; + const uint16_t configuredPort; + // these reflect the current address of remote - will change during failover + std::string host; + uint16_t port; + std::string transport; + bool durable; std::string authMechanism; std::string username; @@ -121,9 +128,16 @@ class Link : public PersistableConfig, public management::Manageable { management::Manageable* parent = 0); virtual ~Link(); - std::string getHost() { return host; } - uint16_t getPort() { return port; } - std::string getTransport() { return transport; } + /** these return the *configured* transport/host/port, which does not change over the + lifetime of the Link */ + std::string getHost() const { return configuredHost; } + uint16_t getPort() const { return configuredPort; } + std::string getTransport() const { return configuredTransport; } + + /** returns the current address of the remote, which may be different from the + configured transport/host/port due to failover. Returns true if connection is + active */ + bool getRemoteAddress(qpid::Address& addr) const; bool isDurable() { return durable; } void maintenanceVisit (); @@ -155,6 +169,9 @@ class Link : public PersistableConfig, public management::Manageable { management::ManagementObject* GetManagementObject(void) const; management::Manageable::status_t ManagementMethod(uint32_t, management::Args&, std::string&); + // manage the exchange owned by this link + static const std::string exchangeTypeName; + static boost::shared_ptr linkExchangeFactory(const std::string& name); }; } } diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp index c6c5a1ac05..d89f220d1b 100644 --- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp @@ -299,22 +299,29 @@ std::string LinkRegistry::getUsername(const std::string& key) return link->getUsername(); } +/** note: returns the current remote host (may be different from the host originally + configured for the Link due to failover) */ std::string LinkRegistry::getHost(const std::string& key) { - Link::shared_ptr link = findLink(key); - if (!link) - return string(); + Link::shared_ptr link = findLink(key); + if (!link) + return string(); - return link->getHost(); + qpid::Address addr; + link->getRemoteAddress(addr); + return addr.host; } +/** returns the current remote port (ditto above) */ uint16_t LinkRegistry::getPort(const std::string& key) { Link::shared_ptr link = findLink(key); if (!link) return 0; - return link->getPort(); + qpid::Address addr; + link->getRemoteAddress(addr); + return addr.port; } std::string LinkRegistry::getPassword(const std::string& key) -- cgit v1.2.1