summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/examples/examples/fanout/listener.cpp73
-rw-r--r--cpp/examples/examples/pub-sub/topic_listener.cpp6
-rw-r--r--cpp/src/qpid/client/Execution.h2
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.cpp13
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.h1
-rw-r--r--cpp/src/qpid/client/SessionBase.cpp7
-rw-r--r--cpp/src/qpid/client/SessionBase.h5
7 files changed, 71 insertions, 36 deletions
diff --git a/cpp/examples/examples/fanout/listener.cpp b/cpp/examples/examples/fanout/listener.cpp
index 5295e10f34..f465a8554e 100644
--- a/cpp/examples/examples/fanout/listener.cpp
+++ b/cpp/examples/examples/fanout/listener.cpp
@@ -50,12 +50,12 @@ Listener::Listener(SubscriptionManager& subs) : subscriptions(subs)
{}
void Listener::received(Message& message) {
- std::cout << "Message: " << message.getData() << std::endl;
- if (message.getData() == "That's all, folks!") {
- std::cout << "Shutting down listener for " << message.getDestination()
- << std::endl;
- subscriptions.cancel(message.getDestination());
- }
+ std::cout << "Message: " << message.getData() << std::endl;
+ if (message.getData() == "That's all, folks!") {
+ std::cout << "Shutting down listener for " << message.getDestination()
+ << std::endl;
+ subscriptions.cancel(message.getDestination());
+ }
}
int main(int argc, char** argv) {
@@ -64,35 +64,38 @@ int main(int argc, char** argv) {
Connection connection;
Message msg;
try {
- connection.open(host, port);
- Session session = connection.newSession();
-
- //--------- Main body of program --------------------------------------------
-
- // Unique name for private queue:
- std::string myQueue=session.getId().str();
- // Declear my queue.
- session.queueDeclare(arg::queue=myQueue, arg::exclusive=true,
- arg::autoDelete=true);
- // Bind my queue to the fanout exchange.
- // Note no routingKey required, the fanout exchange delivers
- // all messages to all bound queues unconditionally.
- session.queueBind(arg::exchange="amq.fanout", arg::queue=myQueue);
-
- // Create a listener and subscribe it to my queue.
- SubscriptionManager subscriptions(session);
- Listener listener(subscriptions);
- subscriptions.subscribe(listener, myQueue);
-
- // Deliver messages until the subscription is cancelled
- // by Listener::received()
- std::cout << "Listening" << std::endl;
- subscriptions.run();
-
- //---------------------------------------------------------------------------
-
- connection.close();
- return 0;
+ connection.open(host, port);
+ Session session = connection.newSession();
+
+ //--------- Main body of program --------------------------------------------
+
+ // Unique name for private queue:
+ std::string myQueue=session.getId().str();
+ // Declear my queue.
+ session.queueDeclare(arg::queue=myQueue, arg::exclusive=true,
+ arg::autoDelete=true);
+ // Bind my queue to the fanout exchange.
+ // Note no routingKey required, the fanout exchange delivers
+ // all messages to all bound queues unconditionally.
+ session.queueBind(arg::exchange="amq.fanout", arg::queue=myQueue);
+
+ // Create a listener and subscribe it to my queue.
+ SubscriptionManager subscriptions(session);
+ Listener listener(subscriptions);
+ subscriptions.subscribe(listener, myQueue);
+
+ // Wait for the broker to indicate that our queues have been created.
+ session.sync();
+
+ // Deliver messages until the subscription is cancelled
+ // by Listener::received()
+ std::cout << "Listening" << std::endl;
+ subscriptions.run();
+
+ //---------------------------------------------------------------------------
+
+ connection.close();
+ return 0;
} catch(const std::exception& error) {
std::cout << error.what() << std::endl;
}
diff --git a/cpp/examples/examples/pub-sub/topic_listener.cpp b/cpp/examples/examples/pub-sub/topic_listener.cpp
index 7364d89abb..7fd31f567e 100644
--- a/cpp/examples/examples/pub-sub/topic_listener.cpp
+++ b/cpp/examples/examples/pub-sub/topic_listener.cpp
@@ -128,7 +128,8 @@ void Listener::received(Message& message) {
}
void Listener::listen() {
- subscriptions.run();
+ // Receive messages
+ subscriptions.run();
}
int main(int argc, char** argv) {
@@ -152,6 +153,9 @@ int main(int argc, char** argv) {
listener.prepareQueue("news", "#.news");
listener.prepareQueue("weather", "#.weather");
+ // Wait for the broker to indicate that our queues have been created.
+ session.sync();
+
std::cout << "Listening for messages ..." << std::endl;
// Give up control and receive messages
diff --git a/cpp/src/qpid/client/Execution.h b/cpp/src/qpid/client/Execution.h
index 557f7ece4a..5f717de586 100644
--- a/cpp/src/qpid/client/Execution.h
+++ b/cpp/src/qpid/client/Execution.h
@@ -41,6 +41,8 @@ public:
virtual bool isComplete(const framing::SequenceNumber& id) = 0;
virtual bool isCompleteUpTo(const framing::SequenceNumber& id) = 0;
virtual void setCompletionListener(boost::function<void()>) = 0;
+ virtual void syncWait(const framing::SequenceNumber& id) = 0;
+ virtual framing::SequenceNumber lastSent() const = 0;
};
}}
diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp
index ba3cdce636..afdd13c9e9 100644
--- a/cpp/src/qpid/client/ExecutionHandler.cpp
+++ b/cpp/src/qpid/client/ExecutionHandler.cpp
@@ -26,6 +26,8 @@
#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/framing/all_method_bodies.h"
#include "qpid/framing/ServerInvoker.h"
+#include "qpid/client/FutureCompletion.h"
+#include <boost/bind.hpp>
using namespace qpid::client;
using namespace qpid::framing;
@@ -252,3 +254,14 @@ void ExecutionHandler::setCompletionListener(boost::function<void()> l)
{
completionListener = l;
}
+
+
+void ExecutionHandler::syncWait(const SequenceNumber& id) {
+ syncTo(id);
+ FutureCompletion fc;
+ completion.listenForCompletion(
+ id, boost::bind(&FutureCompletion::completed, &fc)
+ );
+ fc.waitForCompletion();
+ assert(isCompleteUpTo(id));
+}
diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h
index 5e3e75b801..d9113b683b 100644
--- a/cpp/src/qpid/client/ExecutionHandler.h
+++ b/cpp/src/qpid/client/ExecutionHandler.h
@@ -86,6 +86,7 @@ public:
void completed(const framing::SequenceNumber& id, bool cumulative, bool send);
void syncTo(const framing::SequenceNumber& point);
void flushTo(const framing::SequenceNumber& point);
+ void syncWait(const framing::SequenceNumber& id);
bool isComplete(const framing::SequenceNumber& id);
bool isCompleteUpTo(const framing::SequenceNumber& id);
diff --git a/cpp/src/qpid/client/SessionBase.cpp b/cpp/src/qpid/client/SessionBase.cpp
index 06266ded91..9b6123cbc4 100644
--- a/cpp/src/qpid/client/SessionBase.cpp
+++ b/cpp/src/qpid/client/SessionBase.cpp
@@ -34,4 +34,11 @@ bool SessionBase::isSynchronous() const { return impl->isSync(); }
Execution& SessionBase::getExecution() { return impl->getExecution(); }
Uuid SessionBase::getId() const { return impl->getId(); }
framing::FrameSet::shared_ptr SessionBase::get() { return impl->get(); }
+
+void SessionBase::sync() {
+ Execution& ex = getExecution();
+ ex.syncWait(ex.lastSent());
+ impl->assertOpen();
+}
+
}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/SessionBase.h b/cpp/src/qpid/client/SessionBase.h
index 890dbd269b..87c0892b61 100644
--- a/cpp/src/qpid/client/SessionBase.h
+++ b/cpp/src/qpid/client/SessionBase.h
@@ -84,6 +84,11 @@ class SessionBase
/** Close the session */
void close();
+
+ /** Synchronize with the broker. Wait for all commands issued so far in
+ * the session to complete.
+ */
+ void sync();
Execution& getExecution();