summaryrefslogtreecommitdiff
path: root/cpp/src/tests/qpid-receive.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/qpid-receive.cpp')
-rw-r--r--cpp/src/tests/qpid-receive.cpp14
1 files changed, 13 insertions, 1 deletions
diff --git a/cpp/src/tests/qpid-receive.cpp b/cpp/src/tests/qpid-receive.cpp
index a85d882a0f..9b84306605 100644
--- a/cpp/src/tests/qpid-receive.cpp
+++ b/cpp/src/tests/qpid-receive.cpp
@@ -191,6 +191,9 @@ int main(int argc, char ** argv)
int64_t interval = 0;
if (opts.receiveRate) interval = qpid::sys::TIME_SEC/opts.receiveRate;
+ Address replyToAddress;
+ Sender replyToSender;
+
while (!done && receiver.fetch(msg, timeout)) {
reporter.message(msg);
if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) {
@@ -223,12 +226,21 @@ int main(int argc, char ** argv)
} else if (opts.ackFrequency && (count % opts.ackFrequency == 0)) {
session.acknowledge();
}
+ if (msg.getReplyTo()) { // Echo message back to reply-to address.
+ if (msg.getReplyTo() != replyToAddress) {
+ replyToSender = session.createSender(msg.getReplyTo());
+ replyToSender.setCapacity(opts.capacity);
+ replyToAddress = msg.getReplyTo();
+ }
+ replyToSender.send(msg);
+ }
if (opts.receiveRate) {
qpid::sys::AbsTime waitTill(start, count*interval);
int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill);
if (delay > 0) qpid::sys::usleep(delay/qpid::sys::TIME_USEC);
}
- //opts.rejectFrequency??
+ // Clear out message properties & content for next iteration.
+ msg = Message(); // TODO aconway 2010-12-01: should be done by fetch
}
if (opts.reportTotal) reporter.report();
if (opts.tx) {