summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/Link.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Link.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp72
1 files changed, 51 insertions, 21 deletions
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index bbffa93a53..a8c4b2c2cb 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -79,9 +79,9 @@ namespace {
class LinkExchange : public broker::Exchange
{
public:
- LinkExchange(Link& _link, const std::string& name) : Exchange(name), link(_link) {}
+ LinkExchange(const std::string& name) : Exchange(name), link(0) {}
~LinkExchange() {};
- std::string getType() const { return std::string("qpid.LinkExchange"); }
+ std::string getType() const { return Link::exchangeTypeName; }
// Exchange methods - set up to prevent binding/unbinding etc from clients!
bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*) { return false; }
@@ -92,6 +92,7 @@ public:
// and saving them should the Link need to reconnect.
void route(broker::Deliverable& msg)
{
+ if (!link) return;
const framing::FieldTable* headers = msg.getMessage().getApplicationHeaders();
framing::Array addresses;
if (headers && headers->getArray(FAILOVER_HEADER_KEY, addresses)) {
@@ -102,15 +103,26 @@ public:
for(size_t i = 0; i < urlVec.size(); ++i)
urls.insert(urls.end(), urlVec[i].begin(), urlVec[i].end());
QPID_LOG(debug, "Remote broker has provided these failover addresses= " << urls);
- link.setUrl(urls);
+ link->setUrl(urls);
}
}
+ void setLink(Link *_link)
+ {
+ assert(!link);
+ link = _link;
+ }
+
private:
- Link& link;
+ Link *link;
};
+boost::shared_ptr<Exchange> Link::linkExchangeFactory( const std::string& _name )
+{
+ return Exchange::shared_ptr(new LinkExchange(_name));
+}
+
Link::Link(LinkRegistry* _links,
MessageStore* _store,
const string& _host,
@@ -122,8 +134,9 @@ Link::Link(LinkRegistry* _links,
const string& _password,
Broker* _broker,
Manageable* parent)
- : links(_links), store(_store), host(_host), port(_port),
- transport(_transport),
+ : links(_links), store(_store),
+ configuredTransport(_transport), configuredHost(_host), configuredPort(_port),
+ host(_host), port(_port), transport(_transport),
durable(_durable),
authMechanism(_authMechanism), username(_username), password(_password),
persistenceId(0), mgmtObject(0), broker(_broker), state(0),
@@ -153,9 +166,13 @@ Link::Link(LinkRegistry* _links,
}
broker->getTimer().add(timerTask);
- exchange.reset(new broker::LinkExchange(*this,
- "qpid.link." + framing::Uuid(true).str()));
- broker->getExchanges().registerExchange(exchange);
+ stringstream _name;
+ _name << "qpid.link." << transport << ":" << host << ":" << port;
+ std::pair<Exchange::shared_ptr, bool> rc = broker->getExchanges().declare(_name.str(),
+ exchangeTypeName);
+ exchange = boost::static_pointer_cast<LinkExchange>(rc.first);
+ assert(exchange);
+ exchange->setLink(this);
}
Link::~Link ()
@@ -270,11 +287,11 @@ void Link::opened() {
// attempt to subscribe to failover exchange for updates from remote
//
- const std::string queueName = "qpid.link." + framing::Uuid(true).str();
+ const std::string queueName = "qpid.link." + exchange->getName();
SessionHandler& sessionHandler = connection->getChannel(FAILOVER_CHANNEL);
sessionHandler.setDetachedCallback( boost::bind(&sessionDetached, this) );
- sessionHandler.attachAs(getName());
+ sessionHandler.attachAs(exchange->getName());
framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
@@ -341,7 +358,7 @@ void Link::destroy ()
{
Mutex::ScopedLock mutex(lock);
- QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management");
+ QPID_LOG (info, "Inter-broker link to " << configuredHost << ":" << configuredPort << " removed by management");
closeConnection("closed by management");
setStateLH(STATE_CLOSED);
@@ -363,7 +380,7 @@ void Link::destroy ()
for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++)
(*i)->destroy();
toDelete.clear();
- links->destroy (host, port);
+ links->destroy (configuredHost, configuredPort);
}
void Link::add(Bridge::shared_ptr bridge)
@@ -518,7 +535,7 @@ void Link::setPersistenceId(uint64_t id) const
const string& Link::getName() const
{
- return host;
+ return configuredHost;
}
Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer)
@@ -544,9 +561,9 @@ Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer)
void Link::encode(Buffer& buffer) const
{
buffer.putShortString(string("link"));
- buffer.putShortString(host);
- buffer.putShort(port);
- buffer.putShortString(transport);
+ buffer.putShortString(configuredHost);
+ buffer.putShort(configuredPort);
+ buffer.putShortString(configuredTransport);
buffer.putOctet(durable ? 1 : 0);
buffer.putShortString(authMechanism);
buffer.putShortString(username);
@@ -555,10 +572,10 @@ void Link::encode(Buffer& buffer) const
uint32_t Link::encodedSize() const
{
- return host.size() + 1 // short-string (host)
+ return configuredHost.size() + 1 // short-string (host)
+ 5 // short-string ("link")
+ 2 // port
- + transport.size() + 1 // short-string(transport)
+ + configuredTransport.size() + 1 // short-string(transport)
+ 1 // durable
+ authMechanism.size() + 1
+ username.size() + 1
@@ -613,7 +630,7 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& te
}
std::pair<Bridge::shared_ptr, bool> result =
- links->declare (host, port, iargs.i_durable, iargs.i_src,
+ links->declare (configuredHost, configuredPort, iargs.i_durable, iargs.i_src,
iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue,
iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes,
iargs.i_dynamic, iargs.i_sync);
@@ -652,11 +669,24 @@ void Link::closeConnection( const std::string& reason)
if (sessionHandler.getSession()) {
framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
remoteBroker.getMessage().cancel(exchange->getName());
- remoteBroker.getSession().detach(getName());
+ remoteBroker.getSession().detach(exchange->getName());
}
connection->close(CLOSE_CODE_CONNECTION_FORCED, reason);
connection = 0;
}
}
+/** returns the current remote's address, and connection state */
+bool Link::getRemoteAddress(qpid::Address& addr) const
+{
+ addr.protocol = transport;
+ addr.host = host;
+ addr.port = port;
+
+ return state == STATE_OPERATIONAL;
+}
+
+
+const std::string Link::exchangeTypeName("qpid.LinkExchange");
+
}} // namespace qpid::broker