summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2012-05-01 13:57:13 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2012-05-01 13:57:13 +0000
commita0c96ada9fa6a792ad9fd57356a4e19ead8b030d (patch)
treef9b70aca8acb56a70d4f4867a6c114912dfef35f
parent32e2c1a82279c0be7a42655cb80d02b667fb130a (diff)
downloadqpid-python-a0c96ada9fa6a792ad9fd57356a4e19ead8b030d.tar.gz
QPID-3963: subscribe link to remote broker's to amq.failover exchange.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1332653 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp129
-rw-r--r--qpid/cpp/src/qpid/broker/Link.h6
2 files changed, 127 insertions, 8 deletions
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index 855063a6ad..bbffa93a53 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -31,6 +31,8 @@
#include "qpid/framing/enum.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/broker/AclModule.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/UrlArray.h"
namespace qpid {
namespace broker {
@@ -65,6 +67,50 @@ struct LinkTimerTask : public sys::TimerTask {
sys::Timer& timer;
};
+
+namespace {
+ const std::string FAILOVER_EXCHANGE("amq.failover");
+ const std::string FAILOVER_HEADER_KEY("amq.failover");
+ const framing::ChannelId FAILOVER_CHANNEL(framing::CHANNEL_HIGH_BIT | 1); // reserved for this link
+}
+
+/** LinkExchange is used by the link to subscribe to the remote broker's amq.failover exchange.
+ */
+class LinkExchange : public broker::Exchange
+{
+public:
+ LinkExchange(Link& _link, const std::string& name) : Exchange(name), link(_link) {}
+ ~LinkExchange() {};
+ std::string getType() const { return std::string("qpid.LinkExchange"); }
+
+ // Exchange methods - set up to prevent binding/unbinding etc from clients!
+ bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*) { return false; }
+ bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*) { return false; }
+ bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const) {return false;}
+
+ // Process messages sent from the remote's amq.failover exchange by extracting the failover URLs
+ // and saving them should the Link need to reconnect.
+ void route(broker::Deliverable& msg)
+ {
+ const framing::FieldTable* headers = msg.getMessage().getApplicationHeaders();
+ framing::Array addresses;
+ if (headers && headers->getArray(FAILOVER_HEADER_KEY, addresses)) {
+ // convert the Array of addresses to a single Url container for used with setUrl():
+ std::vector<Url> urlVec;
+ Url urls;
+ urlVec = urlArrayToVector(addresses);
+ for(size_t i = 0; i < urlVec.size(); ++i)
+ urls.insert(urls.end(), urlVec[i].begin(), urlVec[i].end());
+ QPID_LOG(debug, "Remote broker has provided these failover addresses= " << urls);
+ link.setUrl(urls);
+ }
+ }
+
+private:
+ Link& link;
+};
+
+
Link::Link(LinkRegistry* _links,
MessageStore* _store,
const string& _host,
@@ -106,15 +152,22 @@ Link::Link(LinkRegistry* _links,
startConnectionLH();
}
broker->getTimer().add(timerTask);
+
+ exchange.reset(new broker::LinkExchange(*this,
+ "qpid.link." + framing::Uuid(true).str()));
+ broker->getExchanges().registerExchange(exchange);
}
Link::~Link ()
{
- if (state == STATE_OPERATIONAL && connection != 0)
- connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management");
+ if (state == STATE_OPERATIONAL && connection != 0) {
+ closeConnection("closed by management");
+ }
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
+
+ broker->getExchanges().destroy(exchange->getName());
}
void Link::setStateLH (int newState)
@@ -185,6 +238,20 @@ void Link::setUrl(const Url& u) {
reconnectNext = 0;
}
+
+namespace {
+ /** invoked when session used to subscribe to remote's amq.failover exchange detaches */
+ void sessionDetached(Link *link) {
+ // ??? really not sure what the right thing to do here, if anything...
+ // ??? Q: do I need to cancel the subscription and detached the session in the I/O thread (???)
+ // e.g:
+ //peer->getMessage().cancel(args.i_dest);
+ //peer->getSession().detach(name);
+ QPID_LOG(debug, "detached from 'amq.failover' for link: " << link->getName());
+ }
+}
+
+
void Link::opened() {
Mutex::ScopedLock mutex(lock);
if (!connection) return;
@@ -198,6 +265,40 @@ void Link::opened() {
reconnectNext = 0;
QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << url);
}
+
+ //
+ // attempt to subscribe to failover exchange for updates from remote
+ //
+
+ const std::string queueName = "qpid.link." + framing::Uuid(true).str();
+
+ SessionHandler& sessionHandler = connection->getChannel(FAILOVER_CHANNEL);
+ sessionHandler.setDetachedCallback( boost::bind(&sessionDetached, this) );
+ sessionHandler.attachAs(getName());
+
+ framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
+
+ remoteBroker.getQueue().declare(queueName,
+ "", // alt-exchange
+ false, // passive
+ false, // durable
+ true, // exclusive
+ true, // auto-delete
+ FieldTable());
+ remoteBroker.getExchange().bind(queueName,
+ FAILOVER_EXCHANGE,
+ "", // no key
+ FieldTable());
+ remoteBroker.getMessage().subscribe(queueName,
+ exchange->getName(),
+ 1, // implied-accept mode
+ 0, // pre-acquire mode
+ false, // exclusive
+ "", // resume-id
+ 0, // resume-ttl
+ FieldTable());
+ remoteBroker.getMessage().flow(exchange->getName(), 0, 0xFFFFFFFF);
+ remoteBroker.getMessage().flow(exchange->getName(), 1, 0xFFFFFFFF);
}
void Link::closed(int, std::string text)
@@ -241,9 +342,7 @@ void Link::destroy ()
Mutex::ScopedLock mutex(lock);
QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management");
- if (connection)
- connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management");
- connection = 0;
+ closeConnection("closed by management");
setStateLH(STATE_CLOSED);
// Move the bridges to be deleted into a local vector so there is no
@@ -399,7 +498,8 @@ bool Link::hideManagement() const {
uint Link::nextChannel()
{
Mutex::ScopedLock mutex(lock);
-
+ if (channelCounter >= framing::CHANNEL_MAX)
+ channelCounter = 1;
return channelCounter++;
}
@@ -542,4 +642,21 @@ void Link::setPassive(bool passive)
}
}
+
+/** utility to clean up connection resources correctly */
+void Link::closeConnection( const std::string& reason)
+{
+ if (connection != 0) {
+ // cancel our subscription to the failover exchange
+ SessionHandler& sessionHandler = connection->getChannel(FAILOVER_CHANNEL);
+ if (sessionHandler.getSession()) {
+ framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
+ remoteBroker.getMessage().cancel(exchange->getName());
+ remoteBroker.getSession().detach(getName());
+ }
+ connection->close(CLOSE_CODE_CONNECTION_FORCED, reason);
+ connection = 0;
+ }
+}
+
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h
index c7c8209db3..a941aee4f3 100644
--- a/qpid/cpp/src/qpid/broker/Link.h
+++ b/qpid/cpp/src/qpid/broker/Link.h
@@ -47,6 +47,7 @@ namespace broker {
class LinkRegistry;
class Broker;
class Connection;
+class LinkExchange;
class Link : public PersistableConfig, public management::Manageable {
private:
@@ -77,8 +78,8 @@ class Link : public PersistableConfig, public management::Manageable {
uint channelCounter;
Connection* connection;
management::ManagementAgent* agent;
-
boost::intrusive_ptr<sys::TimerTask> timerTask;
+ boost::shared_ptr<broker::LinkExchange> exchange;
static const int STATE_WAITING = 1;
static const int STATE_CONNECTING = 2;
@@ -100,8 +101,9 @@ class Link : public PersistableConfig, public management::Manageable {
void opened(); // Called when connection is open (after create)
void closed(int, std::string); // Called when connection goes away
void reconnectLH(const Address&); //called by LinkRegistry
+ void closeConnection(const std::string& reason);
- friend class LinkRegistry; // to call established, opened, closed
+ friend class LinkRegistry; // to call established, opened, closed
public:
typedef boost::shared_ptr<Link> shared_ptr;