summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-07-09 21:35:38 +0000
committerAlan Conway <aconway@apache.org>2012-07-09 21:35:38 +0000
commiteb28ed5f12bee78a992d60daf52b0ce2943dbfda (patch)
tree835b541a034ae36b214c3ff564c87cf913268707 /cpp/src/qpid
parentb9ee2ad68b3112d95e92d764bf5c8ecb4efbe2ac (diff)
downloadqpid-python-eb28ed5f12bee78a992d60daf52b0ce2943dbfda.tar.gz
QPID-4118: HA does not work with authentication and authorization.
- Updated test framework to use credentials - Updated BrokerReplicator to use HA identity to create configuration - Updated documentation with a HA security section. - Updated qpid-ha to take --sasl-mechanism git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1359412 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/broker/ConnectionHandler.cpp4
-rw-r--r--cpp/src/qpid/broker/Link.h5
-rw-r--r--cpp/src/qpid/ha/Backup.cpp4
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.cpp40
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.h3
5 files changed, 36 insertions, 20 deletions
diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp
index a22972ddd2..4af4692f78 100644
--- a/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -172,7 +172,9 @@ void ConnectionHandler::Handler::startOk(const ConnectionStartOkBody& body)
AclModule* acl = connection.getBroker().getAcl();
FieldTable properties;
if (acl && !acl->authorise(connection.getUserId(),acl::ACT_CREATE,acl::OBJ_LINK,"")){
- proxy.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED,"ACL denied creating a federation link");
+ proxy.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED,
+ QPID_MSG("ACL denied " << connection.getUserId()
+ << " creating a federation link"));
return;
}
QPID_LOG(info, "Connection is a federation link");
diff --git a/cpp/src/qpid/broker/Link.h b/cpp/src/qpid/broker/Link.h
index c92b368b0e..f0cb90e73b 100644
--- a/cpp/src/qpid/broker/Link.h
+++ b/cpp/src/qpid/broker/Link.h
@@ -196,6 +196,11 @@ class Link : public PersistableConfig, public management::Manageable {
static std::string createName(const std::string& transport,
const std::string& host,
uint16_t port);
+
+ /** The current connction for this link. Note returns 0 if the link is not
+ * presently connected.
+ */
+ Connection* getConnection() { return connection; }
};
}
}
diff --git a/cpp/src/qpid/ha/Backup.cpp b/cpp/src/qpid/ha/Backup.cpp
index bbdacb8aa9..8ffe411c91 100644
--- a/cpp/src/qpid/ha/Backup.cpp
+++ b/cpp/src/qpid/ha/Backup.cpp
@@ -72,7 +72,7 @@ Url Backup::removeSelf(const Url& brokers) const {
void Backup::initialize(const Url& brokers) {
if (brokers.empty()) throw Url::Invalid("HA broker URL is empty");
- QPID_LOG(info, logPrefix << "Initialized, broker URL: " << brokers);
+ QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers);
Url url = removeSelf(brokers);
string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
types::Uuid uuid(true);
@@ -82,7 +82,7 @@ void Backup::initialize(const Url& brokers) {
url[0].host, url[0].port, protocol,
false, // durable
settings.mechanism, settings.username, settings.password,
- false); // amq.failover
+ true); // amq.failover
{
sys::Mutex::ScopedLock l(lock);
link = result.first;
diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp
index 1fabff6a09..3eb30a9ec9 100644
--- a/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -22,6 +22,7 @@
#include "HaBroker.h"
#include "QueueReplicator.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/Connection.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/Link.h"
#include "qpid/framing/FieldTable.h"
@@ -90,9 +91,7 @@ const string KEY("key");
const string NAME("name");
const string QNAME("qName");
const string QUEUE("queue");
-const string RHOST("rhost");
const string TYPE("type");
-const string USER("user");
const string HA_BROKER("habroker");
const string AGENT_EVENT_BROKER("agent.ind.event.org_apache_qpid_broker.#");
@@ -202,6 +201,14 @@ BrokerReplicator::~BrokerReplicator() { }
// This is called in the connection IO thread when the bridge is started.
void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
+ // Use the credentials of the outgoing Link connection for creating queues,
+ // exchanges etc. We know link->getConnection() is non-zero because we are
+ // being called in the connections thread context.
+ //
+ assert(link->getConnection());
+ userId = link->getConnection()->getUserId();
+ remoteHost = link->getConnection()->getUrl();
+
qpid::Address primary;
link->getRemoteAddress(primary);
string queueName = bridge.getQueueName();
@@ -320,10 +327,11 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
values[DURABLE].asBool(),
autoDel,
0, // no owner regardless of exclusivity on primary
+ // FIXME aconway 2012-07-06: handle alternate exchange
values[ALTEX].asString(),
args,
- values[USER].asString(),
- values[RHOST].asString());
+ userId,
+ remoteHost);
assert(result.second); // Should be true since we destroyed existing queue above
startQueueReplicator(result.first);
}
@@ -345,7 +353,7 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) {
if (queue && replicationTest.replicateLevel(queue->getSettings())) {
QPID_LOG(debug, logPrefix << "Queue delete event: " << name);
stopQueueReplicator(name);
- broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString());
+ broker.deleteQueue(name, userId, remoteHost);
}
}
@@ -368,10 +376,11 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
name,
values[EXTYPE].asString(),
values[DURABLE].asBool(),
+ // FIXME aconway 2012-07-06: handle alternate exchanges
values[ALTEX].asString(),
args,
- values[USER].asString(),
- values[RHOST].asString());
+ userId,
+ remoteHost);
assert(result.second);
}
}
@@ -385,10 +394,7 @@ void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) {
QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: " << name);
} else {
QPID_LOG(debug, logPrefix << "Exchange delete event:" << name);
- broker.deleteExchange(
- name,
- values[USER].asString(),
- values[RHOST].asString());
+ broker.deleteExchange(name, userId, remoteHost);
}
}
@@ -458,8 +464,9 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) {
0 /*i.e. no owner regardless of exclusivity on master*/,
""/*TODO: need to include alternate-exchange*/,
args,
- ""/*TODO: who is the user?*/,
- ""/*TODO: what should we use as connection id?*/);
+ userId,
+ remoteHost);
+
// It is normal for the queue to already exist if we are failing over.
if (result.second)
startQueueReplicator(result.first);
@@ -478,10 +485,11 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) {
name,
values[TYPE].asString(),
values[DURABLE].asBool(),
- ""/*TODO: need to include alternate-exchange*/,
+ "", // FIXME aconway 2012-07-09: need to include alternate-exchange
args,
- ""/*TODO: who is the user?*/,
- ""/*TODO: what should we use as connection id?*/).second;
+ userId,
+ remoteHost
+ ).second;
QPID_LOG_IF(debug, !created, logPrefix << "Exchange already exists: " << name);
}
diff --git a/cpp/src/qpid/ha/BrokerReplicator.h b/cpp/src/qpid/ha/BrokerReplicator.h
index 35ffdd0cd8..e2ca8f9e14 100644
--- a/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/cpp/src/qpid/ha/BrokerReplicator.h
@@ -54,7 +54,7 @@ class QueueReplicator;
* exchanges and bindings to replicate the primary.
* It also creates QueueReplicators for newly replicated queues.
*
- * THREAD SAFE: Has no mutable state.
+ * THREAD UNSAFE: Only called in Link connection thread, no need for locking.
*
*/
class BrokerReplicator : public broker::Exchange,
@@ -96,6 +96,7 @@ class BrokerReplicator : public broker::Exchange,
void stopQueueReplicator(const std::string& name);
std::string logPrefix;
+ std::string userId, remoteHost;
ReplicationTest replicationTest;
HaBroker& haBroker;
broker::Broker& broker;