summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-07-01 13:47:04 +0000
committerAlan Conway <aconway@apache.org>2009-07-01 13:47:04 +0000
commit6f734e4aa938dccdc1c79e782908a0d1164ee526 (patch)
tree3177af16cacd4ea4d265f7c0c48c181c85512608 /cpp/src
parentcc0cb6411e14e23d45304c3ae84069d59b29465a (diff)
downloadqpid-python-6f734e4aa938dccdc1c79e782908a0d1164ee526.tar.gz
Fix members joining cluster while cluster is handling client errors.
Previously cluster members could abort if a new member joins while existing members are handling a client error. Now if an update offer arrives while an error is in progress, the offering broker retracts the offer and the newcomer must try again. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@790163 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/cluster.mk2
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp109
-rw-r--r--cpp/src/qpid/cluster/Cluster.h6
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp14
-rw-r--r--cpp/src/qpid/cluster/Connection.h2
-rw-r--r--cpp/src/qpid/cluster/ErrorCheck.cpp15
-rw-r--r--cpp/src/qpid/cluster/ErrorCheck.h3
-rw-r--r--cpp/src/qpid/cluster/RetractClient.cpp61
-rw-r--r--cpp/src/qpid/cluster/RetractClient.h49
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp2
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.h1
-rw-r--r--cpp/src/tests/qpid_ping.cpp5
12 files changed, 234 insertions, 35 deletions
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk
index 808c96c7a7..e41eb0b04b 100644
--- a/cpp/src/cluster.mk
+++ b/cpp/src/cluster.mk
@@ -56,6 +56,8 @@ cluster_la_SOURCES = \
qpid/cluster/Dispatchable.h \
qpid/cluster/UpdateClient.cpp \
qpid/cluster/UpdateClient.h \
+ qpid/cluster/RetractClient.cpp \
+ qpid/cluster/RetractClient.h \
qpid/cluster/ErrorCheck.cpp \
qpid/cluster/ErrorCheck.h \
qpid/cluster/Event.cpp \
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 093ca13c7a..78f7bf13fc 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -88,6 +88,7 @@
#include "ClusterSettings.h"
#include "Connection.h"
#include "UpdateClient.h"
+#include "RetractClient.h"
#include "FailoverExchange.h"
#include "UpdateExchange.h"
@@ -104,6 +105,7 @@
#include "qpid/framing/ClusterConfigChangeBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionAbortBody.h"
+#include "qpid/framing/ClusterRetractOfferBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
#include "qpid/framing/ClusterReadyBody.h"
#include "qpid/framing/ClusterShutdownBody.h"
@@ -152,6 +154,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void ready(const std::string& url) { cluster.ready(member, url, l); }
void configChange(const std::string& current) { cluster.configChange(member, current, l); }
void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); }
+ void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); }
void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
void errorCheck(uint8_t type, uint64_t frameSeq) { cluster.errorCheck(member, type, frameSeq, l); }
@@ -186,6 +189,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
state(INIT),
lastSize(0),
lastBroker(false),
+ updateRetracted(false),
error(*this)
{
mAgent = broker.getManagementAgent();
@@ -325,6 +329,12 @@ void Cluster::deliverFrame(const EventFrame& e) {
deliverFrameQueue.push(e);
}
+const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) {
+ return (body && body->getMethod() &&
+ body->getMethod()->isA<ClusterUpdateOfferBody>()) ?
+ static_cast<const ClusterUpdateOfferBody*>(body) : 0;
+}
+
// Handler for deliverEventQueue.
// This thread decodes frames from events.
void Cluster::deliveredEvent(const Event& e) {
@@ -334,8 +344,7 @@ void Cluster::deliveredEvent(const Event& e) {
EventFrame ef(e, e.getFrame());
// Stop the deliverEventQueue on update offers.
// This preserves the connection decoder fragments for an update.
- ClusterUpdateOfferBody* offer = dynamic_cast<ClusterUpdateOfferBody*>(ef.frame.getBody());
- if (offer)
+ if (castUpdateOffer(ef.frame.getBody()))
deliverEventQueue.stop();
deliverFrame(ef);
}
@@ -357,20 +366,37 @@ void Cluster::deliveredEvent(const Event& e) {
QPID_LOG(trace, *this << " DROP: " << e);
}
-void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) {
+void Cluster::flagError(
+ Connection& connection, ErrorCheck::ErrorType type, const std::string& msg)
+{
Mutex::ScopedLock l(lock);
- if (settings.checkErrors)
- error.error(connection, type, map.getFrameSeq(), map.getMembers());
+ if (connection.isCatchUp()) {
+ QPID_LOG(critical, *this << " error on update connection " << connection
+ << ": " << msg);
+ leave(l);
+ }
+ else if (settings.checkErrors)
+ error.error(connection, type, map.getFrameSeq(), map.getMembers(), msg);
}
LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> doOutputTracker("DoOutput");)
// Handler for deliverFrameQueue.
// This thread executes the main logic.
-void Cluster::deliveredFrame(const EventFrame& e) {
+ void Cluster::deliveredFrame(const EventFrame& efConst) {
LATENCY_TRACK(frameQueueLatencyTracker.finish(e.frame.getBody()));
LATENCY_TRACK(if (e.frame.getBody()->type() == CONTENT_BODY) doOutputTracker.start(e.frame.getBody()));
Mutex::ScopedLock l(lock);
+ EventFrame e(efConst);
+ const ClusterUpdateOfferBody* offer = castUpdateOffer(e.frame.getBody());
+ if (offer && error.isUnresolved()) {
+ // We can't honour an update offer that is delivered while an
+ // error is in progress so replace it with a retractOffer and re-start
+ // the event queue.
+ e.frame = AMQFrame(
+ ClusterRetractOfferBody(ProtocolVersion(), offer->getUpdatee()));
+ deliverEventQueue.start();
+ }
// Process each frame through the error checker.
if (settings.checkErrors) {
error.delivered(e);
@@ -382,7 +408,6 @@ void Cluster::deliveredFrame(const EventFrame& e) {
}
}
-LATENCY_TRACK(sys::LatencyStatistic processLatency("Process");)
void Cluster::processFrame(const EventFrame& e, Lock& l) {
if (e.isCluster()) {
@@ -562,6 +587,14 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
}
}
+// Go back to normal processing after an offer that did not result in an update.
+void Cluster::cancelOffer(const MemberId& updatee, Lock& l) {
+ QPID_LOG(info, *this << " cancelled offer to " << updatee);
+ deliverEventQueue.start(); // Go back to normal processing
+ setReady(l);
+ makeOffer(map.firstJoiner(), l); // Maybe make another offer.
+}
+
void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid, Lock& l) {
// NOTE: deliverEventQueue has been stopped at the update offer by
// deliveredEvent in case an update is required.
@@ -572,12 +605,8 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu
assert(state == OFFER);
if (url) // My offer was first.
updateStart(updatee, *url, l);
- else { // Another offer was first.
- deliverEventQueue.start(); // Don't need to update
- setReady(l);
- QPID_LOG(info, *this << " cancelled update offer to " << updatee);
- makeOffer(map.firstJoiner(), l); // Maybe make another offer.
- }
+ else // Another offer was first.
+ cancelOffer(updatee, l);
}
else if (updatee == self && url) {
assert(state == JOINER);
@@ -587,7 +616,34 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu
checkUpdateIn(l);
}
else
- deliverEventQueue.start(); // Don't need to update
+ deliverEventQueue.start(); // Not involved in update.
+}
+
+static client::ConnectionSettings connectionSettings(const ClusterSettings& settings) {
+ client::ConnectionSettings cs;
+ cs.username = settings.username;
+ cs.password = settings.password;
+ cs.mechanism = settings.mechanism;
+ return cs;
+}
+
+void Cluster::retractOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) {
+ // An offer was received while handling an error, and converted to a retract.
+ if (state == LEFT) return;
+ MemberId updatee(updateeInt);
+ boost::optional<Url> url = map.updateOffer(updater, updatee);
+ if (updater == self) {
+ assert(state == OFFER);
+ if (url) { // My offer was first.
+ QPID_LOG(info, *this << " retracted offer to " << updatee);
+ if (updateThread.id())
+ updateThread.join(); // Join the previous updateThread to avoid leaks.
+ updateThread = Thread(new RetractClient(*url, connectionSettings(settings)));
+ }
+ cancelOffer(updatee, l);
+ }
+ else
+ deliverEventQueue.start(); // Not involved in update.
}
void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) {
@@ -598,15 +654,12 @@ void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) {
QPID_LOG(info, *this << " sending update to " << updatee << " at " << url);
if (updateThread.id())
updateThread.join(); // Join the previous updateThread to avoid leaks.
- client::ConnectionSettings cs;
- cs.username = settings.username;
- cs.password = settings.password;
- cs.mechanism = settings.mechanism;
updateThread = Thread(
- new UpdateClient(self, updatee, url, broker, map, *expiryPolicy, getConnections(l), decoder,
+ new UpdateClient(self, updatee, url, broker, map, *expiryPolicy,
+ getConnections(l), decoder,
boost::bind(&Cluster::updateOutDone, this),
boost::bind(&Cluster::updateOutError, this, _1),
- cs));
+ connectionSettings(settings)));
}
// Called in update thread.
@@ -616,8 +669,15 @@ void Cluster::updateInDone(const ClusterMap& m) {
checkUpdateIn(l);
}
+void Cluster::updateInRetracted() {
+ Lock l(lock);
+ updateRetracted = true;
+ checkUpdateIn(l);
+}
+
void Cluster::checkUpdateIn(Lock&) {
- if (state == UPDATEE && updatedMap) {
+ if (state != UPDATEE) return; // Wait till we reach the stall point.
+ if (updatedMap) { // We're up to date
map = *updatedMap;
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
state = CATCHUP;
@@ -625,6 +685,13 @@ void Cluster::checkUpdateIn(Lock&) {
QPID_LOG(info, *this << " received update, starting catch-up");
deliverEventQueue.start();
}
+ else if (updateRetracted) { // Update was retracted, request another update
+ updateRetracted = false;
+ state = JOINER;
+ QPID_LOG(info, *this << " re-try joining after retracted update");
+ mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
+ deliverEventQueue.start();
+ }
}
void Cluster::updateOutDone() {
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 027d45aba2..a89bd83ac0 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -94,6 +94,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
// Update completed - called in update thread
void updateInDone(const ClusterMap&);
+ void updateInRetracted();
MemberId getId() const;
broker::Broker& getBroker() const;
@@ -106,7 +107,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void deliverFrame(const EventFrame&);
// Called in deliverFrame thread to indicate an error from the broker.
- void flagError(Connection&, ErrorCheck::ErrorType);
+ void flagError(Connection&, ErrorCheck::ErrorType, const std::string& msg);
void connectionError();
// Called only during update by Connection::shadowReady
@@ -141,6 +142,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
// Cluster controls implement XML methods from cluster.xml.
void updateRequest(const MemberId&, const std::string&, Lock&);
void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&);
+ void retractOffer(const MemberId& updater, uint64_t updatee, Lock&);
void ready(const MemberId&, const std::string&, Lock&);
void configChange(const MemberId&, const std::string& current, Lock& l);
void messageExpired(const MemberId&, uint64_t, Lock& l);
@@ -157,6 +159,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void memberUpdate(Lock&);
void setClusterId(const framing::Uuid&, Lock&);
void erase(const ConnectionId&, Lock&);
+ void cancelOffer(const MemberId&, Lock&);
// == Called in CPG dispatch thread
void deliver( // CPG deliver callback.
@@ -251,6 +254,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
bool lastBroker;
sys::Thread updateThread;
boost::optional<ClusterMap> updatedMap;
+ bool updateRetracted;
ErrorCheck error;
friend std::ostream& operator<<(std::ostream&, const Cluster&);
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 2db8879eb5..c129ecbd65 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -309,6 +309,12 @@ void Connection::membership(const FieldTable& joiners, const FieldTable& members
self.second = 0; // Mark this as completed update connection.
}
+void Connection::retractOffer() {
+ QPID_LOG(debug, cluster << " incoming update retracted on connection " << *this);
+ cluster.updateInRetracted();
+ self.second = 0; // Mark this as completed update connection.
+}
+
bool Connection::isLocal() const {
return self.first == cluster.getId() && self.second;
}
@@ -435,13 +441,13 @@ void Connection::queue(const std::string& encoded) {
QPID_LOG(debug, cluster << " decoded queue " << q->getName());
}
-void Connection::sessionError(uint16_t , const std::string& ) {
- cluster.flagError(*this, ERROR_TYPE_SESSION);
+void Connection::sessionError(uint16_t , const std::string& msg) {
+ cluster.flagError(*this, ERROR_TYPE_SESSION, msg);
}
-void Connection::connectionError(const std::string& ) {
- cluster.flagError(*this, ERROR_TYPE_CONNECTION);
+void Connection::connectionError(const std::string& msg) {
+ cluster.flagError(*this, ERROR_TYPE_CONNECTION, msg);
}
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 73856a3687..687b0585d3 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -121,6 +121,8 @@ class Connection :
void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameSeq);
+ void retractOffer();
+
void deliveryRecord(const std::string& queue,
const framing::SequenceNumber& position,
const std::string& tag,
diff --git a/cpp/src/qpid/cluster/ErrorCheck.cpp b/cpp/src/qpid/cluster/ErrorCheck.cpp
index abb361bbb5..33e7f34766 100644
--- a/cpp/src/qpid/cluster/ErrorCheck.cpp
+++ b/cpp/src/qpid/cluster/ErrorCheck.cpp
@@ -44,7 +44,8 @@ ostream& operator<<(ostream& o, ErrorCheck::MemberSet ms) {
return o;
}
-void ErrorCheck::error(Connection& c, ErrorType t, uint64_t seq, const MemberSet& ms)
+void ErrorCheck::error(
+ Connection& c, ErrorType t, uint64_t seq, const MemberSet& ms, const std::string& msg)
{
// Detected a local error, inform cluster and set error state.
assert(t != ERROR_TYPE_NONE); // Must be an error.
@@ -53,8 +54,10 @@ void ErrorCheck::error(Connection& c, ErrorType t, uint64_t seq, const MemberSet
unresolved = ms;
frameSeq = seq;
connection = &c;
- QPID_LOG(debug, cluster << (type == ERROR_TYPE_SESSION ? " Session" : " Connection")
- << " error " << frameSeq << " unresolved: " << unresolved);
+ QPID_LOG(error, cluster
+ << (type == ERROR_TYPE_SESSION ? " channel" : " connection")
+ << " error " << frameSeq << " on " << c << ": " << msg
+ << " (unresolved: " << unresolved << ")");
mcast.mcastControl(
ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), c.getId().getMember());
}
@@ -67,11 +70,13 @@ void ErrorCheck::delivered(const EventFrame& e) {
e.frame.getMethod());
if (errorCheck && errorCheck->getFrameSeq() == frameSeq) { // Same error
if (errorCheck->getType() < type) { // my error is worse than his
- QPID_LOG(critical, cluster << " Error " << frameSeq << " did not occur on " << e.getMemberId());
+ QPID_LOG(critical, cluster << " error " << frameSeq
+ << " did not occur on " << e.getMemberId());
throw Exception("Aborted by local failure that did not occur on all replicas");
}
else { // his error is worse/same as mine.
- QPID_LOG(debug, cluster << " Error " << frameSeq << " outcome agrees with " << e.getMemberId());
+ QPID_LOG(debug, cluster << " error " << frameSeq
+ << " outcome agrees with " << e.getMemberId());
unresolved.erase(e.getMemberId());
checkResolved();
}
diff --git a/cpp/src/qpid/cluster/ErrorCheck.h b/cpp/src/qpid/cluster/ErrorCheck.h
index 97b5f2bffd..d303ecea65 100644
--- a/cpp/src/qpid/cluster/ErrorCheck.h
+++ b/cpp/src/qpid/cluster/ErrorCheck.h
@@ -53,7 +53,8 @@ class ErrorCheck
ErrorCheck(Cluster&);
/** A local error has occured */
- void error(Connection&, ErrorType, uint64_t frameSeq, const MemberSet&);
+ void error(Connection&, ErrorType, uint64_t frameSeq, const MemberSet&,
+ const std::string& msg);
/** Called when a frame is delivered */
void delivered(const EventFrame&);
diff --git a/cpp/src/qpid/cluster/RetractClient.cpp b/cpp/src/qpid/cluster/RetractClient.cpp
new file mode 100644
index 0000000000..dfca7fdef4
--- /dev/null
+++ b/cpp/src/qpid/cluster/RetractClient.cpp
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "RetractClient.h"
+#include "UpdateClient.h"
+#include "qpid/framing/ClusterConnectionRetractOfferBody.h"
+#include "qpid/client/ConnectionAccess.h"
+#include "qpid/client/ConnectionImpl.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace cluster {
+
+using namespace framing;
+
+namespace {
+
+struct AutoClose {
+ client::Connection& connection;
+ AutoClose(client::Connection& c) : connection(c) {}
+ ~AutoClose() { connection.close(); }
+};
+}
+
+RetractClient::RetractClient(const Url& u, const client::ConnectionSettings& cs)
+ : url(u), connectionSettings(cs)
+{}
+
+RetractClient::~RetractClient() { delete this; }
+
+
+void RetractClient::run() {
+ try {
+ client::Connection c = UpdateClient::catchUpConnection();
+ c.open(url, connectionSettings);
+ AutoClose ac(c);
+ AMQFrame retract((ClusterConnectionRetractOfferBody()));
+ client::ConnectionAccess::getImpl(c)->handle(retract);
+ } catch (const std::exception& e) {
+ QPID_LOG(error, " while retracting retract to " << url << ": " << e.what());
+ }
+}
+
+}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/RetractClient.h b/cpp/src/qpid/cluster/RetractClient.h
new file mode 100644
index 0000000000..fb896197cc
--- /dev/null
+++ b/cpp/src/qpid/cluster/RetractClient.h
@@ -0,0 +1,49 @@
+#ifndef QPID_CLUSTER_RETRACTCLIENT_H
+#define QPID_CLUSTER_RETRACTCLIENT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/ConnectionSettings.h"
+#include "qpid/sys/Runnable.h"
+
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * A client that retracts an offer to a remote broker using AMQP. @see UpdateClient
+ */
+class RetractClient : public sys::Runnable {
+ public:
+
+ RetractClient(const Url&, const client::ConnectionSettings&);
+ ~RetractClient();
+ void run(); // Will delete this when finished.
+
+ private:
+ Url url;
+ client::ConnectionSettings connectionSettings;
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_RETRACTCLIENT_H*/
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index 7c305a2e92..bad56de826 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -79,7 +79,7 @@ struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection {
};
// Create a connection with special version that marks it as a catch-up connection.
-client::Connection catchUpConnection() {
+client::Connection UpdateClient::catchUpConnection() {
client::Connection c;
client::ConnectionAccess::setVersion(c, ProtocolVersion(0x80 , 0x80 + 10));
return c;
diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h
index ba5bdd1d75..fd3d37ae5f 100644
--- a/cpp/src/qpid/cluster/UpdateClient.h
+++ b/cpp/src/qpid/cluster/UpdateClient.h
@@ -65,6 +65,7 @@ class ExpiryPolicy;
class UpdateClient : public sys::Runnable {
public:
static const std::string UPDATE; // Name for special update queue and exchange.
+ static client::Connection catchUpConnection();
UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&,
broker::Broker& donor, const ClusterMap& map, ExpiryPolicy& expiry,
diff --git a/cpp/src/tests/qpid_ping.cpp b/cpp/src/tests/qpid_ping.cpp
index ddd70515be..cc07ade7bb 100644
--- a/cpp/src/tests/qpid_ping.cpp
+++ b/cpp/src/tests/qpid_ping.cpp
@@ -23,7 +23,7 @@
#include "TestOptions.h"
#include "qpid/client/SubscriptionManager.h"
#include "qpid/client/Connection.h"
-#include "qpid/client/Session.h"
+#include "qpid/client/AsyncSession.h"
#include "qpid/sys/Time.h"
#include "qpid/sys/Thread.h"
#include "qpid/sys/Runnable.h"
@@ -63,7 +63,7 @@ class Ping : public Runnable {
try {
opts.open(connection);
if (!opts.quiet) cout << "Opened connection." << endl;
- Session s = connection.newSession();
+ AsyncSession s = connection.newSession();
string qname(Uuid(true).str());
s.queueDeclare(arg::queue=qname,arg::autoDelete=true,arg::exclusive=true);
s.messageTransfer(arg::content=Message("hello", qname));
@@ -71,6 +71,7 @@ class Ping : public Runnable {
SubscriptionManager subs(s);
subs.get(qname);
if (!opts.quiet) cout << "Received message." << endl;
+ s.sync();
s.close();
connection.close();
Mutex::ScopedLock l(lock);