summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/UpdateClient.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-03-05 13:28:14 +0000
committerAlan Conway <aconway@apache.org>2009-03-05 13:28:14 +0000
commitaff798ccbc13b41696c661fe07bd3934deb18625 (patch)
treeba59dbefe36754a60386f8632cb07f05e89a61ea /cpp/src/qpid/cluster/UpdateClient.cpp
parent23053617b74f1bbb6c8ae3c60fe24953701a4583 (diff)
downloadqpid-python-aff798ccbc13b41696c661fe07bd3934deb18625.tar.gz
cluster: fix delivery-property.exchange-name set on updated messages.
Logging improvements, useful for debugging: - qpid/SessionState.cpp: show frame bodies with command IDs. - assign cluster-wide id number to each Event. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@750456 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/UpdateClient.cpp')
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp12
1 files changed, 5 insertions, 7 deletions
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index 9cba377122..7e349905ab 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -86,14 +86,14 @@ void send(client::AsyncSession& s, const AMQBody& body) {
// TODO aconway 2008-09-24: optimization: update connections/sessions in parallel.
UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url,
- broker::Broker& broker, const ClusterMap& m, uint64_t frameId_,
+ broker::Broker& broker, const ClusterMap& m, uint64_t eventId_, uint64_t frameId_,
const Cluster::Connections& cons,
const boost::function<void()>& ok,
const boost::function<void(const std::exception&)>& fail,
const client::ConnectionSettings& cs
)
: updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m),
- frameId(frameId_), connections(cons),
+ eventId(eventId_), frameId(frameId_), connections(cons),
connection(catchUpConnection()), shadowConnection(catchUpConnection()),
done(ok), failed(fail), connectionSettings(cs)
{
@@ -104,7 +104,7 @@ UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, con
UpdateClient::~UpdateClient() {}
// Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges.
-const std::string UpdateClient::UPDATE("qpid.qpid-update");
+const std::string UpdateClient::UPDATE("qpid.cluster-update");
void UpdateClient::run() {
try {
@@ -120,9 +120,6 @@ void UpdateClient::update() {
QPID_LOG(debug, updaterId << " updating state to " << updateeId << " at " << updateeUrl);
Broker& b = updaterBroker;
b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1));
-
- // Update exchange is used to route messages to the proper queue without modifying routing key.
- session.exchangeDeclare(arg::exchange=UPDATE, arg::type="fanout", arg::autoDelete=true);
b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueue, this, _1));
// Update queue is used to transfer acquired messages that are no longer on their original queue.
session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true);
@@ -133,6 +130,7 @@ void UpdateClient::update() {
ClusterConnectionMembershipBody membership;
map.toMethodBody(membership);
+ membership.setEventId(eventId);
membership.setFrameId(frameId);
AMQFrame frame(membership);
client::ConnectionAccess::getImpl(connection)->handle(frame);
@@ -274,7 +272,7 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
SequenceNumber received = ss->receiverGetReceived().command;
if (inProgress)
--received;
-
+
// Reset command-sequence state.
proxy.sessionState(
ss->senderGetReplayPoint().command,