diff options
| author | Alan Conway <aconway@apache.org> | 2012-07-09 21:35:38 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-07-09 21:35:38 +0000 |
| commit | eb28ed5f12bee78a992d60daf52b0ce2943dbfda (patch) | |
| tree | 835b541a034ae36b214c3ff564c87cf913268707 /cpp/src/qpid | |
| parent | b9ee2ad68b3112d95e92d764bf5c8ecb4efbe2ac (diff) | |
| download | qpid-python-eb28ed5f12bee78a992d60daf52b0ce2943dbfda.tar.gz | |
QPID-4118: HA does not work with authentication and authorization.
- Updated test framework to use credentials
- Updated BrokerReplicator to use HA identity to create configuration
- Updated documentation with a HA security section.
- Updated qpid-ha to take --sasl-mechanism
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1359412 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/broker/ConnectionHandler.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Link.h | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/Backup.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/BrokerReplicator.cpp | 40 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/BrokerReplicator.h | 3 |
5 files changed, 36 insertions, 20 deletions
diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp index a22972ddd2..4af4692f78 100644 --- a/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -172,7 +172,9 @@ void ConnectionHandler::Handler::startOk(const ConnectionStartOkBody& body) AclModule* acl = connection.getBroker().getAcl(); FieldTable properties; if (acl && !acl->authorise(connection.getUserId(),acl::ACT_CREATE,acl::OBJ_LINK,"")){ - proxy.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED,"ACL denied creating a federation link"); + proxy.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED, + QPID_MSG("ACL denied " << connection.getUserId() + << " creating a federation link")); return; } QPID_LOG(info, "Connection is a federation link"); diff --git a/cpp/src/qpid/broker/Link.h b/cpp/src/qpid/broker/Link.h index c92b368b0e..f0cb90e73b 100644 --- a/cpp/src/qpid/broker/Link.h +++ b/cpp/src/qpid/broker/Link.h @@ -196,6 +196,11 @@ class Link : public PersistableConfig, public management::Manageable { static std::string createName(const std::string& transport, const std::string& host, uint16_t port); + + /** The current connction for this link. Note returns 0 if the link is not + * presently connected. + */ + Connection* getConnection() { return connection; } }; } } diff --git a/cpp/src/qpid/ha/Backup.cpp b/cpp/src/qpid/ha/Backup.cpp index bbdacb8aa9..8ffe411c91 100644 --- a/cpp/src/qpid/ha/Backup.cpp +++ b/cpp/src/qpid/ha/Backup.cpp @@ -72,7 +72,7 @@ Url Backup::removeSelf(const Url& brokers) const { void Backup::initialize(const Url& brokers) { if (brokers.empty()) throw Url::Invalid("HA broker URL is empty"); - QPID_LOG(info, logPrefix << "Initialized, broker URL: " << brokers); + QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers); Url url = removeSelf(brokers); string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; types::Uuid uuid(true); @@ -82,7 +82,7 @@ void Backup::initialize(const Url& brokers) { url[0].host, url[0].port, protocol, false, // durable settings.mechanism, settings.username, settings.password, - false); // amq.failover + true); // amq.failover { sys::Mutex::ScopedLock l(lock); link = result.first; diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp index 1fabff6a09..3eb30a9ec9 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -22,6 +22,7 @@ #include "HaBroker.h" #include "QueueReplicator.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/Connection.h" #include "qpid/broker/Queue.h" #include "qpid/broker/Link.h" #include "qpid/framing/FieldTable.h" @@ -90,9 +91,7 @@ const string KEY("key"); const string NAME("name"); const string QNAME("qName"); const string QUEUE("queue"); -const string RHOST("rhost"); const string TYPE("type"); -const string USER("user"); const string HA_BROKER("habroker"); const string AGENT_EVENT_BROKER("agent.ind.event.org_apache_qpid_broker.#"); @@ -202,6 +201,14 @@ BrokerReplicator::~BrokerReplicator() { } // This is called in the connection IO thread when the bridge is started. void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { + // Use the credentials of the outgoing Link connection for creating queues, + // exchanges etc. We know link->getConnection() is non-zero because we are + // being called in the connections thread context. + // + assert(link->getConnection()); + userId = link->getConnection()->getUserId(); + remoteHost = link->getConnection()->getUrl(); + qpid::Address primary; link->getRemoteAddress(primary); string queueName = bridge.getQueueName(); @@ -320,10 +327,11 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { values[DURABLE].asBool(), autoDel, 0, // no owner regardless of exclusivity on primary + // FIXME aconway 2012-07-06: handle alternate exchange values[ALTEX].asString(), args, - values[USER].asString(), - values[RHOST].asString()); + userId, + remoteHost); assert(result.second); // Should be true since we destroyed existing queue above startQueueReplicator(result.first); } @@ -345,7 +353,7 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) { if (queue && replicationTest.replicateLevel(queue->getSettings())) { QPID_LOG(debug, logPrefix << "Queue delete event: " << name); stopQueueReplicator(name); - broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString()); + broker.deleteQueue(name, userId, remoteHost); } } @@ -368,10 +376,11 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { name, values[EXTYPE].asString(), values[DURABLE].asBool(), + // FIXME aconway 2012-07-06: handle alternate exchanges values[ALTEX].asString(), args, - values[USER].asString(), - values[RHOST].asString()); + userId, + remoteHost); assert(result.second); } } @@ -385,10 +394,7 @@ void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) { QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: " << name); } else { QPID_LOG(debug, logPrefix << "Exchange delete event:" << name); - broker.deleteExchange( - name, - values[USER].asString(), - values[RHOST].asString()); + broker.deleteExchange(name, userId, remoteHost); } } @@ -458,8 +464,9 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) { 0 /*i.e. no owner regardless of exclusivity on master*/, ""/*TODO: need to include alternate-exchange*/, args, - ""/*TODO: who is the user?*/, - ""/*TODO: what should we use as connection id?*/); + userId, + remoteHost); + // It is normal for the queue to already exist if we are failing over. if (result.second) startQueueReplicator(result.first); @@ -478,10 +485,11 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) { name, values[TYPE].asString(), values[DURABLE].asBool(), - ""/*TODO: need to include alternate-exchange*/, + "", // FIXME aconway 2012-07-09: need to include alternate-exchange args, - ""/*TODO: who is the user?*/, - ""/*TODO: what should we use as connection id?*/).second; + userId, + remoteHost + ).second; QPID_LOG_IF(debug, !created, logPrefix << "Exchange already exists: " << name); } diff --git a/cpp/src/qpid/ha/BrokerReplicator.h b/cpp/src/qpid/ha/BrokerReplicator.h index 35ffdd0cd8..e2ca8f9e14 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.h +++ b/cpp/src/qpid/ha/BrokerReplicator.h @@ -54,7 +54,7 @@ class QueueReplicator; * exchanges and bindings to replicate the primary. * It also creates QueueReplicators for newly replicated queues. * - * THREAD SAFE: Has no mutable state. + * THREAD UNSAFE: Only called in Link connection thread, no need for locking. * */ class BrokerReplicator : public broker::Exchange, @@ -96,6 +96,7 @@ class BrokerReplicator : public broker::Exchange, void stopQueueReplicator(const std::string& name); std::string logPrefix; + std::string userId, remoteHost; ReplicationTest replicationTest; HaBroker& haBroker; broker::Broker& broker; |
