summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-02-18 21:54:02 +0000
committerAlan Conway <aconway@apache.org>2008-02-18 21:54:02 +0000
commitc612a6c6200fd9a8f9830cbad062b30b465d3dfe (patch)
treefd427336d51e5c091529eb53743592294f7748be /cpp
parent3966d8be198296525a87a6bd88a42c4bb4f20d03 (diff)
downloadqpid-python-c612a6c6200fd9a8f9830cbad062b30b465d3dfe.tar.gz
Fixed race condition in the examples: when a listener program prints
its "ready" message, the commands it has sent to the broker may not yet be complete. This results in sporadic lost messages if the producer is started immediately (e.g. by a script.) - Added Session::sync(), wait till all commands to date have completed. - Call sync() before printing "ready" in listener example programs - Removed sleep from verify script git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@628875 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/examples/examples/fanout/listener.cpp73
-rw-r--r--cpp/examples/examples/pub-sub/topic_listener.cpp6
-rw-r--r--cpp/src/qpid/client/Execution.h2
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.cpp13
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.h1
-rw-r--r--cpp/src/qpid/client/SessionBase.cpp7
-rw-r--r--cpp/src/qpid/client/SessionBase.h5
7 files changed, 71 insertions, 36 deletions
diff --git a/cpp/examples/examples/fanout/listener.cpp b/cpp/examples/examples/fanout/listener.cpp
index 5295e10f34..f465a8554e 100644
--- a/cpp/examples/examples/fanout/listener.cpp
+++ b/cpp/examples/examples/fanout/listener.cpp
@@ -50,12 +50,12 @@ Listener::Listener(SubscriptionManager& subs) : subscriptions(subs)
{}
void Listener::received(Message& message) {
- std::cout << "Message: " << message.getData() << std::endl;
- if (message.getData() == "That's all, folks!") {
- std::cout << "Shutting down listener for " << message.getDestination()
- << std::endl;
- subscriptions.cancel(message.getDestination());
- }
+ std::cout << "Message: " << message.getData() << std::endl;
+ if (message.getData() == "That's all, folks!") {
+ std::cout << "Shutting down listener for " << message.getDestination()
+ << std::endl;
+ subscriptions.cancel(message.getDestination());
+ }
}
int main(int argc, char** argv) {
@@ -64,35 +64,38 @@ int main(int argc, char** argv) {
Connection connection;
Message msg;
try {
- connection.open(host, port);
- Session session = connection.newSession();
-
- //--------- Main body of program --------------------------------------------
-
- // Unique name for private queue:
- std::string myQueue=session.getId().str();
- // Declear my queue.
- session.queueDeclare(arg::queue=myQueue, arg::exclusive=true,
- arg::autoDelete=true);
- // Bind my queue to the fanout exchange.
- // Note no routingKey required, the fanout exchange delivers
- // all messages to all bound queues unconditionally.
- session.queueBind(arg::exchange="amq.fanout", arg::queue=myQueue);
-
- // Create a listener and subscribe it to my queue.
- SubscriptionManager subscriptions(session);
- Listener listener(subscriptions);
- subscriptions.subscribe(listener, myQueue);
-
- // Deliver messages until the subscription is cancelled
- // by Listener::received()
- std::cout << "Listening" << std::endl;
- subscriptions.run();
-
- //---------------------------------------------------------------------------
-
- connection.close();
- return 0;
+ connection.open(host, port);
+ Session session = connection.newSession();
+
+ //--------- Main body of program --------------------------------------------
+
+ // Unique name for private queue:
+ std::string myQueue=session.getId().str();
+ // Declear my queue.
+ session.queueDeclare(arg::queue=myQueue, arg::exclusive=true,
+ arg::autoDelete=true);
+ // Bind my queue to the fanout exchange.
+ // Note no routingKey required, the fanout exchange delivers
+ // all messages to all bound queues unconditionally.
+ session.queueBind(arg::exchange="amq.fanout", arg::queue=myQueue);
+
+ // Create a listener and subscribe it to my queue.
+ SubscriptionManager subscriptions(session);
+ Listener listener(subscriptions);
+ subscriptions.subscribe(listener, myQueue);
+
+ // Wait for the broker to indicate that our queues have been created.
+ session.sync();
+
+ // Deliver messages until the subscription is cancelled
+ // by Listener::received()
+ std::cout << "Listening" << std::endl;
+ subscriptions.run();
+
+ //---------------------------------------------------------------------------
+
+ connection.close();
+ return 0;
} catch(const std::exception& error) {
std::cout << error.what() << std::endl;
}
diff --git a/cpp/examples/examples/pub-sub/topic_listener.cpp b/cpp/examples/examples/pub-sub/topic_listener.cpp
index 7364d89abb..7fd31f567e 100644
--- a/cpp/examples/examples/pub-sub/topic_listener.cpp
+++ b/cpp/examples/examples/pub-sub/topic_listener.cpp
@@ -128,7 +128,8 @@ void Listener::received(Message& message) {
}
void Listener::listen() {
- subscriptions.run();
+ // Receive messages
+ subscriptions.run();
}
int main(int argc, char** argv) {
@@ -152,6 +153,9 @@ int main(int argc, char** argv) {
listener.prepareQueue("news", "#.news");
listener.prepareQueue("weather", "#.weather");
+ // Wait for the broker to indicate that our queues have been created.
+ session.sync();
+
std::cout << "Listening for messages ..." << std::endl;
// Give up control and receive messages
diff --git a/cpp/src/qpid/client/Execution.h b/cpp/src/qpid/client/Execution.h
index 557f7ece4a..5f717de586 100644
--- a/cpp/src/qpid/client/Execution.h
+++ b/cpp/src/qpid/client/Execution.h
@@ -41,6 +41,8 @@ public:
virtual bool isComplete(const framing::SequenceNumber& id) = 0;
virtual bool isCompleteUpTo(const framing::SequenceNumber& id) = 0;
virtual void setCompletionListener(boost::function<void()>) = 0;
+ virtual void syncWait(const framing::SequenceNumber& id) = 0;
+ virtual framing::SequenceNumber lastSent() const = 0;
};
}}
diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp
index ba3cdce636..afdd13c9e9 100644
--- a/cpp/src/qpid/client/ExecutionHandler.cpp
+++ b/cpp/src/qpid/client/ExecutionHandler.cpp
@@ -26,6 +26,8 @@
#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/framing/all_method_bodies.h"
#include "qpid/framing/ServerInvoker.h"
+#include "qpid/client/FutureCompletion.h"
+#include <boost/bind.hpp>
using namespace qpid::client;
using namespace qpid::framing;
@@ -252,3 +254,14 @@ void ExecutionHandler::setCompletionListener(boost::function<void()> l)
{
completionListener = l;
}
+
+
+void ExecutionHandler::syncWait(const SequenceNumber& id) {
+ syncTo(id);
+ FutureCompletion fc;
+ completion.listenForCompletion(
+ id, boost::bind(&FutureCompletion::completed, &fc)
+ );
+ fc.waitForCompletion();
+ assert(isCompleteUpTo(id));
+}
diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h
index 5e3e75b801..d9113b683b 100644
--- a/cpp/src/qpid/client/ExecutionHandler.h
+++ b/cpp/src/qpid/client/ExecutionHandler.h
@@ -86,6 +86,7 @@ public:
void completed(const framing::SequenceNumber& id, bool cumulative, bool send);
void syncTo(const framing::SequenceNumber& point);
void flushTo(const framing::SequenceNumber& point);
+ void syncWait(const framing::SequenceNumber& id);
bool isComplete(const framing::SequenceNumber& id);
bool isCompleteUpTo(const framing::SequenceNumber& id);
diff --git a/cpp/src/qpid/client/SessionBase.cpp b/cpp/src/qpid/client/SessionBase.cpp
index 06266ded91..9b6123cbc4 100644
--- a/cpp/src/qpid/client/SessionBase.cpp
+++ b/cpp/src/qpid/client/SessionBase.cpp
@@ -34,4 +34,11 @@ bool SessionBase::isSynchronous() const { return impl->isSync(); }
Execution& SessionBase::getExecution() { return impl->getExecution(); }
Uuid SessionBase::getId() const { return impl->getId(); }
framing::FrameSet::shared_ptr SessionBase::get() { return impl->get(); }
+
+void SessionBase::sync() {
+ Execution& ex = getExecution();
+ ex.syncWait(ex.lastSent());
+ impl->assertOpen();
+}
+
}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/SessionBase.h b/cpp/src/qpid/client/SessionBase.h
index 890dbd269b..87c0892b61 100644
--- a/cpp/src/qpid/client/SessionBase.h
+++ b/cpp/src/qpid/client/SessionBase.h
@@ -84,6 +84,11 @@ class SessionBase
/** Close the session */
void close();
+
+ /** Synchronize with the broker. Wait for all commands issued so far in
+ * the session to complete.
+ */
+ void sync();
Execution& getExecution();