diff options
Diffstat (limited to 'cpp/src/qpid/cluster/UpdateClient.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 12 |
1 files changed, 5 insertions, 7 deletions
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index 9cba377122..7e349905ab 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -86,14 +86,14 @@ void send(client::AsyncSession& s, const AMQBody& body) { // TODO aconway 2008-09-24: optimization: update connections/sessions in parallel. UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url, - broker::Broker& broker, const ClusterMap& m, uint64_t frameId_, + broker::Broker& broker, const ClusterMap& m, uint64_t eventId_, uint64_t frameId_, const Cluster::Connections& cons, const boost::function<void()>& ok, const boost::function<void(const std::exception&)>& fail, const client::ConnectionSettings& cs ) : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m), - frameId(frameId_), connections(cons), + eventId(eventId_), frameId(frameId_), connections(cons), connection(catchUpConnection()), shadowConnection(catchUpConnection()), done(ok), failed(fail), connectionSettings(cs) { @@ -104,7 +104,7 @@ UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, con UpdateClient::~UpdateClient() {} // Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges. -const std::string UpdateClient::UPDATE("qpid.qpid-update"); +const std::string UpdateClient::UPDATE("qpid.cluster-update"); void UpdateClient::run() { try { @@ -120,9 +120,6 @@ void UpdateClient::update() { QPID_LOG(debug, updaterId << " updating state to " << updateeId << " at " << updateeUrl); Broker& b = updaterBroker; b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1)); - - // Update exchange is used to route messages to the proper queue without modifying routing key. - session.exchangeDeclare(arg::exchange=UPDATE, arg::type="fanout", arg::autoDelete=true); b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueue, this, _1)); // Update queue is used to transfer acquired messages that are no longer on their original queue. session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true); @@ -133,6 +130,7 @@ void UpdateClient::update() { ClusterConnectionMembershipBody membership; map.toMethodBody(membership); + membership.setEventId(eventId); membership.setFrameId(frameId); AMQFrame frame(membership); client::ConnectionAccess::getImpl(connection)->handle(frame); @@ -274,7 +272,7 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { SequenceNumber received = ss->receiverGetReceived().command; if (inProgress) --received; - + // Reset command-sequence state. proxy.sessionState( ss->senderGetReplayPoint().command, |