summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/BrokerReplicator.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/ha/BrokerReplicator.h')
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.h52
1 files changed, 45 insertions, 7 deletions
diff --git a/cpp/src/qpid/ha/BrokerReplicator.h b/cpp/src/qpid/ha/BrokerReplicator.h
index f6983e8719..a42d607263 100644
--- a/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/cpp/src/qpid/ha/BrokerReplicator.h
@@ -31,6 +31,7 @@
#include "qpid/management/ManagementObject.h"
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
+#include <set>
namespace qpid {
@@ -40,6 +41,9 @@ class Broker;
class Link;
class Bridge;
class SessionHandler;
+class Connection;
+class QueueRegistry;
+class ExchangeRegistry;
}
namespace framing {
@@ -58,7 +62,9 @@ class QueueReplicator;
* exchanges and bindings to replicate the primary.
* It also creates QueueReplicators for newly replicated queues.
*
- * THREAD UNSAFE: Only called in Link connection thread, no need for locking.
+ * THREAD UNSAFE:
+ * All members except shutdown are only called in the Link's connection thread context.
+ * shutdown() does not use any mutable state.
*
*/
class BrokerReplicator : public broker::Exchange,
@@ -76,6 +82,7 @@ class BrokerReplicator : public broker::Exchange,
bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*, qpid::broker::AsyncStore* const store);
void route(broker::Deliverable&);
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
+ void shutdown();
// DataSource interface - used to write persistence data to async store
uint64_t getSize();
@@ -83,8 +90,20 @@ class BrokerReplicator : public broker::Exchange,
private:
typedef boost::shared_ptr<QueueReplicator> QueueReplicatorPtr;
+ typedef std::pair<boost::shared_ptr<broker::Queue>, bool> CreateQueueResult;
+ typedef std::pair<boost::shared_ptr<broker::Exchange>, bool> CreateExchangeResult;
- void initializeBridge(broker::Bridge&, broker::SessionHandler&);
+ typedef std::pair<std::string,std::string> EventKey;
+ typedef void (BrokerReplicator::*DispatchFunction)(types::Variant::Map&);
+ typedef std::map<EventKey, DispatchFunction> EventDispatchMap;
+
+ typedef std::map<std::string, QueueReplicatorPtr> QueueReplicatorMap;
+
+ class UpdateTracker;
+ class ErrorListener;
+ class ConnectionObserver;
+
+ void connected(broker::Bridge&, broker::SessionHandler&);
void doEventQueueDeclare(types::Variant::Map& values);
void doEventQueueDelete(types::Variant::Map& values);
@@ -93,6 +112,7 @@ class BrokerReplicator : public broker::Exchange,
void doEventBind(types::Variant::Map&);
void doEventUnbind(types::Variant::Map&);
void doEventMembersUpdate(types::Variant::Map&);
+ void doEventSubscribe(types::Variant::Map&);
void doResponseQueue(types::Variant::Map& values);
void doResponseExchange(types::Variant::Map& values);
@@ -100,32 +120,50 @@ class BrokerReplicator : public broker::Exchange,
void doResponseHaBroker(types::Variant::Map& values);
QueueReplicatorPtr findQueueReplicator(const std::string& qname);
- void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
- void stopQueueReplicator(const std::string& name);
+ QueueReplicatorPtr startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
- boost::shared_ptr<broker::Queue> createQueue(
+ QueueReplicatorPtr replicateQueue(
const std::string& name,
bool durable,
bool autodelete,
const qpid::framing::FieldTable& arguments,
const std::string& alternateExchange);
- boost::shared_ptr<broker::Exchange> createExchange(
+ CreateExchangeResult createExchange(
const std::string& name,
const std::string& type,
bool durable,
const qpid::framing::FieldTable& args,
const std::string& alternateExchange);
+ bool deactivate(boost::shared_ptr<broker::Exchange> ex, bool destroy);
+ void deleteQueue(const std::string& name, bool purge=true);
+ void deleteExchange(const std::string& name);
+
+ void autoDeleteCheck(boost::shared_ptr<broker::Exchange>);
+
+ void disconnected();
+
+ void setMembership(const types::Variant::List&); // Set membership from list.
+
std::string logPrefix;
- std::string userId, remoteHost;
ReplicationTest replicationTest;
+ std::string userId, remoteHost;
HaBroker& haBroker;
broker::Broker& broker;
+ broker::ExchangeRegistry& exchanges;
+ broker::QueueRegistry& queues;
boost::shared_ptr<broker::Link> link;
bool initialized;
AlternateExchangeSetter alternates;
qpid::Address primary;
+ typedef std::set<std::string> StringSet;
+ StringSet replicatedExchanges; // exchanges that have been replicated.
+ broker::Connection* connection;
+ EventDispatchMap dispatch;
+ std::auto_ptr<UpdateTracker> queueTracker;
+ std::auto_ptr<UpdateTracker> exchangeTracker;
+ boost::shared_ptr<ConnectionObserver> connectionObserver;
};
}} // namespace qpid::broker