summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/Link.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Link.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp388
1 files changed, 317 insertions, 71 deletions
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index 56a90e7fb7..1be388b989 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -31,6 +31,8 @@
#include "qpid/framing/enum.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/broker/AclModule.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/UrlArray.h"
namespace qpid {
namespace broker {
@@ -48,6 +50,13 @@ using std::stringstream;
using std::string;
namespace _qmf = ::qmf::org::apache::qpid::broker;
+
+namespace {
+ const std::string FAILOVER_EXCHANGE("amq.failover");
+ const std::string FAILOVER_HEADER_KEY("amq.failover");
+}
+
+
struct LinkTimerTask : public sys::TimerTask {
LinkTimerTask(Link& l, sys::Timer& t)
: TimerTask(int64_t(l.getBroker()->getOptions().linkMaintenanceInterval*
@@ -65,19 +74,73 @@ struct LinkTimerTask : public sys::TimerTask {
sys::Timer& timer;
};
-Link::Link(LinkRegistry* _links,
- MessageStore* _store,
+
+
+/** LinkExchange is used by the link to subscribe to the remote broker's amq.failover exchange.
+ */
+class LinkExchange : public broker::Exchange
+{
+public:
+ LinkExchange(const std::string& name) : Exchange(name), link(0) {}
+ ~LinkExchange() {};
+ std::string getType() const { return Link::exchangeTypeName; }
+
+ // Exchange methods - set up to prevent binding/unbinding etc from clients!
+ bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*) { return false; }
+ bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*) { return false; }
+ bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const) {return false;}
+
+ // Process messages sent from the remote's amq.failover exchange by extracting the failover URLs
+ // and saving them should the Link need to reconnect.
+ void route(broker::Deliverable& msg)
+ {
+ if (!link) return;
+ const framing::FieldTable* headers = msg.getMessage().getApplicationHeaders();
+ framing::Array addresses;
+ if (headers && headers->getArray(FAILOVER_HEADER_KEY, addresses)) {
+ // convert the Array of addresses to a single Url container for used with setUrl():
+ std::vector<Url> urlVec;
+ Url urls;
+ urlVec = urlArrayToVector(addresses);
+ for(size_t i = 0; i < urlVec.size(); ++i)
+ urls.insert(urls.end(), urlVec[i].begin(), urlVec[i].end());
+ QPID_LOG(debug, "Remote broker has provided these failover addresses= " << urls);
+ link->setUrl(urls);
+ }
+ }
+
+ void setLink(Link *_link)
+ {
+ assert(!link);
+ link = _link;
+ }
+
+private:
+ Link *link;
+};
+
+
+boost::shared_ptr<Exchange> Link::linkExchangeFactory( const std::string& _name )
+{
+ return Exchange::shared_ptr(new LinkExchange(_name));
+}
+
+Link::Link(const string& _name,
+ LinkRegistry* _links,
const string& _host,
uint16_t _port,
const string& _transport,
+ DestroyedListener l,
bool _durable,
const string& _authMechanism,
const string& _username,
const string& _password,
Broker* _broker,
- Manageable* parent)
- : links(_links), store(_store), host(_host), port(_port),
- transport(_transport),
+ Manageable* parent,
+ bool failover_)
+ : name(_name), links(_links),
+ configuredTransport(_transport), configuredHost(_host), configuredPort(_port),
+ host(_host), port(_port), transport(_transport),
durable(_durable),
authMechanism(_authMechanism), username(_username), password(_password),
persistenceId(0), mgmtObject(0), broker(_broker), state(0),
@@ -88,14 +151,20 @@ Link::Link(LinkRegistry* _links,
channelCounter(1),
connection(0),
agent(0),
- timerTask(new LinkTimerTask(*this, broker->getTimer()))
+ listener(l),
+ timerTask(new LinkTimerTask(*this, broker->getTimer())),
+ failover(failover_),
+ failoverChannel(0)
{
if (parent != 0 && broker != 0)
{
agent = broker->getManagementAgent();
if (agent != 0)
{
- mgmtObject = new _qmf::Link(agent, this, parent, _host, _port, _transport, _durable);
+ mgmtObject = new _qmf::Link(agent, this, parent, name, durable);
+ mgmtObject->set_host(host);
+ mgmtObject->set_port(port);
+ mgmtObject->set_transport(transport);
agent->addObject(mgmtObject, 0, durable);
}
}
@@ -106,15 +175,29 @@ Link::Link(LinkRegistry* _links,
startConnectionLH();
}
broker->getTimer().add(timerTask);
+
+ if (failover) {
+ stringstream exchangeName;
+ exchangeName << "qpid.link." << name;
+ std::pair<Exchange::shared_ptr, bool> rc =
+ broker->getExchanges().declare(exchangeName.str(), exchangeTypeName);
+ failoverExchange = boost::static_pointer_cast<LinkExchange>(rc.first);
+ assert(failoverExchange);
+ failoverExchange->setLink(this);
+ }
}
Link::~Link ()
{
- if (state == STATE_OPERATIONAL && connection != 0)
- connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management");
+ if (state == STATE_OPERATIONAL && connection != 0) {
+ closeConnection("closed by management");
+ }
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
+
+ if (failover)
+ broker->getExchanges().destroy(failoverExchange->getName());
}
void Link::setStateLH (int newState)
@@ -172,6 +255,7 @@ void Link::established(Connection* c)
currentInterval = 1;
visitCount = 0;
connection = c;
+
if (closing)
destroy();
else // Process any IO tasks bridges added before established.
@@ -180,14 +264,34 @@ void Link::established(Connection* c)
void Link::setUrl(const Url& u) {
+ QPID_LOG(info, "Setting remote broker failover addresses for link '" << getName() << "' to these urls: " << u);
Mutex::ScopedLock mutex(lock);
url = u;
reconnectNext = 0;
}
+
+namespace {
+class DetachedCallback : public SessionHandler::ErrorListener {
+ public:
+ DetachedCallback(const Link& link) : name(link.getName()) {}
+ void connectionException(framing::connection::CloseCode, const std::string&) {}
+ void channelException(framing::session::DetachCode, const std::string&) {}
+ void executionException(framing::execution::ErrorCode, const std::string&) {}
+ void detach() {}
+ private:
+ const std::string name;
+};
+}
+
void Link::opened() {
Mutex::ScopedLock mutex(lock);
if (!connection) return;
+
+ if (!hideManagement() && connection->GetManagementObject()) {
+ mgmtObject->set_connectionRef(connection->GetManagementObject()->getObjectId());
+ }
+
// Get default URL from known-hosts if not already set
if (url.empty()) {
const std::vector<Url>& known = connection->getKnownHosts();
@@ -198,6 +302,45 @@ void Link::opened() {
reconnectNext = 0;
QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << url);
}
+
+ if (failover) {
+ //
+ // attempt to subscribe to failover exchange for updates from remote
+ //
+
+ const std::string queueName = "qpid.link." + framing::Uuid(true).str();
+ failoverChannel = nextChannel();
+
+ SessionHandler& sessionHandler = connection->getChannel(failoverChannel);
+ sessionHandler.setErrorListener(
+ boost::shared_ptr<SessionHandler::ErrorListener>(new DetachedCallback(*this)));
+ failoverSession = queueName;
+ sessionHandler.attachAs(failoverSession);
+
+ framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
+
+ remoteBroker.getQueue().declare(queueName,
+ "", // alt-exchange
+ false, // passive
+ false, // durable
+ true, // exclusive
+ true, // auto-delete
+ FieldTable());
+ remoteBroker.getExchange().bind(queueName,
+ FAILOVER_EXCHANGE,
+ "", // no key
+ FieldTable());
+ remoteBroker.getMessage().subscribe(queueName,
+ failoverExchange->getName(),
+ 1, // implied-accept mode
+ 0, // pre-acquire mode
+ false, // exclusive
+ "", // resume-id
+ 0, // resume-ttl
+ FieldTable());
+ remoteBroker.getMessage().flow(failoverExchange->getName(), 0, 0xFFFFFFFF);
+ remoteBroker.getMessage().flow(failoverExchange->getName(), 1, 0xFFFFFFFF);
+ }
}
void Link::closed(int, std::string text)
@@ -206,11 +349,14 @@ void Link::closed(int, std::string text)
QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text);
connection = 0;
- if (state == STATE_OPERATIONAL) {
- stringstream addr;
- addr << host << ":" << port;
- if (!hideManagement() && agent)
+
+ if (!hideManagement()) {
+ mgmtObject->set_connectionRef(qpid::management::ObjectId());
+ if (state == STATE_OPERATIONAL && agent) {
+ stringstream addr;
+ addr << host << ":" << port;
agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
+ }
}
for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
@@ -225,22 +371,19 @@ void Link::closed(int, std::string text)
if (!hideManagement())
mgmtObject->set_lastError (text);
}
-
- if (closing)
- destroy();
}
-// Called in connection IO thread.
+// Called in connection IO thread, cleans up the connection before destroying Link
void Link::destroy ()
{
Bridges toDelete;
+
+ timerTask->cancel(); // call prior to locking so maintenance visit can finish
{
Mutex::ScopedLock mutex(lock);
- QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management");
- if (connection)
- connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management");
- connection = 0;
+ QPID_LOG (info, "Inter-broker link to " << configuredHost << ":" << configuredPort << " removed by management");
+ closeConnection("closed by management");
setStateLH(STATE_CLOSED);
// Move the bridges to be deleted into a local vector so there is no
@@ -254,14 +397,13 @@ void Link::destroy ()
for (Bridges::iterator i = created.begin(); i != created.end(); i++)
toDelete.push_back(*i);
created.clear();
-
- timerTask->cancel();
}
+
// Now delete all bridges on this link (don't hold the lock for this).
for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++)
- (*i)->destroy();
+ (*i)->close();
toDelete.clear();
- links->destroy (host, port);
+ listener(this); // notify LinkRegistry that this Link has been destroyed
}
void Link::add(Bridge::shared_ptr bridge)
@@ -303,13 +445,13 @@ void Link::ioThreadProcessing()
{
Mutex::ScopedLock mutex(lock);
- if (state != STATE_OPERATIONAL)
+ if (state != STATE_OPERATIONAL || closing)
return;
// check for bridge session errors and recover
if (!active.empty()) {
Bridges::iterator removed = std::remove_if(
- active.begin(), active.end(), !boost::bind(&Bridge::isSessionReady, _1));
+ active.begin(), active.end(), boost::bind(&Bridge::isDetached, _1));
for (Bridges::iterator i = removed; i != active.end(); ++i) {
Bridge::shared_ptr bridge = *i;
bridge->closed();
@@ -340,7 +482,7 @@ void Link::ioThreadProcessing()
void Link::maintenanceVisit ()
{
Mutex::ScopedLock mutex(lock);
-
+ if (closing) return;
if (state == STATE_WAITING)
{
visitCount++;
@@ -358,19 +500,23 @@ void Link::maintenanceVisit ()
}
else if (state == STATE_OPERATIONAL && (!active.empty() || !created.empty() || !cancellations.empty()) && connection != 0)
connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
- }
+}
void Link::reconnectLH(const Address& a)
{
host = a.host;
port = a.port;
transport = a.protocol;
- startConnectionLH();
+
if (!hideManagement()) {
stringstream errorString;
- errorString << "Failed over to " << a;
+ errorString << "Failing over to " << a;
mgmtObject->set_lastError(errorString.str());
+ mgmtObject->set_host(host);
+ mgmtObject->set_port(port);
+ mgmtObject->set_transport(transport);
}
+ startConnectionLH();
}
bool Link::tryFailoverLH() {
@@ -379,15 +525,14 @@ bool Link::tryFailoverLH() {
if (url.empty()) return false;
Address next = url[reconnectNext++];
if (next.host != host || next.port != port || next.protocol != transport) {
- links->changeAddress(Address(transport, host, port), next);
- QPID_LOG(debug, "Inter-broker link failing over to " << next.host << ":" << next.port);
+ QPID_LOG(notice, "Inter-broker link '" << name << "' failing over to " << next);
reconnectLH(next);
return true;
}
return false;
}
-// Management updates for a linke are inconsistent in a cluster, so they are
+// Management updates for a link are inconsistent in a cluster, so they are
// suppressed.
bool Link::hideManagement() const {
return !mgmtObject || ( broker && broker->isInCluster());
@@ -396,7 +541,8 @@ bool Link::hideManagement() const {
uint Link::nextChannel()
{
Mutex::ScopedLock mutex(lock);
-
+ if (channelCounter >= framing::CHANNEL_MAX)
+ channelCounter = 1;
return channelCounter++;
}
@@ -415,18 +561,34 @@ void Link::setPersistenceId(uint64_t id) const
const string& Link::getName() const
{
- return host;
+ return name;
+}
+
+const std::string Link::ENCODED_IDENTIFIER("link.v2");
+const std::string Link::ENCODED_IDENTIFIER_V1("link");
+
+bool Link::isEncodedLink(const std::string& key)
+{
+ return key == ENCODED_IDENTIFIER || key == ENCODED_IDENTIFIER_V1;
}
Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer)
{
+ string kind;
+ buffer.getShortString(kind);
+
string host;
uint16_t port;
string transport;
string authMechanism;
string username;
string password;
+ string name;
+ if (kind == ENCODED_IDENTIFIER) {
+ // newer version provides a link name.
+ buffer.getShortString(name);
+ }
buffer.getShortString(host);
port = buffer.getShort();
buffer.getShortString(transport);
@@ -435,15 +597,24 @@ Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer)
buffer.getShortString(username);
buffer.getShortString(password);
- return links.declare(host, port, transport, durable, authMechanism, username, password).first;
+ if (kind == ENCODED_IDENTIFIER_V1) {
+ /** previous versions identified the Link by host:port, there was no name
+ * assigned. So create a name for the new Link.
+ */
+ name = createName(transport, host, port);
+ }
+
+ return links.declare(name, host, port, transport, durable, authMechanism,
+ username, password).first;
}
void Link::encode(Buffer& buffer) const
{
- buffer.putShortString(string("link"));
- buffer.putShortString(host);
- buffer.putShort(port);
- buffer.putShortString(transport);
+ buffer.putShortString(ENCODED_IDENTIFIER);
+ buffer.putShortString(name);
+ buffer.putShortString(configuredHost);
+ buffer.putShort(configuredPort);
+ buffer.putShortString(configuredTransport);
buffer.putOctet(durable ? 1 : 0);
buffer.putShortString(authMechanism);
buffer.putShortString(username);
@@ -452,10 +623,11 @@ void Link::encode(Buffer& buffer) const
uint32_t Link::encodedSize() const
{
- return host.size() + 1 // short-string (host)
- + 5 // short-string ("link")
+ return ENCODED_IDENTIFIER.size() + 1 // +1 byte length
+ + name.size() + 1
+ + configuredHost.size() + 1 // short-string (host)
+ 2 // port
- + transport.size() + 1 // short-string(transport)
+ + configuredTransport.size() + 1 // short-string(transport)
+ 1 // durable
+ authMechanism.size() + 1
+ username.size() + 1
@@ -468,6 +640,7 @@ ManagementObject* Link::GetManagementObject (void) const
}
void Link::close() {
+ QPID_LOG(debug, "Link::close(), link=" << name );
Mutex::ScopedLock mutex(lock);
if (!closing) {
closing = true;
@@ -488,36 +661,31 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& te
return Manageable::STATUS_OK;
case _qmf::Link::METHOD_BRIDGE :
+ /* TBD: deprecate this interface in favor of the Broker::create() method. The
+ * Broker::create() method allows the user to assign a name to the bridge.
+ */
+ QPID_LOG(info, "The Link::bridge() method will be removed in a future release of QPID."
+ " Please use the Broker::create() method with type='bridge' instead.");
_qmf::ArgsLinkBridge& iargs = (_qmf::ArgsLinkBridge&) args;
- QPID_LOG(debug, "Link::bridge() request received");
-
- // Durable bridges are only valid on durable links
- if (iargs.i_durable && !durable) {
- text = "Can't create a durable route on a non-durable link";
- return Manageable::STATUS_USER;
- }
-
- if (iargs.i_dynamic) {
- Exchange::shared_ptr exchange = getBroker()->getExchanges().get(iargs.i_src);
- if (exchange.get() == 0) {
- text = "Exchange not found";
- return Manageable::STATUS_USER;
- }
- if (!exchange->supportsDynamicBinding()) {
- text = "Exchange type does not support dynamic routing";
- return Manageable::STATUS_USER;
+ QPID_LOG(debug, "Link::bridge() request received; src=" << iargs.i_src <<
+ "; dest=" << iargs.i_dest << "; key=" << iargs.i_key);
+
+ // Does a bridge already exist that has the src/dest/key? If so, re-use the
+ // existing bridge - this behavior is backward compatible with previous releases.
+ Bridge::shared_ptr bridge = links->getBridge(*this, iargs.i_src, iargs.i_dest, iargs.i_key);
+ if (!bridge) {
+ // need to create a new bridge on this link.
+ std::pair<Bridge::shared_ptr, bool> rc =
+ links->declare( Bridge::createName(name, iargs.i_src, iargs.i_dest, iargs.i_key),
+ *this, iargs.i_durable,
+ iargs.i_src, iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue,
+ iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes,
+ iargs.i_dynamic, iargs.i_sync);
+ if (!rc.first) {
+ text = "invalid parameters";
+ return Manageable::STATUS_PARAMETER_INVALID;
}
}
-
- std::pair<Bridge::shared_ptr, bool> result =
- links->declare (host, port, iargs.i_durable, iargs.i_src,
- iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue,
- iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes,
- iargs.i_dynamic, iargs.i_sync);
-
- if (result.second && iargs.i_durable)
- store->create(*result.first);
-
return Manageable::STATUS_OK;
}
@@ -539,4 +707,82 @@ void Link::setPassive(bool passive)
}
}
+
+/** utility to clean up connection resources correctly */
+void Link::closeConnection( const std::string& reason)
+{
+ if (connection != 0) {
+ // cancel our subscription to the failover exchange
+ if (failover) {
+ SessionHandler& sessionHandler = connection->getChannel(failoverChannel);
+ if (sessionHandler.getSession()) {
+ framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
+ remoteBroker.getMessage().cancel(failoverExchange->getName());
+ remoteBroker.getSession().detach(failoverSession);
+ }
+ }
+ connection->close(CLOSE_CODE_CONNECTION_FORCED, reason);
+ connection = 0;
+ }
+}
+
+/** returns the current remote's address, and connection state */
+bool Link::getRemoteAddress(qpid::Address& addr) const
+{
+ addr.protocol = transport;
+ addr.host = host;
+ addr.port = port;
+
+ return state == STATE_OPERATIONAL;
+}
+
+
+// FieldTable keys for internal state data
+namespace {
+ const std::string FAILOVER_ADDRESSES("failover-addresses");
+ const std::string FAILOVER_INDEX("failover-index");
+}
+
+void Link::getState(framing::FieldTable& state) const
+{
+ state.clear();
+ Mutex::ScopedLock mutex(lock);
+ if (!url.empty()) {
+ state.setString(FAILOVER_ADDRESSES, url.str());
+ state.setInt(FAILOVER_INDEX, reconnectNext);
+ }
+}
+
+void Link::setState(const framing::FieldTable& state)
+{
+ Mutex::ScopedLock mutex(lock);
+ if (state.isSet(FAILOVER_ADDRESSES)) {
+ Url failovers(state.getAsString(FAILOVER_ADDRESSES));
+ setUrl(failovers);
+ }
+ if (state.isSet(FAILOVER_INDEX)) {
+ reconnectNext = state.getAsInt(FAILOVER_INDEX);
+ }
+}
+
+std::string Link::createName(const std::string& transport,
+ const std::string& host,
+ uint16_t port)
+{
+ stringstream linkName;
+ linkName << QPID_NAME_PREFIX << transport << std::string(":")
+ << host << std::string(":") << port;
+ return linkName.str();
+}
+
+
+bool Link::pendingConnection(const std::string& _host, uint16_t _port) const
+{
+ Mutex::ScopedLock mutex(lock);
+ return (isConnecting() && _port == port && _host == host);
+}
+
+
+const std::string Link::exchangeTypeName("qpid.LinkExchange");
+
}} // namespace qpid::broker