diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 28 |
1 files changed, 17 insertions, 11 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 3c1d23c842..34aaf3d341 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -131,6 +131,7 @@ #include "qpid/cluster/UpdateExchange.h" #include "qpid/cluster/ClusterTimer.h" #include "qpid/cluster/CredentialsExchange.h" +#include "qpid/cluster/UpdateClient.h" #include "qpid/assert.h" #include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h" @@ -202,7 +203,7 @@ namespace arg=client::arg; * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 1207877; +const uint32_t Cluster::CLUSTER_VERSION = 1332342; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; @@ -269,7 +270,6 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : "Error delivering frames", poller), failoverExchange(new FailoverExchange(broker.GetVhostObject(), &broker)), - updateDataExchange(new UpdateDataExchange(*this)), credentialsExchange(new CredentialsExchange(*this)), quorum(boost::bind(&Cluster::leave, this)), decoder(boost::bind(&Cluster::deliverFrame, this, _1)), @@ -295,15 +295,6 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : // Failover exchange provides membership updates to clients. broker.getExchanges().registerExchange(failoverExchange); - // Update exchange is used during updates to replicate messages - // without modifying delivery-properties.exchange. - broker.getExchanges().registerExchange( - boost::shared_ptr<broker::Exchange>(new UpdateExchange(this))); - - // Update-data exchange is used for passing data that may be too large - // for single control frame. - broker.getExchanges().registerExchange(updateDataExchange); - // CredentialsExchange is used to authenticate new cluster members broker.getExchanges().registerExchange(credentialsExchange); @@ -680,6 +671,17 @@ void Cluster::initMapCompleted(Lock& l) { authenticate(); broker.setRecovery(false); // Ditch my current store. broker.setClusterUpdatee(true); + + // Update exchange is used during updates to replicate messages + // without modifying delivery-properties.exchange. + broker.getExchanges().registerExchange( + boost::shared_ptr<broker::Exchange>(new UpdateExchange(this))); + + // Update-data exchange is used during update for passing data that + // may be too large for single control frame. + updateDataExchange.reset(new UpdateDataExchange(*this)); + broker.getExchanges().registerExchange(updateDataExchange); + if (mAgent) mAgent->suppress(true); // Suppress mgmt output during update. state = JOINER; mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); @@ -999,6 +1001,10 @@ void Cluster::checkUpdateIn(Lock& l) { boost::ref(broker.getExchanges()))); enableClusterSafe(); // Enable cluster-safe assertions deliverEventQueue.start(); + // FIXME aconway 2012-04-04: unregister/delete Update[Data]Exchange + updateDataExchange.reset(); + broker.getExchanges().destroy(UpdateDataExchange::EXCHANGE_NAME); + broker.getExchanges().destroy(UpdateClient::UPDATE); } else if (updateRetracted) { // Update was retracted, request another update updateRetracted = false; |