diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/client/Execution.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/ExecutionHandler.cpp | 13 | ||||
-rw-r--r-- | cpp/src/qpid/client/ExecutionHandler.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionBase.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionBase.h | 5 |
5 files changed, 28 insertions, 0 deletions
diff --git a/cpp/src/qpid/client/Execution.h b/cpp/src/qpid/client/Execution.h index 557f7ece4a..5f717de586 100644 --- a/cpp/src/qpid/client/Execution.h +++ b/cpp/src/qpid/client/Execution.h @@ -41,6 +41,8 @@ public: virtual bool isComplete(const framing::SequenceNumber& id) = 0; virtual bool isCompleteUpTo(const framing::SequenceNumber& id) = 0; virtual void setCompletionListener(boost::function<void()>) = 0; + virtual void syncWait(const framing::SequenceNumber& id) = 0; + virtual framing::SequenceNumber lastSent() const = 0; }; }} diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp index ba3cdce636..afdd13c9e9 100644 --- a/cpp/src/qpid/client/ExecutionHandler.cpp +++ b/cpp/src/qpid/client/ExecutionHandler.cpp @@ -26,6 +26,8 @@ #include "qpid/framing/AMQP_HighestVersion.h" #include "qpid/framing/all_method_bodies.h" #include "qpid/framing/ServerInvoker.h" +#include "qpid/client/FutureCompletion.h" +#include <boost/bind.hpp> using namespace qpid::client; using namespace qpid::framing; @@ -252,3 +254,14 @@ void ExecutionHandler::setCompletionListener(boost::function<void()> l) { completionListener = l; } + + +void ExecutionHandler::syncWait(const SequenceNumber& id) { + syncTo(id); + FutureCompletion fc; + completion.listenForCompletion( + id, boost::bind(&FutureCompletion::completed, &fc) + ); + fc.waitForCompletion(); + assert(isCompleteUpTo(id)); +} diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h index 5e3e75b801..d9113b683b 100644 --- a/cpp/src/qpid/client/ExecutionHandler.h +++ b/cpp/src/qpid/client/ExecutionHandler.h @@ -86,6 +86,7 @@ public: void completed(const framing::SequenceNumber& id, bool cumulative, bool send); void syncTo(const framing::SequenceNumber& point); void flushTo(const framing::SequenceNumber& point); + void syncWait(const framing::SequenceNumber& id); bool isComplete(const framing::SequenceNumber& id); bool isCompleteUpTo(const framing::SequenceNumber& id); diff --git a/cpp/src/qpid/client/SessionBase.cpp b/cpp/src/qpid/client/SessionBase.cpp index 06266ded91..9b6123cbc4 100644 --- a/cpp/src/qpid/client/SessionBase.cpp +++ b/cpp/src/qpid/client/SessionBase.cpp @@ -34,4 +34,11 @@ bool SessionBase::isSynchronous() const { return impl->isSync(); } Execution& SessionBase::getExecution() { return impl->getExecution(); } Uuid SessionBase::getId() const { return impl->getId(); } framing::FrameSet::shared_ptr SessionBase::get() { return impl->get(); } + +void SessionBase::sync() { + Execution& ex = getExecution(); + ex.syncWait(ex.lastSent()); + impl->assertOpen(); +} + }} // namespace qpid::client diff --git a/cpp/src/qpid/client/SessionBase.h b/cpp/src/qpid/client/SessionBase.h index 890dbd269b..87c0892b61 100644 --- a/cpp/src/qpid/client/SessionBase.h +++ b/cpp/src/qpid/client/SessionBase.h @@ -84,6 +84,11 @@ class SessionBase /** Close the session */ void close(); + + /** Synchronize with the broker. Wait for all commands issued so far in + * the session to complete. + */ + void sync(); Execution& getExecution(); |