diff options
author | Alan Conway <aconway@apache.org> | 2009-03-05 13:28:14 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-03-05 13:28:14 +0000 |
commit | aff798ccbc13b41696c661fe07bd3934deb18625 (patch) | |
tree | ba59dbefe36754a60386f8632cb07f05e89a61ea /cpp/src/qpid/cluster/UpdateClient.cpp | |
parent | 23053617b74f1bbb6c8ae3c60fe24953701a4583 (diff) | |
download | qpid-python-aff798ccbc13b41696c661fe07bd3934deb18625.tar.gz |
cluster: fix delivery-property.exchange-name set on updated messages.
Logging improvements, useful for debugging:
- qpid/SessionState.cpp: show frame bodies with command IDs.
- assign cluster-wide id number to each Event.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@750456 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/UpdateClient.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 12 |
1 files changed, 5 insertions, 7 deletions
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index 9cba377122..7e349905ab 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -86,14 +86,14 @@ void send(client::AsyncSession& s, const AMQBody& body) { // TODO aconway 2008-09-24: optimization: update connections/sessions in parallel. UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url, - broker::Broker& broker, const ClusterMap& m, uint64_t frameId_, + broker::Broker& broker, const ClusterMap& m, uint64_t eventId_, uint64_t frameId_, const Cluster::Connections& cons, const boost::function<void()>& ok, const boost::function<void(const std::exception&)>& fail, const client::ConnectionSettings& cs ) : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m), - frameId(frameId_), connections(cons), + eventId(eventId_), frameId(frameId_), connections(cons), connection(catchUpConnection()), shadowConnection(catchUpConnection()), done(ok), failed(fail), connectionSettings(cs) { @@ -104,7 +104,7 @@ UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, con UpdateClient::~UpdateClient() {} // Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges. -const std::string UpdateClient::UPDATE("qpid.qpid-update"); +const std::string UpdateClient::UPDATE("qpid.cluster-update"); void UpdateClient::run() { try { @@ -120,9 +120,6 @@ void UpdateClient::update() { QPID_LOG(debug, updaterId << " updating state to " << updateeId << " at " << updateeUrl); Broker& b = updaterBroker; b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1)); - - // Update exchange is used to route messages to the proper queue without modifying routing key. - session.exchangeDeclare(arg::exchange=UPDATE, arg::type="fanout", arg::autoDelete=true); b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueue, this, _1)); // Update queue is used to transfer acquired messages that are no longer on their original queue. session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true); @@ -133,6 +130,7 @@ void UpdateClient::update() { ClusterConnectionMembershipBody membership; map.toMethodBody(membership); + membership.setEventId(eventId); membership.setFrameId(frameId); AMQFrame frame(membership); client::ConnectionAccess::getImpl(connection)->handle(frame); @@ -274,7 +272,7 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { SequenceNumber received = ss->receiverGetReceived().command; if (inProgress) --received; - + // Reset command-sequence state. proxy.sessionState( ss->senderGetReplayPoint().command, |