summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Link.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Link.cpp')
-rw-r--r--cpp/src/qpid/broker/Link.cpp273
1 files changed, 199 insertions, 74 deletions
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp
index 05b759f695..cdba18ccf9 100644
--- a/cpp/src/qpid/broker/Link.cpp
+++ b/cpp/src/qpid/broker/Link.cpp
@@ -19,50 +19,61 @@
*
*/
-#include "Link.h"
-#include "LinkRegistry.h"
-#include "Broker.h"
-#include "Connection.h"
-#include "qpid/agent/ManagementAgent.h"
-#include "qpid/management/Link.h"
+#include "qpid/broker/Link.h"
+#include "qpid/broker/LinkRegistry.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Connection.h"
+#include "qmf/org/apache/qpid/broker/EventBrokerLinkUp.h"
+#include "qmf/org/apache/qpid/broker/EventBrokerLinkDown.h"
#include "boost/bind.hpp"
#include "qpid/log/Statement.h"
+#include "qpid/framing/enum.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/broker/AclModule.h"
using namespace qpid::broker;
using qpid::framing::Buffer;
using qpid::framing::FieldTable;
+using qpid::framing::NotAllowedException;
+using qpid::framing::connection::CLOSE_CODE_CONNECTION_FORCED;
using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
using qpid::sys::Mutex;
+using std::stringstream;
+namespace _qmf = qmf::org::apache::qpid::broker;
Link::Link(LinkRegistry* _links,
MessageStore* _store,
string& _host,
uint16_t _port,
- bool _useSsl,
+ string& _transport,
bool _durable,
string& _authMechanism,
string& _username,
string& _password,
Broker* _broker,
- management::Manageable* parent)
- : links(_links), store(_store), host(_host), port(_port), useSsl(_useSsl), durable(_durable),
+ Manageable* parent)
+ : links(_links), store(_store), host(_host), port(_port),
+ transport(_transport),
+ durable(_durable),
authMechanism(_authMechanism), username(_username), password(_password),
persistenceId(0), mgmtObject(0), broker(_broker), state(0),
visitCount(0),
currentInterval(1),
closing(false),
+ updateUrls(false),
channelCounter(1),
- connection(0)
+ connection(0),
+ agent(0)
{
- if (parent != 0)
+ if (parent != 0 && broker != 0)
{
- ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
+ agent = broker->getManagementAgent();
if (agent != 0)
{
- mgmtObject = new management::Link(agent, this, parent, _host, _port, _useSsl, _durable);
+ mgmtObject = new _qmf::Link(agent, this, parent, _host, _port, _transport, _durable);
if (!durable)
agent->addObject(mgmtObject);
}
@@ -73,7 +84,7 @@ Link::Link(LinkRegistry* _links,
Link::~Link ()
{
if (state == STATE_OPERATIONAL && connection != 0)
- connection->close();
+ connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management");
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
@@ -95,6 +106,7 @@ void Link::setStateLH (int newState)
case STATE_OPERATIONAL : mgmtObject->set_state("Operational"); break;
case STATE_FAILED : mgmtObject->set_state("Failed"); break;
case STATE_CLOSED : mgmtObject->set_state("Closed"); break;
+ case STATE_PASSIVE : mgmtObject->set_state("Passive"); break;
}
}
@@ -104,8 +116,9 @@ void Link::startConnectionLH ()
// Set the state before calling connect. It is possible that connect
// will fail synchronously and call Link::closed before returning.
setStateLH(STATE_CONNECTING);
- broker->connect (host, port, useSsl,
+ broker->connect (host, port, transport,
boost::bind (&Link::closed, this, _1, _2));
+ QPID_LOG (debug, "Inter-broker link connecting to " << host << ":" << port);
} catch(std::exception& e) {
setStateLH(STATE_WAITING);
if (mgmtObject != 0)
@@ -115,27 +128,39 @@ void Link::startConnectionLH ()
void Link::established ()
{
- Mutex::ScopedLock mutex(lock);
+ stringstream addr;
+ addr << host << ":" << port;
- QPID_LOG (info, "Inter-broker link established to " << host << ":" << port);
- setStateLH(STATE_OPERATIONAL);
- currentInterval = 1;
- visitCount = 0;
- if (closing)
- destroy();
+ QPID_LOG (info, "Inter-broker link established to " << addr.str());
+ agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
+ {
+ Mutex::ScopedLock mutex(lock);
+ setStateLH(STATE_OPERATIONAL);
+ currentInterval = 1;
+ visitCount = 0;
+ if (closing)
+ destroy();
+ }
}
void Link::closed (int, std::string text)
{
Mutex::ScopedLock mutex(lock);
+ QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text);
connection = 0;
- if (state == STATE_OPERATIONAL)
- QPID_LOG (warning, "Inter-broker link disconnected from " << host << ":" << port);
+ if (state == STATE_OPERATIONAL) {
+ stringstream addr;
+ addr << host << ":" << port;
+ QPID_LOG (warning, "Inter-broker link disconnected from " << addr.str());
+ agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
+ }
- for (Bridges::iterator i = active.begin(); i != active.end(); i++)
+ for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+ (*i)->closed();
created.push_back(*i);
+ }
active.clear();
if (state != STATE_FAILED)
@@ -149,32 +174,46 @@ void Link::closed (int, std::string text)
destroy();
}
-void Link::destroy ()
+void Link::checkClosePermission()
{
Mutex::ScopedLock mutex(lock);
- Bridges toDelete;
+
+ AclModule* acl = getBroker()->getAcl();
+ std::string userID = getUsername() + "@" + getBroker()->getOptions().realm;
+ if (acl && !acl->authorise(userID,acl::ACT_DELETE,acl::OBJ_LINK,"")){
+ throw NotAllowedException("ACL denied delete link request");
+ }
+}
- QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management");
- if (connection)
- connection->close(403, "closed by management");
- setStateLH(STATE_CLOSED);
+void Link::destroy ()
+{
+ Bridges toDelete;
+ {
+ Mutex::ScopedLock mutex(lock);
- // Move the bridges to be deleted into a local vector so there is no
- // corruption of the iterator caused by bridge deletion.
- for (Bridges::iterator i = active.begin(); i != active.end(); i++)
- toDelete.push_back(*i);
- active.clear();
+ QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management");
+ if (connection)
+ connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management");
+
+ setStateLH(STATE_CLOSED);
- for (Bridges::iterator i = created.begin(); i != created.end(); i++)
- toDelete.push_back(*i);
- created.clear();
+ // Move the bridges to be deleted into a local vector so there is no
+ // corruption of the iterator caused by bridge deletion.
+ for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+ (*i)->closed();
+ toDelete.push_back(*i);
+ }
+ active.clear();
- // Now delete all bridges on this link.
+ for (Bridges::iterator i = created.begin(); i != created.end(); i++)
+ toDelete.push_back(*i);
+ created.clear();
+ }
+ // Now delete all bridges on this link (don't hold the lock for this).
for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++)
(*i)->destroy();
toDelete.clear();
-
links->destroy (host, port);
}
@@ -186,21 +225,27 @@ void Link::add(Bridge::shared_ptr bridge)
void Link::cancel(Bridge::shared_ptr bridge)
{
- Mutex::ScopedLock mutex(lock);
-
- for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
- if ((*i).get() == bridge.get()) {
- created.erase(i);
- break;
+ {
+ Mutex::ScopedLock mutex(lock);
+
+ for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
+ if ((*i).get() == bridge.get()) {
+ created.erase(i);
+ break;
+ }
}
- }
- for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
- if ((*i).get() == bridge.get()) {
- bridge->cancel();
- active.erase(i);
- break;
+ for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+ if ((*i).get() == bridge.get()) {
+ cancellations.push_back(bridge);
+ bridge->closed();
+ active.erase(i);
+ break;
+ }
}
}
+ if (!cancellations.empty()) {
+ connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+ }
}
void Link::ioThreadProcessing()
@@ -209,8 +254,9 @@ void Link::ioThreadProcessing()
if (state != STATE_OPERATIONAL)
return;
+ QPID_LOG(debug, "Link::ioThreadProcessing()");
- //process any pending creates
+ //process any pending creates and/or cancellations
if (!created.empty()) {
for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
active.push_back(*i);
@@ -218,34 +264,77 @@ void Link::ioThreadProcessing()
}
created.clear();
}
+ if (!cancellations.empty()) {
+ for (Bridges::iterator i = cancellations.begin(); i != cancellations.end(); ++i) {
+ (*i)->cancel(*connection);
+ }
+ cancellations.clear();
+ }
}
void Link::setConnection(Connection* c)
{
Mutex::ScopedLock mutex(lock);
connection = c;
+ updateUrls = true;
}
void Link::maintenanceVisit ()
{
Mutex::ScopedLock mutex(lock);
+ if (connection && updateUrls) {
+ urls.reset(connection->getKnownHosts());
+ QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << urls);
+ updateUrls = false;
+ }
+
if (state == STATE_WAITING)
{
visitCount++;
if (visitCount >= currentInterval)
{
visitCount = 0;
- currentInterval *= 2;
- if (currentInterval > MAX_INTERVAL)
- currentInterval = MAX_INTERVAL;
- startConnectionLH();
+ //switch host and port to next in url list if possible
+ if (!tryFailover()) {
+ currentInterval *= 2;
+ if (currentInterval > MAX_INTERVAL)
+ currentInterval = MAX_INTERVAL;
+ startConnectionLH();
+ }
}
}
- else if (state == STATE_OPERATIONAL && !created.empty() && connection != 0)
+ else if (state == STATE_OPERATIONAL && (!created.empty() || !cancellations.empty()) && connection != 0)
connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
+void Link::reconnect(const qpid::TcpAddress& a)
+{
+ Mutex::ScopedLock mutex(lock);
+ host = a.host;
+ port = a.port;
+ startConnectionLH();
+ if (mgmtObject != 0) {
+ stringstream errorString;
+ errorString << "Failed over to " << a;
+ mgmtObject->set_lastError(errorString.str());
+ }
+}
+
+bool Link::tryFailover()
+{
+ //TODO: urls only work for TCP at present, update when that has changed
+ TcpAddress next;
+ if (transport == Broker::TCP_TRANSPORT && urls.next(next) &&
+ (next.host != host || next.port != port)) {
+ links->changeAddress(TcpAddress(host, port), next);
+ QPID_LOG(debug, "Link failing over to " << host << ":" << port);
+ return true;
+ } else {
+ return false;
+ }
+}
+
uint Link::nextChannel()
{
Mutex::ScopedLock mutex(lock);
@@ -265,7 +354,7 @@ void Link::notifyConnectionForced(const string text)
void Link::setPersistenceId(uint64_t id) const
{
if (mgmtObject != 0 && persistenceId == 0) {
- ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
+ ManagementAgent* agent = broker->getManagementAgent();
agent->addObject(mgmtObject, id);
}
persistenceId = id;
@@ -280,19 +369,20 @@ Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer)
{
string host;
uint16_t port;
+ string transport;
string authMechanism;
string username;
string password;
buffer.getShortString(host);
port = buffer.getShort();
- bool useSsl(buffer.getOctet());
+ buffer.getShortString(transport);
bool durable(buffer.getOctet());
buffer.getShortString(authMechanism);
buffer.getShortString(username);
buffer.getShortString(password);
- return links.declare(host, port, useSsl, durable, authMechanism, username, password).first;
+ return links.declare(host, port, transport, durable, authMechanism, username, password).first;
}
void Link::encode(Buffer& buffer) const
@@ -300,7 +390,7 @@ void Link::encode(Buffer& buffer) const
buffer.putShortString(string("link"));
buffer.putShortString(host);
buffer.putShort(port);
- buffer.putOctet(useSsl ? 1 : 0);
+ buffer.putShortString(transport);
buffer.putOctet(durable ? 1 : 0);
buffer.putShortString(authMechanism);
buffer.putShortString(username);
@@ -312,7 +402,7 @@ uint32_t Link::encodedSize() const
return host.size() + 1 // short-string (host)
+ 5 // short-string ("link")
+ 2 // port
- + 1 // useSsl
+ + transport.size() + 1 // short-string(transport)
+ 1 // durable
+ authMechanism.size() + 1
+ username.size() + 1
@@ -324,27 +414,48 @@ ManagementObject* Link::GetManagementObject (void) const
return (ManagementObject*) mgmtObject;
}
-Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args)
+Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& text)
{
switch (op)
{
- case management::Link::METHOD_CLOSE :
- closing = true;
- if (state != STATE_CONNECTING)
- destroy();
+ case _qmf::Link::METHOD_CLOSE :
+ checkClosePermission();
+ if (!closing) {
+ closing = true;
+ if (state != STATE_CONNECTING && connection) {
+ //connection can only be closed on the connections own IO processing thread
+ connection->requestIOProcessing(boost::bind(&Link::destroy, this));
+ }
+ }
return Manageable::STATUS_OK;
- case management::Link::METHOD_BRIDGE :
- management::ArgsLinkBridge& iargs = (management::ArgsLinkBridge&) args;
+ case _qmf::Link::METHOD_BRIDGE :
+ _qmf::ArgsLinkBridge& iargs = (_qmf::ArgsLinkBridge&) args;
+ QPID_LOG(debug, "Link::bridge() request received");
// Durable bridges are only valid on durable links
- if (iargs.i_durable && !durable)
- return Manageable::STATUS_INVALID_PARAMETER;
+ if (iargs.i_durable && !durable) {
+ text = "Can't create a durable route on a non-durable link";
+ return Manageable::STATUS_USER;
+ }
+
+ if (iargs.i_dynamic) {
+ Exchange::shared_ptr exchange = getBroker()->getExchanges().get(iargs.i_src);
+ if (exchange.get() == 0) {
+ text = "Exchange not found";
+ return Manageable::STATUS_USER;
+ }
+ if (!exchange->supportsDynamicBinding()) {
+ text = "Exchange type does not support dynamic routing";
+ return Manageable::STATUS_USER;
+ }
+ }
std::pair<Bridge::shared_ptr, bool> result =
links->declare (host, port, iargs.i_durable, iargs.i_src,
iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue,
- iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes);
+ iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes,
+ iargs.i_dynamic, iargs.i_sync);
if (result.second && iargs.i_durable)
store->create(*result.first);
@@ -354,3 +465,17 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args
return Manageable::STATUS_UNKNOWN_METHOD;
}
+
+void Link::setPassive(bool passive)
+{
+ Mutex::ScopedLock mutex(lock);
+ if (passive) {
+ setStateLH(STATE_PASSIVE);
+ } else {
+ if (state == STATE_PASSIVE) {
+ setStateLH(STATE_WAITING);
+ } else {
+ QPID_LOG(warning, "Ignoring attempt to activate non-passive link");
+ }
+ }
+}