summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2012-05-01 13:57:21 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2012-05-01 13:57:21 +0000
commite3b659e61a270ad25af48a59096db2506eec9447 (patch)
tree2e2a6d8ecc415b3a6b1ff8e03412c16bd2aa08fc
parenta0c96ada9fa6a792ad9fd57356a4e19ead8b030d (diff)
downloadqpid-python-e3b659e61a270ad25af48a59096db2506eec9447.tar.gz
QPID-3963: fix naming of link exchange, and exchange creation/replication handling
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1332654 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp72
-rw-r--r--qpid/cpp/src/qpid/broker/Link.h29
-rw-r--r--qpid/cpp/src/qpid/broker/LinkRegistry.cpp17
4 files changed, 89 insertions, 32 deletions
diff --git a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
index fca77f7ddd..43d7268dfb 100644
--- a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
@@ -24,6 +24,7 @@
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/HeadersExchange.h"
#include "qpid/broker/TopicExchange.h"
+#include "qpid/broker/Link.h"
#include "qpid/management/ManagementDirectExchange.h"
#include "qpid/management/ManagementTopicExchange.h"
#include "qpid/framing/reply_exceptions.h"
@@ -58,6 +59,8 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c
exchange = Exchange::shared_ptr(new ManagementDirectExchange(name, durable, args, parent, broker));
}else if (type == ManagementTopicExchange::typeName) {
exchange = Exchange::shared_ptr(new ManagementTopicExchange(name, durable, args, parent, broker));
+ }else if (type == Link::exchangeTypeName) {
+ exchange = Link::linkExchangeFactory(name);
}else{
FunctionMap::iterator i = factory.find(type);
if (i == factory.end()) {
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index bbffa93a53..a8c4b2c2cb 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -79,9 +79,9 @@ namespace {
class LinkExchange : public broker::Exchange
{
public:
- LinkExchange(Link& _link, const std::string& name) : Exchange(name), link(_link) {}
+ LinkExchange(const std::string& name) : Exchange(name), link(0) {}
~LinkExchange() {};
- std::string getType() const { return std::string("qpid.LinkExchange"); }
+ std::string getType() const { return Link::exchangeTypeName; }
// Exchange methods - set up to prevent binding/unbinding etc from clients!
bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*) { return false; }
@@ -92,6 +92,7 @@ public:
// and saving them should the Link need to reconnect.
void route(broker::Deliverable& msg)
{
+ if (!link) return;
const framing::FieldTable* headers = msg.getMessage().getApplicationHeaders();
framing::Array addresses;
if (headers && headers->getArray(FAILOVER_HEADER_KEY, addresses)) {
@@ -102,15 +103,26 @@ public:
for(size_t i = 0; i < urlVec.size(); ++i)
urls.insert(urls.end(), urlVec[i].begin(), urlVec[i].end());
QPID_LOG(debug, "Remote broker has provided these failover addresses= " << urls);
- link.setUrl(urls);
+ link->setUrl(urls);
}
}
+ void setLink(Link *_link)
+ {
+ assert(!link);
+ link = _link;
+ }
+
private:
- Link& link;
+ Link *link;
};
+boost::shared_ptr<Exchange> Link::linkExchangeFactory( const std::string& _name )
+{
+ return Exchange::shared_ptr(new LinkExchange(_name));
+}
+
Link::Link(LinkRegistry* _links,
MessageStore* _store,
const string& _host,
@@ -122,8 +134,9 @@ Link::Link(LinkRegistry* _links,
const string& _password,
Broker* _broker,
Manageable* parent)
- : links(_links), store(_store), host(_host), port(_port),
- transport(_transport),
+ : links(_links), store(_store),
+ configuredTransport(_transport), configuredHost(_host), configuredPort(_port),
+ host(_host), port(_port), transport(_transport),
durable(_durable),
authMechanism(_authMechanism), username(_username), password(_password),
persistenceId(0), mgmtObject(0), broker(_broker), state(0),
@@ -153,9 +166,13 @@ Link::Link(LinkRegistry* _links,
}
broker->getTimer().add(timerTask);
- exchange.reset(new broker::LinkExchange(*this,
- "qpid.link." + framing::Uuid(true).str()));
- broker->getExchanges().registerExchange(exchange);
+ stringstream _name;
+ _name << "qpid.link." << transport << ":" << host << ":" << port;
+ std::pair<Exchange::shared_ptr, bool> rc = broker->getExchanges().declare(_name.str(),
+ exchangeTypeName);
+ exchange = boost::static_pointer_cast<LinkExchange>(rc.first);
+ assert(exchange);
+ exchange->setLink(this);
}
Link::~Link ()
@@ -270,11 +287,11 @@ void Link::opened() {
// attempt to subscribe to failover exchange for updates from remote
//
- const std::string queueName = "qpid.link." + framing::Uuid(true).str();
+ const std::string queueName = "qpid.link." + exchange->getName();
SessionHandler& sessionHandler = connection->getChannel(FAILOVER_CHANNEL);
sessionHandler.setDetachedCallback( boost::bind(&sessionDetached, this) );
- sessionHandler.attachAs(getName());
+ sessionHandler.attachAs(exchange->getName());
framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
@@ -341,7 +358,7 @@ void Link::destroy ()
{
Mutex::ScopedLock mutex(lock);
- QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management");
+ QPID_LOG (info, "Inter-broker link to " << configuredHost << ":" << configuredPort << " removed by management");
closeConnection("closed by management");
setStateLH(STATE_CLOSED);
@@ -363,7 +380,7 @@ void Link::destroy ()
for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++)
(*i)->destroy();
toDelete.clear();
- links->destroy (host, port);
+ links->destroy (configuredHost, configuredPort);
}
void Link::add(Bridge::shared_ptr bridge)
@@ -518,7 +535,7 @@ void Link::setPersistenceId(uint64_t id) const
const string& Link::getName() const
{
- return host;
+ return configuredHost;
}
Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer)
@@ -544,9 +561,9 @@ Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer)
void Link::encode(Buffer& buffer) const
{
buffer.putShortString(string("link"));
- buffer.putShortString(host);
- buffer.putShort(port);
- buffer.putShortString(transport);
+ buffer.putShortString(configuredHost);
+ buffer.putShort(configuredPort);
+ buffer.putShortString(configuredTransport);
buffer.putOctet(durable ? 1 : 0);
buffer.putShortString(authMechanism);
buffer.putShortString(username);
@@ -555,10 +572,10 @@ void Link::encode(Buffer& buffer) const
uint32_t Link::encodedSize() const
{
- return host.size() + 1 // short-string (host)
+ return configuredHost.size() + 1 // short-string (host)
+ 5 // short-string ("link")
+ 2 // port
- + transport.size() + 1 // short-string(transport)
+ + configuredTransport.size() + 1 // short-string(transport)
+ 1 // durable
+ authMechanism.size() + 1
+ username.size() + 1
@@ -613,7 +630,7 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& te
}
std::pair<Bridge::shared_ptr, bool> result =
- links->declare (host, port, iargs.i_durable, iargs.i_src,
+ links->declare (configuredHost, configuredPort, iargs.i_durable, iargs.i_src,
iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue,
iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes,
iargs.i_dynamic, iargs.i_sync);
@@ -652,11 +669,24 @@ void Link::closeConnection( const std::string& reason)
if (sessionHandler.getSession()) {
framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
remoteBroker.getMessage().cancel(exchange->getName());
- remoteBroker.getSession().detach(getName());
+ remoteBroker.getSession().detach(exchange->getName());
}
connection->close(CLOSE_CODE_CONNECTION_FORCED, reason);
connection = 0;
}
}
+/** returns the current remote's address, and connection state */
+bool Link::getRemoteAddress(qpid::Address& addr) const
+{
+ addr.protocol = transport;
+ addr.host = host;
+ addr.port = port;
+
+ return state == STATE_OPERATIONAL;
+}
+
+
+const std::string Link::exchangeTypeName("qpid.LinkExchange");
+
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h
index a941aee4f3..1f8b3a2f23 100644
--- a/qpid/cpp/src/qpid/broker/Link.h
+++ b/qpid/cpp/src/qpid/broker/Link.h
@@ -54,9 +54,16 @@ class Link : public PersistableConfig, public management::Manageable {
sys::Mutex lock;
LinkRegistry* links;
MessageStore* store;
- std::string host;
- uint16_t port;
- std::string transport;
+
+ // these remain constant across failover - used to identify this link
+ const std::string configuredTransport;
+ const std::string configuredHost;
+ const uint16_t configuredPort;
+ // these reflect the current address of remote - will change during failover
+ std::string host;
+ uint16_t port;
+ std::string transport;
+
bool durable;
std::string authMechanism;
std::string username;
@@ -121,9 +128,16 @@ class Link : public PersistableConfig, public management::Manageable {
management::Manageable* parent = 0);
virtual ~Link();
- std::string getHost() { return host; }
- uint16_t getPort() { return port; }
- std::string getTransport() { return transport; }
+ /** these return the *configured* transport/host/port, which does not change over the
+ lifetime of the Link */
+ std::string getHost() const { return configuredHost; }
+ uint16_t getPort() const { return configuredPort; }
+ std::string getTransport() const { return configuredTransport; }
+
+ /** returns the current address of the remote, which may be different from the
+ configured transport/host/port due to failover. Returns true if connection is
+ active */
+ bool getRemoteAddress(qpid::Address& addr) const;
bool isDurable() { return durable; }
void maintenanceVisit ();
@@ -155,6 +169,9 @@ class Link : public PersistableConfig, public management::Manageable {
management::ManagementObject* GetManagementObject(void) const;
management::Manageable::status_t ManagementMethod(uint32_t, management::Args&, std::string&);
+ // manage the exchange owned by this link
+ static const std::string exchangeTypeName;
+ static boost::shared_ptr<Exchange> linkExchangeFactory(const std::string& name);
};
}
}
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
index c6c5a1ac05..d89f220d1b 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -299,22 +299,29 @@ std::string LinkRegistry::getUsername(const std::string& key)
return link->getUsername();
}
+/** note: returns the current remote host (may be different from the host originally
+ configured for the Link due to failover) */
std::string LinkRegistry::getHost(const std::string& key)
{
- Link::shared_ptr link = findLink(key);
- if (!link)
- return string();
+ Link::shared_ptr link = findLink(key);
+ if (!link)
+ return string();
- return link->getHost();
+ qpid::Address addr;
+ link->getRemoteAddress(addr);
+ return addr.host;
}
+/** returns the current remote port (ditto above) */
uint16_t LinkRegistry::getPort(const std::string& key)
{
Link::shared_ptr link = findLink(key);
if (!link)
return 0;
- return link->getPort();
+ qpid::Address addr;
+ link->getRemoteAddress(addr);
+ return addr.port;
}
std::string LinkRegistry::getPassword(const std::string& key)