diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/examples/examples/fanout/listener.cpp | 73 | ||||
-rw-r--r-- | cpp/examples/examples/pub-sub/topic_listener.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/client/Execution.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/ExecutionHandler.cpp | 13 | ||||
-rw-r--r-- | cpp/src/qpid/client/ExecutionHandler.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionBase.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionBase.h | 5 |
7 files changed, 71 insertions, 36 deletions
diff --git a/cpp/examples/examples/fanout/listener.cpp b/cpp/examples/examples/fanout/listener.cpp index 5295e10f34..f465a8554e 100644 --- a/cpp/examples/examples/fanout/listener.cpp +++ b/cpp/examples/examples/fanout/listener.cpp @@ -50,12 +50,12 @@ Listener::Listener(SubscriptionManager& subs) : subscriptions(subs) {} void Listener::received(Message& message) { - std::cout << "Message: " << message.getData() << std::endl; - if (message.getData() == "That's all, folks!") { - std::cout << "Shutting down listener for " << message.getDestination() - << std::endl; - subscriptions.cancel(message.getDestination()); - } + std::cout << "Message: " << message.getData() << std::endl; + if (message.getData() == "That's all, folks!") { + std::cout << "Shutting down listener for " << message.getDestination() + << std::endl; + subscriptions.cancel(message.getDestination()); + } } int main(int argc, char** argv) { @@ -64,35 +64,38 @@ int main(int argc, char** argv) { Connection connection; Message msg; try { - connection.open(host, port); - Session session = connection.newSession(); - - //--------- Main body of program -------------------------------------------- - - // Unique name for private queue: - std::string myQueue=session.getId().str(); - // Declear my queue. - session.queueDeclare(arg::queue=myQueue, arg::exclusive=true, - arg::autoDelete=true); - // Bind my queue to the fanout exchange. - // Note no routingKey required, the fanout exchange delivers - // all messages to all bound queues unconditionally. - session.queueBind(arg::exchange="amq.fanout", arg::queue=myQueue); - - // Create a listener and subscribe it to my queue. - SubscriptionManager subscriptions(session); - Listener listener(subscriptions); - subscriptions.subscribe(listener, myQueue); - - // Deliver messages until the subscription is cancelled - // by Listener::received() - std::cout << "Listening" << std::endl; - subscriptions.run(); - - //--------------------------------------------------------------------------- - - connection.close(); - return 0; + connection.open(host, port); + Session session = connection.newSession(); + + //--------- Main body of program -------------------------------------------- + + // Unique name for private queue: + std::string myQueue=session.getId().str(); + // Declear my queue. + session.queueDeclare(arg::queue=myQueue, arg::exclusive=true, + arg::autoDelete=true); + // Bind my queue to the fanout exchange. + // Note no routingKey required, the fanout exchange delivers + // all messages to all bound queues unconditionally. + session.queueBind(arg::exchange="amq.fanout", arg::queue=myQueue); + + // Create a listener and subscribe it to my queue. + SubscriptionManager subscriptions(session); + Listener listener(subscriptions); + subscriptions.subscribe(listener, myQueue); + + // Wait for the broker to indicate that our queues have been created. + session.sync(); + + // Deliver messages until the subscription is cancelled + // by Listener::received() + std::cout << "Listening" << std::endl; + subscriptions.run(); + + //--------------------------------------------------------------------------- + + connection.close(); + return 0; } catch(const std::exception& error) { std::cout << error.what() << std::endl; } diff --git a/cpp/examples/examples/pub-sub/topic_listener.cpp b/cpp/examples/examples/pub-sub/topic_listener.cpp index 7364d89abb..7fd31f567e 100644 --- a/cpp/examples/examples/pub-sub/topic_listener.cpp +++ b/cpp/examples/examples/pub-sub/topic_listener.cpp @@ -128,7 +128,8 @@ void Listener::received(Message& message) { } void Listener::listen() { - subscriptions.run(); + // Receive messages + subscriptions.run(); } int main(int argc, char** argv) { @@ -152,6 +153,9 @@ int main(int argc, char** argv) { listener.prepareQueue("news", "#.news"); listener.prepareQueue("weather", "#.weather"); + // Wait for the broker to indicate that our queues have been created. + session.sync(); + std::cout << "Listening for messages ..." << std::endl; // Give up control and receive messages diff --git a/cpp/src/qpid/client/Execution.h b/cpp/src/qpid/client/Execution.h index 557f7ece4a..5f717de586 100644 --- a/cpp/src/qpid/client/Execution.h +++ b/cpp/src/qpid/client/Execution.h @@ -41,6 +41,8 @@ public: virtual bool isComplete(const framing::SequenceNumber& id) = 0; virtual bool isCompleteUpTo(const framing::SequenceNumber& id) = 0; virtual void setCompletionListener(boost::function<void()>) = 0; + virtual void syncWait(const framing::SequenceNumber& id) = 0; + virtual framing::SequenceNumber lastSent() const = 0; }; }} diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp index ba3cdce636..afdd13c9e9 100644 --- a/cpp/src/qpid/client/ExecutionHandler.cpp +++ b/cpp/src/qpid/client/ExecutionHandler.cpp @@ -26,6 +26,8 @@ #include "qpid/framing/AMQP_HighestVersion.h" #include "qpid/framing/all_method_bodies.h" #include "qpid/framing/ServerInvoker.h" +#include "qpid/client/FutureCompletion.h" +#include <boost/bind.hpp> using namespace qpid::client; using namespace qpid::framing; @@ -252,3 +254,14 @@ void ExecutionHandler::setCompletionListener(boost::function<void()> l) { completionListener = l; } + + +void ExecutionHandler::syncWait(const SequenceNumber& id) { + syncTo(id); + FutureCompletion fc; + completion.listenForCompletion( + id, boost::bind(&FutureCompletion::completed, &fc) + ); + fc.waitForCompletion(); + assert(isCompleteUpTo(id)); +} diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h index 5e3e75b801..d9113b683b 100644 --- a/cpp/src/qpid/client/ExecutionHandler.h +++ b/cpp/src/qpid/client/ExecutionHandler.h @@ -86,6 +86,7 @@ public: void completed(const framing::SequenceNumber& id, bool cumulative, bool send); void syncTo(const framing::SequenceNumber& point); void flushTo(const framing::SequenceNumber& point); + void syncWait(const framing::SequenceNumber& id); bool isComplete(const framing::SequenceNumber& id); bool isCompleteUpTo(const framing::SequenceNumber& id); diff --git a/cpp/src/qpid/client/SessionBase.cpp b/cpp/src/qpid/client/SessionBase.cpp index 06266ded91..9b6123cbc4 100644 --- a/cpp/src/qpid/client/SessionBase.cpp +++ b/cpp/src/qpid/client/SessionBase.cpp @@ -34,4 +34,11 @@ bool SessionBase::isSynchronous() const { return impl->isSync(); } Execution& SessionBase::getExecution() { return impl->getExecution(); } Uuid SessionBase::getId() const { return impl->getId(); } framing::FrameSet::shared_ptr SessionBase::get() { return impl->get(); } + +void SessionBase::sync() { + Execution& ex = getExecution(); + ex.syncWait(ex.lastSent()); + impl->assertOpen(); +} + }} // namespace qpid::client diff --git a/cpp/src/qpid/client/SessionBase.h b/cpp/src/qpid/client/SessionBase.h index 890dbd269b..87c0892b61 100644 --- a/cpp/src/qpid/client/SessionBase.h +++ b/cpp/src/qpid/client/SessionBase.h @@ -84,6 +84,11 @@ class SessionBase /** Close the session */ void close(); + + /** Synchronize with the broker. Wait for all commands issued so far in + * the session to complete. + */ + void sync(); Execution& getExecution(); |