diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/ha/BrokerReplicator.h | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/ha/BrokerReplicator.h')
-rw-r--r-- | cpp/src/qpid/ha/BrokerReplicator.h | 52 |
1 files changed, 45 insertions, 7 deletions
diff --git a/cpp/src/qpid/ha/BrokerReplicator.h b/cpp/src/qpid/ha/BrokerReplicator.h index f6983e8719..a42d607263 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.h +++ b/cpp/src/qpid/ha/BrokerReplicator.h @@ -31,6 +31,7 @@ #include "qpid/management/ManagementObject.h" #include <boost/shared_ptr.hpp> #include <boost/enable_shared_from_this.hpp> +#include <set> namespace qpid { @@ -40,6 +41,9 @@ class Broker; class Link; class Bridge; class SessionHandler; +class Connection; +class QueueRegistry; +class ExchangeRegistry; } namespace framing { @@ -58,7 +62,9 @@ class QueueReplicator; * exchanges and bindings to replicate the primary. * It also creates QueueReplicators for newly replicated queues. * - * THREAD UNSAFE: Only called in Link connection thread, no need for locking. + * THREAD UNSAFE: + * All members except shutdown are only called in the Link's connection thread context. + * shutdown() does not use any mutable state. * */ class BrokerReplicator : public broker::Exchange, @@ -76,6 +82,7 @@ class BrokerReplicator : public broker::Exchange, bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*, qpid::broker::AsyncStore* const store); void route(broker::Deliverable&); bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const); + void shutdown(); // DataSource interface - used to write persistence data to async store uint64_t getSize(); @@ -83,8 +90,20 @@ class BrokerReplicator : public broker::Exchange, private: typedef boost::shared_ptr<QueueReplicator> QueueReplicatorPtr; + typedef std::pair<boost::shared_ptr<broker::Queue>, bool> CreateQueueResult; + typedef std::pair<boost::shared_ptr<broker::Exchange>, bool> CreateExchangeResult; - void initializeBridge(broker::Bridge&, broker::SessionHandler&); + typedef std::pair<std::string,std::string> EventKey; + typedef void (BrokerReplicator::*DispatchFunction)(types::Variant::Map&); + typedef std::map<EventKey, DispatchFunction> EventDispatchMap; + + typedef std::map<std::string, QueueReplicatorPtr> QueueReplicatorMap; + + class UpdateTracker; + class ErrorListener; + class ConnectionObserver; + + void connected(broker::Bridge&, broker::SessionHandler&); void doEventQueueDeclare(types::Variant::Map& values); void doEventQueueDelete(types::Variant::Map& values); @@ -93,6 +112,7 @@ class BrokerReplicator : public broker::Exchange, void doEventBind(types::Variant::Map&); void doEventUnbind(types::Variant::Map&); void doEventMembersUpdate(types::Variant::Map&); + void doEventSubscribe(types::Variant::Map&); void doResponseQueue(types::Variant::Map& values); void doResponseExchange(types::Variant::Map& values); @@ -100,32 +120,50 @@ class BrokerReplicator : public broker::Exchange, void doResponseHaBroker(types::Variant::Map& values); QueueReplicatorPtr findQueueReplicator(const std::string& qname); - void startQueueReplicator(const boost::shared_ptr<broker::Queue>&); - void stopQueueReplicator(const std::string& name); + QueueReplicatorPtr startQueueReplicator(const boost::shared_ptr<broker::Queue>&); - boost::shared_ptr<broker::Queue> createQueue( + QueueReplicatorPtr replicateQueue( const std::string& name, bool durable, bool autodelete, const qpid::framing::FieldTable& arguments, const std::string& alternateExchange); - boost::shared_ptr<broker::Exchange> createExchange( + CreateExchangeResult createExchange( const std::string& name, const std::string& type, bool durable, const qpid::framing::FieldTable& args, const std::string& alternateExchange); + bool deactivate(boost::shared_ptr<broker::Exchange> ex, bool destroy); + void deleteQueue(const std::string& name, bool purge=true); + void deleteExchange(const std::string& name); + + void autoDeleteCheck(boost::shared_ptr<broker::Exchange>); + + void disconnected(); + + void setMembership(const types::Variant::List&); // Set membership from list. + std::string logPrefix; - std::string userId, remoteHost; ReplicationTest replicationTest; + std::string userId, remoteHost; HaBroker& haBroker; broker::Broker& broker; + broker::ExchangeRegistry& exchanges; + broker::QueueRegistry& queues; boost::shared_ptr<broker::Link> link; bool initialized; AlternateExchangeSetter alternates; qpid::Address primary; + typedef std::set<std::string> StringSet; + StringSet replicatedExchanges; // exchanges that have been replicated. + broker::Connection* connection; + EventDispatchMap dispatch; + std::auto_ptr<UpdateTracker> queueTracker; + std::auto_ptr<UpdateTracker> exchangeTracker; + boost::shared_ptr<ConnectionObserver> connectionObserver; }; }} // namespace qpid::broker |