summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/src/qpid/client/LocalQueue.cpp7
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.cpp2
-rw-r--r--cpp/src/tests/perftest.cpp12
3 files changed, 14 insertions, 7 deletions
diff --git a/cpp/src/qpid/client/LocalQueue.cpp b/cpp/src/qpid/client/LocalQueue.cpp
index 09bf1e055a..7fbe4a053e 100644
--- a/cpp/src/qpid/client/LocalQueue.cpp
+++ b/cpp/src/qpid/client/LocalQueue.cpp
@@ -35,8 +35,11 @@ Message LocalQueue::pop() {
if (!queue)
throw ClosedException();
FrameSet::shared_ptr content = queue->pop();
- if (content->isA<MessageTransferBody>())
- return Message(*content, session);
+ if (content->isA<MessageTransferBody>()) {
+ Message m(*content, session);
+ autoAck.ack(m);
+ return m;
+ }
else
throw CommandInvalidException(
QPID_MSG("Unexpected method: " << content->getMethod()));
diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp
index fc65843643..478b8438c2 100644
--- a/cpp/src/qpid/client/SubscriptionManager.cpp
+++ b/cpp/src/qpid/client/SubscriptionManager.cpp
@@ -41,8 +41,8 @@ void SubscriptionManager::subscribe(
MessageListener& listener, const std::string& q, const std::string& t)
{
std::string tag=t.empty() ? q:t;
- session.messageSubscribe(arg::queue=q, arg::destination=tag);
dispatcher.listen(tag, &listener);
+ session.messageSubscribe(arg::queue=q, arg::destination=tag);
setFlowControl(tag, messages, bytes, window);
}
diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp
index 80157da7f4..c775407cdf 100644
--- a/cpp/src/tests/perftest.cpp
+++ b/cpp/src/tests/perftest.cpp
@@ -44,10 +44,11 @@ struct Opts : public TestOptions {
bool durable;
int consumers;
std::string mode;
+ int autoAck;
Opts() :
listen(false), publish(false), count(500000), size(64), consumers(1),
- mode("shared")
+ mode("shared"), autoAck(100)
{
addOptions()
("listen", optValue(listen), "Consume messages.")
@@ -56,7 +57,8 @@ struct Opts : public TestOptions {
("size", optValue(size, "BYTES"), "Size of messages.")
("durable", optValue(durable, "N"), "Publish messages as durable.")
("consumers", optValue(consumers, "N"), "Number of consumers.")
- ("mode", optValue(mode, "shared|fanout|topic"), "consume mode");
+ ("mode", optValue(mode, "shared|fanout|topic"), "consume mode")
+ ("auto-ack", optValue(autoAck, "N"), "ack every N messages.");
}
};
@@ -219,12 +221,14 @@ void ListenThread::run() {
session.messageTransfer(arg::content=Message("ready", "control"));
SubscriptionManager subs(session);
- LocalQueue consume;
+ LocalQueue consume(AckPolicy(opts.autoAck));
subs.subscribe(consume, consumeQueue);
int consumed=0;
AbsTime start=now();
- while (consume.pop().getData() != "done")
+ Message msg;
+ while ((msg=consume.pop()).getData() != "done")
++consumed;
+ msg.acknowledge(); // Ack all outstanding messages.
AbsTime end=now();
// Report to publisher.