diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Link.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Link.cpp | 72 |
1 files changed, 51 insertions, 21 deletions
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index bbffa93a53..a8c4b2c2cb 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -79,9 +79,9 @@ namespace { class LinkExchange : public broker::Exchange { public: - LinkExchange(Link& _link, const std::string& name) : Exchange(name), link(_link) {} + LinkExchange(const std::string& name) : Exchange(name), link(0) {} ~LinkExchange() {}; - std::string getType() const { return std::string("qpid.LinkExchange"); } + std::string getType() const { return Link::exchangeTypeName; } // Exchange methods - set up to prevent binding/unbinding etc from clients! bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*) { return false; } @@ -92,6 +92,7 @@ public: // and saving them should the Link need to reconnect. void route(broker::Deliverable& msg) { + if (!link) return; const framing::FieldTable* headers = msg.getMessage().getApplicationHeaders(); framing::Array addresses; if (headers && headers->getArray(FAILOVER_HEADER_KEY, addresses)) { @@ -102,15 +103,26 @@ public: for(size_t i = 0; i < urlVec.size(); ++i) urls.insert(urls.end(), urlVec[i].begin(), urlVec[i].end()); QPID_LOG(debug, "Remote broker has provided these failover addresses= " << urls); - link.setUrl(urls); + link->setUrl(urls); } } + void setLink(Link *_link) + { + assert(!link); + link = _link; + } + private: - Link& link; + Link *link; }; +boost::shared_ptr<Exchange> Link::linkExchangeFactory( const std::string& _name ) +{ + return Exchange::shared_ptr(new LinkExchange(_name)); +} + Link::Link(LinkRegistry* _links, MessageStore* _store, const string& _host, @@ -122,8 +134,9 @@ Link::Link(LinkRegistry* _links, const string& _password, Broker* _broker, Manageable* parent) - : links(_links), store(_store), host(_host), port(_port), - transport(_transport), + : links(_links), store(_store), + configuredTransport(_transport), configuredHost(_host), configuredPort(_port), + host(_host), port(_port), transport(_transport), durable(_durable), authMechanism(_authMechanism), username(_username), password(_password), persistenceId(0), mgmtObject(0), broker(_broker), state(0), @@ -153,9 +166,13 @@ Link::Link(LinkRegistry* _links, } broker->getTimer().add(timerTask); - exchange.reset(new broker::LinkExchange(*this, - "qpid.link." + framing::Uuid(true).str())); - broker->getExchanges().registerExchange(exchange); + stringstream _name; + _name << "qpid.link." << transport << ":" << host << ":" << port; + std::pair<Exchange::shared_ptr, bool> rc = broker->getExchanges().declare(_name.str(), + exchangeTypeName); + exchange = boost::static_pointer_cast<LinkExchange>(rc.first); + assert(exchange); + exchange->setLink(this); } Link::~Link () @@ -270,11 +287,11 @@ void Link::opened() { // attempt to subscribe to failover exchange for updates from remote // - const std::string queueName = "qpid.link." + framing::Uuid(true).str(); + const std::string queueName = "qpid.link." + exchange->getName(); SessionHandler& sessionHandler = connection->getChannel(FAILOVER_CHANNEL); sessionHandler.setDetachedCallback( boost::bind(&sessionDetached, this) ); - sessionHandler.attachAs(getName()); + sessionHandler.attachAs(exchange->getName()); framing::AMQP_ServerProxy remoteBroker(sessionHandler.out); @@ -341,7 +358,7 @@ void Link::destroy () { Mutex::ScopedLock mutex(lock); - QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management"); + QPID_LOG (info, "Inter-broker link to " << configuredHost << ":" << configuredPort << " removed by management"); closeConnection("closed by management"); setStateLH(STATE_CLOSED); @@ -363,7 +380,7 @@ void Link::destroy () for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++) (*i)->destroy(); toDelete.clear(); - links->destroy (host, port); + links->destroy (configuredHost, configuredPort); } void Link::add(Bridge::shared_ptr bridge) @@ -518,7 +535,7 @@ void Link::setPersistenceId(uint64_t id) const const string& Link::getName() const { - return host; + return configuredHost; } Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer) @@ -544,9 +561,9 @@ Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer) void Link::encode(Buffer& buffer) const { buffer.putShortString(string("link")); - buffer.putShortString(host); - buffer.putShort(port); - buffer.putShortString(transport); + buffer.putShortString(configuredHost); + buffer.putShort(configuredPort); + buffer.putShortString(configuredTransport); buffer.putOctet(durable ? 1 : 0); buffer.putShortString(authMechanism); buffer.putShortString(username); @@ -555,10 +572,10 @@ void Link::encode(Buffer& buffer) const uint32_t Link::encodedSize() const { - return host.size() + 1 // short-string (host) + return configuredHost.size() + 1 // short-string (host) + 5 // short-string ("link") + 2 // port - + transport.size() + 1 // short-string(transport) + + configuredTransport.size() + 1 // short-string(transport) + 1 // durable + authMechanism.size() + 1 + username.size() + 1 @@ -613,7 +630,7 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& te } std::pair<Bridge::shared_ptr, bool> result = - links->declare (host, port, iargs.i_durable, iargs.i_src, + links->declare (configuredHost, configuredPort, iargs.i_durable, iargs.i_src, iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue, iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes, iargs.i_dynamic, iargs.i_sync); @@ -652,11 +669,24 @@ void Link::closeConnection( const std::string& reason) if (sessionHandler.getSession()) { framing::AMQP_ServerProxy remoteBroker(sessionHandler.out); remoteBroker.getMessage().cancel(exchange->getName()); - remoteBroker.getSession().detach(getName()); + remoteBroker.getSession().detach(exchange->getName()); } connection->close(CLOSE_CODE_CONNECTION_FORCED, reason); connection = 0; } } +/** returns the current remote's address, and connection state */ +bool Link::getRemoteAddress(qpid::Address& addr) const +{ + addr.protocol = transport; + addr.host = host; + addr.port = port; + + return state == STATE_OPERATIONAL; +} + + +const std::string Link::exchangeTypeName("qpid.LinkExchange"); + }} // namespace qpid::broker |