summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/CMakeLists.txt1
-rw-r--r--cpp/src/Makefile.am1
-rw-r--r--cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp3
-rw-r--r--cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp2
-rw-r--r--cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp3
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.cpp5
-rw-r--r--cpp/src/qpid/messaging/Duration.cpp45
-rw-r--r--cpp/src/qpid/messaging/Message.cpp4
-rw-r--r--cpp/src/tests/MessagingSessionTests.cpp36
-rw-r--r--cpp/src/tests/qpid_recv.cpp6
-rw-r--r--cpp/src/tests/qpid_send.cpp2
11 files changed, 79 insertions, 29 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt
index 40590993eb..1695c6fa17 100644
--- a/cpp/src/CMakeLists.txt
+++ b/cpp/src/CMakeLists.txt
@@ -679,6 +679,7 @@ set (qpidclient_SOURCES
qpid/messaging/AddressParser.cpp
qpid/messaging/Connection.cpp
qpid/messaging/ConnectionImpl.h
+ qpid/messaging/Duration.cpp
qpid/messaging/ListContent.cpp
qpid/messaging/ListView.cpp
qpid/messaging/MapContent.cpp
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 7da7f42dcb..d6867701b4 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -707,6 +707,7 @@ libqpidclient_la_SOURCES = \
qpid/messaging/AddressParser.h \
qpid/messaging/AddressParser.cpp \
qpid/messaging/Connection.cpp \
+ qpid/messaging/Duration.cpp \
qpid/messaging/ListContent.cpp \
qpid/messaging/ListView.cpp \
qpid/messaging/MapContent.cpp \
diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index 8e501511e4..3f5cccfedb 100644
--- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -26,6 +26,7 @@
#include "qpid/client/SessionBase_0_10Access.h"
#include "qpid/log/Statement.h"
#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Duration.h"
#include "qpid/messaging/Message.h"
#include "qpid/messaging/MessageImpl.h"
#include "qpid/types/Variant.h"
@@ -276,7 +277,7 @@ void populateHeaders(qpid::messaging::Message& message,
const MessageProperties* messageProperties)
{
if (deliveryProperties) {
- message.setTtl(deliveryProperties->getTtl());
+ message.setTtl(qpid::messaging::Duration(deliveryProperties->getTtl()));
message.setDurable(deliveryProperties->getDeliveryMode() == DELIVERY_MODE_PERSISTENT);
MessageImplAccess::get(message).redelivered = deliveryProperties->getRedelivered();
}
diff --git a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
index e75368cda7..d0d945b934 100644
--- a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
+++ b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
@@ -47,7 +47,7 @@ void OutgoingMessage::convert(const qpid::messaging::Message& from)
message.getMessageProperties().setReplyTo(AddressResolution::convert(address));
}
translate(from.getHeaders(), message.getMessageProperties().getApplicationHeaders());
- message.getDeliveryProperties().setTtl(from.getTtl());
+ message.getDeliveryProperties().setTtl(from.getTtl().getMilliseconds());
if (from.getDurable()) {
message.getDeliveryProperties().setDeliveryMode(DELIVERY_MODE_PERSISTENT);
}
diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
index 2f52efbceb..c3367f8ab4 100644
--- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
@@ -30,6 +30,7 @@ namespace client {
namespace amqp0_10 {
using qpid::messaging::Receiver;
+using qpid::messaging::Duration;
void ReceiverImpl::received(qpid::messaging::Message&)
{
@@ -163,7 +164,7 @@ bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::messaging:
} else {
sync(session).messageFlush(destination);
startFlow();//reallocate credit
- return getImpl(message, 0);
+ return getImpl(message, Duration::IMMEDIATE);
}
}
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index 245ec878be..209ab93909 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -267,8 +267,9 @@ bool SessionImpl::accept(ReceiverImpl* receiver,
qpid::sys::Duration adjust(qpid::messaging::Duration timeout)
{
- if (timeout < (uint64_t) (qpid::sys::TIME_INFINITE/qpid::sys::TIME_MSEC)) {
- return timeout * qpid::sys::TIME_MSEC;
+ uint64_t ms = timeout.getMilliseconds();
+ if (ms < (uint64_t) (qpid::sys::TIME_INFINITE/qpid::sys::TIME_MSEC)) {
+ return ms * qpid::sys::TIME_MSEC;
} else {
return qpid::sys::TIME_INFINITE;
}
diff --git a/cpp/src/qpid/messaging/Duration.cpp b/cpp/src/qpid/messaging/Duration.cpp
new file mode 100644
index 0000000000..c415b70fbe
--- /dev/null
+++ b/cpp/src/qpid/messaging/Duration.cpp
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/Duration.h"
+#include <limits>
+
+namespace qpid {
+namespace messaging {
+
+Duration::Duration(uint64_t ms) : milliseconds(ms) {}
+uint64_t Duration::getMilliseconds() const { return milliseconds; }
+
+Duration operator*(const Duration& duration, uint64_t multiplier)
+{
+ return Duration(duration.getMilliseconds() * multiplier);
+}
+
+Duration operator*(uint64_t multiplier, const Duration& duration)
+{
+ return Duration(duration.getMilliseconds() * multiplier);
+}
+
+const Duration Duration::INFINITE(std::numeric_limits<uint64_t>::max());
+const Duration Duration::IMMEDIATE(0);
+const Duration Duration::SECOND(1000);
+const Duration Duration::MINUTE(SECOND * 60);
+
+}} // namespace qpid::messaging
diff --git a/cpp/src/qpid/messaging/Message.cpp b/cpp/src/qpid/messaging/Message.cpp
index d953b3ff75..822659f6ef 100644
--- a/cpp/src/qpid/messaging/Message.cpp
+++ b/cpp/src/qpid/messaging/Message.cpp
@@ -52,8 +52,8 @@ const std::string& Message::getUserId() const { return impl->userId; }
void Message::setCorrelationId(const std::string& id) { impl->correlationId = id; }
const std::string& Message::getCorrelationId() const { return impl->correlationId; }
-void Message::setTtl(Duration ttl) { impl->ttl = ttl; }
-Duration Message::getTtl() const { return impl->ttl; }
+void Message::setTtl(Duration ttl) { impl->ttl = ttl.getMilliseconds(); }
+Duration Message::getTtl() const { return Duration(impl->ttl); }
void Message::setDurable(bool durable) { impl->durable = durable; }
bool Message::getDurable() const { return impl->durable; }
diff --git a/cpp/src/tests/MessagingSessionTests.cpp b/cpp/src/tests/MessagingSessionTests.cpp
index 56e86d101f..c4659493ce 100644
--- a/cpp/src/tests/MessagingSessionTests.cpp
+++ b/cpp/src/tests/MessagingSessionTests.cpp
@@ -131,7 +131,7 @@ struct MessagingFixture : public BrokerFixture
Message out(Uuid(true).str());
s.send(out);
Message in;
- BOOST_CHECK(r.fetch(in, 5*DURATION_SEC));
+ BOOST_CHECK(r.fetch(in, 5*Duration::SECOND));
BOOST_CHECK_EQUAL(out.getContent(), in.getContent());
r.close();
s.close();
@@ -197,7 +197,7 @@ struct MultiQueueFixture : MessagingFixture
}
};
-std::vector<std::string> fetch(Receiver& receiver, int count, Duration timeout=DURATION_SEC*5)
+std::vector<std::string> fetch(Receiver& receiver, int count, Duration timeout=Duration::SECOND*5)
{
std::vector<std::string> data;
Message message;
@@ -216,7 +216,7 @@ void send(Sender& sender, uint count = 1, uint start = 1, const std::string& bas
}
void receive(Receiver& receiver, uint count = 1, uint start = 1,
- const std::string& base = "Message", Duration timeout=DURATION_SEC*5)
+ const std::string& base = "Message", Duration timeout=Duration::SECOND*5)
{
for (uint i = start; i < start + count; ++i) {
BOOST_CHECK_EQUAL(receiver.fetch(timeout).getContent(), (boost::format("%1%_%2%") % base % i).str());
@@ -230,7 +230,7 @@ QPID_AUTO_TEST_CASE(testSimpleSendReceive)
Message out("test-message");
sender.send(out);
Receiver receiver = fix.session.createReceiver(fix.queue);
- Message in = receiver.fetch(5 * DURATION_SEC);
+ Message in = receiver.fetch(Duration::SECOND * 5);
fix.session.acknowledge();
BOOST_CHECK_EQUAL(in.getContent(), out.getContent());
}
@@ -247,7 +247,7 @@ QPID_AUTO_TEST_CASE(testSendReceiveHeaders)
Receiver receiver = fix.session.createReceiver(fix.queue);
Message in;
for (uint i = 0; i < 10; ++i) {
- BOOST_CHECK(receiver.fetch(in, 5 * DURATION_SEC));
+ BOOST_CHECK(receiver.fetch(in, Duration::SECOND * 5));
BOOST_CHECK_EQUAL(in.getContent(), out.getContent());
BOOST_CHECK_EQUAL(in.getHeaders()["a"].asUint32(), i);
fix.session.acknowledge();
@@ -301,7 +301,7 @@ QPID_AUTO_TEST_CASE(testSimpleTopic)
BOOST_CHECK_EQUAL(fetch(sub1, 4), boost::assign::list_of<std::string>("two")("three")("four")("five"));
BOOST_CHECK_EQUAL(fetch(sub3, 2), boost::assign::list_of<std::string>("four")("five"));
Message in;
- BOOST_CHECK(!sub2.fetch(in, 0));//TODO: or should this raise an error?
+ BOOST_CHECK(!sub2.fetch(in, Duration::IMMEDIATE));//TODO: or should this raise an error?
//TODO: check pending messages...
@@ -324,7 +324,7 @@ QPID_AUTO_TEST_CASE(testNextReceiver)
for (uint i = 0; i < fix.queues.size(); i++) {
Message msg;
- BOOST_CHECK(fix.session.nextReceiver().fetch(msg, DURATION_SEC));
+ BOOST_CHECK(fix.session.nextReceiver().fetch(msg, Duration::SECOND));
BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1)).str());
}
}
@@ -346,7 +346,7 @@ QPID_AUTO_TEST_CASE(testMapMessage)
content.encode();
sender.send(out);
Receiver receiver = fix.session.createReceiver(fix.queue);
- Message in = receiver.fetch(5 * DURATION_SEC);
+ Message in = receiver.fetch(5 * Duration::SECOND);
MapView view(in);
BOOST_CHECK_EQUAL(view["abc"].asString(), "def");
BOOST_CHECK_EQUAL(view["pi"].asFloat(), 3.14f);
@@ -369,7 +369,7 @@ QPID_AUTO_TEST_CASE(testMapMessageWithInitial)
content.encode();
sender.send(out);
Receiver receiver = fix.session.createReceiver(fix.queue);
- Message in = receiver.fetch(5 * DURATION_SEC);
+ Message in = receiver.fetch(5 * Duration::SECOND);
MapView view(in);
BOOST_CHECK_EQUAL(view["abc"].asString(), "def");
BOOST_CHECK_EQUAL(view["pi"].asFloat(), 3.14f);
@@ -389,7 +389,7 @@ QPID_AUTO_TEST_CASE(testListMessage)
content.encode();
sender.send(out);
Receiver receiver = fix.session.createReceiver(fix.queue);
- Message in = receiver.fetch(5 * DURATION_SEC);
+ Message in = receiver.fetch(5 * Duration::SECOND);
ListView view(in);
BOOST_CHECK_EQUAL(view.size(), content.size());
BOOST_CHECK_EQUAL(view.front().asString(), "abc");
@@ -423,7 +423,7 @@ QPID_AUTO_TEST_CASE(testListMessageWithInitial)
content.encode();
sender.send(out);
Receiver receiver = fix.session.createReceiver(fix.queue);
- Message in = receiver.fetch(5 * DURATION_SEC);
+ Message in = receiver.fetch(5 * Duration::SECOND);
ListView view(in);
BOOST_CHECK_EQUAL(view.size(), content.size());
BOOST_CHECK_EQUAL(view.front().asString(), "abc");
@@ -452,10 +452,10 @@ QPID_AUTO_TEST_CASE(testReject)
Message m2("accept-me");
sender.send(m2);
Receiver receiver = fix.session.createReceiver(fix.queue);
- Message in = receiver.fetch(5 * DURATION_SEC);
+ Message in = receiver.fetch(5 * Duration::SECOND);
BOOST_CHECK_EQUAL(in.getContent(), m1.getContent());
fix.session.reject(in);
- in = receiver.fetch(5 * DURATION_SEC);
+ in = receiver.fetch(5 * Duration::SECOND);
BOOST_CHECK_EQUAL(in.getContent(), m2.getContent());
fix.session.acknowledge();
}
@@ -834,17 +834,17 @@ QPID_AUTO_TEST_CASE(testTx)
send(sender2, 5, 1, "B");
ssn2.commit();
receive(receiver1, 5, 1, "B");//(only those from sender2 should be received)
- BOOST_CHECK(!receiver1.fetch(in, 0));//check there are no more messages
+ BOOST_CHECK(!receiver1.fetch(in, Duration::IMMEDIATE));//check there are no more messages
ssn1.rollback();
receive(receiver2, 5, 1, "B");
- BOOST_CHECK(!receiver2.fetch(in, 0));//check there are no more messages
+ BOOST_CHECK(!receiver2.fetch(in, Duration::IMMEDIATE));//check there are no more messages
ssn2.rollback();
receive(receiver1, 5, 1, "B");
- BOOST_CHECK(!receiver1.fetch(in, 0));//check there are no more messages
+ BOOST_CHECK(!receiver1.fetch(in, Duration::IMMEDIATE));//check there are no more messages
ssn1.commit();
//check neither receiver gets any more messages:
- BOOST_CHECK(!receiver1.fetch(in, 0));
- BOOST_CHECK(!receiver2.fetch(in, 0));
+ BOOST_CHECK(!receiver1.fetch(in, Duration::IMMEDIATE));
+ BOOST_CHECK(!receiver2.fetch(in, Duration::IMMEDIATE));
}
QPID_AUTO_TEST_SUITE_END()
diff --git a/cpp/src/tests/qpid_recv.cpp b/cpp/src/tests/qpid_recv.cpp
index 3ee8208a0b..994ef00960 100644
--- a/cpp/src/tests/qpid_recv.cpp
+++ b/cpp/src/tests/qpid_recv.cpp
@@ -96,8 +96,8 @@ struct Options : public qpid::Options
Duration getTimeout()
{
- if (forever) return INFINITE_DURATION;
- else return timeout*DURATION_SEC;
+ if (forever) return Duration::INFINITE;
+ else return Duration::SECOND*timeout;
}
bool parse(int argc, char** argv)
@@ -173,7 +173,7 @@ int main(int argc, char ** argv)
if (msg.getReplyTo()) std::cout << "ReplyTo: " << msg.getReplyTo() << std::endl;
if (msg.getCorrelationId().size()) std::cout << "CorrelationId: " << msg.getCorrelationId() << std::endl;
if (msg.getUserId().size()) std::cout << "UserId: " << msg.getUserId() << std::endl;
- if (msg.getTtl()) std::cout << "TTL: " << msg.getTtl() << std::endl;
+ if (msg.getTtl().getMilliseconds()) std::cout << "TTL: " << msg.getTtl().getMilliseconds() << std::endl;
if (msg.getDurable()) std::cout << "Durable: true" << std::endl;
if (msg.isRedelivered()) std::cout << "Redelivered: true" << std::endl;
std::cout << "Headers: " << msg.getHeaders() << std::endl;
diff --git a/cpp/src/tests/qpid_send.cpp b/cpp/src/tests/qpid_send.cpp
index eabcbdae21..a75d21e0a5 100644
--- a/cpp/src/tests/qpid_send.cpp
+++ b/cpp/src/tests/qpid_send.cpp
@@ -192,7 +192,7 @@ int main(int argc, char ** argv)
Message msg;
msg.setDurable(opts.durable);
if (opts.ttl) {
- msg.setTtl(opts.ttl);
+ msg.setTtl(Duration(opts.ttl));
}
if (!opts.replyto.empty()) msg.setReplyTo(Address(opts.replyto));
if (!opts.userid.empty()) msg.setUserId(opts.userid);