summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp28
1 files changed, 17 insertions, 11 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 3c1d23c842..34aaf3d341 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -131,6 +131,7 @@
#include "qpid/cluster/UpdateExchange.h"
#include "qpid/cluster/ClusterTimer.h"
#include "qpid/cluster/CredentialsExchange.h"
+#include "qpid/cluster/UpdateClient.h"
#include "qpid/assert.h"
#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h"
@@ -202,7 +203,7 @@ namespace arg=client::arg;
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 1207877;
+const uint32_t Cluster::CLUSTER_VERSION = 1332342;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
@@ -269,7 +270,6 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
"Error delivering frames",
poller),
failoverExchange(new FailoverExchange(broker.GetVhostObject(), &broker)),
- updateDataExchange(new UpdateDataExchange(*this)),
credentialsExchange(new CredentialsExchange(*this)),
quorum(boost::bind(&Cluster::leave, this)),
decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
@@ -295,15 +295,6 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
// Failover exchange provides membership updates to clients.
broker.getExchanges().registerExchange(failoverExchange);
- // Update exchange is used during updates to replicate messages
- // without modifying delivery-properties.exchange.
- broker.getExchanges().registerExchange(
- boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
-
- // Update-data exchange is used for passing data that may be too large
- // for single control frame.
- broker.getExchanges().registerExchange(updateDataExchange);
-
// CredentialsExchange is used to authenticate new cluster members
broker.getExchanges().registerExchange(credentialsExchange);
@@ -680,6 +671,17 @@ void Cluster::initMapCompleted(Lock& l) {
authenticate();
broker.setRecovery(false); // Ditch my current store.
broker.setClusterUpdatee(true);
+
+ // Update exchange is used during updates to replicate messages
+ // without modifying delivery-properties.exchange.
+ broker.getExchanges().registerExchange(
+ boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
+
+ // Update-data exchange is used during update for passing data that
+ // may be too large for single control frame.
+ updateDataExchange.reset(new UpdateDataExchange(*this));
+ broker.getExchanges().registerExchange(updateDataExchange);
+
if (mAgent) mAgent->suppress(true); // Suppress mgmt output during update.
state = JOINER;
mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
@@ -999,6 +1001,10 @@ void Cluster::checkUpdateIn(Lock& l) {
boost::ref(broker.getExchanges())));
enableClusterSafe(); // Enable cluster-safe assertions
deliverEventQueue.start();
+ // FIXME aconway 2012-04-04: unregister/delete Update[Data]Exchange
+ updateDataExchange.reset();
+ broker.getExchanges().destroy(UpdateDataExchange::EXCHANGE_NAME);
+ broker.getExchanges().destroy(UpdateClient::UPDATE);
}
else if (updateRetracted) { // Update was retracted, request another update
updateRetracted = false;