summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/client/Execution.h2
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.cpp13
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.h1
-rw-r--r--cpp/src/qpid/client/SessionBase.cpp7
-rw-r--r--cpp/src/qpid/client/SessionBase.h5
5 files changed, 28 insertions, 0 deletions
diff --git a/cpp/src/qpid/client/Execution.h b/cpp/src/qpid/client/Execution.h
index 557f7ece4a..5f717de586 100644
--- a/cpp/src/qpid/client/Execution.h
+++ b/cpp/src/qpid/client/Execution.h
@@ -41,6 +41,8 @@ public:
virtual bool isComplete(const framing::SequenceNumber& id) = 0;
virtual bool isCompleteUpTo(const framing::SequenceNumber& id) = 0;
virtual void setCompletionListener(boost::function<void()>) = 0;
+ virtual void syncWait(const framing::SequenceNumber& id) = 0;
+ virtual framing::SequenceNumber lastSent() const = 0;
};
}}
diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp
index ba3cdce636..afdd13c9e9 100644
--- a/cpp/src/qpid/client/ExecutionHandler.cpp
+++ b/cpp/src/qpid/client/ExecutionHandler.cpp
@@ -26,6 +26,8 @@
#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/framing/all_method_bodies.h"
#include "qpid/framing/ServerInvoker.h"
+#include "qpid/client/FutureCompletion.h"
+#include <boost/bind.hpp>
using namespace qpid::client;
using namespace qpid::framing;
@@ -252,3 +254,14 @@ void ExecutionHandler::setCompletionListener(boost::function<void()> l)
{
completionListener = l;
}
+
+
+void ExecutionHandler::syncWait(const SequenceNumber& id) {
+ syncTo(id);
+ FutureCompletion fc;
+ completion.listenForCompletion(
+ id, boost::bind(&FutureCompletion::completed, &fc)
+ );
+ fc.waitForCompletion();
+ assert(isCompleteUpTo(id));
+}
diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h
index 5e3e75b801..d9113b683b 100644
--- a/cpp/src/qpid/client/ExecutionHandler.h
+++ b/cpp/src/qpid/client/ExecutionHandler.h
@@ -86,6 +86,7 @@ public:
void completed(const framing::SequenceNumber& id, bool cumulative, bool send);
void syncTo(const framing::SequenceNumber& point);
void flushTo(const framing::SequenceNumber& point);
+ void syncWait(const framing::SequenceNumber& id);
bool isComplete(const framing::SequenceNumber& id);
bool isCompleteUpTo(const framing::SequenceNumber& id);
diff --git a/cpp/src/qpid/client/SessionBase.cpp b/cpp/src/qpid/client/SessionBase.cpp
index 06266ded91..9b6123cbc4 100644
--- a/cpp/src/qpid/client/SessionBase.cpp
+++ b/cpp/src/qpid/client/SessionBase.cpp
@@ -34,4 +34,11 @@ bool SessionBase::isSynchronous() const { return impl->isSync(); }
Execution& SessionBase::getExecution() { return impl->getExecution(); }
Uuid SessionBase::getId() const { return impl->getId(); }
framing::FrameSet::shared_ptr SessionBase::get() { return impl->get(); }
+
+void SessionBase::sync() {
+ Execution& ex = getExecution();
+ ex.syncWait(ex.lastSent());
+ impl->assertOpen();
+}
+
}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/SessionBase.h b/cpp/src/qpid/client/SessionBase.h
index 890dbd269b..87c0892b61 100644
--- a/cpp/src/qpid/client/SessionBase.h
+++ b/cpp/src/qpid/client/SessionBase.h
@@ -84,6 +84,11 @@ class SessionBase
/** Close the session */
void close();
+
+ /** Synchronize with the broker. Wait for all commands issued so far in
+ * the session to complete.
+ */
+ void sync();
Execution& getExecution();