From 5683a5220e8bd4f0dc1cede8f6d430c1d670f71b Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 1 Aug 2013 20:26:45 +0000 Subject: QPID-4327: Renamed ConfigurationObserver as BrokerObserver. This class really was intended as a observer for broker-level events which includes configuration but may in future include other non-configuration events such as transactions. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1509420 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/Broker.cpp | 4 +- cpp/src/qpid/broker/Broker.h | 6 +-- cpp/src/qpid/broker/BrokerObserver.h | 65 +++++++++++++++++++++++++ cpp/src/qpid/broker/BrokerObservers.h | 71 +++++++++++++++++++++++++++ cpp/src/qpid/broker/ConfigurationObserver.h | 65 ------------------------- cpp/src/qpid/broker/ConfigurationObservers.h | 72 ---------------------------- cpp/src/qpid/broker/ExchangeRegistry.cpp | 4 +- cpp/src/qpid/broker/QueueRegistry.cpp | 6 +-- cpp/src/qpid/ha/HaBroker.cpp | 2 +- cpp/src/qpid/ha/Primary.cpp | 14 +++--- cpp/src/qpid/ha/Primary.h | 6 +-- cpp/src/qpid/ha/QueueSnapshots.h | 8 ++-- cpp/src/qpid/ha/RemoteBackup.cpp | 4 +- cpp/src/qpid/ha/RemoteBackup.h | 4 +- 14 files changed, 165 insertions(+), 166 deletions(-) create mode 100644 cpp/src/qpid/broker/BrokerObserver.h create mode 100644 cpp/src/qpid/broker/BrokerObservers.h delete mode 100644 cpp/src/qpid/broker/ConfigurationObserver.h delete mode 100644 cpp/src/qpid/broker/ConfigurationObservers.h diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 2e4fe9b896..3e2cd87b77 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -1446,7 +1446,7 @@ void Broker::bind(const std::string& queueName, throw framing::NotFoundException(QPID_MSG("Bind failed. No such exchange: " << exchangeName)); } else { if (queue->bind(exchange, key, arguments)) { - getConfigurationObservers().bind(exchange, queue, key, arguments); + getBrokerObservers().bind(exchange, queue, key, arguments); if (managementAgent.get()) { managementAgent->raiseEvent(_qmf::EventBind(connectionId, userId, exchangeName, queueName, key, ManagementAgent::toMap(arguments))); @@ -1488,7 +1488,7 @@ void Broker::unbind(const std::string& queueName, if (exchange->isDurable() && queue->isDurable()) { store->unbind(*exchange, *queue, key, qpid::framing::FieldTable()); } - getConfigurationObservers().unbind( + getBrokerObservers().unbind( exchange, queue, key, framing::FieldTable()); if (managementAgent.get()) { managementAgent->raiseEvent(_qmf::EventUnbind(connectionId, userId, exchangeName, queueName, key)); diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 52b79a0944..79c74a1c66 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -38,7 +38,7 @@ #include "qpid/broker/System.h" #include "qpid/broker/ConsumerFactory.h" #include "qpid/broker/ConnectionObservers.h" -#include "qpid/broker/ConfigurationObservers.h" +#include "qpid/broker/BrokerObservers.h" #include "qpid/management/Manageable.h" #include "qpid/sys/ConnectionCodec.h" #include "qpid/sys/Mutex.h" @@ -175,7 +175,7 @@ class Broker : public sys::Runnable, public Plugin::Target, AclModule* acl; DataDir dataDir; ConnectionObservers connectionObservers; - ConfigurationObservers configurationObservers; + BrokerObservers brokerObservers; QueueRegistry queues; ExchangeRegistry exchanges; @@ -352,7 +352,7 @@ class Broker : public sys::Runnable, public Plugin::Target, ConsumerFactories& getConsumerFactories() { return consumerFactories; } ConnectionObservers& getConnectionObservers() { return connectionObservers; } - ConfigurationObservers& getConfigurationObservers() { return configurationObservers; } + BrokerObservers& getBrokerObservers() { return brokerObservers; } /** Properties to be set on outgoing link connections */ QPID_BROKER_EXTERN framing::FieldTable getLinkClientProperties() const; diff --git a/cpp/src/qpid/broker/BrokerObserver.h b/cpp/src/qpid/broker/BrokerObserver.h new file mode 100644 index 0000000000..8b503309d2 --- /dev/null +++ b/cpp/src/qpid/broker/BrokerObserver.h @@ -0,0 +1,65 @@ +#ifndef QPID_BROKER_BROKEROBSERVER_H +#define QPID_BROKER_BROKEROBSERVER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include +#include + +namespace qpid { + +namespace framing { +class FieldTable; +} + +namespace broker { +class Queue; +class Exchange; + + +/** + * Observer for changes to configuration (aka wiring) + * + * NOTE: create and destroy functions are called with + * the registry lock held. This is necessary to ensure + * they are called in the correct sequence. + */ +class BrokerObserver +{ + public: + virtual ~BrokerObserver() {} + virtual void queueCreate(const boost::shared_ptr&) {} + virtual void queueDestroy(const boost::shared_ptr&) {} + virtual void exchangeCreate(const boost::shared_ptr&) {} + virtual void exchangeDestroy(const boost::shared_ptr&) {} + virtual void bind(const boost::shared_ptr& , + const boost::shared_ptr& , + const std::string& /*key*/, + const framing::FieldTable& /*args*/) {} + virtual void unbind(const boost::shared_ptr&, + const boost::shared_ptr& , + const std::string& /*key*/, + const framing::FieldTable& /*args*/) {} +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_BROKEROBSERVER_H*/ diff --git a/cpp/src/qpid/broker/BrokerObservers.h b/cpp/src/qpid/broker/BrokerObservers.h new file mode 100644 index 0000000000..9624c25421 --- /dev/null +++ b/cpp/src/qpid/broker/BrokerObservers.h @@ -0,0 +1,71 @@ +#ifndef QPID_BROKER_BROKEROBSERVERS_H +#define QPID_BROKER_BROKEROBSERVERS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "BrokerObserver.h" +#include "Observers.h" +#include "qpid/sys/Mutex.h" + +namespace qpid { +namespace broker { + +/** + * A broker observer that delegates to a collection of broker observers. + * + * THREAD SAFE + */ +class BrokerObservers : public BrokerObserver, + public Observers +{ + public: + void queueCreate(const boost::shared_ptr& q) { + each(boost::bind(&BrokerObserver::queueCreate, _1, q)); + } + void queueDestroy(const boost::shared_ptr& q) { + each(boost::bind(&BrokerObserver::queueDestroy, _1, q)); + } + void exchangeCreate(const boost::shared_ptr& e) { + each(boost::bind(&BrokerObserver::exchangeCreate, _1, e)); + } + void exchangeDestroy(const boost::shared_ptr& e) { + each(boost::bind(&BrokerObserver::exchangeDestroy, _1, e)); + } + void bind(const boost::shared_ptr& exchange, + const boost::shared_ptr& queue, + const std::string& key, + const framing::FieldTable& args) { + each(boost::bind( + &BrokerObserver::bind, _1, exchange, queue, key, args)); + } + void unbind(const boost::shared_ptr& exchange, + const boost::shared_ptr& queue, + const std::string& key, + const framing::FieldTable& args) { + each(boost::bind( + &BrokerObserver::unbind, _1, exchange, queue, key, args)); + } +}; + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_BROKEROBSERVERS_H*/ diff --git a/cpp/src/qpid/broker/ConfigurationObserver.h b/cpp/src/qpid/broker/ConfigurationObserver.h deleted file mode 100644 index 789490e08c..0000000000 --- a/cpp/src/qpid/broker/ConfigurationObserver.h +++ /dev/null @@ -1,65 +0,0 @@ -#ifndef QPID_BROKER_CONFIGURATIONOBSERVER_H -#define QPID_BROKER_CONFIGURATIONOBSERVER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include -#include - -namespace qpid { - -namespace framing { -class FieldTable; -} - -namespace broker { -class Queue; -class Exchange; - - -/** - * Observer for changes to configuration (aka wiring) - * - * NOTE: create and destroy functions are called with - * the registry lock held. This is necessary to ensure - * they are called in the correct sequence. - */ -class ConfigurationObserver -{ - public: - virtual ~ConfigurationObserver() {} - virtual void queueCreate(const boost::shared_ptr&) {} - virtual void queueDestroy(const boost::shared_ptr&) {} - virtual void exchangeCreate(const boost::shared_ptr&) {} - virtual void exchangeDestroy(const boost::shared_ptr&) {} - virtual void bind(const boost::shared_ptr& , - const boost::shared_ptr& , - const std::string& /*key*/, - const framing::FieldTable& /*args*/) {} - virtual void unbind(const boost::shared_ptr&, - const boost::shared_ptr& , - const std::string& /*key*/, - const framing::FieldTable& /*args*/) {} -}; -}} // namespace qpid::broker - -#endif /*!QPID_BROKER_CONFIGURATIONOBSERVER_H*/ diff --git a/cpp/src/qpid/broker/ConfigurationObservers.h b/cpp/src/qpid/broker/ConfigurationObservers.h deleted file mode 100644 index 4c1159747d..0000000000 --- a/cpp/src/qpid/broker/ConfigurationObservers.h +++ /dev/null @@ -1,72 +0,0 @@ -#ifndef QPID_BROKER_CONFIGURATIONOBSERVERS_H -#define QPID_BROKER_CONFIGURATIONOBSERVERS_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ConfigurationObserver.h" -#include "Observers.h" -#include "qpid/sys/Mutex.h" - -namespace qpid { -namespace broker { - -/** - * A configuration observer that delegates to a collection of - * configuration observers. - * - * THREAD SAFE - */ -class ConfigurationObservers : public ConfigurationObserver, - public Observers -{ - public: - void queueCreate(const boost::shared_ptr& q) { - each(boost::bind(&ConfigurationObserver::queueCreate, _1, q)); - } - void queueDestroy(const boost::shared_ptr& q) { - each(boost::bind(&ConfigurationObserver::queueDestroy, _1, q)); - } - void exchangeCreate(const boost::shared_ptr& e) { - each(boost::bind(&ConfigurationObserver::exchangeCreate, _1, e)); - } - void exchangeDestroy(const boost::shared_ptr& e) { - each(boost::bind(&ConfigurationObserver::exchangeDestroy, _1, e)); - } - void bind(const boost::shared_ptr& exchange, - const boost::shared_ptr& queue, - const std::string& key, - const framing::FieldTable& args) { - each(boost::bind( - &ConfigurationObserver::bind, _1, exchange, queue, key, args)); - } - void unbind(const boost::shared_ptr& exchange, - const boost::shared_ptr& queue, - const std::string& key, - const framing::FieldTable& args) { - each(boost::bind( - &ConfigurationObserver::unbind, _1, exchange, queue, key, args)); - } -}; - -}} // namespace qpid::broker - -#endif /*!QPID_BROKER_CONFIGURATIONOBSERVERS_H*/ diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp index 645918d526..9eeffadb90 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -81,7 +81,7 @@ pair ExchangeRegistry::declare( result = std::pair(exchange, true); if (alternate) exchange->setAlternate(alternate); // Call exchangeCreate inside the lock to ensure correct ordering. - if (broker) broker->getConfigurationObservers().exchangeCreate(exchange); + if (broker) broker->getBrokerObservers().exchangeCreate(exchange); } else { result = std::pair(i->second, false); } @@ -118,7 +118,7 @@ void ExchangeRegistry::destroy( if (broker) { // Call exchangeDestroy and raiseEvent inside the lock to ensure // correct ordering. - broker->getConfigurationObservers().exchangeDestroy(i->second); + broker->getBrokerObservers().exchangeDestroy(i->second); if (broker->getManagementAgent()) broker->getManagementAgent()->raiseEvent( _qmf::EventExchangeDelete(connectionId, userId, name)); diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp index 13f9bbe23c..631718e7ae 100644 --- a/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/cpp/src/qpid/broker/QueueRegistry.cpp @@ -59,8 +59,8 @@ QueueRegistry::declare(const string& name, const QueueSettings& settings, QueueMap::iterator i = queues.find(name); if (i == queues.end()) { Queue::shared_ptr queue = create(name, settings); - // Allow ConfigurationObserver to modify settings before storing the message. - if (getBroker()) getBroker()->getConfigurationObservers().queueCreate(queue); + // Allow BrokerObserver to modify settings before storing the message. + if (getBroker()) getBroker()->getBrokerObservers().queueCreate(queue); //Move this to factory also? if (alternate) queue->setAlternateExchange(alternate);//need to do this *before* create @@ -100,7 +100,7 @@ void QueueRegistry::destroy( // NOTE: queueDestroy and raiseEvent must be called with the // lock held in order to ensure events are generated // in the correct order. - getBroker()->getConfigurationObservers().queueDestroy(q); + getBroker()->getBrokerObservers().queueDestroy(q); if (getBroker()->getManagementAgent()) getBroker()->getManagementAgent()->raiseEvent( _qmf::EventQueueDelete(connectionId, userId, name)); diff --git a/cpp/src/qpid/ha/HaBroker.cpp b/cpp/src/qpid/ha/HaBroker.cpp index 7efeaad5b2..e8c7c5c7d8 100644 --- a/cpp/src/qpid/ha/HaBroker.cpp +++ b/cpp/src/qpid/ha/HaBroker.cpp @@ -82,7 +82,7 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) broker.getExchanges().registerExchange(failoverExchange); } // QueueSnapshots are needed for standalone replication as well as cluster. - broker.getConfigurationObservers().add(queueSnapshots); + broker.getBrokerObservers().add(queueSnapshots); } namespace { diff --git a/cpp/src/qpid/ha/Primary.cpp b/cpp/src/qpid/ha/Primary.cpp index beabab4e32..38e75e7818 100644 --- a/cpp/src/qpid/ha/Primary.cpp +++ b/cpp/src/qpid/ha/Primary.cpp @@ -29,7 +29,7 @@ #include "QueueReplicator.h" #include "qpid/assert.h" #include "qpid/broker/Broker.h" -#include "qpid/broker/ConfigurationObserver.h" +#include "qpid/broker/BrokerObserver.h" #include "qpid/broker/Connection.h" #include "qpid/broker/Queue.h" #include "qpid/framing/FieldTable.h" @@ -59,10 +59,10 @@ class PrimaryConnectionObserver : public broker::ConnectionObserver Primary& primary; }; -class PrimaryConfigurationObserver : public broker::ConfigurationObserver +class PrimaryBrokerObserver : public broker::BrokerObserver { public: - PrimaryConfigurationObserver(Primary& p) : primary(p) {} + PrimaryBrokerObserver(Primary& p) : primary(p) {} void queueCreate(const Primary::QueuePtr& q) { primary.queueCreate(q); } void queueDestroy(const Primary::QueuePtr& q) { primary.queueDestroy(q); } void exchangeCreate(const Primary::ExchangePtr& q) { primary.exchangeCreate(q); } @@ -98,7 +98,7 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : QPID_LOG(notice, logPrefix << "Promoted to primary. No expected backups."); } else { - // NOTE: RemoteBackups must be created before we set the ConfigurationObserver + // NOTE: RemoteBackups must be created before we set the BrokerObserver // or ConnectionObserver so that there is no client activity while // the QueueGuards are created. QPID_LOG(notice, logPrefix << "Promoted to primary. Expected backups: " << expect); @@ -113,8 +113,8 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : timerTask = new ExpectedBackupTimerTask(*this, deadline); hb.getBroker().getTimer().add(timerTask); } - configurationObserver.reset(new PrimaryConfigurationObserver(*this)); - haBroker.getBroker().getConfigurationObservers().add(configurationObserver); + brokerObserver.reset(new PrimaryBrokerObserver(*this)); + haBroker.getBroker().getBrokerObservers().add(brokerObserver); checkReady(); // Outside lock // Allow client connections @@ -124,7 +124,7 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : Primary::~Primary() { if (timerTask) timerTask->cancel(); - haBroker.getBroker().getConfigurationObservers().remove(configurationObserver); + haBroker.getBroker().getBrokerObservers().remove(brokerObserver); haBroker.getObserver()->reset(); } diff --git a/cpp/src/qpid/ha/Primary.h b/cpp/src/qpid/ha/Primary.h index 3e1ec48ce6..7bacfbc1fe 100644 --- a/cpp/src/qpid/ha/Primary.h +++ b/cpp/src/qpid/ha/Primary.h @@ -38,7 +38,7 @@ namespace broker { class Queue; class Connection; class ConnectionObserver; -class ConfigurationObserver; +class BrokerObserver; } namespace sys { @@ -79,7 +79,7 @@ class Primary : public Role void readyReplica(const ReplicatingSubscription&); void removeReplica(const std::string& q); - // Called via ConfigurationObserver + // Called via BrokerObserver void queueCreate(const QueuePtr&); void queueDestroy(const QueuePtr&); void exchangeCreate(const ExchangePtr&); @@ -126,7 +126,7 @@ class Primary : public Role */ BackupMap backups; boost::shared_ptr connectionObserver; - boost::shared_ptr configurationObserver; + boost::shared_ptr brokerObserver; boost::intrusive_ptr timerTask; static Primary* instance; }; diff --git a/cpp/src/qpid/ha/QueueSnapshots.h b/cpp/src/qpid/ha/QueueSnapshots.h index 258c406954..e28bcd95a8 100644 --- a/cpp/src/qpid/ha/QueueSnapshots.h +++ b/cpp/src/qpid/ha/QueueSnapshots.h @@ -27,7 +27,7 @@ #include "hash.h" #include "qpid/assert.h" -#include "qpid/broker/ConfigurationObserver.h" +#include "qpid/broker/BrokerObserver.h" #include "qpid/broker/Queue.h" #include "qpid/sys/Mutex.h" @@ -37,10 +37,10 @@ namespace qpid { namespace ha { /** - * ConfigurationObserver that maintains a map of the QueueSnapshot for each queue. + * BrokerObserver that maintains a map of the QueueSnapshot for each queue. * THREAD SAFE. */ -class QueueSnapshots : public broker::ConfigurationObserver +class QueueSnapshots : public broker::BrokerObserver { public: boost::shared_ptr get(const boost::shared_ptr& q) const { @@ -49,7 +49,7 @@ class QueueSnapshots : public broker::ConfigurationObserver return i != snapshots.end() ? i->second : boost::shared_ptr(); } - // ConfigurationObserver overrides. + // BrokerObserver overrides. void queueCreate(const boost::shared_ptr& q) { sys::Mutex::ScopedLock l(lock); boost::shared_ptr observer(new QueueSnapshot); diff --git a/cpp/src/qpid/ha/RemoteBackup.cpp b/cpp/src/qpid/ha/RemoteBackup.cpp index c37d44fa08..e55d415972 100644 --- a/cpp/src/qpid/ha/RemoteBackup.cpp +++ b/cpp/src/qpid/ha/RemoteBackup.cpp @@ -108,13 +108,13 @@ void RemoteBackup::ready(const QueuePtr& q) { QPID_LOG(debug, logPrefix << "Caught up on queue: " << q->getName() ); } -// Called via ConfigurationObserver::queueCreate and from catchupQueue +// Called via BrokerObserver::queueCreate and from catchupQueue void RemoteBackup::queueCreate(const QueuePtr& q) { if (replicationTest.getLevel(*q) == ALL) guards[q].reset(new QueueGuard(*q, brokerInfo)); } -// Called via ConfigurationObserver +// Called via BrokerObserver void RemoteBackup::queueDestroy(const QueuePtr& q) { catchupQueues.erase(q); GuardMap::iterator i = guards.find(q); diff --git a/cpp/src/qpid/ha/RemoteBackup.h b/cpp/src/qpid/ha/RemoteBackup.h index e2c5032820..1d0a129c82 100644 --- a/cpp/src/qpid/ha/RemoteBackup.h +++ b/cpp/src/qpid/ha/RemoteBackup.h @@ -71,10 +71,10 @@ class RemoteBackup */ void ready(const QueuePtr& queue); - /** Called via ConfigurationObserver */ + /** Called via BrokerObserver */ void queueCreate(const QueuePtr&); - /** Called via ConfigurationObserver. Note: may set isReady() */ + /** Called via BrokerObserver. Note: may set isReady() */ void queueDestroy(const QueuePtr&); /**@return true when all catch-up queues for this backup are ready. */ -- cgit v1.2.1