diff options
author | Alan Conway <aconway@apache.org> | 2009-07-01 13:47:04 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-07-01 13:47:04 +0000 |
commit | 6f734e4aa938dccdc1c79e782908a0d1164ee526 (patch) | |
tree | 3177af16cacd4ea4d265f7c0c48c181c85512608 /cpp/src | |
parent | cc0cb6411e14e23d45304c3ae84069d59b29465a (diff) | |
download | qpid-python-6f734e4aa938dccdc1c79e782908a0d1164ee526.tar.gz |
Fix members joining cluster while cluster is handling client errors.
Previously cluster members could abort if a new member joins while
existing members are handling a client error.
Now if an update offer arrives while an error is in progress, the
offering broker retracts the offer and the newcomer must try again.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@790163 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/cluster.mk | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 109 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 14 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ErrorCheck.cpp | 15 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ErrorCheck.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/RetractClient.cpp | 61 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/RetractClient.h | 49 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.h | 1 | ||||
-rw-r--r-- | cpp/src/tests/qpid_ping.cpp | 5 |
12 files changed, 234 insertions, 35 deletions
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk index 808c96c7a7..e41eb0b04b 100644 --- a/cpp/src/cluster.mk +++ b/cpp/src/cluster.mk @@ -56,6 +56,8 @@ cluster_la_SOURCES = \ qpid/cluster/Dispatchable.h \ qpid/cluster/UpdateClient.cpp \ qpid/cluster/UpdateClient.h \ + qpid/cluster/RetractClient.cpp \ + qpid/cluster/RetractClient.h \ qpid/cluster/ErrorCheck.cpp \ qpid/cluster/ErrorCheck.h \ qpid/cluster/Event.cpp \ diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 093ca13c7a..78f7bf13fc 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -88,6 +88,7 @@ #include "ClusterSettings.h" #include "Connection.h" #include "UpdateClient.h" +#include "RetractClient.h" #include "FailoverExchange.h" #include "UpdateExchange.h" @@ -104,6 +105,7 @@ #include "qpid/framing/ClusterConfigChangeBody.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionAbortBody.h" +#include "qpid/framing/ClusterRetractOfferBody.h" #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" #include "qpid/framing/ClusterReadyBody.h" #include "qpid/framing/ClusterShutdownBody.h" @@ -152,6 +154,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void ready(const std::string& url) { cluster.ready(member, url, l); } void configChange(const std::string& current) { cluster.configChange(member, current, l); } void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); } + void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); } void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); } void errorCheck(uint8_t type, uint64_t frameSeq) { cluster.errorCheck(member, type, frameSeq, l); } @@ -186,6 +189,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : state(INIT), lastSize(0), lastBroker(false), + updateRetracted(false), error(*this) { mAgent = broker.getManagementAgent(); @@ -325,6 +329,12 @@ void Cluster::deliverFrame(const EventFrame& e) { deliverFrameQueue.push(e); } +const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) { + return (body && body->getMethod() && + body->getMethod()->isA<ClusterUpdateOfferBody>()) ? + static_cast<const ClusterUpdateOfferBody*>(body) : 0; +} + // Handler for deliverEventQueue. // This thread decodes frames from events. void Cluster::deliveredEvent(const Event& e) { @@ -334,8 +344,7 @@ void Cluster::deliveredEvent(const Event& e) { EventFrame ef(e, e.getFrame()); // Stop the deliverEventQueue on update offers. // This preserves the connection decoder fragments for an update. - ClusterUpdateOfferBody* offer = dynamic_cast<ClusterUpdateOfferBody*>(ef.frame.getBody()); - if (offer) + if (castUpdateOffer(ef.frame.getBody())) deliverEventQueue.stop(); deliverFrame(ef); } @@ -357,20 +366,37 @@ void Cluster::deliveredEvent(const Event& e) { QPID_LOG(trace, *this << " DROP: " << e); } -void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) { +void Cluster::flagError( + Connection& connection, ErrorCheck::ErrorType type, const std::string& msg) +{ Mutex::ScopedLock l(lock); - if (settings.checkErrors) - error.error(connection, type, map.getFrameSeq(), map.getMembers()); + if (connection.isCatchUp()) { + QPID_LOG(critical, *this << " error on update connection " << connection + << ": " << msg); + leave(l); + } + else if (settings.checkErrors) + error.error(connection, type, map.getFrameSeq(), map.getMembers(), msg); } LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> doOutputTracker("DoOutput");) // Handler for deliverFrameQueue. // This thread executes the main logic. -void Cluster::deliveredFrame(const EventFrame& e) { + void Cluster::deliveredFrame(const EventFrame& efConst) { LATENCY_TRACK(frameQueueLatencyTracker.finish(e.frame.getBody())); LATENCY_TRACK(if (e.frame.getBody()->type() == CONTENT_BODY) doOutputTracker.start(e.frame.getBody())); Mutex::ScopedLock l(lock); + EventFrame e(efConst); + const ClusterUpdateOfferBody* offer = castUpdateOffer(e.frame.getBody()); + if (offer && error.isUnresolved()) { + // We can't honour an update offer that is delivered while an + // error is in progress so replace it with a retractOffer and re-start + // the event queue. + e.frame = AMQFrame( + ClusterRetractOfferBody(ProtocolVersion(), offer->getUpdatee())); + deliverEventQueue.start(); + } // Process each frame through the error checker. if (settings.checkErrors) { error.delivered(e); @@ -382,7 +408,6 @@ void Cluster::deliveredFrame(const EventFrame& e) { } } -LATENCY_TRACK(sys::LatencyStatistic processLatency("Process");) void Cluster::processFrame(const EventFrame& e, Lock& l) { if (e.isCluster()) { @@ -562,6 +587,14 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { } } +// Go back to normal processing after an offer that did not result in an update. +void Cluster::cancelOffer(const MemberId& updatee, Lock& l) { + QPID_LOG(info, *this << " cancelled offer to " << updatee); + deliverEventQueue.start(); // Go back to normal processing + setReady(l); + makeOffer(map.firstJoiner(), l); // Maybe make another offer. +} + void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid, Lock& l) { // NOTE: deliverEventQueue has been stopped at the update offer by // deliveredEvent in case an update is required. @@ -572,12 +605,8 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu assert(state == OFFER); if (url) // My offer was first. updateStart(updatee, *url, l); - else { // Another offer was first. - deliverEventQueue.start(); // Don't need to update - setReady(l); - QPID_LOG(info, *this << " cancelled update offer to " << updatee); - makeOffer(map.firstJoiner(), l); // Maybe make another offer. - } + else // Another offer was first. + cancelOffer(updatee, l); } else if (updatee == self && url) { assert(state == JOINER); @@ -587,7 +616,34 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu checkUpdateIn(l); } else - deliverEventQueue.start(); // Don't need to update + deliverEventQueue.start(); // Not involved in update. +} + +static client::ConnectionSettings connectionSettings(const ClusterSettings& settings) { + client::ConnectionSettings cs; + cs.username = settings.username; + cs.password = settings.password; + cs.mechanism = settings.mechanism; + return cs; +} + +void Cluster::retractOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) { + // An offer was received while handling an error, and converted to a retract. + if (state == LEFT) return; + MemberId updatee(updateeInt); + boost::optional<Url> url = map.updateOffer(updater, updatee); + if (updater == self) { + assert(state == OFFER); + if (url) { // My offer was first. + QPID_LOG(info, *this << " retracted offer to " << updatee); + if (updateThread.id()) + updateThread.join(); // Join the previous updateThread to avoid leaks. + updateThread = Thread(new RetractClient(*url, connectionSettings(settings))); + } + cancelOffer(updatee, l); + } + else + deliverEventQueue.start(); // Not involved in update. } void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) { @@ -598,15 +654,12 @@ void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) { QPID_LOG(info, *this << " sending update to " << updatee << " at " << url); if (updateThread.id()) updateThread.join(); // Join the previous updateThread to avoid leaks. - client::ConnectionSettings cs; - cs.username = settings.username; - cs.password = settings.password; - cs.mechanism = settings.mechanism; updateThread = Thread( - new UpdateClient(self, updatee, url, broker, map, *expiryPolicy, getConnections(l), decoder, + new UpdateClient(self, updatee, url, broker, map, *expiryPolicy, + getConnections(l), decoder, boost::bind(&Cluster::updateOutDone, this), boost::bind(&Cluster::updateOutError, this, _1), - cs)); + connectionSettings(settings))); } // Called in update thread. @@ -616,8 +669,15 @@ void Cluster::updateInDone(const ClusterMap& m) { checkUpdateIn(l); } +void Cluster::updateInRetracted() { + Lock l(lock); + updateRetracted = true; + checkUpdateIn(l); +} + void Cluster::checkUpdateIn(Lock&) { - if (state == UPDATEE && updatedMap) { + if (state != UPDATEE) return; // Wait till we reach the stall point. + if (updatedMap) { // We're up to date map = *updatedMap; mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); state = CATCHUP; @@ -625,6 +685,13 @@ void Cluster::checkUpdateIn(Lock&) { QPID_LOG(info, *this << " received update, starting catch-up"); deliverEventQueue.start(); } + else if (updateRetracted) { // Update was retracted, request another update + updateRetracted = false; + state = JOINER; + QPID_LOG(info, *this << " re-try joining after retracted update"); + mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); + deliverEventQueue.start(); + } } void Cluster::updateOutDone() { diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 027d45aba2..a89bd83ac0 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -94,6 +94,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { // Update completed - called in update thread void updateInDone(const ClusterMap&); + void updateInRetracted(); MemberId getId() const; broker::Broker& getBroker() const; @@ -106,7 +107,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void deliverFrame(const EventFrame&); // Called in deliverFrame thread to indicate an error from the broker. - void flagError(Connection&, ErrorCheck::ErrorType); + void flagError(Connection&, ErrorCheck::ErrorType, const std::string& msg); void connectionError(); // Called only during update by Connection::shadowReady @@ -141,6 +142,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { // Cluster controls implement XML methods from cluster.xml. void updateRequest(const MemberId&, const std::string&, Lock&); void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&); + void retractOffer(const MemberId& updater, uint64_t updatee, Lock&); void ready(const MemberId&, const std::string&, Lock&); void configChange(const MemberId&, const std::string& current, Lock& l); void messageExpired(const MemberId&, uint64_t, Lock& l); @@ -157,6 +159,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void memberUpdate(Lock&); void setClusterId(const framing::Uuid&, Lock&); void erase(const ConnectionId&, Lock&); + void cancelOffer(const MemberId&, Lock&); // == Called in CPG dispatch thread void deliver( // CPG deliver callback. @@ -251,6 +254,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { bool lastBroker; sys::Thread updateThread; boost::optional<ClusterMap> updatedMap; + bool updateRetracted; ErrorCheck error; friend std::ostream& operator<<(std::ostream&, const Cluster&); diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 2db8879eb5..c129ecbd65 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -309,6 +309,12 @@ void Connection::membership(const FieldTable& joiners, const FieldTable& members self.second = 0; // Mark this as completed update connection. } +void Connection::retractOffer() { + QPID_LOG(debug, cluster << " incoming update retracted on connection " << *this); + cluster.updateInRetracted(); + self.second = 0; // Mark this as completed update connection. +} + bool Connection::isLocal() const { return self.first == cluster.getId() && self.second; } @@ -435,13 +441,13 @@ void Connection::queue(const std::string& encoded) { QPID_LOG(debug, cluster << " decoded queue " << q->getName()); } -void Connection::sessionError(uint16_t , const std::string& ) { - cluster.flagError(*this, ERROR_TYPE_SESSION); +void Connection::sessionError(uint16_t , const std::string& msg) { + cluster.flagError(*this, ERROR_TYPE_SESSION, msg); } -void Connection::connectionError(const std::string& ) { - cluster.flagError(*this, ERROR_TYPE_CONNECTION); +void Connection::connectionError(const std::string& msg) { + cluster.flagError(*this, ERROR_TYPE_CONNECTION, msg); } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 73856a3687..687b0585d3 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -121,6 +121,8 @@ class Connection : void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameSeq); + void retractOffer(); + void deliveryRecord(const std::string& queue, const framing::SequenceNumber& position, const std::string& tag, diff --git a/cpp/src/qpid/cluster/ErrorCheck.cpp b/cpp/src/qpid/cluster/ErrorCheck.cpp index abb361bbb5..33e7f34766 100644 --- a/cpp/src/qpid/cluster/ErrorCheck.cpp +++ b/cpp/src/qpid/cluster/ErrorCheck.cpp @@ -44,7 +44,8 @@ ostream& operator<<(ostream& o, ErrorCheck::MemberSet ms) { return o; } -void ErrorCheck::error(Connection& c, ErrorType t, uint64_t seq, const MemberSet& ms) +void ErrorCheck::error( + Connection& c, ErrorType t, uint64_t seq, const MemberSet& ms, const std::string& msg) { // Detected a local error, inform cluster and set error state. assert(t != ERROR_TYPE_NONE); // Must be an error. @@ -53,8 +54,10 @@ void ErrorCheck::error(Connection& c, ErrorType t, uint64_t seq, const MemberSet unresolved = ms; frameSeq = seq; connection = &c; - QPID_LOG(debug, cluster << (type == ERROR_TYPE_SESSION ? " Session" : " Connection") - << " error " << frameSeq << " unresolved: " << unresolved); + QPID_LOG(error, cluster + << (type == ERROR_TYPE_SESSION ? " channel" : " connection") + << " error " << frameSeq << " on " << c << ": " << msg + << " (unresolved: " << unresolved << ")"); mcast.mcastControl( ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), c.getId().getMember()); } @@ -67,11 +70,13 @@ void ErrorCheck::delivered(const EventFrame& e) { e.frame.getMethod()); if (errorCheck && errorCheck->getFrameSeq() == frameSeq) { // Same error if (errorCheck->getType() < type) { // my error is worse than his - QPID_LOG(critical, cluster << " Error " << frameSeq << " did not occur on " << e.getMemberId()); + QPID_LOG(critical, cluster << " error " << frameSeq + << " did not occur on " << e.getMemberId()); throw Exception("Aborted by local failure that did not occur on all replicas"); } else { // his error is worse/same as mine. - QPID_LOG(debug, cluster << " Error " << frameSeq << " outcome agrees with " << e.getMemberId()); + QPID_LOG(debug, cluster << " error " << frameSeq + << " outcome agrees with " << e.getMemberId()); unresolved.erase(e.getMemberId()); checkResolved(); } diff --git a/cpp/src/qpid/cluster/ErrorCheck.h b/cpp/src/qpid/cluster/ErrorCheck.h index 97b5f2bffd..d303ecea65 100644 --- a/cpp/src/qpid/cluster/ErrorCheck.h +++ b/cpp/src/qpid/cluster/ErrorCheck.h @@ -53,7 +53,8 @@ class ErrorCheck ErrorCheck(Cluster&); /** A local error has occured */ - void error(Connection&, ErrorType, uint64_t frameSeq, const MemberSet&); + void error(Connection&, ErrorType, uint64_t frameSeq, const MemberSet&, + const std::string& msg); /** Called when a frame is delivered */ void delivered(const EventFrame&); diff --git a/cpp/src/qpid/cluster/RetractClient.cpp b/cpp/src/qpid/cluster/RetractClient.cpp new file mode 100644 index 0000000000..dfca7fdef4 --- /dev/null +++ b/cpp/src/qpid/cluster/RetractClient.cpp @@ -0,0 +1,61 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "RetractClient.h" +#include "UpdateClient.h" +#include "qpid/framing/ClusterConnectionRetractOfferBody.h" +#include "qpid/client/ConnectionAccess.h" +#include "qpid/client/ConnectionImpl.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace cluster { + +using namespace framing; + +namespace { + +struct AutoClose { + client::Connection& connection; + AutoClose(client::Connection& c) : connection(c) {} + ~AutoClose() { connection.close(); } +}; +} + +RetractClient::RetractClient(const Url& u, const client::ConnectionSettings& cs) + : url(u), connectionSettings(cs) +{} + +RetractClient::~RetractClient() { delete this; } + + +void RetractClient::run() { + try { + client::Connection c = UpdateClient::catchUpConnection(); + c.open(url, connectionSettings); + AutoClose ac(c); + AMQFrame retract((ClusterConnectionRetractOfferBody())); + client::ConnectionAccess::getImpl(c)->handle(retract); + } catch (const std::exception& e) { + QPID_LOG(error, " while retracting retract to " << url << ": " << e.what()); + } +} + +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/RetractClient.h b/cpp/src/qpid/cluster/RetractClient.h new file mode 100644 index 0000000000..fb896197cc --- /dev/null +++ b/cpp/src/qpid/cluster/RetractClient.h @@ -0,0 +1,49 @@ +#ifndef QPID_CLUSTER_RETRACTCLIENT_H +#define QPID_CLUSTER_RETRACTCLIENT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/client/ConnectionSettings.h" +#include "qpid/sys/Runnable.h" + + +namespace qpid { +namespace cluster { + +/** + * A client that retracts an offer to a remote broker using AMQP. @see UpdateClient + */ +class RetractClient : public sys::Runnable { + public: + + RetractClient(const Url&, const client::ConnectionSettings&); + ~RetractClient(); + void run(); // Will delete this when finished. + + private: + Url url; + client::ConnectionSettings connectionSettings; +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_RETRACTCLIENT_H*/ diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index 7c305a2e92..bad56de826 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -79,7 +79,7 @@ struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection { }; // Create a connection with special version that marks it as a catch-up connection. -client::Connection catchUpConnection() { +client::Connection UpdateClient::catchUpConnection() { client::Connection c; client::ConnectionAccess::setVersion(c, ProtocolVersion(0x80 , 0x80 + 10)); return c; diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h index ba5bdd1d75..fd3d37ae5f 100644 --- a/cpp/src/qpid/cluster/UpdateClient.h +++ b/cpp/src/qpid/cluster/UpdateClient.h @@ -65,6 +65,7 @@ class ExpiryPolicy; class UpdateClient : public sys::Runnable { public: static const std::string UPDATE; // Name for special update queue and exchange. + static client::Connection catchUpConnection(); UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&, broker::Broker& donor, const ClusterMap& map, ExpiryPolicy& expiry, diff --git a/cpp/src/tests/qpid_ping.cpp b/cpp/src/tests/qpid_ping.cpp index ddd70515be..cc07ade7bb 100644 --- a/cpp/src/tests/qpid_ping.cpp +++ b/cpp/src/tests/qpid_ping.cpp @@ -23,7 +23,7 @@ #include "TestOptions.h" #include "qpid/client/SubscriptionManager.h" #include "qpid/client/Connection.h" -#include "qpid/client/Session.h" +#include "qpid/client/AsyncSession.h" #include "qpid/sys/Time.h" #include "qpid/sys/Thread.h" #include "qpid/sys/Runnable.h" @@ -63,7 +63,7 @@ class Ping : public Runnable { try { opts.open(connection); if (!opts.quiet) cout << "Opened connection." << endl; - Session s = connection.newSession(); + AsyncSession s = connection.newSession(); string qname(Uuid(true).str()); s.queueDeclare(arg::queue=qname,arg::autoDelete=true,arg::exclusive=true); s.messageTransfer(arg::content=Message("hello", qname)); @@ -71,6 +71,7 @@ class Ping : public Runnable { SubscriptionManager subs(s); subs.get(qname); if (!opts.quiet) cout << "Received message." << endl; + s.sync(); s.close(); connection.close(); Mutex::ScopedLock l(lock); |